###  00-connectivity

In [None]:
''' ENV CONFIGURATION '''
# pip install python-dotenv
import os
from dotenv import load_dotenv
from pathlib import Path


dotenv_path = os.path.join(os.getcwd(), ".env")  # Replace ".env" with the actual file path if it's in a different location
load_dotenv(dotenv_path=dotenv_path, override=True)

ACCOUNT = os.environ.get("ACCOUNT", "default")
USER = os.environ.get("USER", "default")
PASSWORD = os.environ.get("PASSWORD", "default")
ROLE = os.environ.get("ROLE", "default")
DATABASE = os.environ.get("DATABASE", "default")
WAREHOUSE = os.environ.get("WAREHOUSE", "default")
# a3491fb2-000f-4d9f-943e-127cfe29c39c

## STEP 01 - CREATE USER AND WH 

In [None]:
from snowflake.snowpark import Session
import sys
import logging

# initiate logging at info level
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%I:%M:%S')

# snowpark session
def get_snowpark_session() -> Session:
    connection_parameters =  {
       "ACCOUNT":ACCOUNT,
        "USER":USER,
        "PASSWORD":PASSWORD,
        "ROLE":ROLE,
        "DATABASE":DATABASE,
        "SCHEMA":"ALF_SNOWPARK",
        "WAREHOUSE":WAREHOUSE,
        "session_parameters":{
            'QUERY_TAG': 'DevOPS_deployment',
            'use_openssl_only': True
        }
    }
    # print(connection_parameters)
    # creating snowflake session object
    return Session.builder.configs(connection_parameters).create()   

def main():
    session = get_snowpark_session()

    context_df = session.sql("select current_role(), current_database(), current_schema(), current_warehouse()")
    context_df.show(2)

    customer_df = session.sql("select c_custkey,c_name,c_phone,c_mktsegment from snowflake_sample_data.tpch_sf1.customer limit 10")
    customer_df.show(5)

if __name__ == '__main__':
    main()  


## STEP 02 - CREATE SCHEMA 

## STEP 03 - CREATE INTERNAL STAGE IN SOURCE SCHEMA 

## Step 4.2 Loading Data To Internal Stage Using Snowpark File API
Following Snowpark Program is using File API to read the data from local machine and loading into Snowpark Internal Stage.

In [None]:

def traverse_directory(directory, file_extension) -> list:
	local_file_path = []
	file_name = [] # list to csv file path 
	partition_dir = []
	print(directory)
	for root, dirs, files in os.walk(directory):
		for file in files:
			if file.endswith(file_extension):
				file_path = os.path.join(root, file)
				file_name.append(file)
				partition_dir.append(root.replace(directory, ''))
				local_file_path.append(file_path)

	return file_name, partition_dir, local_file_path


def main():
	# Specify the directory path to traverse
	# Get the current working directory
	current_dir = os.getcwd()
	current_dir = os.path.abspath('..')
	# Specify the path to the "dataset/sales" directory
	directory_path = os.path.join(current_dir, 'dataset', 'sales')

	# Print the path
	# print(directory_path)

	csv_file_name, csv_partition_dir , csv_local_file_path= traverse_directory(directory_path,'.csv')

	parquet_file_name, parquet_partition_dir , parquet_local_file_path= traverse_directory(directory_path,'.parquet')
	json_file_name, json_partition_dir , json_local_file_path= traverse_directory(directory_path,'.json')
	stage_location = "@DEMO_DB.ALF_SOURCE.MY_INTERNAL_STAGE"




	csv_index = 0

	for file_element in csv_file_name:
		csv_partition_dir[csv_index] = csv_partition_dir[csv_index].replace("\\" , "/")
		put_result = (
			get_snowpark_session().file.put(
				local_file_name=csv_local_file_path[csv_index],
				stage_location=rf"{stage_location+csv_partition_dir[csv_index]}",
			auto_compress=True, overwrite=True, parallel=10)
		)
		print(file_element," => ",put_result[0].status)
		csv_index+=1

	parquet_index = 0
	for file_element in parquet_file_name:
		parquet_partition_dir[parquet_index] = parquet_partition_dir[parquet_index].replace("\\" , "/")
		put_result = ( 
			get_snowpark_session().file.put( 
				local_file_name=parquet_local_file_path[parquet_index], 
				stage_location=rf"{stage_location+parquet_partition_dir[parquet_index]}",
				auto_compress=True, overwrite=True, parallel=10)
			)
		print(file_element," => ",put_result[0].status)
		parquet_index+=1


	json_index = 0
	for file_element in parquet_file_name:
		json_partition_dir[json_index] = json_partition_dir[json_index].replace("\\" , "/")
		put_result = ( 
			get_snowpark_session().file.put( 
				json_local_file_path[json_index], 
				# stage_location+"/"+json_partition_dir[json_index], 
				stage_location=rf"{stage_location+json_partition_dir[json_index]}",
				auto_compress=True, overwrite=True, parallel=10)

			)
		print(file_element," => ",put_result[0].status)
		json_index+=1 




if __name__ == '__main__':
	main()



## STEP 05: FILE FORMATS IN COMMON SCHEMA

## Step-5.1 Select Statements On Internal Stage (CSV, Parquet, JSON)


## STEP - 6: FOREX DATA 

## Step-7.1 Loading Data From Internal Stage to Source Tables


## 7.2 Source Table DDL Script


In [None]:

