#### Why Snowpark?
The purpose of Snowpark lies in its ability to provide more flexibility, scalability, and integration for data processing and orchestration tasks.|

Orchestrating Jobs and Pipelines: We can automate Snowpark-based pipelines (UDFs, views, and other transformations) in a more flexible way. Snowpark code can be versioned and tested, making it easier to maintain and extend in your pipeline.

#### Note:
This notebook is to validate, clean and update data in tables of higher priority in terms of updating speed.

Data tables that need a faster updating speed will be processed here.

#### Desired Flow
1. Grab data from raw tables
2. Check for data, validate, check and update cleaned data onto the cleaned tables
3. Join tables via foreign keys and make hybrid master table
4. Create Schema Views

#### 1. Loading Raw Data In

Since we already push our data onto Snowflake, we can call for them in this notebook to run in Snowpark. This will be the first step to the data flow overview for establishing the CI/CD deployment & finalizing the ELT pipepline.

##### Tables in this notebook
- Sales.CustomerTransactions 
- Sales.CustomerCategories 
- Application.People 
- Purchasing.SupplierTransactions 
- Purchasing.PurchaseOrderLines 
- Warehouse.ColdRoomTemperatures 
- Warehouse.VehicleTemperatures
- Sales.Orders 
- Purchasing.PurchaseOrders
- Warehouse.StockItemTransactions
- Warehouse.StockItems
- Warehouse.StockItemHoldings
- Sales.Customers
- Sales.Invoices
- Sales.InvoiceLines
- Sales.Orderlines

In [None]:
# RAW TABLES
# ---------------------------------------------------------------------------------------------
import time
from snowflake.snowpark import Session

# Define all tables organized by categories or schemas (CALL FOR RAW TABLES)
TABLE_DICT = {
    "application": {
        "schema": "KN_LOGISTICS.SNOWSQL", 
        "tables": [
            "APPLICATION_PEOPLE_RAW"
        ]
    },
    "purchasing": {
        "schema": "KN_LOGISTICS.SNOWSQL",
        "tables": [
            "PURCHASING_PURCHASEORDERLINES_RAW",
            "PURCHASING_PURCHASEORDERS_RAW"
        ]
    },
    "sales": {
        "schema": "KN_LOGISTICS.SNOWSQL",
        "tables": [
            "SALES_CUSTOMERCATEGORIES_RAW",
            "SALES_CUSTOMERS_RAW",
            "SALES_CUSTOMERTRANSACTIONS_RAW",
            "SALES_INVOICELINES_RAW",
            "SALES_INVOICES_RAW",
            "SALES_ORDERLINES_RAW",
            "SALES_ORDERS_RAW"
        ]
    },
    "warehouse": {
        "schema": "KN_LOGISTICS.SNOWSQL",
        "tables": [
            "WAREHOUSE_COLDROOMTEMPERATURES_RAW",
            "WAREHOUSE_STOCKITEMHOLDINGS_RAW",
            "WAREHOUSE_STOCKITEMS_RAW",
            "WAREHOUSE_STOCKITEMTRANSACTIONS_RAW",
            "WAREHOUSE_VEHICLETEMPERATURES_RAW"
        ]
    }
}

def load_raw_table(session, tname=None, schema=None):
    # Adjusted for direct use (no S3 staging assumed in your case)
    session.use_schema(schema)
    print(f"Loading table: {tname}")
    # If additional logic for transformations/loading is needed, add it here
    df = session.table(tname)
    df.show()  # Example action to verify table content

def load_all_tables(session):
    for category, data in TABLE_DICT.items():
        schema = data['schema']
        tables = data['tables']
        for tname in tables:
            load_raw_table(session, tname=tname, schema=schema)

def validate_tables(session):
    for category, data in TABLE_DICT.items():
        schema = data['schema']
        tables = data['tables']
        for tname in tables:
            session.use_schema(schema)
            print(f"Validating table: {tname}")
            print(f"Columns: {session.table(tname).columns}")

In [None]:
# Add the utils package to our path and import the snowpark_utils function
import os, sys
current_dir = os.getcwd()
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)

In [None]:
#from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
load_all_tables(session)

