In [1]:
import snowflake.snowpark as snp
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T

## Data Engineering
We begin where all ML use cases do: data engineering. We will extract the data from the source system, load it into snowflake.

Input: Historical bulk data in `csv` format. 
Output: `SQUIRRELLY_DATA` table

### 1. Load credentials and connect to Snowflake

In [2]:
from steps.snowpark_connection import snowpark_connect
session, state_dict = snowpark_connect('./include/state.json')

### 2. Create a stage for loading data to Snowflake


In [3]:
state_dict['load_stage_name']='LOAD_STAGE' 
state_dict['table_name']='SQUIRRELLY_DATA'

import json
with open('./include/state.json', 'w') as sdf:
    json.dump(state_dict, sdf)

In [4]:
def reset_database(session, state_dict:dict):
    _ = session.sql('CREATE OR REPLACE DATABASE '+state_dict['connection_parameters']['database']).collect()
    _ = session.sql('CREATE SCHEMA '+state_dict['connection_parameters']['schema']).collect() 
    _ = session.sql('CREATE STAGE IF NOT EXISTS '+state_dict['load_stage_name']).collect()

In [5]:
reset_database(session, state_dict)

In [6]:
session.use_warehouse(state_dict['compute_parameters']['default_warehouse'])

### 3. Extract file to a local stage

In [7]:
csv_file_name = './data/squirrel_data.csv'
print('Putting '+csv_file_name+' to stage: '+ state_dict['load_stage_name'])
session.file.put(local_file_name=csv_file_name, 
                     stage_location=state_dict['load_stage_name'], 
                     source_compression='NONE', 
                     overwrite=True)


Putting ./data/squirrel_data.csv to stage: LOAD_STAGE


[PutResult(source='squirrel_data.csv', target='squirrel_data.csv.gz', source_size=535297, target_size=85088, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]

In [8]:
session.sql("list @"+state_dict['load_stage_name']+" pattern='.*[.]gz'").collect()

[Row(name='load_stage/squirrel_data.csv.gz', size=85088, md5='a77388f1b2c7c84c36697fc9edd596df', last_modified='Wed, 7 Sep 2022 12:57:51 GMT')]

### 4. Load Data into Snowflake


We start by defining a schema type that we will use to ingest the data.

In [9]:
load_schema = T.StructType([T.StructField("lon", T.FloatType()),
                            T.StructField("lat", T.FloatType()),
                            T.StructField("unique_squirrel_id", T.StringType()),
                            T.StructField("hectare", T.StringType()),
                            T.StructField("shift", T.StringType()),
                            T.StructField("date", T.DateType()),
                            T.StructField("age", T.StringType()),
                            T.StructField("primary_fur_color", T.StringType()),
                            T.StructField("highlight_fur_color", T.StringType()),
                            T.StructField("location", T.StringType()),
                            T.StructField("running", T.BooleanType()),
                            T.StructField("chasing", T.BooleanType()),
                            T.StructField("climbing", T.BooleanType()),
                            T.StructField("eating", T.BooleanType()),
                            T.StructField("foraging", T.BooleanType()),
                            T.StructField("kuks", T.BooleanType()),
                            T.StructField("quaas", T.BooleanType()),
                            T.StructField("moans", T.BooleanType()),
                            T.StructField("tail_flags", T.BooleanType()),
                            T.StructField("tail_twitches", T.BooleanType()),
                            T.StructField("approaches", T.BooleanType()),
                            T.StructField("indifferent", T.BooleanType()),
                            T.StructField("runs_from", T.BooleanType())  
])

Create empty table.

In [10]:
session.create_dataframe([[None]*len(load_schema.names)], schema=load_schema)\
       .na.drop()\
       .write\
       .mode("overwrite") \
       .save_as_table(state_dict['table_name'])

Load schema

In [11]:
csv_file_format_options = {"FIELD_OPTIONALLY_ENCLOSED_BY": "'\"'", "skip_header": 1}

loaddf = session.read.option("SKIP_HEADER", 1)\
                     .schema(load_schema)\
                     .csv('@'+state_dict['load_stage_name'])\
                     .copy_into_table(state_dict['table_name'], format_type_options=csv_file_format_options)

Count the number of ingested records.

In [12]:
session.table(state_dict['table_name']).count()

3023

In [13]:
session.close()