# Install Snowpark

In [1]:
!pip install snowflake-snowpark-python

Collecting snowflake-snowpark-python
  Downloading snowflake_snowpark_python-1.4.0-py3-none-any.whl (284 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m284.3/284.3 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting cloudpickle<=2.0.0,>=1.6.0
  Using cached cloudpickle-2.0.0-py3-none-any.whl (25 kB)
Collecting snowflake-connector-python<4.0.0,>=2.7.12
  Using cached snowflake_connector_python-3.0.3-cp38-cp38-macosx_10_14_x86_64.whl (15.4 MB)
Collecting idna<4,>=2.5
  Using cached idna-3.4-py3-none-any.whl (61 kB)
Collecting oscrypto<2.0.0
  Using cached oscrypto-1.3.0-py2.py3-none-any.whl (194 kB)
Collecting asn1crypto<2.0.0,>0.24.0
  Using cached asn1crypto-1.5.1-py2.py3-none-any.whl (105 kB)
Collecting pyjwt<3.0.0
  Using cached PyJWT-2.6.0-py3-none-any.whl (20 kB)
Collecting urllib3<1.27,>=1.21.1
  Using cached urllib3-1.26.15-py2.py3-none-any.whl (140 kB)
Collecting filelock<4,>=3.5
  Using cached filelock-3.12.0-py3-none-any.whl (10 kB)
C



---



# Connect to Snowflake via SnowPark

In [16]:
import time
import json


# --->  PYSPARK

# import pyspark.sql.functions as f
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import udf,col
# from pyspark.sql.types import IntegerType
# spark = SparkSession.builder.appName("DataEngeering1").getOrCreate()

# <---  PYSPARK

import snowflake.snowpark.functions as f
from snowflake.snowpark import Session, DataFrame
from snowflake.snowpark.functions import udf, col
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import call_udf


# <----- Make these changes before running the notebook -------
# 1. Change Connection params to match your environment

# <----------------------------------------------------------------------------

Warehouse_Name = 'MY_ETL_WH'
DB_NAME = 'DEMO_SNOWPARK'

CONNECTION_PARAMETERS1 = {
    "host": "<YourAccount>.snowflakecomputing.com",
    'account': '<YourAccount>',
    'user': '<Your_UserID>',
    'password': '<Your_Password>',
    'role': 'SYSADMIN',
}

with open('creds.json') as f:
    data = json.load(f)
    username = data['username']
    password = data['password']
    account = data["account"]

    CONNECTION_PARAMETERS = {
        'account': account,
        'user': username,
        'password': password,
        'role': 'SYSADMIN',
    }

print("Connecting to Snowflake.....\n")
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
print("Connected Successfully!...\n\n")



sql_cmd = "CREATE OR REPLACE WAREHOUSE {} WAREHOUSE_SIZE = 'X-Small' ".format(Warehouse_Name)
session.sql(sql_cmd).collect() 

sql_cmd = "CREATE OR REPLACE DATABASE {}".format(DB_NAME)
session.sql(sql_cmd).collect() 

sql_cmd = "USE SCHEMA {}.PUBLIC".format(DB_NAME)
session.sql(sql_cmd).collect() 

sql_cmd = "USE WAREHOUSE {}".format(Warehouse_Name)
session.sql(sql_cmd).collect() 


Connecting to Snowflake.....

Connected Successfully!...




[Row(status='Statement executed successfully.')]

## Start Data Engineering Process

In [11]:


# 1 - INCREASE COMPUTE TO 4 NODES
print("Resizing to from XS(1 Node) to MEDIUM(4 Nodes) ..\n")

sql_cmd = "ALTER WAREHOUSE {} SET WAREHOUSE_SIZE = 'LARGE' WAIT_FOR_COMPLETION = TRUE".format(Warehouse_Name)
session.sql(sql_cmd).collect()  

print("Completed!...\n\n")


# 2 - READ & JOIN 2 LARGE TABLES (600M & 1M rows)
print("Joining, Aggregating with 2 large tables(600M & 1M rows) & Writing results to new table(80M rows) ..\n")

dfLineItems = session.table("SFC_SAMPLES_SAMPLE_DATA.TPCH_SF100.LINEITEM")  # 600 Million Rows
dfSuppliers = session.table("SFC_SAMPLES_SAMPLE_DATA.TPCH_SF100.SUPPLIER")  # 1 Million Rows

print('Lineitems Table: %s rows' % "{:,}".format(dfLineItems.count()))
print('Suppliers Table: %s rows' % "{:,}".format(dfSuppliers.count()))

# 3 - JOIN TABLES
dfJoinTables = dfLineItems.join(dfSuppliers,
                                dfLineItems.col("L_SUPPKEY") == dfSuppliers.col("S_SUPPKEY"))  

# 4 - SUMMARIZE THE DATA BY SUPPLIER, PART, SUM, MIN & MAX
dfSummary = dfJoinTables.groupBy("S_NAME", "L_PARTKEY").agg([
    f.sum("L_QUANTITY").alias("TOTAL_QTY"),
    f.min("L_QUANTITY").alias("MIN_QTY"),
    f.max("L_QUANTITY").alias("MAX_QTY"),
])


Resizing to from XS(1 Node) to MEDIUM(4 Nodes) ..

Completed!...


Joining, Aggregating with 2 large tables(600M & 1M rows) & Writing results to new table(80M rows) ..

Lineitems Table: 600,037,902 rows
Suppliers Table: 1,000,000 rows


### **↑ Compute is NOT used** up to this point. (Lazy Execution Model) !!!

## 3. Storing the Results in Table or Showing results triggers the compute & previous steps.

In [12]:
start_time = time.time()
# 5 - WRITE THE RESULTS TO A NEW TABLE ( 80 Million Rows)
# <-- This is when all the previous operations are compiled & executed as a single job
dfSummary.write.mode("overwrite").saveAsTable("SALES_SUMMARY")
print("Completed!...\n\n")

# 6 - QUERY THE RESULTS (80 Million Rows)
print("Query the results..\n")
dfSales = session.table("SALES_SUMMARY")
dfSales.show()
end_time = time.time()

print("Completed!...\n\n")

# 7 - SCALE DOWN COMPUTE TO 1 NODE
print("Reducing the warehouse to XS..\n")
sql_cmd = "ALTER WAREHOUSE {} SET WAREHOUSE_SIZE = 'XSMALL'".format(Warehouse_Name)
session.sql(sql_cmd).collect()  

print("Completed!...\n")

print("--- %s seconds to Join, Summarize & Write Results to a new Table --- \n" % int(end_time - start_time))
print("--- %s Rows Written to SALES_SUMMARY table" % "{:,}".format(dfSales.count()))

Completed!...


Query the results..

--------------------------------------------------------------------------
|"S_NAME"            |"L_PARTKEY"  |"TOTAL_QTY"  |"MIN_QTY"  |"MAX_QTY"  |
--------------------------------------------------------------------------
|Supplier#000564493  |5564492      |207.00       |1.00       |50.00      |
|Supplier#000676924  |2676923      |193.00       |4.00       |47.00      |
|Supplier#000639927  |9139908      |282.00       |10.00      |50.00      |
|Supplier#000542648  |1292646      |243.00       |6.00       |49.00      |
|Supplier#000697355  |10697354     |151.00       |3.00       |50.00      |
|Supplier#000062149  |19562110     |203.00       |6.00       |47.00      |
|Supplier#000548120  |15798074     |277.00       |7.00       |49.00      |
|Supplier#000685994  |2935987      |100.00       |19.00      |50.00      |
|Supplier#000383108  |2133105      |151.00       |16.00      |39.00      |
|Supplier#000971264  |8971263      |235.00       |1.00       |4

# That's all there is to it!

In [15]:


















# Clean it all up
sql_cmd = "DROP WAREHOUSE {} ".format(Warehouse_Name)
session.sql(sql_cmd).collect() 

sql_cmd = "DROP DATABASE {}".format(DB_NAME)
session.sql(sql_cmd).collect() 

[Row(status='DEMO_SNOWPARK successfully dropped.')]