PREREQUISTE

1. Create Snowflake user
Create User and update the credential as described here:
https://docs.snowflake.com/en/developer-guide/snowflake-cli-v2/connecting/specify-credentials

use role sysadmin and makesure you grant following

GRANT EXECUTE TASK ON ACCOUNT TO ROLE SYSADMIN;

2. Create SnowCLI utility - for authentication only
https://docs.snowflake.com/en/developer-guide/snowflake-cli-v2/installation/installation

3. Create Python environment


conda create --name snow_env --override-channels -c https://repo.anaconda.com/pkgs/snowflake python=3.10 numpy pandas pyarrow

conda activate snow_env


conda install snowflake-snowpark-python


pip install snowflake - U


pip install ploty

4. run in Visual code or your choice of notebook

In [7]:
#Snowflake python API packages

from snowflake.core import Root
from snowflake.core.database import Database
from snowflake.core.schema import Schema
from snowflake.core.table import Table, TableColumn, PrimaryKey
from snowflake.core.warehouse import Warehouse
from snowflake.core.role import Role
from snowflake.core.grant import Grant
from snowflake.core.grant._grantee import Grantees
from snowflake.core.grant._privileges import Privileges
from snowflake.core.grant._securables import Securables
from snowflake.core.stage import Stage
from snowflake.core.stage import StageCollection
from snowflake.core.task import StoredProcedureCall, Task
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask

from snowflake.snowpark import Session
from snowflake.snowpark.types import IntegerType, StringType, StructType, FloatType, StructField, DateType, Variant
from snowflake.snowpark.functions import udf, sum, col,array_construct,month,year,call_udf,lit
from snowflake.snowpark.version import VERSION
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
from snowflake.snowpark.functions import sproc
#from snowflake.snowpark import Window

# Import Python packages
import pandas as pd
import plotly.express as px
import plotly.io as pio
from datetime import timedelta

In [8]:
from snowflake.connector.config_manager import CONFIG_MANAGER
# file from connection is created
print(CONFIG_MANAGER.file_path)
# https://docs.snowflake.com/en/developer-guide/snowflake-cli-v2/connecting/specify-credentials
# location in mac ~/Library/Application Support/snowflake/config.toml
# following take default connection from above file
#session = Session.builder.getOrCreate()
# following for specific connection
session=Session.builder.config("connection_name","demodc").create()
snowpark_version = VERSION
# Current Environment Details
print('Account                     : {}'.format(session.get_current_account()))
print('User                        : {}'.format(session.get_current_user()))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

root = Root(session) 

/Users/upatel/Library/Application Support/snowflake/config.toml
Account                     : "sfsenorthamerica-upateldc"
User                        : "demouser"
Role                        : "SYSADMIN"
Database                    : None
Schema                      : None
Warehouse                   : "DEMO_WH"
Snowpark for Python version : 1.18.0


In [9]:
# create database
tb101_db = root.databases.create(Database(name="tb_101"),  mode="ifNotExists") # orReplace, errorIfExists 


In [10]:
#create schemas
raw_pos_sch = tb101_db.schemas.create(Schema(name="raw_pos"), mode="orReplace")
raw_cust_sch = tb101_db.schemas.create(Schema(name="raw_customer"), mode="orReplace")
harmonized_sch = tb101_db.schemas.create(Schema(name="harmonized"), mode="orReplace")
analytics_sch = tb101_db.schemas.create(Schema(name="analytics"), mode="orReplace")
session.use_schema('raw_pos')

# list schemas
# SQL  show schemas in database "tb_101";
schema_collection = root.databases["tb_101"].schemas.iter()
for sch in schema_collection:
  print(sch.name)

ANALYTICS
HARMONIZED
INFORMATION_SCHEMA
PUBLIC
RAW_CUSTOMER
RAW_POS


