semi-structured data

In [8]:

%pip install snowflake-connector-python
%pip install snowflake-snowpark-python

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.
Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [31]:
import os
import configparser
from snowflake.snowpark import Session
import snowflake.snowpark.functions as f
from snowflake.snowpark.window import Window
from datetime import date
from snowflake.snowpark.types import ArrayType, VariantType 

In [10]:
config = configparser.ConfigParser()
config.read('config.ini')


['config.ini']

In [11]:

def snowpark_session_create():
    connection_params = {
        "account": config.get("snowflake", "account"),
        "user": config.get("snowflake", "user"),
        "password": config.get("snowflake", "password"),
        "role": config.get("snowflake", "role"),
        "warehouse": config.get("snowflake", "warehouse"),
        "database": config.get("snowflake", "database"),
        "schema": config.get("snowflake", "schema")
    }

    session = Session.builder.configs(connection_params).create()
    return session


In [12]:
demo_session = snowpark_session_create()

In [17]:
df = demo_session.sql("SELECT * FROM DEMO.RAW.RAW_CREDIT_SALES ")
df.show()


----------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"DATA"                                              |
----------------------------------------------------------------------------------------------------
|1               |2024-11-20 00:52:36.762000  |[                                                   |
|                |                            |  {                                                 |
|                |                            |    "creditCardNumber": "30211992825494",           |
|                |                            |    "creditCardType": "diners-club-carte-blanche",  |
|                |                            |    "dateTime": "2022-04-06 12:24:38",              |
|                |                            |    "id": "bad9cffb24fd560624859ae0c21f9135f5b0...  |
|                |                            |    "saleAmount": 302.2                     

In [20]:
demo_session.use_database("demo")
demo_session.use_schema("raw")
demo_session.table("raw_credit_sales")
demo_session.table("raw_credit_sales_items")

<snowflake.snowpark.table.Table at 0x718c2ea92d10>

In [21]:
sales_items = demo_session.table("raw_credit_sales_items")
sales_items.show()

----------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"DATA"                                              |
----------------------------------------------------------------------------------------------------
|1               |2024-11-20 01:06:59.493000  |[                                                   |
|                |                            |  {                                                 |
|                |                            |    "creditCardNumber": "30211992825494",           |
|                |                            |    "dateTime": "2022-04-06 12:24:38",              |
|                |                            |    "id": "bad9cffb24fd560624859ae0c21f9135f5b0...  |
|                |                            |    "items": [                                      |
|                |                            |      "T-shirt",                            

transform data into structured format 

In [23]:
#lit funct creates a column
sales_items_strcd = (
    sales_items.join_table_function("flatten",f.col("data"))
    .withColumn("credit_card_number",f.json_extract_path_text("value",f.lit("creditCardNumber")) )
    .withColumn("date_time",f.json_extract_path_text("value",f.lit("dateTime")))
    .withColumn("id",f.json_extract_path_text("value",f.lit("id")))
    .withColumn("items",f.json_extract_path_text("value",f.lit("items")))
    .select("ingestion_id","ingested_at","credit_card_number","date_time","id","items")
)

sales_items_strcd.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"CREDIT_CARD_NUMBER"  |"DATE_TIME"          |"ID"                                                |"ITEMS"                                             |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1               |2024-11-20 01:06:59.493000  |30211992825494        |2022-04-06 12:24:38  |bad9cffb24fd560624859ae0c21f9135f5b005673dbd1d5...  |["T-shirt","Jeans","Sneakers"]                      |
|1               |2024-11-20 01:06:59.493000  |4405443328071522      |2023-02-16 10:21:46  |f70a7d8263289ad52bc948916d45325215396c6d8910405...  |["Dress","Sandals"]                                 |
|1   

creating a stage for udf

In [26]:
demo_session.sql("create or replace stage demo_stage").collect()

[Row(status='Stage area DEMO_STAGE successfully created.')]

In [27]:
demo_session.file.put("udf.py","@demo_stage",auto_compress=False)

[PutResult(source='udf.py', target='udf.py', source_size=56, target_size=64, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [33]:
#we need to register the udf from the srage
sort_items = demo_session.udf.register_from_file(
    file_path="@demo_stage/udf.py",
    func_name="sortitems",
    return_type=ArrayType(),
    input_types=[VariantType()],
    name="sort_items",
    replace=True,
    stage_location="@demo_stage",
    is_permanent=True
)

In [34]:
sales_items_strcd_wudf = ( 
    sales_items.join_table_function("flatten",f.col("data"))
    .withColumn("credit_card_number",f.json_extract_path_text("value",f.lit("creditCardNumber")) )
    .withColumn("date_time",f.json_extract_path_text("value",f.lit("dateTime")))
    .withColumn("id",f.json_extract_path_text("value",f.lit("id")))
    .withColumn("items",sort_items(f.parse_json(f.json_extract_path_text("value",f.lit("items")))))
    .select("ingestion_id","ingested_at","credit_card_number","date_time","id","items"))

In [35]:
sales_items_strcd_wudf.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"CREDIT_CARD_NUMBER"  |"DATE_TIME"          |"ID"                                                |"ITEMS"               |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1               |2024-11-20 01:06:59.493000  |30211992825494        |2022-04-06 12:24:38  |bad9cffb24fd560624859ae0c21f9135f5b005673dbd1d5...  |[                     |
|                |                            |                      |                     |                                                    |  "Jeans",            |
|                |                            |                      |                     |                                                    |  "Sneaker

we can save this table now :)

In [36]:
sales_items_strcd_wudf.write.mode("overwrite").save_as_table("sales_items")