# Snowpark for Python Advanced DEMO

## Imports

In [None]:
# Import login values from config
#from config import *

In [None]:
# Import packages needed
!pip install snowflake-snowpark-python
from snowflake.snowpark import Session
from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *
import pandas as pd

Collecting snowflake-snowpark-python
  Downloading snowflake_snowpark_python-1.24.0-py3-none-any.whl.metadata (105 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/105.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m105.6/105.6 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
Collecting snowflake-connector-python<4.0.0,>=3.10.0 (from snowflake-snowpark-python)
  Downloading snowflake_connector_python-3.12.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (65 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.5/65.5 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
Collecting cloudpickle!=2.1.0,!=2.2.0,<=2.2.1,>=1.6.0 (from snowflake-snowpark-python)
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python<4.0.0,>=3.10.0->snowflake-snowpark-python)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.

## Session

In [None]:
# Function for creating session object
from google.colab import userdata

def snowpark_session_create():
    account=userdata.get('account')
    user=userdata.get('user')
    password=userdata.get('password')
    role=userdata.get('role')


    connection_params={
      "account":account,
      "user": user,
      "password": password,
      "role": role
    }
    session = Session.builder.configs(connection_params).create()
    session.sql_simplifier_enabled = True
    return session

In [None]:
# Creation of session using the above function
demo_session = snowpark_session_create()

In [None]:
# TODO: Replace with values relevant to you
demo_session.use_database("SAMPLE_SNOWPARK")
#demo_session.use_schema("")

## Look At Data

In [None]:
# TODO: Run the below after importing the data in Snowflake UI
sales = demo_session.table("RAW_CREDITCO_SALES")
sales.show()

----------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"DATA"                                              |
----------------------------------------------------------------------------------------------------
|1               |2024-10-28 11:59:58.248000  |[                                                   |
|                |                            |  {                                                 |
|                |                            |    "creditCardNumber": "30211992825494",           |
|                |                            |    "creditCardType": "diners-club-carte-blanche",  |
|                |                            |    "dateTime": "2022-04-06 12:24:38",              |
|                |                            |    "id": "bad9cffb24fd560624859ae0c21f9135f5b0...  |
|                |                            |    "saleAmount": 302.2                     

In [None]:
sales_items = demo_session.table("RAW_CREDITCO_SALES_ITEMS")
sales_items.show()

----------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"DATA"                                              |
----------------------------------------------------------------------------------------------------
|1               |2024-10-28 12:00:00.435000  |[                                                   |
|                |                            |  {                                                 |
|                |                            |    "creditCardNumber": "30211992825494",           |
|                |                            |    "dateTime": "2022-04-06 12:24:38",              |
|                |                            |    "id": "bad9cffb24fd560624859ae0c21f9135f5b0...  |
|                |                            |    "items": [                                      |
|                |                            |      "T-shirt",                            

## UDF

In [None]:
# Run the below to see the transformed data
sales_items_flat = (
    sales_items.join_table_function("flatten", col("data"))
    .withColumn("credit_card_number", json_extract_path_text("value", lit("creditCardNumber")))
    .withColumn("date_time", json_extract_path_text("value", lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("value", lit("id"))))
    .withColumn("items", json_extract_path_text("value", lit("items")))
    .select("ingestion_id", "ingested_at", "credit_card_number", "date_time", "id", "items")
)

sales_items_flat.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"CREDIT_CARD_NUMBER"  |"DATE_TIME"          |"ID"                              |"ITEMS"                                             |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1               |2024-10-28 12:00:00.435000  |30211992825494        |2022-04-06 12:24:38  |690863fd43c06792f23fdc2700ae090e  |["T-shirt","Jeans","Sneakers"]                      |
|1               |2024-10-28 12:00:00.435000  |4405443328071522      |2023-02-16 10:21:46  |2b0de9a34332c8f27f11d07d11b4eaa2  |["Dress","Sandals"]                                 |
|1               |2024-10-28 12:00:00.435000  |3579855610552574      |2022-06-01 23:16:18  |be8

In [None]:
# Create an interal stage for our UDF
demo_session.sql("CREATE OR REPLACE STAGE DEMO_STAGE").collect()

[Row(status='Stage area DEMO_STAGE successfully created.')]

In [22]:
# Place our UDF file in the internal stage
demo_session.file.put("udf.py" ,"@DEMO_STAGE",auto_compress=False)

[PutResult(source='udf.py', target='udf.py', source_size=67, target_size=80, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [23]:
# TODO: Register the UDF from the stage
sort_alphabetically = demo_session.udf.register_from_file(
    file_path = "@DEMO_STAGE/udf.py",
    func_name="sort_alphabetically",
    return_type=ArrayType(),
    input_types=[VariantType()],
    name="sort_alphabetically",
    is_permanent=True,
    stage_location="@DEMO_STAGE",
    replace=True
)

In [24]:
# TODO: Repeat the transformation above with the UDF
sales_items_flat_wudf =(
    sales_items.join_table_function("flatten", col("data"))
    .withColumn("credit_card_number", json_extract_path_text("value", lit("creditCardNumber")))
    .withColumn("date_time", json_extract_path_text("value", lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("value", lit("id"))))
    .withColumn("items", sort_alphabetically(parse_json(json_extract_path_text("value", lit("items")))))
    .select("ingestion_id", "ingested_at", "credit_card_number", "date_time", "id", "items")
)


In [25]:
# TODO: .show()
sales_items_flat_wudf.show()

------------------------------------------------------------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"CREDIT_CARD_NUMBER"  |"DATE_TIME"          |"ID"                              |"ITEMS"               |
------------------------------------------------------------------------------------------------------------------------------------------------------
|1               |2024-10-28 12:00:00.435000  |30211992825494        |2022-04-06 12:24:38  |690863fd43c06792f23fdc2700ae090e  |[                     |
|                |                            |                      |                     |                                  |  "Jeans",            |
|                |                            |                      |                     |                                  |  "Sneakers",         |
|                |                            |                      |                     |  

In [27]:
# TODO: Save as a table called `sales_items`
sales_items_flat_wudf.write.mode("overwrite").saveAsTable("sales_items")

## Merge

In [28]:
# Read CSV
new_sales_item_data = pd.read_csv("new_sales_item_data.csv")

In [29]:
# Print the pandas DF
new_sales_item_data

Unnamed: 0,INGESTION_ID,INGESTED_AT,CREDIT_CARD_NUMBER,DATE_TIME,ID,ITEMS
0,3,2023-04-06 08:28:53.892000,3534118906250666,2022-12-07 22:57:35,4cd1974f5be885220ab629a5228c2ebf,"[""Blazer"",""Slacks"",""Loafers""]"
1,3,2023-04-06 08:28:53.892000,3575177399469647,2023-02-11 20:01:45,0aba79867bd7d7b7823264e9408d095b,"[""Button-up Shirt"",""Khakis""]"
2,3,2023-04-06 08:28:53.892000,6333020302312124279,2023-03-09 11:22:30,0484eb3108c06aed5e6a354e5203d721,"[""Vest"",""Jeans"",""Sneakers""]"
3,3,2023-04-06 08:28:53.892000,3551078881931920,2023-02-12 22:10:49,5ff7f4e02b59ad20f55f53b94a77368f,"[""Crop Top"",""Shorts""]"
4,3,2023-04-06 08:28:53.892000,3537111546909475,2023-03-16 06:46:57,4b48465f9060140312d689dfd076a71f,"[""Sweater Dress"",""Tights"",""Heels""]"
5,3,2023-04-06 08:28:53.892000,3557546242325275,2022-08-23 11:26:17,cd1675b2f0e20c56c8ea1f3e47afc577,"[""Turtleneck"",""Corduroy Pants""]"
6,4,2023-04-11 09:30:15.123000,1234567890123456,2023-04-10 18:15:30,83f15e2fbb7f1b2208ef8423f07f54e8,"[""T-Shirt"",""Jeans"",""Sneakers""]"
7,4,2023-04-11 09:30:15.123000,9876543210987654,2023-04-09 21:45:10,f439b6fc0f0bc8f2317b36afbdca19f7,"[""Hoodie"",""Leggings"",""Boots""]"
8,4,2023-04-11 09:30:15.123000,5678901234567890,2023-04-08 14:30:45,3b095264fc87e528edf91e16f2d51cd1,"[""Dress"",""Sandals""]"
9,4,2023-04-11 09:30:15.123000,9876543210123456,2023-04-07 10:20:15,83ff2aa7d37d0e7836ca5cf6f5c5a5d2,"[""Jacket"",""Pants"",""Loafers""]"


In [30]:
# Write pandas to `new_sales_item_data` as a temp table and show the new Snowpark DF

new_sales_item_data_sp = demo_session.write_pandas(
    df = new_sales_item_data,
    table_name = "new_sales_item_data",
    overwrite=True,
    auto_create_table=True,
    table_type="temp",
    quote_identifiers=False
)

new_sales_item_data_sp.show()

--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"CREDIT_CARD_NUMBER"  |"DATE_TIME"          |"ID"                              |"ITEMS"                             |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|3               |2023-04-06 08:28:53.892000  |3534118906250666      |2022-12-07 22:57:35  |4cd1974f5be885220ab629a5228c2ebf  |["Blazer","Slacks","Loafers"]       |
|3               |2023-04-06 08:28:53.892000  |3575177399469647      |2023-02-11 20:01:45  |0aba79867bd7d7b7823264e9408d095b  |["Button-up Shirt","Khakis"]        |
|3               |2023-04-06 08:28:53.892000  |6333020302312124279   |2023-03-09 11:22:30  |0484eb3108c06aed5e6a354e5203d721  |["Vest","Jeans","Sneakers"]         |
|3        

In [33]:
# TODO: Merge `sales_items` with `new_sales_item_data`
target=demo_session.table("sales_items")
merged =target.merge(
    source=new_sales_item_data_sp,
    join_expr = (target["ID"]==new_sales_item_data_sp["ID"]),
    clauses=[
        when_not_matched().insert(
            {
                "ingestion_id":new_sales_item_data_sp["ingestion_id"],
                "INGESTED_AT":new_sales_item_data_sp["ingested_at"],
                "CREDIT_CARD_NUMBER":new_sales_item_data_sp["CREDIT_CARD_NUMBER"],
                "DATE_TIME":new_sales_item_data_sp["DATE_TIME"],
                "ID":new_sales_item_data_sp["ID"],
                "ITEMS":parse_json(new_sales_item_data_sp["ITEMS"])
            }
        )
    ]
)

In [34]:
# TODO: Run the below to see how many rows were inserted, updated and deleted
merged

MergeResult(rows_inserted=10, rows_updated=0, rows_deleted=0)

## Reading from Stages

In [58]:
# TODO: Put the two json files into our `demo stage`
demo_session.file.put("new_sales.json" ,"@DEMO_STAGE",auto_compress=False)
demo_session.file.put("new_sales_items.json" ,"@DEMO_STAGE",auto_compress=False)

[PutResult(source='new_sales_items.json', target='new_sales_items.json', source_size=3648, target_size=3664, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [37]:
# TODO: Create a DataFrameReader to read `new_sales_items.json`
json = demo_session.read.options({"STRIP_OUTER_ARRAY":True}).json("@DEMO_STAGE/new_sales_items.json")
json.show()

------------------------------------------------------
|"$1"                                                |
------------------------------------------------------
|{                                                   |
|  "creditCardNumber": "3552372594286931",           |
|  "dateTime": "2022-12-15 01:17:57",                |
|  "id": "eeb62e335642e773c8c1ced13fe86c06e96d08...  |
|  "items": [                                        |
|    "Polo shirt",                                   |
|    "Shorts",                                       |
|    "Sandals"                                       |
|  ]                                                 |
|}                                                   |
|{                                                   |
|  "creditCardNumber": "5038306986883304079",        |
|  "dateTime": "2023-01-03 10:05:00",                |
|  "id": "ad187740d176791cd73aab91956c78974b720a...  |
|  "items": [                                        |
|    "Blue

## Automated ELT Snowpark

In [57]:
# Run the below to create our `transformed` table
extract_sales = (
    sales
    .join_table_function("flatten", col("data"))
    .withColumn("credit_card_number", json_extract_path_text("value", lit("creditCardNumber")))
    .withColumn("credit_card_type", json_extract_path_text("value", lit("creditCardType")))
    .withColumn("date_time", json_extract_path_text("value", lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("value", lit("id"))))
    .withColumn("sale_amount", json_extract_path_text("value", lit("saleAmount")))
    .select("credit_card_number", "credit_card_type", "date_time", "id", "sale_amount")
)

extract_sales_items = (
    sales_items
    .join_table_function("flatten", col("data"))
    .withColumn("credit_card_number", json_extract_path_text("value", lit("creditCardNumber")))
    .withColumn("date_time", json_extract_path_text("value", lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("value", lit("id"))))
    .withColumn("items", sort_alphabetically(parse_json(json_extract_path_text("value", lit("items")))))
    .select("credit_card_number", "date_time", "id", "items")
)

join_sales_and_sales_items = (
    extract_sales.join(
        right= extract_sales_items,
        on= extract_sales.id == extract_sales_items.id,
        rsuffix = "_si"
    )
    .select(
        "credit_card_number",
        "date_time", "id", "items",
        "sale_amount"
    )
)

join_sales_and_sales_items.write.mode("overwrite").save_as_table(
    table_name = "sales_data"
)

In [39]:
# TODO: Run and show
join_sales_and_sales_items.show()

------------------------------------------------------------------------------------------------------------------------
|"CREDIT_CARD_NUMBER"  |"DATE_TIME"          |"ID"                              |"ITEMS"               |"SALE_AMOUNT"  |
------------------------------------------------------------------------------------------------------------------------
|30211992825494        |2022-04-06 12:24:38  |690863fd43c06792f23fdc2700ae090e  |[                     |302.2          |
|                      |                     |                                  |  "Jeans",            |               |
|                      |                     |                                  |  "Sneakers",         |               |
|                      |                     |                                  |  "T-shirt"           |               |
|                      |                     |                                  |]                     |               |
|4405443328071522      |2023-02-

In [43]:
# TODO: Create DataFrameReaders of the two .json files in our stage
new_sales_items = demo_session.read.options({"STRIP_OUTER_ARRAY":True}).json("@DEMO_STAGE/new_sales_items.json")

new_sales_items.write.mode("overwrite").save_as_table(
    table_name = "new_sales_items_transient",
    table_type="transient"
)
new_sales = demo_session.read.options({"STRIP_OUTER_ARRAY":True}).json("@DEMO_STAGE/new_sales.json")

new_sales.write.mode("overwrite").save_as_table(
    table_name ="new_sales_transient",
    table_type="transient"
)

In [46]:
# TODO: Transform the new data as you did the old data
extract_new_sales = (
    demo_session.table("new_sales_transient")
    .withColumn("credit_card_number", json_extract_path_text("$1", lit("creditCardNumber")))
    .withColumn("credit_card_type", json_extract_path_text("$1", lit("creditCardType")))
    .withColumn("date_time", json_extract_path_text("$1", lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("$1", lit("id"))))
    .withColumn("sale_amount", json_extract_path_text("$1", lit("saleAmount")))
    .select("credit_card_number", "credit_card_type", "date_time", "id", "sale_amount")
)

extract_new_sales_items = (
    demo_session.table("new_sales_items_transient")
    .withColumn("credit_card_number", json_extract_path_text("$1", lit("creditCardNumber")))
    .withColumn("date_time", json_extract_path_text("$1", lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("$1", lit("id"))))
    .withColumn("items", sort_alphabetically(parse_json(json_extract_path_text("$1", lit("items")))))
    .select("credit_card_number", "date_time", "id", "items")
)

join_new_sales_and_sales_items = (
    extract_new_sales.join(
        right= extract_new_sales_items,
        on= extract_new_sales.id == extract_new_sales_items.id,
        rsuffix = "_si"
    )
    .select(
        "credit_card_number",
        "date_time", "id", "items",
        "sale_amount"
    )
)

join_new_sales_and_sales_items.show()

-------------------------------------------------------------------------------------------------------------------
|"CREDIT_CARD_NUMBER"  |"DATE_TIME"          |"ID"                              |"ITEMS"          |"SALE_AMOUNT"  |
-------------------------------------------------------------------------------------------------------------------
|1234567812345678      |2022-05-10 10:30:45  |720a209f737489338e85c86339bbc080  |[                |100.25         |
|                      |                     |                                  |  "Jeans",       |               |
|                      |                     |                                  |  "Sneakers",    |               |
|                      |                     |                                  |  "T-shirt"      |               |
|                      |                     |                                  |]                |               |
|6789012367890123      |2022-10-12 15:35:40  |5d14cbe23215c81c7e155b1c97

In [54]:
# TODO: Merge the new data into the old
target=demo_session.table("sales_data")
merged =target.merge(
    source=join_new_sales_and_sales_items,
    join_expr = (target["ID"]==new_sales_item_data_sp["ID"]),
    clauses=[
        when_not_matched().insert(
            {
                "CREDIT_CARD_NUMBER":join_new_sales_and_sales_items["CREDIT_CARD_NUMBER"],
                "DATE_TIME":join_new_sales_and_sales_items["DATE_TIME"],
                "ID":join_new_sales_and_sales_items["ID"],
                "ITEMS":join_new_sales_and_sales_items["ITEMS"],
                "SALE_AMOUNT":join_new_sales_and_sales_items["SALE_AMOUNT"]
            }
        )
    ]
)
merged

MergeResult(rows_inserted=0, rows_updated=0, rows_deleted=0)

In [55]:
# TODO: Drop the transient tables
demo_session.table("new_sales_items_transient").drop_table()
demo_session.table("new_sales_transient").drop_table()

In [56]:
# TODO: Remove the files from the stage
demo_session.sql("REMOVE @DEMO_STAGE/new_sales_items.json").collect()
demo_session.sql("REMOVE @DEMO_STAGE/new_sales.json").collect()

[Row(name='demo_stage/new_sales.json', result='removed')]