In [None]:
validate_tables(session)

#### 2. Check for data, validate, check and update cleaned data onto the cleaned tables
After pulling in the raw data, we can check and validate these raw data to make sure they are of a certain format eligible to be pushed over to the cleaned tables. 

If not, we will clean the tables accordingly then update them over to the cleaned tables.

Some of the validation we can do is:
- Check for null values
- Check for duplicates in PK and UQ
- Check for invalid datetypes & text formats

After cleaning the data, we need to validate ONE MORE TIME to make sure the data has not been imported into the cleaned tables before. If validation succeeds, we update the current records over to the cleaned tables.
- Check if the data records exist in the cleaned tables (checkj if they are identitcal)

#### 4. Create SNOWSQL View
We will simplify the SNOWSQL schema by joining together the tables and picking only the columns we need. This will be done using Snowpark Dataframe API. Then we'll create a Snowflake stream on that view so that we can incrementally process changes to any of the SNOWSQL tables.

This setup is crucial for ensuring that once the initial data is loaded, we can efficiently manage incremental updates to the SNOWSQL data through the Snowflake stream.

This joining of tables can only be done when the group has finalized cleaning the tables in Snowflake (Snowsight).

In [None]:
# SNOWFLAKE ADVANTAGE: Snowpark DataFrame API
# SNOWFLAKE ADVANTAGE: Streams for incremental processing (CDC)
# SNOWFLAKE ADVANTAGE: Streams on views

### I NEED HELP ON JOINING THE DATA HERE!!!!!!
# ---------------------------------------------------------------------------------------------

from snowflake.snowpark import Session
import snowflake.snowpark.functions as F
from snowflake.snowpark.functions import col

