# Snowflake SQL Alchemy

### Uploading the web-scaped cleaned data, after using pydantic, 'items.csv' into snowflake using Snowflake-SQLAlchemy into our created db

In [1]:
!pip install python-dotenv

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple


In [2]:
!pip install bcrypt

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple


In [3]:
!pip install snowflake-sqlalchemy

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple


In [1]:
#!/usr/bin/env python
import warnings
warnings.filterwarnings("ignore")
from dotenv import load_dotenv
from sqlalchemy import create_engine
import bcrypt
import os

### Snowflake Testing environment setup & data upload

In [2]:
create_test_stage = """CREATE STAGE TEST_URL_STAGING DIRECTORY = ( ENABLE = true );"""
drop_test_stage = """DROP STAGE if exists TEST_URL_STAGING;"""

In [19]:
create_test_table = """CREATE OR REPLACE TABLE test_cfa_courses (
        NameOfTopic STRING,
        Title STRING,
        Year STRING,
        Level STRING,
        Introduction STRING,
        LearningOutcome STRING,
        LinkToPDF STRING,
        Summary STRING
        );"""

In [4]:
import os
data_file_path = os.path.join('..', 'data', 'cfa_data.csv')
print(os.path.exists(data_file_path))
print(data_file_path)

True
../data/cfa_data.csv


In [5]:
upload_to_test_stage = f"""PUT file://{data_file_path} @PC_DBT_DB.public.TEST_URL_STAGING;"""

In [23]:
copy_stage_to_test_table = """COPY INTO test_cfa_courses
  FROM @PC_DBT_DB.public.TEST_URL_STAGING
  FILE_FORMAT = (type = csv field_optionally_enclosed_by='"' )
  PATTERN = 'pdf_data.csv.gz'
  ON_ERROR = 'CONTINUE';"""

### Snowflake Production Environment setup & data upload

In [7]:
create_prod_stage = """CREATE STAGE URL_STAGING DIRECTORY = ( ENABLE = true );"""
drop_prod_stage = """DROP STAGE if exists URL_STAGING;"""

In [18]:
create_prod_table_query = """CREATE OR REPLACE TABLE cfa_courses (
        NameOfTopic STRING,
        Title STRING,
        Year STRING,
        Level STRING,
        Introduction STRING,
        LearningOutcome STRING,
        LinkToPDF STRING,
        Summary STRING
        );"""

In [9]:
upload_to_prod_stage = f"""PUT file://{data_file_path} @PC_DBT_DB.public.URL_STAGING;"""

In [22]:
copy_stage_to_prod_table = """COPY INTO cfa_courses
  FROM @PC_DBT_DB.public.URL_STAGING
  FILE_FORMAT = (type = csv field_optionally_enclosed_by='"' )
  PATTERN = 'pdf_data.csv.gz'
  ON_ERROR = 'skip_file';"""

In [26]:
load_dotenv()

u=os.getenv("SNOWFLAKE_USER")
p=os.getenv("SNOWFLAKE_PASS")
ai=os.getenv("SNOWFLAKE_ACCOUNTID")


engine = create_engine(
    'snowflake://{user}:{password}@{account_identifier}/'.format(
        user=u,
        password=p,
        account_identifier=ai,
    )
)



try:
    connection = engine.connect()
    connection.execute("USE DATABASE PC_DBT_DB")
    connection.execute("USE WAREHOUSE COMPUTE_WH")
    print('1')
    results = connection.execute(create_test_stage) 
    results = connection.execute(create_test_table)
    results = connection.execute(upload_to_test_stage)
    results = connection.execute(copy_stage_to_test_table)
    #results = connection.execute(drop_test_stage)
    print('2')
    results = connection.execute(create_prod_stage) 
    results = connection.execute(create_prod_table_query)
    results = connection.execute(upload_to_prod_stage)
    results = connection.execute(copy_stage_to_prod_table)
    #results = connection.execute(drop_prod_stage)
    print('3')
finally:
    print("Done")
    connection.close()
    engine.dispose()

1
2
3
Done


In [51]:
# engine = create_engine(
#     'snowflake://{user}:{password}@{account_identifier}/'.format(
#         user=u,
#         password=p,
#         account_identifier=ai,
#     )
# )
# try:
#     connection = engine.connect()
#     connection.execute("USE DATABASE PC_DBT_DB")
#     connection.execute("USE WAREHOUSE COMPUTE_WH")
#     results = connection.execute(drop_test_stage)
#     results = connection.execute(drop_prod_stage)
# except NameError as e:
#     print(e)
# finally:
#     print("Done")
#     connection.close()
#     engine.dispose()

Done