def ingest_in_sales(session)-> None:


    session.sql(" \
            copy into DEMO_DB.ALF_SOURCE.in_sales_order from ( \
            select \
        DEMO_DB.ALF_SOURCE.in_sales_order_seq.nextval, \
            t.$1::text as order_id, \
            t.$2::text as customer_name, \
            t.$3::text as mobile_key,\
            t.$4::number as order_quantity, \
            t.$5::number as unit_price, \
            t.$6::number as order_valaue,  \
            t.$7::text as promotion_code , \
            t.$8::number(10,2)  as final_order_amount,\
            t.$9::number(10,2) as tax_amount,\
            t.$10::date as order_dt,\
            t.$11::text as payment_status,\
            t.$12::text as shipping_status,\
            t.$13::text as payment_method,\
            t.$14::text as payment_provider,\
            t.$15::text as mobile,\
            t.$16::text as shipping_address,\
            metadata$filename as stg_file_name,\
            metadata$file_row_number as stg_row_numer,\
            metadata$file_last_modified as stg_last_modified\
            from \
            @DEMO_DB.ALF_SOURCE.MY_INTERNAL_STAGE/source=IN/format=csv/ \
            (                                                             \
                file_format => 'DEMO_DB.ALF_COMMON.my_csv_format'           \
            ) t  )  on_error = 'Continue'     \
            "
            ).collect()

def ingest_us_sales(session)-> None:
    session.sql(' \
            copy into DEMO_DB.ALF_SOURCE.us_sales_order                \
            from                                    \
            (                                       \
                select                              \
                DEMO_DB.ALF_SOURCE.us_sales_order_seq.nextval, \
                $1:"Order ID"::text as orde_id,   \
                $1:"Customer Name"::text as customer_name,\
                $1:"Mobile Model"::text as mobile_key,\
                to_number($1:"Quantity") as quantity,\
                to_number($1:"Price per Unit") as unit_price,\
                to_decimal($1:"Total Price") as total_price,\
                $1:"Promotion Code"::text as promotion_code,\
                $1:"Order Amount"::number(10,2) as order_amount,\
                to_decimal($1:"Tax") as tax,\
                $1:"Order Date"::date as order_dt,\
                $1:"Payment Status"::text as payment_status,\
                $1:"Shipping Status"::text as shipping_status,\
                $1:"Payment Method"::text as payment_method,\
                $1:"Payment Provider"::text as payment_provider,\
                $1:"Phone"::text as phone,\
                $1:"Delivery Address"::text as shipping_address,\
                metadata$filename as stg_file_name,\
                metadata$file_row_number as stg_row_numer,\
                metadata$file_last_modified as stg_last_modified\
                from                                \
                    @DEMO_DB.ALF_SOURCE.MY_INTERNAL_STAGE/source=US/format=parquet/\
                    (file_format => DEMO_DB.ALF_COMMON.my_parquet_format)\
                    ) on_error = continue \
            '
            ).collect()
    
def ingest_fr_sales(session)-> None:
    session.sql(' \
        copy into DEMO_DB.ALF_SOURCE.fr_sales_order                                \
        from                                                    \
        (                                                       \
            select                                              \
            DEMO_DB.ALF_SOURCE.fr_sales_order_seq.nextval,         \
            $1:"Order ID"::text as orde_id,                   \
            $1:"Customer Name"::text as customer_name,          \
            $1:"Mobile Model"::text as mobile_key,              \
            to_number($1:"Quantity") as quantity,               \
            to_number($1:"Price per Unit") as unit_price,       \
            to_decimal($1:"Total Price") as total_price,        \
            $1:"Promotion Code"::text as promotion_code,        \
            $1:"Order Amount"::number(10,2) as order_amount,    \
            to_decimal($1:"Tax") as tax,                        \
            $1:"Order Date"::date as order_dt,                  \
            $1:"Payment Status"::text as payment_status,        \
            $1:"Shipping Status"::text as shipping_status,      \
            $1:"Payment Method"::text as payment_method,        \
            $1:"Payment Provider"::text as payment_provider,    \
            $1:"Phone"::text as phone,                          \
            $1:"Delivery Address"::text as shipping_address ,    \
            metadata$filename as stg_file_name,\
            metadata$file_row_number as stg_row_numer,\
            metadata$file_last_modified as stg_last_modified\
            from                                                \
            @DEMO_DB.ALF_SOURCE.MY_INTERNAL_STAGE/source=FR/format=json/\
            (file_format => DEMO_DB.ALF_COMMON.my_json_format)\
            ) on_error=continue\
        '
        ).collect()

def main():

    #get the session object and get dataframe
    session = get_snowpark_session()

    #ingest in sales data
    ingest_in_sales(session)
    print('<IN Sals after copy>')
    session.sql('SELECT COUNT(*) FROM  DEMO_DB.ALF_SOURCE.IN_SALES_ORDER').show()

    #ingest in sales data
    ingest_us_sales(session) 
    print('<US Sals after copy>')
    session.sql('SELECT COUNT(*) FROM  DEMO_DB.ALF_SOURCE.US_SALES_ORDER').show()

    #ingest in sales data
    ingest_fr_sales(session)  
    print('<FR Sals after copy>') 
    session.sql('SELECT COUNT(*) FROM  DEMO_DB.ALF_SOURCE.FR_SALES_ORDER').show()

if __name__ == '__main__':
    main()