def create_pos_view(session):
    session.use_schema('SNOWSQL')

    # Define DataFrames for each table with selected columns
    application_cities = session.table("KN_LOGISTICS.SNOWSQL.APPLICATION_CITIES").select(
        F.col("CITYID"),
        F.col("CITYNAME"),
        F.col("COUNTRYID"),
        F.col("LATITUDE"),
        F.col("LONGITUDE"),
        F.col("LATESTRECORDEDPOPULATION")
    )

    application_countries_sea = session.table("KN_LOGISTICS.SNOWSQL.APPLICATION_COUNTRIES_SEA").select(
        F.col("COUNTRYID"),
        F.col("COUNTRYNAME"),
        F.col("FORMALNAME"),
        F.col("LATESTRECORDEDPOPULATION"),
        F.col("CONTINENT"),
        F.col("REGION"),
        F.col("SUBREGION")
    )

    application_deliverymethods = session.table("KN_LOGISTICS.SNOWSQL.APPLICATION_DELIVERYMETHODS").select(
        F.col("DELIVERYMETHODID"),
        F.col("DELIVERYMETHODNAME")
    )

    application_paymentmethods = session.table("KN_LOGISTICS.SNOWSQL.APPLICATION_PAYMENTMETHODS").select(
        F.col("PAYMENTMETHODID"),
        F.col("PAYMENTMETHODNAME")
    )

    application_transactiontypes = session.table("KN_LOGISTICS.SNOWSQL.APPLICATION_TRANSACTIONTYPES").select(
        F.col("TRANSACTIONTYPEID"),
        F.col("TRANSACTIONTYPENAME")
    )

    application_people = session.table("KN_LOGISTICS.SNOWSQL.APPLICATION_PEOPLE").select(
        F.col("PERSONID"),
        F.col("FULLNAME"),
        F.col("ISEMPLOYEE"),
        F.col("ISSALESPERSON"),
        F.col("PREFERREDNAME"),
        F.col("SEARCHNAME")
    )

    purchasing_purchaseorderlines = session.table("KN_LOGISTICS.SNOWSQL.PURCHASING_PURCHASEORDERLINES").select(
        F.col("DESCRIPTION"),
        F.col("EXPECTEDUNITPRICEPEROUTER"),
        F.col("ISORDERLINEFINALIZED"),
        F.col("LASTRECEIPTDATE"),
        F.col("ORDERDOUTERS"),
        F.col("PACKAGETYPEID"),
        F.col("PURCHASEORDERID"),
        F.col("PURCHASEORDERLINEID"),
        F.col("RECEIVEDOUTERS"),
        F.col("STOCKITEMID")
    )

    purchasing_purchaseorders = session.table("KN_LOGISTICS.SNOWSQL.PURCHASING_PURCHASEORDERS").select(
        F.col("CONTACTPERSONID"),
        F.col("DELIVERYMETHODID"),
        F.col("EXPECTEDDELIVERYDATE"),
        F.col("ISORDERFINALIZED"),
        F.col("ORDERDATE"),
        F.col("PURCHASEORDERID"),
        F.col("SUPPLIERID"),
        F.col("SUPPLIERREFERENCE")
    )

    purchasing_suppliercategories = session.table("KN_LOGISTICS.SNOWSQL.PURCHASING_SUPPLIERCATEGORIES").select(
        F.col("SUPPLIERCATEGORYID"),
        F.col("SUPPLIERCATEGORYNAME")
    )

    purchasing_suppliers = session.table("KN_LOGISTICS.SNOWSQL.PURCHASING_SUPPLIERS").select(
        F.col("ALTERNATECONTACTPERSONID"),
        F.col("DELIVERYMETHODID"),
        F.col("PAYMENTDAYS"),
        F.col("PHONENUMBER"),
        F.col("PRIMARYCONTACTPERSONID"),
        F.col("SUPPLIERCATEGORYID"),
        F.col("SUPPLIERID"),
        F.col("SUPPLIERNAME"),
        F.col("SUPPLIERREFERENCE"),
        F.col("WEBSITEURL")
    )

    purchasing_suppliertransactions = session.table("KN_LOGISTICS.SNOWSQL.PURCHASING_SUPPLIERTRANSACTIONS").select(
        F.col("AMOUNTEXCLUDINGTAX"),
        F.col("FINALIZATIONDATE"),
        F.col("ISFINALIZED"),
        F.col("LASTCOSTPRICE_NUM"),
        F.col("OUTSTANDINGBALANCE"),
        F.col("PAYMENTMETHODID"),
        F.col("PURCHASEORDERID"),
        F.col("SUPPLIERID"),
        F.col("SUPPLIERINVOICENUMBER"),
        F.col("SUPPLIERTRANSACTIONID"),
        F.col("TAXAMOUNT"),
        F.col("TRANSACTIONAMOUNT"),
        F.col("TRANSACTIONDATE"),
        F.col("TRANSACTIONTYPEID")
    )

    sales_buyinggroups = session.table("KN_LOGISTICS.SNOWSQL.SALES_BUYINGGROUPS").select(
        F.col("BUYINGGROUPID"),
        F.col("BUYINGGROUPNAME")
    )

    sales_customercategories = session.table("KN_LOGISTICS.SNOWSQL.SALES_CUSTOMERCATEGORIES").select(
        F.col("CUSTOMERCATEGORYID"),
        F.col("CUSTOMERCATEGORYNAME")
    )

    
    sales_customers = session.table("KN_LOGISTICS.SNOWSQL.SALES_CUSTOMERS").select(
        F.col("CUSTOMERID"),
        F.col("CUSTOMERNAME"),
        F.col("BILLTOCUSTOMERID"),
        F.col("CUSTOMERCATEGORYID"),
        F.col("BUYINGGROUPID"),
        F.col("PRIMARYCONTACTPERSONID"),
        F.col("ALTERNATECONTACTPERSONID"),
        F.col("DELIVERYMETHODID"),
        F.col("DELIVERYCITYID"),
        F.col("CREDITLIMIT"),
        F.col("ACCOUNTOPENEDDATE"),
        F.col("STANDARDDISCOUNTPERCENTAGE"),
        F.col("ISSTATEMENTSENT"),
        F.col("ISONCREDITHOLD"),
        F.col("PAYMENTDAYS"),
        F.col("PHONENUMBER"),
        F.col("WEBSITEURL")
    )

    sales_customertransactions = session.table("KN_LOGISTICS.SNOWSQL.SALES_CUSTOMERTRANSACTIONS").select(
        F.col("AMOUNTEXCLUDINGTAX"),
        F.col("CUSTOMERID"),
        F.col("CUSTOMERTRANSACTIONID"),
        F.col("FINALIZATIONDATE"),
        F.col("INVOICEID"),
        F.col("ISFINALIZED"),
        F.col("OUTSTANDINGBALANCE"),
        F.col("PAYMENTMETHODID"),
        F.col("TAXAMOUNT"),
        F.col("TRANSACTIONAMOUNT"),
        F.col("TRANSACTIONDATE"),
        F.col("TRANSACTIONTYPEID")
    )

    sales_invoicelines = session.table("KN_LOGISTICS.SNOWSQL.SALES_INVOICELINES").select(
        F.col("DESCRIPTION"),
        F.col("EXTENDEDPRICE"),
        F.col("INVOICEID"),
        F.col("INVOICELINEID"),
        F.col("LINEPROFIT"),
        F.col("PACKAGETYPEID"),
        F.col("QUANTTY"),
        F.col("STOCKITEMID"),
        F.col("TAXAMOUNT"),
        F.col("TAXRATE"),
        F.col("UNITPRICE")
    )

    sales_invoices = session.table("KN_LOGISTICS.SNOWSQL.SALES_INVOICES").select(
        F.col("ACCOUNTSPERSONID"),
        F.col("BILLTOCUSTOMERID"),
        F.col("CONFIRMEDDELIVERYTIME"),
        F.col("CONFIRMEDRECEIVEDBY"),
        F.col("CONTACTPERSONID"),
        F.col("CUSTOMERID"),
        F.col("CUSTOMERPURCHASEORDERNUMBER"),
        F.col("DELIVERYMETHODID"),
        F.col("INVOICEDATE"),
        F.col("INVOICEID"),
        F.col("ORDERID"),
        F.col("PACKEDBYPERSONID"),
        F.col("SALESPERSONPERSONID"),
        F.col("TOTALCHILLERITEMS"),
        F.col("TOTALDRYITEMS")
    )

    sales_orderlines = session.table("KN_LOGISTICS.SNOWSQL.SALES_ORDERLINES").select(
        F.col("DESCRIPTION"),
        F.col("ORDERID"),
        F.col("ORDERLINEID"),
        F.col("PACKAGETYPEID"),
        F.col("PICKEDQUANTITY"),
        F.col("PICKINGCOMPLETEDWHEN"),
        F.col("QUANTITY"),
        F.col("STOCKITEMID"),
        F.col("TAXRATE"),
        F.col("UNITPRICE")
    )

    sales_ordes = session.table("KN_LOGISTICS.SNOWSQL.SALES_ORDERS").select(
        F.col("CONTACTPERSONID"),
        F.col("CUSTOMERID"),
        F.col("CUSTOMERPURCHASEORDERNUMBER"),
        F.col("EXPECTEDDELIVERYDATE"),
        F.col("ISUNDERSUPPLYBACKORDERED"),
        F.col("ORDERDATE"),
        F.col("ORDERID"),
        F.col("PICKEDBYPERSONID"),
        F.col("PICKINGCOMPLETEDWHEN"),
        F.col("SALESPERSONPERSONID")
    )

    warehouse_coldroomtemperatures = session.table("KN_LOGISTICS.SNOWSQL.WAREHOUSE_COLDROOMTEMPERATURES").select(
        F.col("COLDROOMSENSORNUMBER"),
        F.col("COLDROOMTEMPERATUREID"),
        F.col("RECORDEDWHEN"),
        F.col("RECORDEDWHEN_T"),
        F.col("RECORDEDWHEN_TS"),
        F.col("TEMPERATURE"),
        F.col("TEMPERATURE_FLOAT")
    )

    warehouse_colors = session.table("KN_LOGISTICS.SNOWSQL.WAREHOUSE_COLORS").select(
        F.col("COLORID"),
        F.col("COLORNAME")
    )

    warehouse_packagetypes = session.table("KN_LOGISTICS.SNOWSQL.WAREHOUSE_PACKAGETYPES").select(
        F.col("PACKAGETYPEID"),
        F.col("PACKAGETYPENAME")
    )
    
    warehouse_stockgroups = session.table("KN_LOGISTICS.SNOWSQL.WAREHOUSE_STOCKGROUPS").select(
        F.col("STOCKGROUPID"),
        F.col("STOCKGROUPNAME")
    )

    warehouse_stockitemholdings = session.table("KN_LOGISTICS.SNOWSQL.WAREHOUSE_STOCKITEMHOLDINGS").select(
        F.col("BINLOCATION"),
        F.col("LASTCOSTPRICE"),
        F.col("LASTSTOCKTAKEQUANTITY"),
        F.col("QUANTITYONHAND"),
        F.col("REORDERLEVEL"),
        F.col("STOCKITEMID"),
        F.col("TARGETSTOCKLEVEL")
    )
    
    warehouse_stockitems = session.table("KN_LOGISTICS.SNOWSQL.WAREHOUSE_STOCKITEMS").select(
        F.col("STOCKITEMID"),
        F.col("STOCKITEMNAME"),
        F.col("UNITPACKAGEID"),
        F.col("OUTERPACKAGEID"),
        F.col("BRAND"),
        F.col("SIZE"),
        F.col("LEADTIMEDAYS"),
        F.col("QUANTITYPEROUTER"),
        F.col("ISCHILLERSTOCK"),
        F.col("TAXRATE"),
        F.col("UNITPRICE"),
        F.col("RECOMMENDEDRETAILPRICE"),
        F.col("TYPICALWEIGHTPERUNIT"),
        F.col("SUPPLIERID"),
        F.col("COLORID")
    )

    warehouse_stockitemtransactions = session.table("KN_LOGISTICS.SNOWSQL.WAREHOUSE_STOCKITEMTRANSACTIONS").select(
        F.col("CUSTOMERID"),
        F.col("INVOICEID"),
        F.col("PURCHASEORDERID"),
        F.col("QUANTITY"),
        F.col("STOCKITEMID"),
        F.col("STOCKITEMTRANSACTIONID"),
        F.col("SUPPLIERID"),
        F.col("TRANSACTIONOCCURREDWHEN"),
        F.col("TRANSACTIONTYPEID")
    )

    warehouse_vehicletemperatures = session.table("KN_LOGISTICS.SNOWSQL.WAREHOUSE_VEHICLETEMPERATURES").select(
        F.col("CHILLERSENSORNUMBER"),
        F.col("FULLSENSORDATA"),
        F.col("RECORDEDWHEN"),
        F.col("TEMPERATURE"),
        F.col("VEHICLEREGISTRATION"),
        F.col("VEHICLETEMPERATUREID")
    )





    
    # Join Cities and Countries
    cities_with_countries = application_cities.join(
        application_countries_sea,
        application_cities["COUNTRYID"] == application_countries_sea["COUNTRYID"]
    ).select(
        application_cities["CITYID"],
        application_cities["CITYNAME"],
        application_countries_sea["COUNTRYNAME"],
        application_countries_sea["REGION"],
        application_cities["LATITUDE"],
        application_cities["LONGITUDE"]
    )

    # Join Purchasing Tables
    purchase_orders_with_details = purchasing_purchaseorders.join(
        purchasing_suppliers,
        purchasing_purchaseorders["SUPPLIERID"] == purchasing_suppliers["SUPPLIERID"]
    ).join(
        application_people,
        purchasing_purchaseorders["CONTACTPERSONID"] == application_people["PERSONID"]
    ).join(
        application_deliverymethods,
        purchasing_purchaseorders["DELIVERYMETHODID"] == application_deliverymethods["DELIVERYMETHODID"]
    ).select(
        purchasing_purchaseorders["PURCHASEORDERID"],
        purchasing_purchaseorders["ORDERDATE"],
        application_people["FULLNAME"].alias("CONTACT_PERSON"),
        purchasing_suppliers["SUPPLIERNAME"],
        application_deliverymethods["DELIVERYMETHODNAME"]
    )

    # Join Sales Tables
    sales_data = sales_invoices.join(
        sales_customers,
        sales_invoices["CUSTOMERID"] == sales_customers["CUSTOMERID"]
    ).join(
        sales_orderlines,
        sales_invoices["ORDERID"] == sales_orderlines["ORDERID"]
    ).join(
        application_people,
        sales_invoices["CONTACTPERSONID"] == application_people["PERSONID"]
    ).select(
        sales_invoices["INVOICEID"],
        sales_invoices["INVOICEDATE"],
        sales_customers["CUSTOMERNAME"],
        application_people["FULLNAME"].alias("CONTACT_PERSON"),
        sales_orderlines["DESCRIPTION"],
        sales_orderlines["QUANTITY"],
        sales_orderlines["UNITPRICE"]
    )

    # Join Stock Tables
    stock_with_transactions = warehouse_stockitems.join(
        warehouse_stockitemtransactions,
        warehouse_stockitems["STOCKITEMID"] == warehouse_stockitemtransactions["STOCKITEMID"]
    ).join(
        warehouse_colors,
        warehouse_stockitems["COLORID"] == warehouse_colors["COLORID"]
    ).select(
        warehouse_stockitems["STOCKITEMNAME"],
        warehouse_colors["COLORNAME"],
        warehouse_stockitemtransactions["TRANSACTIONOCCURREDWHEN"],
        warehouse_stockitemtransactions["QUANTITY"]
    )

    # Final Unified DataFrame
    final_dataframe = sales_data.join(
        stock_with_transactions,
        sales_data["DESCRIPTION"] == stock_with_transactions["STOCKITEMNAME"],
        how="left"
    #).join(
    #    cities_with_countries,
     #   sales_customers["DELIVERYCITYID"] == cities_with_countries["CITYID"],
      #  how="left"
    ).join(
        purchase_orders_with_details,
        sales_data["INVOICEID"] == purchase_orders_with_details["PURCHASEORDERID"],
        how="left"
    ).select(
        sales_data["INVOICEID"],
        sales_data["INVOICEDATE"],
        sales_data["CUSTOMERNAME"],
        sales_data["CONTACT_PERSON"],
        sales_data["DESCRIPTION"],
        sales_data["QUANTITY"],
        stock_with_transactions["COLORNAME"],
        #cities_with_countries["CITYNAME"].alias("DELIVERY_CITY"),
        #cities_with_countries["COUNTRYNAME"].alias("DELIVERY_COUNTRY"),
        purchase_orders_with_details["SUPPLIERNAME"],
        purchase_orders_with_details["DELIVERYMETHODNAME"]
    )

    final_dataframe.create_or_replace_view("SQL_FLATTENED_V")
    