In [11]:
#create warehouse
# create a Warehouse instance that used to store the property of a warehouse
warehouses = root.warehouses
warehouse_name = "tb_de_wh"
de_wh= Warehouse(
    name=warehouse_name,
    warehouse_size="LARGE",
    auto_suspend=60,
    initially_suspended="true",
    comment="data engg warehouse for tasty bytes",
    auto_resume="true"
)
dev_wh= Warehouse(
    name="tb_dev_wh",
    warehouse_size="XSMALL",
    auto_suspend=60,
    initially_suspended="true",
    comment="developer warehouse for tasty bytes",
    auto_resume="true"
)
# create a warehouse 

dewh  = warehouses.create(de_wh, mode='orReplace')
devwh = warehouses.create(dev_wh,mode='orReplace')

session.use_warehouse('tb_de_wh')

# list warehosues
# SQL  show warehouses like '%demo%';
warehouse_collection = warehouses.iter(like="tb%")
for wh in warehouse_collection:
  print(wh.name, wh.auto_suspend, wh.max_cluster_count)


TB_DEV_WH 60 None
TB_DE_WH 60 None


In [12]:
# create roles
session.use_role("securityadmin")
tbadmin_role = Role(name="tb_admin",  comment='admin for tasty bytes')
tbadmin_role = root.roles.create(tbadmin_role,  mode='ifNotExists')
tbde_role = Role(name="tb_data_engineer",  comment='data engineer for tasty bytes')
tbde_role = root.roles.create(tbde_role,  mode='ifNotExists')
tbdev_role = Role(name="tb_dev",  comment='developer for tasty bytes')
tbdev_role = root.roles.create(tbdev_role,  mode='ifNotExists')

In [13]:
#give a grant to create hierarchey , tb_dev-> tb_data_engineer-> tb_admin -> sysadmin
# GRANT ROLE TB_ADMIN TO ROLE SYSADMIN
root.grants.grant(
  Grant(
    grantee=Grantees.role('sysadmin'), 
    securable=Securables.role('tb_admin'),
  )
)
# GRANT ROLE TB_DATA_ENGINEER TO ROLE TB_ADMIN
root.grants.grant(
  Grant(
    grantee=Grantees.role('tb_admin'), 
    securable=Securables.role('tb_data_engineer'),
  )
)
# GRANT ROLE TB_DEV TO ROLE TB_DATA ENGINEER
root.grants.grant(
  Grant(
    grantee=Grantees.role('tb_data_engineer'), 
    securable=Securables.role('tb_dev'),
  )
)


In [14]:
# usages grants
# GRANT USAGE ON DATABASE TB_101 TO ROLE TB_DEV

root.grants.grant(
  Grant(
    grantee=Grantees.role('tb_dev'),
    securable=Securables.database('tb_101'),
    privileges=[Privileges.usage]
  )
)





In [15]:
#create stage
#CREATE STAGE IF NOT EXISTS  tb_101.public.s3load_csv  URL = 's3://sfquickstarts/frostbyte_tastybytes/'      
session.use_role("sysadmin")
s3_stage_csv = Stage(
  name="s3load_csv", url='s3://sfquickstarts/frostbyte_tastybytes/'
)
stages = root.databases["tb_101"].schemas["public"].stages
stages.create(s3_stage_csv, mode='ifNotExists')

# this stage to use to store python procedure code so it can execute later in task
int_stage = Stage(name="mycode")
stages.create(int_stage, mode='ifNotExists')


<snowflake.core.stage._stage.StageResource at 0x148b19840>

In [16]:
# List files in the stage
# LIST @TB_101.PUBLIC.S3LOAD_CSV;
s3files = root.databases["tb_101"].schemas["public"].stages["s3load_csv"].list_files()
for stageFile in s3files:
  print(stageFile)


