In [1]:
pip install -r requirements.txt











Note: you may need to restart the kernel to use updated packages.


##### Import necessary libraries and load environment variables

In [2]:

import os
import snowflake.connector
import boto3
from sqlalchemy import create_engine, MetaData, Table, Column, VARCHAR, TIMESTAMP, INTEGER
from dotenv import load_dotenv
from snowflake.sqlalchemy import URL
from sqlalchemy.types import String, DateTime, Integer

load_dotenv()


True

##### Retrieve Snowflake credentials from environment variables

In [3]:

snowflake_user = os.getenv('SNOWFLAKE_USER')
snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')
snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT')
warehouse = 'SF_WH_CASE1'
database = 'SF_DB_CASE1'
schema = 'SF_CASE1'
role = 'SYSADMIN'  # Replace with your full access role


##### Retrieve S3 credentials from environment variables

In [4]:

aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')


##### Connect to Snowflake and create a cursor object

In [5]:

conn = snowflake.connector.connect(
    user=snowflake_user,
    password=snowflake_password,
    account=snowflake_account,
    warehouse=warehouse,
    database=database,
    schema=schema,
    role=role,
)
cur = conn.cursor()


##### Execute SQL commands for setup

In [6]:

try:
    cur.execute("CREATE DATABASE IF NOT EXISTS SF_DB_CASE1")
    cur.execute("USE DATABASE SF_DB_CASE1")
    cur.execute("""
        CREATE WAREHOUSE IF NOT EXISTS SF_WH_CASE1
        WITH WAREHOUSE_SIZE = 'MEDIUM'
        WAREHOUSE_TYPE = 'STANDARD'
        AUTO_SUSPEND = 300
        AUTO_RESUME = TRUE;
    """)
    cur.execute("CREATE SCHEMA IF NOT EXISTS SF_CASE1")
    cur.execute("USE SCHEMA SF_CASE1")
    cur.execute("""
        CREATE OR REPLACE TABLE METADATA (
            OBJECT_NAME VARCHAR(255),
            LAST_MODIFIED TIMESTAMP,
            SIZE_BYTES NUMBER,
            LINK_TO_TXT_FILE VARCHAR(1000),
            DIRECTORY VARCHAR(1000)
        );
    """)
    print("Snowflake setup completed successfully.")
except Exception as e:
    print(e)
finally:
    # Always close the cursor
    cur.close()


Snowflake setup completed successfully.


##### Initialize S3 client

In [7]:

s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)


#### UPLOAD Files to S3

##### Function to Upload

In [8]:
def upload_file_to_s3(bucket_name, folder_name, file_path):
    """
    Uploads a file to an Amazon S3 bucket.

    Parameters:
        bucket_name (str): The name of the S3 bucket.
        folder_name (str): The name of the folder in the bucket.
        file_path (str): The local file path.

    Returns:
        None
    """
    # Create an S3 client
    s3 = boto3.client('s3')

    s3 = boto3.resource(
        service_name='s3',
        region_name='us-east-2',
        aws_access_key_id= os.getenv('AWS_ACCESS_KEY_ID'),
        aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
    )

    # Read the file content
    with open(file_path, 'rb') as file:
        file_content = file.read()

    # Upload the file to S3
    file_key = f"{folder_name}/{file_path.split('/')[-1]}"
    #s3.put_object(Bucket=bucket_name, Key=file_key, Body=file_content)
    s3.Bucket(bucket_name).put_object(Key= file_key, Body=file_content)

##### Uploading Files

In [9]:
# Replace these with your actual values
bucket_name = 'bigdata-pypdf'  # S3 bucket name
local_folder_paths = {
    'PyPDF': '../PDF_Extraction/PyPDF',  # Local folder path for 'pypdf'
    'GORBID': '../PDF_Extraction/GROBID/txt',  # Local folder path for 'gorbid'
    'CSV': '../Webscrape/CSV'
}

# Iterate through local folder paths
for folder_name, folder_path in local_folder_paths.items():
    # Iterate through files in the folder
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)
        # Upload each file to S3
        upload_file_to_s3(bucket_name, folder_name, file_path)

print("Files uploaded successfully.")  # Print success message

Files uploaded successfully.


##### Specify S3 bucket and prefix and list objects

In [10]:
# Assuming s3 client setup is correct in Cell 7

# Cell 10 & 12 Integrated Approach:
engine = create_engine(URL(
    account=snowflake_account,
    user=snowflake_user,
    password=snowflake_password,
    database=database,
    schema=schema,
    warehouse=warehouse,
    role=role,
))

metadata = MetaData()

# Define the metadata table structure if not already defined
metadata_table = Table(
    'METADATA', metadata,
    Column('OBJECT_NAME', String(255)),
    Column('LAST_MODIFIED', DateTime),
    Column('SIZE_BYTES', Integer),
    Column('LINK_TO_TXT_FILE', String(1000)),
    Column('DIRECTORY', String(100))  # To identify the file's source directory
)

# Create the metadata table if it doesn't exist
metadata.create_all(engine, checkfirst=True)

# Directories to process
directories = ['PyPDF', 'GORBID', 'CSV']

try:
    with engine.connect() as connection:
        for dir_prefix in directories:
            # List objects for each directory
            response = s3.list_objects_v2(Bucket=bucket_name, Prefix=f"{dir_prefix}/")
            for obj in response.get('Contents', []):
                # Extract file metadata
                object_name = obj['Key']
                last_modified = obj['LastModified']
                size_bytes = obj['Size']
                link_to_txt_file = f"https://{bucket_name}.s3.amazonaws.com/{object_name}"
                # Insert metadata into Snowflake
                connection.execute(metadata_table.insert().values(
                    OBJECT_NAME=object_name,
                    LAST_MODIFIED=last_modified,
                    SIZE_BYTES=size_bytes,
                    LINK_TO_TXT_FILE=link_to_txt_file,
                    DIRECTORY=dir_prefix  # Mark the source directory
                ))
    print("Metadata transfer to Snowflake completed successfully for all directories.")
except Exception as e:
    print(f"An error occurred: {e}")


Metadata transfer to Snowflake completed successfully for all directories.


In [11]:

# bucket_name = 'bigdata-pypdf'
# prefix = 'PyPDF/'
# response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)


In [12]:
# engine = create_engine(URL(
#     account = snowflake_account,
#     user = snowflake_user,
#     password = snowflake_password,
#     database = database,
#     schema = schema,
#     warehouse = warehouse,
#     role = role,
# ))

# metadata = MetaData()


# metadata_table = Table(
#     'METADATA', metadata,
#     Column('OBJECT_NAME', String(255)),
#     Column('LAST_MODIFIED', DateTime),
#     Column('SIZE_BYTES', Integer),
#     Column('LINK_TO_TXT_FILE', String(1000))
# )

# metadata.create_all(engine, checkfirst=True)

##### Metadata transfer to Snowflake

In [13]:

# try:
#     with engine.connect() as connection:
#         for obj in response.get('Contents', []):
#             object_name = obj['Key']
#             last_modified = obj['LastModified']
#             size_bytes = obj['Size']
#             link_to_txt_file = f"https://{bucket_name}.s3.amazonaws.com/{object_name}"
#             connection.execute(metadata_table.insert().values(
#                 OBJECT_NAME=object_name,
#                 LAST_MODIFIED=last_modified,
#                 SIZE_BYTES=size_bytes,
#                 LINK_TO_TXT_FILE=link_to_txt_file
#             ))
#     print("Metadata transfer to Snowflake completed successfully.")
# except Exception as e:
#     print(e)


In [14]:

conn.close()