def create_pos_view_stream(session):
    session.use_schema('SNOWSQL')
    _ = session.sql('CREATE OR REPLACE STREAM SNOWSQL_FLATTENED_V_STREAM \
                        ON VIEW SQL_FLATTENED_V \
                        SHOW_INITIAL_ROWS = TRUE').collect()

def test_pos_view(session):
    session.use_schema('SNOWSQL')
    tv = session.table('SQL_FLATTENED_V')
    tv.limit(5).show()


In [None]:
# use this place to validate the tables
# test the code
create_pos_view(session)

In [None]:
# visualize
# Test the view created by create_pos_view
def validate_view(session):
    session.use_schema('SNOWSQL')
    # Load the view into a Snowpark DataFrame
    df = session.table("SQL_FLATTENED_V")
    # Display the first few rows for validation
    df.show()

validate_view(session)

## Data UDF

During this step we will be creating and deploying our first Snowpark Python object to Snowflake, a user-defined function (or UDF). To begin with the UDF will be very basic, but in a future step we'll update it to include a third-party Python package. Also in this step you will be introduced to the new SnowCLI, a new developer command line tool. SnowCLI makes building and deploying Snowpark Python objects to Snowflake a consistent experience for the developer. More details below on SnowCLI.

## Orders Update Sproc

During this step we will be creating and deploying our first Snowpark Python stored procedure (or sproc) to Snowflake. This sproc will merge changes from the `HARMONIZED.POS_FLATTENED_V_STREAM` stream into our target `HARMONIZED.ORDERS` table.

### Running the Sproc Locally
To test the procedure locally, you will execute the following script.

In [None]:
# close session
session.close()