In [35]:
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F

session = Session.builder.config("connection_name", "demo").getOrCreate()

session.sql("SELECT CURRENT_ACCOUNT()").show()
print(session)

-----------------------
|"CURRENT_ACCOUNT()"  |
-----------------------
|LAB23496             |
-----------------------

<snowflake.snowpark.session.Session: account="SFSENORTHAMERICA-JHOLLAN", role="ACCOUNTADMIN", database="TEMP", schema="TEMP", warehouse="XSMALL">


In [36]:
session.use_role("ACCOUNTADMIN")

session.sql("CREATE DATABASE IF NOT EXISTS TEMP").collect()

session.use_database("TEMP")
session.sql("CREATE SCHEMA IF NOT EXISTS TEMP").collect()
session.use_schema("TEMP")
print(session)

<snowflake.snowpark.session.Session: account="SFSENORTHAMERICA-JHOLLAN", role="ACCOUNTADMIN", database="TEMP", schema="TEMP", warehouse="XSMALL">


In [37]:
rows=10000

import modin.pandas as pd

import snowflake.snowpark.modin.plugin
import numpy as np

# Generate random data for table1
categories = np.random.choice(['A', 'B', 'C', 'D'], size=rows)
products = np.random.choice(['X', 'Y', 'Z'], size=rows)
regions = np.random.choice(['North', 'South', 'East', 'West'], size=rows)
statuses = np.random.choice(['Active', 'Inactive'], size=rows)

table1 = pd.DataFrame({
    'category': categories,
    'product': products,
    'region': regions,
    'status': statuses
})

# Generate random data for table2
categories = np.random.choice(['A', 'B', 'C', 'D'], size=rows)
products = np.random.choice(['X', 'Y', 'Z'], size=rows)
regions = np.random.choice(['North', 'South', 'East', 'West'], size=rows)
statuses = np.random.choice(['Active', 'Inactive'], size=rows)

table2 = pd.DataFrame({
    'category': categories,
    'product': products,
    'region': regions,
    'status': statuses
})

snow_df1: snowflake.snowpark.DataFrame = pd.to_snowpark(table1, index_label='index')
snow_df2: snowflake.snowpark.DataFrame = pd.to_snowpark(table2, index_label='index')

In [38]:
snow_df1.write.save_as_table("table1", table_type="temp", mode="overwrite")
snow_df2.write.save_as_table("table2", table_type="temp", mode="overwrite")



In [39]:
snow_df1.schema

StructType([StructField('"index"', LongType(), nullable=True), StructField('"category"', StringType(16777216), nullable=True), StructField('"product"', StringType(16777216), nullable=True), StructField('"region"', StringType(16777216), nullable=True), StructField('"status"', StringType(16777216), nullable=True)])

In [40]:
from time import sleep

# Join on category
join_category = snow_df1.join(snow_df2, '"category"', "inner")

# Join on product
join_product = snow_df1.join(snow_df2, '"product"', "inner")

# Join on region
join_region = snow_df1.join(snow_df2, '"region"', "inner")

# Join on status
join_status = snow_df1.join(snow_df2, '"status"', "inner")

job1 = join_category.write.save_as_table("join_category", table_type="temp", mode="overwrite", block=False)
job2 = join_product.write.save_as_table("join_product", table_type="temp", mode="overwrite", block=False)
job3 = join_region.write.save_as_table("join_region", table_type="temp", mode="overwrite", block=False)
job4 = join_status.write.save_as_table("join_status", table_type="temp", mode="overwrite", block=False)

while not job1.is_done() or not job2.is_done() or not job3.is_done() or not job4.is_done():
    print(f"Waiting for jobs to complete..\nJob 1: {job1.is_done()}\nJob 2: {job2.is_done()}\nJob 3: {job3.is_done()}\nJob 4: {job4.is_done()}")
    sleep(5)
    pass

print("All jobs completed!")


Waiting for jobs to complete..
Job 1: False
Job 2: False
Job 3: False
Job 4: False
Waiting for jobs to complete..
Job 1: False
Job 2: False
Job 3: False
Job 4: False
Waiting for jobs to complete..
Job 1: False
Job 2: False
Job 3: False
Job 4: False
Waiting for jobs to complete..
Job 1: False
Job 2: False
Job 3: False
Job 4: False
Waiting for jobs to complete..
Job 1: False
Job 2: False
Job 3: False
Job 4: False
Waiting for jobs to complete..
Job 1: False
Job 2: False
Job 3: False
Job 4: False
Waiting for jobs to complete..
Job 1: True
Job 2: False
Job 3: False
Job 4: False
Waiting for jobs to complete..
Job 1: True
Job 2: False
Job 3: False
Job 4: False
Waiting for jobs to complete..
Job 1: True
Job 2: False
Job 3: True
Job 4: False
Waiting for jobs to complete..
Job 1: True
Job 2: False
Job 3: True
Job 4: False
Waiting for jobs to complete..
Job 1: True
Job 2: True
Job 3: True
Job 4: False
Waiting for jobs to complete..
Job 1: True
Job 2: True
Job 3: True
Job 4: False
Waiting for jobs

In [41]:
def get_query_duration(session, query_id):
    query = f"""
        SELECT 
            QUERY_ID,
            START_TIME,
            END_TIME,
            EXECUTION_STATUS,
            EXECUTION_TIME / 1000 AS EXECUTION_TIME_SECONDS -- Convert milliseconds to seconds
        FROM 
            TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())
        WHERE 
            QUERY_ID = '{query_id}'
    """
    result_df = session.sql(query)
    result = result_df.collect()
    return result

print("Job 1: ", get_query_duration(session, job1.query_id))
print("Job 2: ", get_query_duration(session, job2.query_id))
print("Job 3: ", get_query_duration(session, job3.query_id))
print("Job 4: ", get_query_duration(session, job4.query_id))

Job 1:  [Row(QUERY_ID='01b4f759-0002-7f74-003c-5f07006dee22', START_TIME=datetime.datetime(2024, 6, 12, 12, 37, 56, 263000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), END_TIME=datetime.datetime(2024, 6, 12, 12, 38, 42, 125000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), EXECUTION_STATUS='SUCCESS', EXECUTION_TIME_SECONDS=Decimal('45.680000'))]
Job 2:  [Row(QUERY_ID='01b4f75a-0002-7f74-003c-5f07006dee26', START_TIME=datetime.datetime(2024, 6, 12, 12, 38, 1, 981000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), END_TIME=datetime.datetime(2024, 6, 12, 12, 39, 2, 234000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), EXECUTION_STATUS='SUCCESS', EXECUTION_TIME_SECONDS=Decimal('60.083000'))]
Job 3:  [Row(QUERY_ID='01b4f75a-0002-7f75-003c-5f07006e2a92', START_TIME=datetime.datetime(2024, 6, 12, 12, 38, 6, 778000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), END_TIME=datetime.datetime(2