# Snowflake utils example   

SnowflakeDataConnector is utility classMethods for interacting with Snowflake databases which provides a set of tools to simplify working with Snowflake databases 

**Functionality:**

- Connects to Snowflake using user or technical credentials.  
- Downloads data from Snowflake and saves it as pickle files.  
- Creates temporary tables in Snowflake.  
- Executes SQL queries and returns results as DataFrames.
- Uploads data from DataFrames to Snowflake tables.
- Delete part or clean the table
- Closes the Snowflake connection when done.

1. [Connection](#1)
2. [Download data](#2)
3. [Execute queries to df](#3)
4. [Create tmp tables](#4)
5. [Upload data](#5)
6. [Clean data from table](#6)
7. [Close connection](#7)

# 1
## Connection 

A connector is created when the class is initialized

Connectors of 2 types:  
- **by user** with normal authorization
To create a connection via user credentials it is necessary to pass only `user` account when initializing the class
- **by tech account**  
To create a connection via technical account it is necessary to pass both `tech_login` and `tech_key` when initializing the class 

In [5]:
# !pip3 install --upgrade snowflake-sqlalchemy

In [1]:
import os
from getpass import getpass

from utils.snowflake_utils import SnowflakeDataConnector

In [2]:
# With user
sf_user = '---'

### With ML tech account
tech_login = '...'
tech_key   = getpass("Enter tech_key: ")

Enter tech_key:  ········


In [1]:
# with sf_user
sf_connector = SnowflakeDataConnector(user=sf_user)

In [3]:
# Or with tech acc
sf_connector_tech = SnowflakeDataConnector(tech_key=tech_key, tech_login=tech_login)

Connected to Snowflake with Tech acccount


# 2
## Download data

In [10]:
DEPTH      = 10        # Download Depth -> make it really big for full history 
BATCH_SIZE = 3000000   # SnowFlake Batches

In [11]:
########################### Queries
###########################################################

START_DATE = '2023-10-01'
END_DATE   = '2023-10-02'

users = {
    'query': """
             SELECT 
             *
             FROM tbl
             LIMIT 1000
             """,
    'raw_folder_path': 'data/raw/users',
    'full_file_folder': 'data/',
    'file_name': 'users', 
}

subscription = {
    'query':  f'''
                select *
                from tbl1
                where True
                and TRANSACTION_TS>='{START_DATE}'
                and TRANSACTION_TS<'{START_DATE}'
                ''',
    
    'raw_folder_path': 'data/raw/subscription',
    'full_file_folder': 'data/',
    'file_name': 'subscription'
}

transaction = {
    'query':  f'''
                select *
                from tbl2
                where True
                and TRANSACTION_TS>='{START_DATE}'
                and TRANSACTION_TS<'{END_DATE}'
                ''',
    
    'raw_folder_path': 'data/raw/transaction',
    'full_file_folder': 'data/',
    'file_name': 'transaction'
}


load_list = [users, subscription, transaction]

In [12]:
########################### Download
###########################################################

for data in load_list:
    sf_connector.download_data(query=data['query'],
                               raw_folder_path=data['raw_folder_path'],
                               full_file_folder=data['full_file_folder'],
                               file_name=data['file_name'],
                               depth=DEPTH,
                               batch=BATCH_SIZE
                               # join=True, ---- DEFAULT - to join into a single file
                              )

users data loading...


 10%|████▍                                       | 1/10 [00:00<00:01,  6.09it/s]


users data loaded in data/raw/users, 1 files


100%|█████████████████████████████████████████| 10/10 [00:00<00:00, 2314.10it/s]


subscription data loading...


  0%|                                                    | 0/10 [00:00<?, ?it/s]


subscription data loaded in data/raw/subscription, 1 files


100%|███████████████████████████████████████████| 10/10 [00:00<00:00, 78.94it/s]


transaction data loading...


 10%|████▍                                       | 1/10 [00:00<00:04,  1.81it/s]


transaction data loaded in data/raw/transaction, 1 files


100%|██████████████████████████████████████████| 10/10 [00:00<00:00, 211.90it/s]


# 3
## Execute queries to df

In [4]:
df = sf_connector.execute_query_to_pandas("select * from event_tbl limit 3")
df # returns df from the query

In [9]:
query = """
        select count(*) as cnt 
        from meal_tbl
        """

df = sf_connector_tech.execute_query_to_pandas(query)
df

Unnamed: 0,CNT
0,6586


In [10]:
type(df)

pandas.core.frame.DataFrame

# 4 
## Create tmp tables

In [7]:
database  = "SANDBOX"
schema    = "ML"

idx = {
    'tmp_table_name': 'idx',
    'tmp_query': f"""
                select user_id
                from tbl1
                where TRUE
                and TRANSACTION_TS>='{START_DATE}'
                and TRANSACTION_TS<'{END_DATE}'
                """,
}

click = {
    'tmp_table_name': 'click',
    'tmp_query': f"""
                select
                    ue.user_id,
                    cast(ue.event_time as date) as event_date,
                    COUNT(CASE WHEN EVENT_TYPE like '%BlockView%' THEN 4 END) as block_view,
                    COUNT(CASE WHEN EVENT_TYPE like '%ScreenView%' THEN 4 END) as screen_view
                from simple.product.f_user_event as ue
                join {database}.{schema}.{idx['tmp_table_name']} ids 
                on ue.user_id = ids.user_id
                where ue.event_time >= '{START_DATE}'
                group by ue.user_id, event_date
                order by ue.user_id, event_date
                """,
}

tmp_queries = [idx, click]

In [8]:
for tmp_tbl in tmp_queries:
    sf_connector.create_temp_tbl(database=database,
                    schema=schema,
                    tmp_table_name=tmp_tbl['tmp_table_name'], 
                    tmp_query=tmp_tbl['tmp_query'])

In [9]:
click_df = sf_connector.execute_query_to_pandas(f"select * from {database}.{schema}.click")
click_df.head()

Unnamed: 0,USER_ID,EVENT_DATE,BLOCK_VIEW,SCREEN_VIEW
0,5c1fb4d9-ecb8-4674-a824-114323454412,2023-10-01,4,6
1,5c251826-a445-42f1-8efa-fd81df9f4c5e,2023-10-01,0,0
2,5c251826-a445-42f1-8efa-fd81df9f4c5e,2023-10-02,0,0
3,5c38f721-785f-4f76-b08d-208fa0320769,2023-10-01,25,14
4,5c38f721-785f-4f76-b08d-208fa0320769,2023-10-02,0,2


# 5
## Upload data 

Upload data using tech account

In [10]:
!pip3 show sqlalchemy

Name: SQLAlchemy
Version: 1.4.49
Summary: Database Abstraction Library
Home-page: https://www.sqlalchemy.org
Author: Mike Bayer
Author-email: mike_mp@zzzcomputing.com
License: MIT
Location: /home/ec2-user/.local/lib/python3.7/site-packages
Requires: greenlet, importlib-metadata
Required-by: snowflake-sqlalchemy


In [11]:
type(sf_connector.connector)

snowflake.connector.connection.SnowflakeConnection

In [12]:
sf_connector_tech.upload_data(df=click_df,
                              database=database, 
                              schema=schema, 
                              table_name='tmp_example', 
                              if_exists="append" # DEFAULT
                             )

Uploading....
Data uploaded to PALTA_SANDBOX.ML.tmp_example - 6958 rows


In [2]:
sf_connector_tech.execute_query_to_pandas("select * from sandbox.ml.tmp_example limit 10")

In [15]:
# Drop table

# cursor = sf_connector_tech.connector.cursor()
# cursor.execute("DROP TABLE IF EXISTS sandbox.ml.tmp_example")
# cursor.close()

True

In [3]:
sf_connector_tech.execute_query_to_pandas("select * from sandbox.ml.tmp_example")

# returns pandas df

# 6
## Clean data from table

Clean data using tech account  

- Provide the name of the Snowflake database, schema able from which you want to delete data.
- If you only want to delete specific rows from the table based on a condition (similar to a WHERE clause in SQL), you can provide this condition as a string. For example, if you want to delete rows where "PREDICTION_AT" is a specific date, you can pass it as 'PREDICTION_AT='1990-05-15''. If not provided, the function will prompt you to enter the condition interactively.
- If drop_all is True, method will clean all data in the table without any conditions, the method will require you to confirm the deletion of all data.

In [4]:
database_d = 'sandbox'
schema     = 'ml'
table_del  = 'MODEL_1'
part       = "PREDICTION_AT='2023-10-06'"

In [5]:
sf_connector_tech.delete_data(database=database_d, schema=schema, table_name=table_del, drop_condition=part)

delete part - PREDICTION_AT='2023-10-06'
done


# 7
## Close connection

Closes the Snowflake connection when done

In [13]:
sf_connector.disconnect()

Disconnected from Snowflake.


In [6]:
sf_connector_tech.disconnect()

Disconnected from Snowflake.