name='s3://sfquickstarts/frostbyte_tastybytes/analytics/menu_item_aggregate_v.csv' size='22109556' md5='f073c6baf1204e746a57ddffade0fcce-2' last_modified='Tue, 25 Jun 2024 20:55:58 GMT'
name='s3://sfquickstarts/frostbyte_tastybytes/analytics/menu_item_cogs_and_price_v.csv' size='35665' md5='96759e95b794b462e714b270f0e2e76c' last_modified='Tue, 25 Jun 2024 20:55:59 GMT'
name='s3://sfquickstarts/frostbyte_tastybytes/analytics/order_item_cost_agg_v.csv' size='890483' md5='fbbebe3357b6d21bf958c980a26df40e' last_modified='Tue, 25 Jun 2024 20:55:59 GMT'
name='s3://sfquickstarts/frostbyte_tastybytes/analytics/orders_v/data_0_0_0.csv.gz' size='16803703' md5='1d115b3d9b91a0c3b7a5601de1c82152' last_modified='Thu, 27 Jun 2024 16:00:11 GMT'
name='s3://sfquickstarts/frostbyte_tastybytes/analytics/orders_v/data_0_0_1.csv.gz' size='16815782' md5='cb445b6a5a6aee40965fb06b9e17321e' last_modified='Thu, 27 Jun 2024 16:00:17 GMT'
name='s3://sfquickstarts/frostbyte_tastybytes/analytics/orders_v/data_0_0_10

In [17]:
# define and create table and load data from above stage
shift_sales = Table(
  name="shift_sales",
  columns=[TableColumn(name="location_id", datatype="number(38,0)"),
           TableColumn(name="city", datatype="string"),
           TableColumn(name="date", datatype="date"),
           TableColumn(name="shift_sales", datatype="float"),
           TableColumn(name="shift", datatype="string"),
           TableColumn(name="month", datatype="number(2,0)"),
           TableColumn(name="day_of_week", datatype="number(2,0)"),
           TableColumn(name="city_population", datatype="number(38,0)"),       
           ]
)
root.databases["tb_101"].schemas["raw_pos"].tables.create(shift_sales,mode='orReplace')


shift_sales_schema = StructType([
StructField("location_id",StringType()),
StructField("city",StringType()),
StructField("date",StringType()),
StructField("shift_sales",StringType()),
StructField("shift",StringType()),
StructField("month",StringType()),
StructField("day_of_week", StringType()),
StructField("city_population", StringType()),

])
shift_sales_df=session.read.options({"field_delimiter": ",", "skip_header": 0}).schema(shift_sales_schema).csv("@tb_101.public.s3load_csv/analytics/shift_sales/")
copy_result = shift_sales_df.copy_into_table("shift_sales", force=True)
print(copy_result)
session.table("shift_sales").show()

