<a href="https://colab.research.google.com/github/manoharpavuluri/DE_SnowflakeSnowpark/blob/main/Snowflake_Snowpark_1sttry.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Installs**

In [1]:
!pip install config
!pip install snowflake
!pip install pandas
!pip install toml
!pip install snowflake-snowpark-python



Collecting config
  Downloading config-0.5.1-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading config-0.5.1-py2.py3-none-any.whl (20 kB)
Installing collected packages: config
Successfully installed config-0.5.1
Collecting snowflake
  Downloading snowflake-0.13.0-py3-none-any.whl.metadata (2.0 kB)
Collecting snowflake-core==0.13.0 (from snowflake)
  Downloading snowflake_core-0.13.0-py3-none-any.whl.metadata (1.8 kB)
Collecting snowflake-legacy (from snowflake)
  Downloading snowflake_legacy-0.13.0-py3-none-any.whl.metadata (2.5 kB)
Collecting snowflake-connector-python (from snowflake-core==0.13.0->snowflake)
  Downloading snowflake_connector_python-3.12.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (65 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.3/65.3 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
Collecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python->snowflake-core==0.13.0->snowflake)
  Downloading asn1crypto-1.5.1-py

# **Imports**

In [2]:
import configparser
import os
import subprocess

In [3]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *
import pandas as pd
import toml
import datetime


# **Adding Config.ini to .gitignore**

In [4]:
# to create or update .gitignore
def update_gitignore(file_name):
  with open('.gitignore', 'a') as gitignore_file:
    gitignore_file.write(f"{file_name}\n")

# to untrake a file
def untrack_file(file_name):
  subprocess.run(['git', 'rm', '--cached', file_name])


In [5]:
# Update .gitignore
update_gitignore('/content/drive/MyDrive/Colab Notebooks/Snowflake_Snowpark_1sttry_config.toml')

# Untrack the config.ini file if it exists
if os.path.exists('/content/drive/MyDrive/Colab Notebooks/Snowflake_Snowpark_1sttry_config.toml'):
    untrack_file('/content/drive/MyDrive/Colab Notebooks/Snowflake_Snowpark_1sttry_config.ini')

# **Snowflake Session**

In [6]:
# Read the configuration file using toml library
config = toml.load('/content/drive/MyDrive/Colab Notebooks/Snowflake_Snowpark_1sttry_config.toml')

# Check if any sections were loaded
if config.items():
    print("Sections found in Snowflake_Snowpark_1sttry_config.ini:", config.keys())
else:
    print("No sections found in Snowflake_Snowpark_1sttry_config.ini.")

Sections found in Snowflake_Snowpark_1sttry_config.ini: dict_keys(['snowflake'])


In [7]:

# Prepare the connection parameters
connection_parameters = {
    "account": config['snowflake']['account'],
    "user": config['snowflake']['user'],
    "password": config['snowflake']['password'],
    "role": config['snowflake']['role'],
    "warehouse": config['snowflake']['warehouse'],
    "database": config['snowflake']['database'],
    "schema": config['snowflake']['schema']
}

# Attempt to create the Snowpark session
try:
    session = Session.builder.configs(connection_parameters).create()
    print("Successfully connected to Snowflake.")
except Exception as e:
    print("Error connecting to Snowflake:", e)

Successfully connected to Snowflake.


In [8]:
# Commit changes to .gitignore
subprocess.run(['git', 'add', '.gitignore'])
subprocess.run(['git', 'commit', '-m', 'Update .gitignore to exclude config.ini'])

# Optional: Push to GitHub (Make sure you have set up remote and authenticated)
# subprocess.run(['git', 'push', 'origin', 'main'])  # Uncomment this if you want to push

print("Snowpark session created and .gitignore updated.")

Snowpark session created and .gitignore updated.


# **Data**

In [9]:
# form the query
showtables_qry = """
SELECT TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
AND TABLE_CATALOG = 'DE_PROJECT'
"""

# run the query
showtables_qry_result = session.sql(showtables_qry).collect()

# show tables

for row in showtables_qry_result:
  print(row[0])


RAW_CREDITCO_SALES
RAW_CREDITCO_SALES_ITEMS
SALES_ITEMS


In [10]:
sales = session.table("RAW_CREDITCO_SALES")
sales.show()

