In [1]:
import code_library as cl
import json
from schema import src_stg_schema

# Loading unstructured files from s3 to snowflake

In [2]:
config_snow_copy = open('config_files/copy_to_snowstg_avro.json')
config_snow_copy = json.loads(config_snow_copy.read())
config_snow_copy

{'Database_name': 'DEMO_DB',
 'Schema_name': 'PUBLIC',
 'Target_table': 'INT_EMP_DETAILS_AVRO',
 'Reject_table': 'EMPLOYEE_AVRO_REJECTS',
 'target_columns': ['REGISTRATION_DTTM',
  'ID',
  'FIRST_NAME',
  'LAST_NAME',
  'EMAIL',
  'GENDER',
  'IP_ADDRESS',
  'CC',
  'COUNTRY',
  'BIRTHDATE',
  'SALARY',
  'TITLE',
  'COMMENTS'],
 'on_error': 'CONTINUE',
 'Source_location': '@DEMO_DB.EXTERNAL_STAGES.S3_STG2/sp_avro/',
 'Source_file_type': 'avro'}

In [3]:
session = cl.snowconnection(cl.connection_parameters)

In [4]:
# # Commenting the entire block of code as the stage is allready created, if you are running for the first time recreate the stage
# #creating a new stage with a file format for avro file
# result = session.sql(
#     f"""
#         CREATE OR REPLACE stage demo_db.external_stages.s3_stg2
#             URL = {cl.secrets['s3_bucket_for_stg']}
#             STORAGE_INTEGRATION = s3_int
#             file_format = (type = 'AVRO')
#     """
#     )

# result.show()
# """
# --------------------------------------------
# |"status"                                  |
# --------------------------------------------
# |Stage area S3_STG2 successfully created.  |
# --------------------------------------------
# """

In [5]:
# describing the stage
result = session.sql(
    f"""
        desc stage DEMO_DB.EXTERNAL_STAGES.S3_STG2;
    """
    )

result.show(1000)

-----------------------------------------------------------------------------------------------------------------------------------------------
|"parent_property"   |"property"                  |"property_type"  |"property_value"                                    |"property_default"  |
-----------------------------------------------------------------------------------------------------------------------------------------------
|STAGE_FILE_FORMAT   |TYPE                        |String           |AVRO                                                |CSV                 |
|STAGE_FILE_FORMAT   |TRIM_SPACE                  |Boolean          |false                                               |false               |
|STAGE_FILE_FORMAT   |NULL_IF                     |List             |[]                                                  |[\\N]               |
|STAGE_FILE_FORMAT   |COMPRESSION                 |String           |AUTO                                                |AUTO          

In [6]:
# listing the files to know if we are able to access the stage
result = session.sql(
    f"""
        list @DEMO_DB.EXTERNAL_STAGES.S3_STG2;
    """
    )

result.show()

-------------------------------------------------------------------------------------------------------------------------------
|"name"                                           |"size"  |"md5"                             |"last_modified"                |
-------------------------------------------------------------------------------------------------------------------------------
|s3://inzs3bucketlearn/sp_avro/userdata1.avro     |93561   |e60871d1105f230b90d5ae2f8d9481ce  |Wed, 21 Feb 2024 11:05:57 GMT  |
|s3://inzs3bucketlearn/sp_csv/employee01.csv      |342     |8e77d4649e70c82d47e0d4d8b76a13ee  |Wed, 21 Feb 2024 11:08:40 GMT  |
|s3://inzs3bucketlearn/sp_csv/employee02.csv      |391     |2ccf600775f9db9c78b49964cb5ff821  |Wed, 21 Feb 2024 11:08:41 GMT  |
|s3://inzs3bucketlearn/sp_csv/employee03.csv      |408     |a3f1099a25248926a1d9fa32947ca289  |Wed, 21 Feb 2024 11:08:41 GMT  |
|s3://inzs3bucketlearn/sp_csv/employee04.csv      |387     |9906f9f4449edb68f7570737d6999b3b  |Wed, 21 F

In [7]:
# dir(session.read)

In [8]:
# help(session.read)

In [9]:
#   # Example 6:
#   #      Loading an AVRO file and infer the schema:
#   #          >>> from snowflake.snowpark.functions import col
#   #          >>> _ = session.file.put("tests/resources/test.avro", "@mystage", auto_compress=False)
#   #          >>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
#   #          >>> df = session.read.avro("@mystage/test.avro").where(col('"num"') == 2)
#   #          >>> # Load the data into the DataFrame and return an array of rows containing the results.
#   #          >>> df.collect()
#   #          [Row(str='str2', num=2)]

# from snowflake.snowpark.functions import col
# df = session.read.avro("@DEMO_DB.EXTERNAL_STAGES.S3_STG2/sp_avro/user_data1.avro").where(col('"num"') == 2)
# df.collect()

In [10]:
df = session.read.avro("@DEMO_DB.EXTERNAL_STAGES.S3_STG2/sp_avro/")

In [11]:
df.columns

['"registration_dttm"',
 '"id"',
 '"first_name"',
 '"last_name"',
 '"email"',
 '"gender"',
 '"ip_address"',
 '"cc"',
 '"country"',
 '"birthdate"',
 '"salary"',
 '"title"',
 '"comments"']

In [12]:
df.schema

StructType([StructField('"registration_dttm"', StringType(16777216), nullable=False), StructField('"id"', LongType(), nullable=False), StructField('"first_name"', StringType(16777216), nullable=False), StructField('"last_name"', StringType(16777216), nullable=False), StructField('"email"', StringType(16777216), nullable=False), StructField('"gender"', StringType(16777216), nullable=False), StructField('"ip_address"', StringType(16777216), nullable=False), StructField('"cc"', LongType(), nullable=True), StructField('"country"', StringType(16777216), nullable=False), StructField('"birthdate"', StringType(16777216), nullable=False), StructField('"salary"', DoubleType(), nullable=True), StructField('"title"', StringType(16777216), nullable=False), StructField('"comments"', StringType(16777216), nullable=False)])

# copy the above schema and replace the double quotes and paste it in the schema folder.
## we are using this schema as we created the table in snowflake manually using these column names.
## ddl for the created table 
    // create a table to store the details
CREATE TABLE int_emp_details_avro
(
    REGISTRATION_DTTM STRING,
    ID STRING,
    FIRST_NAME STRING,
    LAST_NAME STRING,
    EMAIL STRING,
    GENDER STRING,
    IP_ADDRESS STRING,
    CC STRING,
    COUNTRY STRING,
    BIRTHDATE STRING,
    SALARY STRING,
    TITLE STRING,
    COMMENTS STRING
);


In [13]:
copied_into_result, qid = cl.copy_to_table_semi_struct_data(session, config_snow_copy, src_stg_schema.int_emp_details_avro)

maped_columns : None


AttributeError: 'NoneType' object has no attribute 'keys'