[Row(file='s3://sfquickstarts/frostbyte_tastybytes/analytics/shift_sales/shift_sales.csv.gz', status='LOADED', rows_parsed=1938202, rows_loaded=1938202, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]
----------------------------------------------------------------------------------------------------------------------
|"LOCATION_ID"  |"CITY"         |"DATE"      |"SHIFT_SALES"  |"SHIFT"  |"MONTH"  |"DAY_OF_WEEK"  |"CITY_POPULATION"  |
----------------------------------------------------------------------------------------------------------------------
|2574           |New York City  |2023-05-02  |NULL           |AM       |5        |2              |8804190            |
|2574           |New York City  |2023-05-05  |NULL           |AM       |5        |5              |8804190            |
|2574           |New York City  |2023-05-01  |NULL           |AM       |5        |1              |8804190            |
|25

In [18]:
# show tables
tables = root.databases["tb_101"].schemas["raw_pos"].tables.iter(like="%")
for table_obj in tables:
  print(table_obj.name)

SHIFT_SALES


In [19]:
# change warehouse size to xsmall

whobj=root.warehouses["tb_de_wh"].fetch()
whobj.warehouse_size="xsmall"
whobj.resource_monitor = None
root.warehouses["tb_de_wh"].create_or_update(whobj)

In [20]:
table_ref = root.databases['tb_101'].schemas['raw_pos'].tables['shift_sales']
demo_table = table_ref.fetch()
demo_table.to_dict()

{'name': 'SHIFT_SALES',
 'kind': '',
 'enable_schema_evolution': False,
 'change_tracking': False,
 'data_retention_time_in_days': 1,
 'max_data_extension_time_in_days': 14,
 'default_ddl_collation': '',
 'columns': [{'name': 'LOCATION_ID',
   'datatype': 'NUMBER(38,0)',
   'nullable': True},
  {'name': 'CITY', 'datatype': 'VARCHAR(16777216)', 'nullable': True},
  {'name': 'DATE', 'datatype': 'DATE', 'nullable': True},
  {'name': 'SHIFT_SALES', 'datatype': 'FLOAT', 'nullable': True},
  {'name': 'SHIFT', 'datatype': 'VARCHAR(16777216)', 'nullable': True},
  {'name': 'MONTH', 'datatype': 'NUMBER(2,0)', 'nullable': True},
  {'name': 'DAY_OF_WEEK', 'datatype': 'NUMBER(2,0)', 'nullable': True},
  {'name': 'CITY_POPULATION', 'datatype': 'NUMBER(38,0)', 'nullable': True}]}

In [21]:
# join an create view
shift_sales_df = session.table("tb_101.raw_pos.shift_sales")
shift_sales_df.show()
    

----------------------------------------------------------------------------------------------------------------------
|"LOCATION_ID"  |"CITY"         |"DATE"      |"SHIFT_SALES"  |"SHIFT"  |"MONTH"  |"DAY_OF_WEEK"  |"CITY_POPULATION"  |
----------------------------------------------------------------------------------------------------------------------
|2574           |New York City  |2023-05-02  |NULL           |AM       |5        |2              |8804190            |
|2574           |New York City  |2023-05-05  |NULL           |AM       |5        |5              |8804190            |
|2574           |New York City  |2023-05-01  |NULL           |AM       |5        |1              |8804190            |
|2574           |New York City  |2023-05-07  |NULL           |AM       |5        |0              |8804190            |
|2574           |New York City  |2023-05-04  |NULL           |AM       |5        |4              |8804190            |
|2574           |New York City  |2023-05-03  |NU

Create database from Snowflake Marketplace called "frostbyte_safegraph" and grant access to the role you are using (eg. sysadmin)

 https://app.snowflake.com/marketplace/listing/GZSNZL1CN82/safegraph-safegraph-frostbyte

In [22]:


tb_safegraph_df=session.table('frostbyte_safegraph.public.frostbyte_tb_safegraph_s')
tb_safegraph_df.count();

tb_safegraph_df.show(10)

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"PLACEKEY"            |"PARENT_PLACEKEY"  |"SAFEGRAPH_BRAND_IDS"                      |"LOCATION_NAME"          |"BRANDS"      |"STORE_ID"  |"TOP_CATEGORY"                                      |"SUB_CATEGORY"                               |"NAICS_CODE"  |"LAT

In [23]:
#create a view in analytics schema

shift_sales_harmoized_df = shift_sales_df.join(tb_safegraph_df, shift_sales_df.col("LOCATION_ID") == tb_safegraph_df.col("LOCATION_ID"))\
        .select(shift_sales_df["LOCATION_ID"].as_("LOCATION_ID"),
                shift_sales_df["CITY"].as_("CITY"),
                shift_sales_df["DATE"],
                shift_sales_df["SHIFT_SALES"],
                shift_sales_df["SHIFT"],
                shift_sales_df["MONTH"],
                shift_sales_df["DAY_OF_WEEK"],
                shift_sales_df["CITY_POPULATION"],
                tb_safegraph_df["LATITUDE"],
                tb_safegraph_df["LONGITUDE"]
        )

shift_sales_harmoized_df.show()
shift_sales_harmoized_df.create_or_replace_view("TB_101.ANALYTICS.SHIFT_SALES_VW")
            


-------------------------------------------------------------------------------------------------------------------------------------------------
|"LOCATION_ID"  |"CITY"         |"DATE"      |"SHIFT_SALES"  |"SHIFT"  |"MONTH"  |"DAY_OF_WEEK"  |"CITY_POPULATION"  |"LATITUDE"  |"LONGITUDE"  |
-------------------------------------------------------------------------------------------------------------------------------------------------
|2574           |New York City  |2023-05-02  |NULL           |AM       |5        |2              |8804190            |40.731148   |-74.003081   |
|2574           |New York City  |2023-05-05  |NULL           |AM       |5        |5              |8804190            |40.731148   |-74.003081   |
|2574           |New York City  |2023-05-01  |NULL           |AM       |5        |1              |8804190            |40.731148   |-74.003081   |
|2574           |New York City  |2023-05-07  |NULL           |AM       |5        |0              |8804190            |40.731

[Row(status='View SHIFT_SALES_VW successfully created.')]

In [24]:
ss_df = session.table("tb_101.analytics.shift_sales_vw")
ss_df.show()

-------------------------------------------------------------------------------------------------------------------------------------------------
|"LOCATION_ID"  |"CITY"         |"DATE"      |"SHIFT_SALES"  |"SHIFT"  |"MONTH"  |"DAY_OF_WEEK"  |"CITY_POPULATION"  |"LATITUDE"  |"LONGITUDE"  |
-------------------------------------------------------------------------------------------------------------------------------------------------
|2574           |New York City  |2023-05-02  |NULL           |AM       |5        |2              |8804190            |40.731148   |-74.003081   |
|2574           |New York City  |2023-05-05  |NULL           |AM       |5        |5              |8804190            |40.731148   |-74.003081   |
|2574           |New York City  |2023-05-01  |NULL           |AM       |5        |1              |8804190            |40.731148   |-74.003081   |
|2574           |New York City  |2023-05-07  |NULL           |AM       |5        |0              |8804190            |40.731

In [25]:
#create stage to load data from  parquet files
s3_stage_parquet = Stage(
  name="s3load_parquet", url='s3://sfquickstarts/data-engineering-with-snowpark-python/'
)
stages_parquet = root.databases["tb_101"].schemas["public"].stages
stages_parquet.create(s3_stage_parquet, mode='orReplace')

# List files in the parquet stage
# LIST @TB_101.PUBLIC.s3load_parquet;
s3files = root.databases["tb_101"].schemas["public"].stages["s3load_parquet"].list_files(pattern=".*truck.*")
for stageFile in s3files:
  print(stageFile)



name='s3://sfquickstarts/data-engineering-with-snowpark-python/pos/truck/truck.snappy.parquet' size='12120' md5='1ea78d12010e999358d882b92c7c49ee' last_modified='Wed, 11 Jan 2023 19:45:15 GMT'


In [26]:
# craete table truck using schema detection
stage_loc= "@tb_101.public.s3load_parquet/pos/truck/"
truckdf = session.read.option("compression", "snappy").parquet(stage_loc)
tref=truckdf.copy_into_table("truck")
print(tref)
table_ref = root.databases['tb_101'].schemas['raw_pos'].tables['truck']
truck_table = table_ref.fetch()
truck_table.to_dict()

[Row(file='s3://sfquickstarts/data-engineering-with-snowpark-python/pos/truck/truck.snappy.parquet', status='LOADED', rows_parsed=450, rows_loaded=450, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]


{'name': 'TRUCK',
 'kind': '',
 'enable_schema_evolution': False,
 'change_tracking': False,
 'data_retention_time_in_days': 1,
 'max_data_extension_time_in_days': 14,
 'default_ddl_collation': '',
 'columns': [{'name': 'TRUCK_ID',
   'datatype': 'NUMBER(38,0)',
   'nullable': True},
  {'name': 'MENU_TYPE_ID', 'datatype': 'NUMBER(38,0)', 'nullable': True},
  {'name': 'PRIMARY_CITY', 'datatype': 'VARCHAR(16777216)', 'nullable': True},
  {'name': 'REGION', 'datatype': 'VARCHAR(16777216)', 'nullable': True},
  {'name': 'ISO_REGION', 'datatype': 'VARCHAR(16777216)', 'nullable': True},
  {'name': 'COUNTRY', 'datatype': 'VARCHAR(16777216)', 'nullable': True},
  {'name': 'ISO_COUNTRY_CODE',
   'datatype': 'VARCHAR(16777216)',
   'nullable': True},
  {'name': 'FRANCHISE_FLAG', 'datatype': 'NUMBER(38,0)', 'nullable': True},
  {'name': 'YEAR', 'datatype': 'NUMBER(38,0)', 'nullable': True},
  {'name': 'MAKE', 'datatype': 'VARCHAR(16777216)', 'nullable': True},
  {'name': 'MODEL', 'datatype': 'VAR

In [27]:
truck_df = session.table("tb_101.raw_pos.truck")

print(truck_df.count())
shift_sales_df.show()

450
----------------------------------------------------------------------------------------------------------------------
|"LOCATION_ID"  |"CITY"         |"DATE"      |"SHIFT_SALES"  |"SHIFT"  |"MONTH"  |"DAY_OF_WEEK"  |"CITY_POPULATION"  |
----------------------------------------------------------------------------------------------------------------------
|2574           |New York City  |2023-05-02  |NULL           |AM       |5        |2              |8804190            |
|2574           |New York City  |2023-05-05  |NULL           |AM       |5        |5              |8804190            |
|2574           |New York City  |2023-05-01  |NULL           |AM       |5        |1              |8804190            |
|2574           |New York City  |2023-05-07  |NULL           |AM       |5        |0              |8804190            |
|2574           |New York City  |2023-05-04  |NULL           |AM       |5        |4              |8804190            |
|2574           |New York City  |2023-05-03 

In [28]:
@sproc(name='tb_101.raw_pos.ingest_data_sproc', 
       packages=['snowflake-snowpark-python'], 
       is_permanent=True, 
       replace=True,
       stage_location='@tb_101.public.mycode', 
       session=session)
def ingest_data_sproc(session: Session) -> T.Variant:
       shift_sales_schema = StructType([
              StructField("location_id",StringType()),
              StructField("city",StringType()),
              StructField("date",StringType()),
              StructField("shift_sales",StringType()),
              StructField("shift",StringType()),
              StructField("month",StringType()),
              StructField("day_of_week", StringType()),
              StructField("city_population", StringType()),

       ])
       shift_sales_df=session.read.options({"field_delimiter": ",", "skip_header": 0}).schema(shift_sales_schema).csv("@tb_101.public.s3load_csv/analytics/shift_sales/")
       copy_result = shift_sales_df.copy_into_table("shift_sales", force=True)
       stage_loc= "@tb_101.public.s3load_parquet/pos/truck/"
       truckdf = session.read.option("compression", "snappy").parquet(stage_loc)
       tref=truckdf.copy_into_table("truck")
       return ("Data loading complete!")



In [29]:
@sproc(name='tb_101.raw_pos.change_wh_size', 
       packages=['snowflake-snowpark-python','snowflake.core'], 
       is_permanent=True, 
       replace=True,
       stage_location='@tb_101.public.mycode', 
       session=session)
def change_wh_size(session: Session, whname: str, whsize: str) -> T.Variant:
       root = Root(session) 
       whobj=root.warehouses[whname].fetch()
       whobj.warehouse_size=whsize
       whobj.resource_monitor = None
       root.warehouses[whname].create_or_update(whobj)
       return ("Warehouse Size changed")



Package 'snowflake.core' is not installed in the local environment. Your UDF might not work when the package is installed on the server but not on your local environment.


In [30]:
# create a Task instance in the client side
#serverless task
increase_whsize_task = Task(
    "increase_wh_size",
    definition=StoredProcedureCall(change_wh_size, args=["tb_de_wh","xlarge"], stage_location="@tb_101.public.mycode", packages=["snowflake-snowpark-python","snowflake.core"]),    
    warehouse="tb_de_wh",
    schedule=timedelta(hours=1)
)
ingest_task = Task(
    "loaddata_task",
    definition=StoredProcedureCall(ingest_data_sproc, stage_location="@tb_101.public.mycode", packages=["snowflake-snowpark-python"]),
    warehouse="tb_de_wh"
)
reduce_whsize_task = Task(
    "reduce_wh_size",
    definition="call tb_101.raw_pos.change_wh_size('tb_de_wh','xsmall')" ,
    user_task_managed_initial_warehouse_size="xsmall"
)
# create chain of task/DAG
reduce_whsize_task.predecessors = [ingest_task.name]
ingest_task.predecessors = [increase_whsize_task.name]

In [31]:
tasks = root.databases["tb_101"].schemas["public"].tasks
task0 = tasks.create(increase_whsize_task, mode="orreplace")
task1 = tasks.create(ingest_task, mode="orreplace")
task2 = tasks.create(reduce_whsize_task, mode="orreplace")
for t in tasks.iter(like="%"):
    print(f"Definition of {t.name}: \n",  t.definition, sep="", end="\n--------------------------\n")

Definition of DATA_INGESTION_DAG: 
select 'dag dummy root'
--------------------------
Definition of DATA_INGESTION_DAG$INCREASE_WH_SIZE: 
CALL tb_101.raw_pos.change_wh_size('tb_de_wh', 'xlarge')
--------------------------
Definition of DATA_INGESTION_DAG$LOADDATA_TASK: 
CALL tb_101.raw_pos.ingest_data_sproc()
--------------------------
Definition of DATA_INGESTION_DAG$REDUCE_WH_SIZE: 
CALL tb_101.raw_pos.change_wh_size('tb_de_wh', 'xsmall')
--------------------------
Definition of INCREASE_WH_SIZE: 
CALL tb_101.raw_pos.change_wh_size('tb_de_wh', 'xlarge')
--------------------------
Definition of LOADDATA_TASK: 
CALL tb_101.raw_pos.ingest_data_sproc()
--------------------------
Definition of REDUCE_WH_SIZE: 
call tb_101.raw_pos.change_wh_size('tb_de_wh','xsmall')
--------------------------


In [32]:
#task2.resume()
#task1.resume()
#task0.resume()
task0.execute()

In [33]:
task0.get_current_graphs()

[TaskRun(root_task_name='INCREASE_WH_SIZE', database_name='TB_101', schema_name='PUBLIC', state='SCHEDULED', first_error_task_name=None, first_error_code=0, first_error_message=None, scheduled_time=datetime.datetime(2024, 7, 11, 23, 27, 57, 218000, tzinfo=TzInfo(UTC)), query_start_time=None, next_scheduled_time=datetime.datetime(2024, 7, 12, 0, 27, 57, 218000, tzinfo=TzInfo(UTC)), completed_time=None, root_task_id='01b59b5f-a877-c43d-0000-0000000000f7', graph_version=1, run_id=-1541408478)]

In [34]:
task0.suspend()

In [35]:
#. OR Create a DAG

dag_name = "data_ingestion_dag" 
dag = DAG(dag_name, schedule=timedelta(hours=1))
with dag:
    dag_task1 = DAGTask("increase_wh_size",
           definition=StoredProcedureCall(change_wh_size, args=["tb_de_wh","xlarge"], stage_location="@tb_101.public.mycode", 
                    packages=["snowflake-snowpark-python","snowflake.core"]),  warehouse="tb_de_wh")    
    dag_task2 = DAGTask("loaddata_task", StoredProcedureCall(ingest_data_sproc, stage_location="@tb_101.public.mycode", packages=["snowflake-snowpark-python"]), warehouse="tb_de_wh")
    dag_task3 = DAGTask("reduce_wh_size",definition=StoredProcedureCall(change_wh_size, args=["tb_de_wh","xsmall"], stage_location="@tb_101.public.mycode", 
                    packages=["snowflake-snowpark-python","snowflake.core"]),  warehouse="tb_de_wh")
    dag_task1 >> dag_task2 >> dag_task3  # task1 is a predecessor of task2 which is predecssor of task3
pubschema = root.databases["tb_101"].schemas["public"]
dag_op = DAGOperation(pubschema)
dag_op.deploy(dag, mode="orreplace")  

dagiter = dag_op.iter_dags(like='%')
for dag_name in dagiter:
    print(dag_name)

DATA_INGESTION_DAG
INCREASE_WH_SIZE


In [36]:
dag_op.run(dag)

In [37]:
# data transformation and analytics
# Select
location_df = ss_df.select("date", "shift", "shift_sales", "location_id", "city")

# Filter
location_df = location_df.filter(F.col("location_id") == 1135)

# Sort
location_df = location_df.order_by(["date", "shift"], ascending=[0, 0])

# Display
location_df.show(n=20)

--------------------------------------------------------------------
|"DATE"      |"SHIFT"  |"SHIFT_SALES"  |"LOCATION_ID"  |"CITY"     |
--------------------------------------------------------------------
|2023-05-07  |PM       |NULL           |1135           |Vancouver  |
|2023-05-07  |AM       |NULL           |1135           |Vancouver  |
|2023-05-06  |PM       |NULL           |1135           |Vancouver  |
|2023-05-06  |AM       |NULL           |1135           |Vancouver  |
|2023-05-05  |PM       |NULL           |1135           |Vancouver  |
|2023-05-05  |AM       |NULL           |1135           |Vancouver  |
|2023-05-04  |PM       |NULL           |1135           |Vancouver  |
|2023-05-04  |AM       |NULL           |1135           |Vancouver  |
|2023-05-03  |PM       |NULL           |1135           |Vancouver  |
|2023-05-03  |AM       |NULL           |1135           |Vancouver  |
|2023-05-02  |PM       |NULL           |1135           |Vancouver  |
|2023-05-02  |AM       |NULL      

In [38]:
#shift sales table
print(location_df.count())
location_df.explain()

160
---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT "DATE", "SHIFT", "SHIFT_SALES", "LOCATION_ID", "CITY" FROM tb_101.analytics.shift_sales_vw WHERE ("LOCATION_ID" = 1135 :: INT) ORDER BY "DATE" DESC NULLS LAST, "SHIFT" DESC NULLS LAST
Logical Execution Plan:
GlobalStats:
    partitionsTotal=2
    partitionsAssigned=2
    bytesAssigned=23433728
Operations:
1:0     ->Result  SHIFT_SALES.DATE, SHIFT_SALES.SHIFT, SHIFT_SALES.SHIFT_SALES, SHIFT_SALES.LOCATION_ID, SHIFT_SALES.CITY  
1:1          ->Sort  SHIFT_SALES.DATE DESC NULLS LAST, SHIFT_SALES.SHIFT DESC NULLS LAST  
1:2               ->InnerJoin  joinKey: (FROSTBYTE_TB_SAFEGRAPH_S.LOCATION_ID = SHIFT_SALES.LOCATION_ID)  
1:3                    ->Filter  (FROSTBYTE_TB_SAFEGRAPH_S.LOCATION_ID = 1135) AND (FROSTBYTE_TB_SAFEGRAPH_S.LOCATION_ID IS NOT NULL)  
1:4                         ->TableScan  FROSTBYTE_SAFEGRAPH.PUBLIC.FROSTBYTE_TB_SAFEGRAPH_S  LOCATION_ID  {partitionsTotal=1, partitionsAssigned=1, bytesAssigned=62

In [39]:
# Group by city and average shift sales
analysis_df = ss_df.group_by("city").agg(F.mean("shift_sales").alias("avg_shift_sales"))

# Sort by average shift sales
analysis_df = analysis_df.sort("avg_shift_sales", ascending=True)

# Pull to pandas and plot
analysis_df.to_pandas().plot.barh(x="CITY", y="AVG_SHIFT_SALES")

ImportError: matplotlib is required for plotting when the default backend "matplotlib" is selected.

In [None]:
# Filter to Vancouver
analysis_df = ss_df.filter(F.col("city") == "Vancouver")

# Group by location and average shift sales
analysis_df = analysis_df.group_by("location_id").agg(F.mean("shift_sales").alias("avg_shift_sales"))

# Get the location count
print("Vancouver location count:", analysis_df.count())

In [None]:
ax = analysis_df.to_pandas().hist(column="AVG_SHIFT_SALES", bins=20)


In [None]:
#cleanup
#task2.delete()
#task1.delete()
#task0.delete()
#dag_op.delete(dag)
#tb101_db.delete()

In [None]:
#session.close()