In [18]:
pip install -r requirements.txt

Note: you may need to restart the kernel to use updated packages.


##### Import necessary libraries and load environment variables

In [27]:

import os
import snowflake.connector
import boto3
from sqlalchemy import create_engine, MetaData, Table, Column, VARCHAR, TIMESTAMP, INTEGER
from dotenv import load_dotenv
from snowflake.sqlalchemy import URL
from sqlalchemy.types import String, DateTime, Integer

load_dotenv()


True

##### Retrieve Snowflake credentials from environment variables

In [28]:

snowflake_user = os.getenv('SNOWFLAKE_USER')
snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')
snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT')
warehouse = 'SF_WH_CASE1'
database = 'SF_DB_CASE1'
schema = 'SF_CASE1'
role = 'SYSADMIN'  # Replace with your full access role


##### Retrieve S3 credentials from environment variables

In [21]:

aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')


##### Connect to Snowflake and create a cursor object

In [22]:

conn = snowflake.connector.connect(
    user=snowflake_user,
    password=snowflake_password,
    account=snowflake_account,
    warehouse=warehouse,
    database=database,
    schema=schema,
    role=role,
)
cur = conn.cursor()


##### Execute SQL commands for setup

In [23]:

try:
    cur.execute("CREATE DATABASE IF NOT EXISTS SF_DB_CASE1")
    cur.execute("USE DATABASE SF_DB_CASE1")
    cur.execute("""
        CREATE WAREHOUSE IF NOT EXISTS SF_WH_CASE1
        WITH WAREHOUSE_SIZE = 'MEDIUM'
        WAREHOUSE_TYPE = 'STANDARD'
        AUTO_SUSPEND = 300
        AUTO_RESUME = TRUE;
    """)
    cur.execute("CREATE SCHEMA IF NOT EXISTS SF_CASE1")
    cur.execute("USE SCHEMA SF_CASE1")
    cur.execute("""
        CREATE OR REPLACE TABLE METADATA (
            OBJECT_NAME VARCHAR(255),
            LAST_MODIFIED TIMESTAMP,
            SIZE_BYTES NUMBER,
            LINK_TO_TXT_FILE VARCHAR(1000)
        );
    """)
    print("Snowflake setup completed successfully.")
except Exception as e:
    print(e)
finally:
    # Always close the cursor
    cur.close()


Snowflake setup completed successfully.


##### Initialize S3 client

In [24]:

s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)


##### Specify S3 bucket and prefix and list objects

In [25]:

bucket_name = 'bigdata-pypdf'
prefix = 'PyPDF/'
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)


In [29]:
engine = create_engine(URL(
    account = snowflake_account,
    user = snowflake_user,
    password = snowflake_password,
    database = database,
    schema = schema,
    warehouse = warehouse,
    role = role,
))

metadata = MetaData()


metadata_table = Table(
    'METADATA', metadata,
    Column('OBJECT_NAME', String(255)),
    Column('LAST_MODIFIED', DateTime),
    Column('SIZE_BYTES', Integer),
    Column('LINK_TO_TXT_FILE', String(1000))
)

metadata.create_all(engine, checkfirst=True)

##### Metadata transfer to Snowflake

In [30]:

try:
    with engine.connect() as connection:
        for obj in response.get('Contents', []):
            object_name = obj['Key']
            last_modified = obj['LastModified']
            size_bytes = obj['Size']
            link_to_txt_file = f"https://{bucket_name}.s3.amazonaws.com/{object_name}"
            connection.execute(metadata_table.insert().values(
                OBJECT_NAME=object_name,
                LAST_MODIFIED=last_modified,
                SIZE_BYTES=size_bytes,
                LINK_TO_TXT_FILE=link_to_txt_file
            ))
    print("Metadata transfer to Snowflake completed successfully.")
except Exception as e:
    print(e)


Metadata transfer to Snowflake completed successfully.


In [31]:

conn.close()