----------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"DATA"                                              |
----------------------------------------------------------------------------------------------------
|1               |2024-10-03 22:42:16.929000  |[                                                   |
|                |                            |  {                                                 |
|                |                            |    "creditCardNumber": "30211992825494",           |
|                |                            |    "creditCardType": "diners-club-carte-blanche",  |
|                |                            |    "dateTime": "2022-04-06 12:24:38",              |
|                |                            |    "id": "bad9cffb24fd560624859ae0c21f9135f5b0...  |
|                |                            |    "saleAmount": 302.2                     

In [11]:
sales_items = session.table("RAW_CREDITCO_SALES_ITEMS")
sales_items.show()

----------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"DATA"                                              |
----------------------------------------------------------------------------------------------------
|1               |2024-10-03 22:42:33.125000  |[                                                   |
|                |                            |  {                                                 |
|                |                            |    "creditCardNumber": "30211992825494",           |
|                |                            |    "dateTime": "2022-04-06 12:24:38",              |
|                |                            |    "id": "bad9cffb24fd560624859ae0c21f9135f5b0...  |
|                |                            |    "items": [                                      |
|                |                            |      "T-shirt",                            

In [12]:
sales_items_flat = (
    sales_items.join_table_function("flatten", col("data"))
    .withColumn("creditCardNumber", json_extract_path_text("value",lit("creditCardNumber")))
    .withColumn("dateTime", json_extract_path_text("value",lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("value",lit("id"))))
    .withColumn("items", (json_extract_path_text("value",lit("items"))))
    .select("ingestion_id","ingested_at","creditCardNumber","dateTime","id","items")
)

sales_items_flat.show()

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"CREDITCARDNUMBER"   |"DATETIME"           |"ID"                              |"ITEMS"                                             |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1               |2024-10-03 22:42:33.125000  |30211992825494       |2022-04-06 12:24:38  |690863fd43c06792f23fdc2700ae090e  |["T-shirt","Jeans","Sneakers"]                      |
|1               |2024-10-03 22:42:33.125000  |4405443328071522     |2023-02-16 10:21:46  |2b0de9a34332c8f27f11d07d11b4eaa2  |["Dress","Sandals"]                                 |
|1               |2024-10-03 22:42:33.125000  |3579855610552574     |2022-06-01 23:16:18  |be83739b5

In [13]:
# get system time as its on google colab so to understand why there's a difference in the ingested time.
print(datetime.datetime.now())

2024-10-07 01:57:56.437943


# **UDF**

In [14]:
# create a stage
session.sql("create or replace stage sf_sp_stage").collect()

[Row(status='Stage area SF_SP_STAGE successfully created.')]

In [15]:
# put udf file in the newly created stage
session.file.put("/content/drive/MyDrive/Colab Notebooks/Snowflake_Snowpark_1sttry_udf.py", "@SF_SP_STAGE", auto_compress=False)

[PutResult(source='Snowflake_Snowpark_1sttry_udf.py', target='Snowflake_Snowpark_1sttry_udf.py', source_size=67, target_size=80, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [16]:
# register the udf

sort_alphabetically = session.udf.register_from_file(
    file_path="@SF_SP_STAGE/Snowflake_Snowpark_1sttry_udf.py",
    func_name="sort_alphabetically",
    return_type=ArrayType(),
    input_types=[VariantType()],
    name = "sort_alphabetically",
    replace=True,
    stage_location="@SF_SP_STAGE",
    is_permanent=True
)

In [17]:
# applying UDF on the existing data

sales_items_flat_with_udf = (
    sales_items.join_table_function("flatten", col("data"))
    .withColumn("creditCardNumber", json_extract_path_text("value",lit("creditCardNumber")))
    .withColumn("dateTime", json_extract_path_text("value",lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("value",lit("id"))))
    .withColumn("items", sort_alphabetically(parse_json(json_extract_path_text("value",lit("items")))))
    .select("ingestion_id","ingested_at","creditCardNumber","dateTime","id","items")
)

sales_items_flat_with_udf.show()

-----------------------------------------------------------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"CREDITCARDNUMBER"   |"DATETIME"           |"ID"                              |"ITEMS"               |
-----------------------------------------------------------------------------------------------------------------------------------------------------
|1               |2024-10-03 22:42:33.125000  |30211992825494       |2022-04-06 12:24:38  |690863fd43c06792f23fdc2700ae090e  |[                     |
|                |                            |                     |                     |                                  |  "Jeans",            |
|                |                            |                     |                     |                                  |  "Sneakers",         |
|                |                            |                     |                     |         

In [18]:
# save the UDF transformed data into a table

sales_items_flat_with_udf.write.mode("overwrite").saveAsTable("sales_items")

In [19]:
# create a stage
session.sql("select * from sales_items").collect()

[Row(INGESTION_ID=1, INGESTED_AT=datetime.datetime(2024, 10, 3, 22, 42, 33, 125000), CREDITCARDNUMBER='30211992825494', DATETIME='2022-04-06 12:24:38', ID='690863fd43c06792f23fdc2700ae090e', ITEMS='[\n  "Jeans",\n  "Sneakers",\n  "T-shirt"\n]'),
 Row(INGESTION_ID=1, INGESTED_AT=datetime.datetime(2024, 10, 3, 22, 42, 33, 125000), CREDITCARDNUMBER='4405443328071522', DATETIME='2023-02-16 10:21:46', ID='2b0de9a34332c8f27f11d07d11b4eaa2', ITEMS='[\n  "Dress",\n  "Sandals"\n]'),
 Row(INGESTION_ID=1, INGESTED_AT=datetime.datetime(2024, 10, 3, 22, 42, 33, 125000), CREDITCARDNUMBER='3579855610552574', DATETIME='2022-06-01 23:16:18', ID='be83739b50b41880570c132f092d57ae', ITEMS='[\n  "Flip flops",\n  "Shirt",\n  "Shorts"\n]'),
 Row(INGESTION_ID=1, INGESTED_AT=datetime.datetime(2024, 10, 3, 22, 42, 33, 125000), CREDITCARDNUMBER='50383603573914002', DATETIME='2022-06-29 01:56:53', ID='7e25f863e835c2b79819a6a4f8e5ff98', ITEMS='[\n  "Blouse",\n  "Heels",\n  "Skirt"\n]'),
 Row(INGESTION_ID=1, INGEST

# **Merge**

In [20]:
# read local csv
new_sales_items_data = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/Snowflake_Snowpark_1sttry/new_sales_item_data.csv")

In [21]:
# lets look into the data

new_sales_items_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16 entries, 0 to 15
Data columns (total 6 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   INGESTION_ID        16 non-null     int64 
 1   INGESTED_AT         16 non-null     object
 2   CREDIT_CARD_NUMBER  16 non-null     int64 
 3   DATE_TIME           16 non-null     object
 4   ID                  16 non-null     object
 5   ITEMS               16 non-null     object
dtypes: int64(2), object(4)
memory usage: 896.0+ bytes


In [22]:
# lets look into the data
new_sales_items_data.head()

Unnamed: 0,INGESTION_ID,INGESTED_AT,CREDIT_CARD_NUMBER,DATE_TIME,ID,ITEMS
0,3,2023-04-06 08:28:53.892000,3534118906250666,2022-12-07 22:57:35,4cd1974f5be885220ab629a5228c2ebf,"[""Blazer"",""Slacks"",""Loafers""]"
1,3,2023-04-06 08:28:53.892000,3575177399469647,2023-02-11 20:01:45,0aba79867bd7d7b7823264e9408d095b,"[""Button-up Shirt"",""Khakis""]"
2,3,2023-04-06 08:28:53.892000,6333020302312124279,2023-03-09 11:22:30,0484eb3108c06aed5e6a354e5203d721,"[""Vest"",""Jeans"",""Sneakers""]"
3,3,2023-04-06 08:28:53.892000,3551078881931920,2023-02-12 22:10:49,5ff7f4e02b59ad20f55f53b94a77368f,"[""Crop Top"",""Shorts""]"
4,3,2023-04-06 08:28:53.892000,3537111546909475,2023-03-16 06:46:57,4b48465f9060140312d689dfd076a71f,"[""Sweater Dress"",""Tights"",""Heels""]"


In [23]:
# now lets write the new_sales_items_data data into temp table and show the new snowpark df

new_sales_items_data_snowpark_temp = session.write_pandas(
    df = new_sales_items_data,
    table_name= "new_sales_items_data",
    overwrite=True,
    auto_create_table=True,
    table_type="temp",
    quote_identifiers=False
)

new_sales_items_data_snowpark_temp.show()

--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"INGESTION_ID"  |"INGESTED_AT"               |"CREDIT_CARD_NUMBER"  |"DATE_TIME"          |"ID"                              |"ITEMS"                             |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|3               |2023-04-06 08:28:53.892000  |3534118906250666      |2022-12-07 22:57:35  |4cd1974f5be885220ab629a5228c2ebf  |["Blazer","Slacks","Loafers"]       |
|3               |2023-04-06 08:28:53.892000  |3575177399469647      |2023-02-11 20:01:45  |0aba79867bd7d7b7823264e9408d095b  |["Button-up Shirt","Khakis"]        |
|3               |2023-04-06 08:28:53.892000  |6333020302312124279   |2023-03-09 11:22:30  |0484eb3108c06aed5e6a354e5203d721  |["Vest","Jeans","Sneakers"]         |
|3        

In [24]:

# let do the actual merge

target = session.table("sales_items")

merged = target.merge(
    source=new_sales_items_data_snowpark_temp,
    join_expr=(target["ID"] == new_sales_items_data_snowpark_temp["ID"]),
    clauses=[
        when_not_matched().insert(
            {
                "INGESTION_ID" : new_sales_items_data_snowpark_temp["INGESTION_ID"],
                "INGESTED_AT" : new_sales_items_data_snowpark_temp["INGESTED_AT"],
                "creditCardNumber" : new_sales_items_data_snowpark_temp["CREDIT_CARD_NUMBER"],
                "DATETIME" : new_sales_items_data_snowpark_temp["DATE_TIME"],
                "ID" : new_sales_items_data_snowpark_temp["ID"],
                "ITEMS" : parse_json(new_sales_items_data_snowpark_temp["ITEMS"])
            }
        )
    ]

)

merged

MergeResult(rows_inserted=10, rows_updated=0, rows_deleted=0)

# **Read**

In [25]:
# uploading the local file into stage
session.file.put("/content/drive/MyDrive/Colab Notebooks/Snowflake_Snowpark_1sttry/new_sales.json","@SF_SP_STAGE",auto_compress = False)
session.file.put("/content/drive/MyDrive/Colab Notebooks/Snowflake_Snowpark_1sttry/new_sales_items.json","@SF_SP_STAGE",auto_compress = False)


[PutResult(source='new_sales_items.json', target='new_sales_items.json', source_size=3648, target_size=3664, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [26]:
# displaying the content of the file.

json = session.read.options({"STRIP_OUTER_ARRAY":True}).json("@SF_SP_STAGE/new_sales_items.json")
json.show()

------------------------------------------------------
|"$1"                                                |
------------------------------------------------------
|{                                                   |
|  "creditCardNumber": "3552372594286931",           |
|  "dateTime": "2022-12-15 01:17:57",                |
|  "id": "eeb62e335642e773c8c1ced13fe86c06e96d08...  |
|  "items": [                                        |
|    "Polo shirt",                                   |
|    "Shorts",                                       |
|    "Sandals"                                       |
|  ]                                                 |
|}                                                   |
|{                                                   |
|  "creditCardNumber": "5038306986883304079",        |
|  "dateTime": "2023-01-03 10:05:00",                |
|  "id": "ad187740d176791cd73aab91956c78974b720a...  |
|  "items": [                                        |
|    "Blue

# **Applying Automation**

In [38]:
extract_sales = (
    sales
    .join_table_function("flatten", col("data"))
    .withColumn("credit_card_number", json_extract_path_text("value",lit("creditCardNumber")))
    .withColumn("credit_card_type", json_extract_path_text("value",lit("creditCardType")))
    .withColumn("date_time", json_extract_path_text("value",lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("value",lit("id"))))
    .withColumn("sale_amount", (json_extract_path_text("value",lit("saleAmount"))))
    .select("credit_card_number","credit_card_type", "date_time","id","sale_amount")
)

In [39]:
extract_sales_items = (
    sales_items
    .join_table_function("flatten", col("data"))
    .withColumn("credit_card_number", json_extract_path_text("value",lit("creditCardNumber")))
    .withColumn("date_time", json_extract_path_text("value",lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("value",lit("id"))))
    .withColumn("items", sort_alphabetically(parse_json(json_extract_path_text("value",lit("items")))))
    .select("credit_card_number", "date_time","id","items")
)

In [40]:
join_sales_and_sales_items = (
    extract_sales.join(
        right = extract_sales_items,
        on = extract_sales.id == extract_sales_items.id,
        rsuffix = "_si"
    )
    .select(
        "credit_card_number",
        "date_time","id", "items",
        "sale_amount"
    )
)



In [41]:
join_sales_and_sales_items.write.mode("overwrite").save_as_table(
    table_name = "sales_data"
)

In [42]:
join_sales_and_sales_items.show()

------------------------------------------------------------------------------------------------------------------------
|"CREDIT_CARD_NUMBER"  |"DATE_TIME"          |"ID"                              |"ITEMS"               |"SALE_AMOUNT"  |
------------------------------------------------------------------------------------------------------------------------
|30211992825494        |2022-04-06 12:24:38  |690863fd43c06792f23fdc2700ae090e  |[                     |302.2          |
|                      |                     |                                  |  "Jeans",            |               |
|                      |                     |                                  |  "Sneakers",         |               |
|                      |                     |                                  |  "T-shirt"           |               |
|                      |                     |                                  |]                     |               |
|4405443328071522      |2023-02-