1. Import Packages

In [None]:
###################################################################################################
# Import Snowpark Package and a few Functions and Types
###################################################################################################
import snowflake.snowpark as snowpark
import yaml
from snowflake.snowpark.functions import col
from snowflake.snowpark.types import StructField, StructType, IntegerType, StringType, TimestampType, DoubleType, DateType, FloatType
from snowflake.core.stage import Stage


2. Load Parameters from yml file

In [None]:
###################################################################################################
# Load parameters
###################################################################################################
# Open and read the YAML file
with open('environment.yml', 'r') as file:
    params = yaml.safe_load(file)

# Access parameters
stage_path = params['stage_path']
train_file = params['train_file']
weather_file = params['weather_file']

3. Prepare stage

In [None]:
##################################################################################################
# Create database, schema and prepare external stage
###################################################################################################
CREATE DATABASE IF NOT EXISTS NY_TAXI_DATA;
CREATE SCHEMA IF NOT EXISTS NY_TAXI_DATA.RAW;
CREATE SCHEMA IF NOT EXISTS NY_TAXI_DATA.PROCESS;
CREATE OR ALTER FILE FORMAT csv_format TYPE = CSV skip_header = 1;
CREATE STAGE IF NOT EXISTS NY_TAXI_DATA.PROCESS.S3_STAGE_TAXI STORAGE_INTEGRATION = S3INTEGRATION URL = stage_path FILE_FORMAT = csv_format;

4. Ingest data from stage into table

In [None]:
###################################################################################################
# Load data from external stage into raw layer
###################################################################################################
from snowflake.snowpark.context import get_active_session
session = get_active_session()
taxi_data_schema =  StructType([StructField("id",StringType()),\
                        StructField("vendor_id",IntegerType()),\
                        StructField("pickup_datetime",TimestampType()),\
                        StructField("dropoff_datetime",TimestampType()),\
                        StructField("passenger_count",IntegerType()),\
                        StructField("pickup_longitude",DoubleType()),\
                        StructField("pickup_latitude",DoubleType()),\
                        StructField("dropoff_longitude",DoubleType()),\
                        StructField("dropoff_latitude",DoubleType()),\
                        StructField("store_and_fwd_flag",StringType(1)),\
                        StructField("trip_duration",IntegerType())])

session.sql('DROP TABLE IF EXISTS NY_TAXI_DATA.RAW.TAXI_DATA;').collect()

# Create a Dataframe from our taxi_data file from our stage
file_path=train_file
stage_read_df = session.read.options({
        "skip_header": 1
    }).schema(taxi_data_schema).csv(file_path)

# Save our Dataframe as a taxi_data table in our ny_taxi_data DB and Raw Schema
stage_read_df.write.mode("overwrite").save_as_table("NY_TAXI_DATA.RAW.TAXI_DATA")


In [None]:
###################################################################################################
# Load weather data 
###################################################################################################
from snowflake.snowpark.context import get_active_session
session = get_active_session()
taxi_data_schema =  StructType([StructField("date",StringType()),\
                    StructField("maximum_temperature",IntegerType()),\
                    StructField("minimum_temperature",IntegerType()),\
                    StructField("average_temperature",FloatType()),\
                    StructField("precipitation",StringType()),\
                    StructField("snow_fall",StringType()),\
                    StructField("snow_depth",StringType())                      ])
session.sql('DROP TABLE IF EXISTS NY_TAXI_DATA.RAW.WEATHER;').collect()
file_path=weather_file
stage_read_df = session.read.options({
        "skip_header": 1
    }).schema(taxi_data_schema).csv(file_path)

# Save our Dataframe as a taxi_data table in our ny_taxi_data DB and Raw Schema
stage_read_df.write.mode("overwrite").save_as_table("NY_TAXI_DATA.RAW.WEATHER")
