# What's in this exercise?
1) Create Storage Intergration
2) Load reference data in staging directory into Snowflake Tables directly and get the performance of Snowflake's elastic performance and scalability.
3) Create FILE FORMAT for the external files
4) Create external stage for ADLS Gen-2 storage account
5) Load csv files available in ADLS Gen-2 Storage into Snowflake tables

In [0]:
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark.types import StructType, StructField
import os

In [0]:
User = dbutils.secrets.get("snowparkdetails", "username")
Password = dbutils.secrets.get("snowparkdetails", "password")
Account = dbutils.secrets.get("snowparkdetails", "account")
TenandId = dbutils.secrets.get("gen2-storage", "tenant-id")
SchemaName = "taxi"
DatabaseName = "NYCTAXI"
Warehouse = "cluster1"
DBrole = "dba_role"
CONNECTION_PARAMETERS = {
    'account': Account,
    'user': User,
    'password': Password,
    'schema': SchemaName,
    'database': DatabaseName,
    'warehouse': Warehouse,
    'role':DBrole,
}

session = Session.builder.configs(CONNECTION_PARAMETERS).create()


In [0]:

print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

In [0]:
# Define source and destination directories
# srcDataDirRoot = "/mnt/workshop/staging/reference-data/" #Root dir for source data
# destDataDirRoot = "/mnt/workshop/curated/nyctaxi/reference/" #Root dir for consumable data
allowed_location = ('azure://demostgacct.blob.core.windows.net/staging','azure://demostgacct.blob.core.windows.net/curated')

session.sql(f"CREATE OR REPLACE STORAGE INTEGRATION azure_integration_gen2 TYPE = EXTERNAL_STAGE \
  STORAGE_PROVIDER = AZURE \
  ENABLED = TRUE \
  AZURE_TENANT_ID = '{TenandId}' \
  STORAGE_ALLOWED_LOCATIONS = {allowed_location}").collect()

In [0]:
session.sql("desc storage integration azure_integration").show()

In [0]:
session.sql(f"CREATE OR REPLACE FILE FORMAT referencedata_csv_format \
            TYPE = CSV \
            COMPRESSION = NONE \
            FIELD_DELIMITER=',' \
            FILE_EXTENSION = 'csv' \
            ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE \
             skip_header=1 ").collect();

In [0]:
session.sql(f"show file formats").show(max_width=50)

In [0]:
session.sql(f"create or replace stage azure_csv_stage \
  storage_integration = azure_integration \
  url = 'azure://demostgacct.blob.core.windows.net/staging/' \
  file_format = referencedata_csv_format").collect()
  

In [0]:
session.sql("select * from information_schema.stages").toPandas()

Unnamed: 0,STAGE_CATALOG,STAGE_SCHEMA,STAGE_NAME,STAGE_URL,STAGE_REGION,STAGE_TYPE,STAGE_OWNER,COMMENT,CREATED,LAST_ALTERED
0,NYCTAXI,TAXI,AZURE_CSV_STAGE,azure://[REDACTED]demostgacct.blob.core.windows.net...,westus2,External Named,ACCOUNTADMIN,,2022-06-08 20:39:15.858000-07:00,2022-06-08 20:39:15.975000-07:00


In [0]:
session.sql("list @azure_csv_stage/customer/csvFiles/").show(max_width=1000)

In [0]:
df_stages=session.sql(" list @azure_csv_stage").collect()
for val in df_stages:
  if 'reference-data' in val.name:
    print(val.name,val, val.size,val.last_modified)


### List reference datasets

In [0]:
#display(dbutils.fs.ls(srcDataDirRoot)) use below code to list ref data to load
df_stages=session.sql(" list @azure_csv_stage").collect()
for val in df_stages:
  if 'reference-data' in val.name and '.csv' in val.name:
    print(val.name)
    
    
   # @azure_csv_stage/reference-data/taxi_zone_lookup.csv

### 3. Define schema for raw reference data

In [0]:
# 1.  Taxi zone lookup
taxiZoneSchema = StructType([
    StructField("location_id", T.StringType(), True),
    StructField("borough", T.StringType(), True),
    StructField("zone", T.StringType(), True),
    StructField("service_zone", T.StringType(), True)])

