# Data Engineering with Snowpark - Python

This notebook utilizes Snowflake Snowpark for Python to:
- Load raw parquet data files into Snowflake from S3
- Create a harmonized work-area to merge change-data (CDC) over views
- Create a denormalized table for analytics downstream


#### A more comprehensive version of this notebook can be found [here](https://quickstarts.snowflake.com/guide/data_engineering_pipelines_with_snowpark_python/index.html?index=..%2F..index#0) as a Snowflake Quickstart


---

## Step 1 - Import Necessary Libraries

If any of the packages do not exist in your Python environment you can install them using conda or pip. 
Here is an example of installing seaborn visualization package

!conda install --yes --prefix {sys.prefix} seaborn*

In [1]:
# Snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
#import from snowflake.snowpark.types
from snowflake.snowpark.types import *
from snowflake.snowpark.version import VERSION

# Pandas & json
import pandas as pd
import numpy as np
import json

# Progress bar
from tqdm.notebook import tqdm

# Plotting
import plotly.express as px

# Time
import time

---

## Step 2 - Connect to Snowflake

A json file contains credentials locally, using which a Snowpark session is created

In [3]:
# Read credentials
with open("creds_vshiv.json") as f:
    connection_parameters = json.load(f)
session = Session.builder.configs(connection_parameters).create()

In [7]:

# Session details
print(f'Database.Schema  : {session.get_fully_qualified_current_schema()}')
print(f'Warehouse \t : {session.get_current_warehouse()}')
print(f'Role \t\t : {session.get_current_role()}')

session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = SMALL").collect()[0][0]


Database.Schema  : "HOL_DB"."ANALYTICS"
Warehouse 	 : "HOL_WH"
Role 		 : "HOL_ROLE"


'Statement executed successfully.'

In [5]:
## Test loading all in one

# Optional - if the database needs to be torndown and recreated
DASHES = '-'*50
session.sql("CREATE OR REPLACE DATABASE HOL_DB").collect()


# -- Schemas
session.sql("CREATE OR REPLACE SCHEMA EXTERNAL").collect()
session.sql("CREATE OR REPLACE SCHEMA RAW_POS").collect()
session.sql("CREATE OR REPLACE SCHEMA RAW_CUSTOMER").collect()
session.sql("CREATE OR REPLACE SCHEMA HARMONIZED").collect()
session.sql("CREATE OR REPLACE SCHEMA ANALYTICS").collect()

# -- External Frostbyte objects
session.sql(
    "CREATE OR REPLACE FILE FORMAT EXTERNAL.PARQUET_FORMAT\
    TYPE = PARQUET\
    COMPRESSION = SNAPPY"
).collect()

session.sql(
    "CREATE OR REPLACE STAGE EXTERNAL.FROSTBYTE_RAW_STAGE\
     URL = 's3://sfquickstarts/data-engineering-with-snowpark-python/'"
).collect();


# Define locations and objects
POS_TABLES = [
    "country",
    "franchise",
    "location",
    "menu",
    "truck",
    # "order_header",
    # "order_detail"
    ]

TABLE_DICT = {"pos": {"schema": "RAW_POS", "tables": POS_TABLES}}


In [6]:

# Load data
for s3dir, data in TABLE_DICT.items():
    tnames = data["tables"]
    schema = data["schema"]

    for tname in tnames:
        location = "@external.frostbyte_raw_stage/"+s3dir+"/"+tname

        tname = ['HOL_DB', schema, tname]
        print(f'\n{DASHES}\nLoading {tname}..\n{DASHES}\n')

        df = session.read.option("compression", "snappy").parquet(location)
        copied_into_result = df.copy_into_table(tname)
    
        # Print result
        out_dict = copied_into_result[0].asDict()
        print("Completed!")
        
        for i in out_dict:
            if i in ['status']:
                print(f"{i.replace('_',' ').title()}     : {out_dict[i]}")
            elif i in ['rows_loaded','rows_parsed','errors_seen']:
                print(f"{i.replace('_',' ').title()}: {out_dict[i]:,}")


--------------------------------------------------
Loading ['HOL_DB', 'RAW_POS', 'country']..
--------------------------------------------------

Completed!
Status     : LOADED
Rows Parsed: 30
Rows Loaded: 30
Errors Seen: 0

--------------------------------------------------
Loading ['HOL_DB', 'RAW_POS', 'franchise']..
--------------------------------------------------

Completed!
Status     : LOADED
Rows Parsed: 335
Rows Loaded: 335
Errors Seen: 0

--------------------------------------------------
Loading ['HOL_DB', 'RAW_POS', 'location']..
--------------------------------------------------

Completed!
Status     : LOADED
Rows Parsed: 13,093
Rows Loaded: 13,093
Errors Seen: 0

--------------------------------------------------
Loading ['HOL_DB', 'RAW_POS', 'menu']..
--------------------------------------------------

Completed!
Status     : LOADED
Rows Parsed: 100
Rows Loaded: 100
Errors Seen: 0

--------------------------------------------------
Loading ['HOL_DB', 'RAW_POS', 'truck