#2. Months of the year
tripMonthNameSchema = StructType([
    StructField("trip_month", T.StringType(), True),
    StructField("month_name_short", T.StringType(), True),
    StructField("month_name_full", T.StringType(), True)])

#3.  Rate code id lookup
rateCodeSchema = StructType([
    StructField("rate_code_id", T.IntegerType(), True),
    StructField("description", T.StringType(), True)])

#4.  Payment type lookup
paymentTypeSchema = StructType([
    StructField("payment_type", T.IntegerType(), True),
    StructField("abbreviation", T.StringType(), True),
    StructField("description", T.StringType(), True)])

#5. Trip type
tripTypeSchema = StructType([
    StructField("trip_type", T.IntegerType(), True),
    StructField("description", T.StringType(), True)])


#6. Vendor ID
vendorSchema = StructType([
    StructField("vendor_id", T.IntegerType(), True),
    StructField("abbreviation", T.StringType(), True),
    StructField("description", T.StringType(), True)])

### Load reference data

##### Create function to load data

In [0]:
#def loadReferenceData(srcDatasetName, srcDataFile, destDataDir, srcSchema, delimiter ):
def loadReferenceData(srcDatasetName, srcDataFile, destTableName, srcSchema, delimiter ):
  print("Dataset:  " + srcDatasetName)
  print(".......................................................")
  

  #---Snowpark code
  refDF = session.read \
                    .schema(srcSchema) \
                    .options({"skip_header": 1, "field_delimiter":delimiter ,"FIELD_OPTIONALLY_ENCLOSED_BY": '"' })\
                    .csv(srcDataFile)
      
  print(f"....reading source {srcDataFile} and saving as Snowflake Table {destTableName}")
  refDF.write.mode("overwrite").saveAsTable(destTableName)
  print("....done")


##### Loading data

In [0]:
srcDataDirRoot="@azure_csv_stage/reference-data/"

loadReferenceData("taxi zone",srcDataDirRoot + "taxi_zone_lookup.csv","taxi_zone_lookup",taxiZoneSchema,",")
loadReferenceData("trip month",srcDataDirRoot + "trip_month_lookup.csv","trip_month_lookup",tripMonthNameSchema,",")
loadReferenceData("rate code",srcDataDirRoot + "rate_code_lookup.csv","rate_code_lookup",rateCodeSchema,",")
loadReferenceData("payment type",srcDataDirRoot + "payment_type_lookup.csv","payment_type_lookup",paymentTypeSchema,",")
loadReferenceData("trip type",srcDataDirRoot + "trip_type_lookup.csv","trip_type_lookup",tripTypeSchema,",")
loadReferenceData("vendor",srcDataDirRoot + "vendor_lookup.csv","vendor_lookup",vendorSchema,",")

In [0]:
session.sql("select * from vendor_lookup").toPandas()

Unnamed: 0,VENDOR_ID,ABBREVIATION,DESCRIPTION
0,1,Creative Mobile Technologies LLC,Creative Mobile Technologies LLC
1,2,VeriFone Inc.,VeriFone Inc.


In [0]:
session.sql("select * from taxi_zone_lookup").limit(10).toPandas()

Unnamed: 0,LOCATION_ID,BOROUGH,ZONE,SERVICE_ZONE
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone
5,6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
6,7,Queens,Astoria,Boro Zone
7,8,Queens,Astoria Park,Boro Zone
8,9,Queens,Auburndale,Boro Zone
9,10,Queens,Baisley Park,Boro Zone


##### Validate load

In [0]:
session.sql("select * from taxi_zone_lookup").toPandas()
session.sql("select * from trip_month_lookup").toPandas()
session.sql("select * from rate_code_lookup").toPandas()
session.sql("select * from payment_type_lookup").toPandas()
session.sql("select * from trip_type_lookup").toPandas()
session.sql("select * from vendor_lookup").toPandas()

Unnamed: 0,VENDOR_ID,ABBREVIATION,DESCRIPTION
0,1,Creative Mobile Technologies LLC,Creative Mobile Technologies LLC
1,2,VeriFone Inc.,VeriFone Inc.
