In [2]:
!pip install snowflake
import os
import pandas as pd
from dotenv import load_dotenv
import snowflake.connector
import datetime

start = datetime.datetime.now()

def get_snowflake_connection(env_path=None):
    if env_path:
        load_dotenv(dotenv_path=env_path)
    else:
        load_dotenv()

    password = os.getenv("SNOWFLAKE_PASSWORD")
    account = os.getenv("SNOWFLAKE_ACCOUNT")
    user = os.getenv("SNOWFLAKE_USER")
    warehouse = os.getenv("SNOWFLAKE_WAREHOUSE")
    database = os.getenv("SNOWFLAKE_DATABASE")
    schema = os.getenv("SNOWFLAKE_SCHEMA")
    role = os.getenv("SNOWFLAKE_ROLE")

    conn = snowflake.connector.connect(
        user=user,
        password=password,
        account=account,
        warehouse=warehouse,
        database=database,
        schema=schema,
        role=role,
    )
    return conn

# === Step 1: Load environment variables from a custom path ===
dotenv_path = r"../.env"
load_dotenv(dotenv_path=dotenv_path)

# === Step 2: Define file paths ===
local_folder = "../data/raw"
cleaned_folder = "../data/cleaned_csv"
os.makedirs(cleaned_folder, exist_ok=True)

# === Step 3: Clean and lowercase column names, convert datetimes, and save as gzipped CSV ===
for filename in os.listdir(local_folder):
    if filename.endswith(".parquet"):
        original_path = os.path.join(local_folder, filename)
        cleaned_path = os.path.join(cleaned_folder, filename.replace(".parquet", ".csv.gz"))

        df = pd.read_parquet(original_path)
        df.columns = [col.lower() for col in df.columns]

        # Convert datetime columns to ISO string format
        df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime']).dt.strftime('%Y-%m-%d %H:%M:%S')
        df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime']).dt.strftime('%Y-%m-%d %H:%M:%S')

        # Extract year and month from filename
        _, year, month = filename.split('.')[0].split('_')
        df['month'] = int(month)
        df['year'] = int(year)

        # Save as gzipped CSV
        df.to_csv(cleaned_path, index=False, compression="gzip")
        print(f"✅ Cleaned and saved: {cleaned_path}")

# === Step 4: Connect to Snowflake using utility function ===
conn = get_snowflake_connection(env_path=dotenv_path)
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS NYC_DATA")
cursor.execute("CREATE DATABASE NYC_DATA")
cursor.execute("USE DATABASE NYC_DATA")
cursor.execute("CREATE SCHEMA IF NOT EXISTS PUBLIC")
cursor.execute("USE SCHEMA PUBLIC")

# === Step 5: Create table (updated to match cleaned CSV schema) ===
cursor.execute(
    """
CREATE OR REPLACE TABLE yellow_taxi_data_raw (
    vendorid INTEGER,
    tpep_pickup_datetime TIMESTAMP,
    tpep_dropoff_datetime TIMESTAMP,
    passenger_count FLOAT,
    trip_distance FLOAT,
    ratecodeid FLOAT,
    store_and_fwd_flag STRING,
    pulocationid INTEGER,
    dolocationid INTEGER,
    payment_type INTEGER,
    fare_amount FLOAT,
    extra FLOAT,
    mta_tax FLOAT,
    tip_amount FLOAT,
    tolls_amount FLOAT,
    improvement_surcharge FLOAT,
    total_amount FLOAT,
    congestion_surcharge FLOAT,
    airport_fee FLOAT,
    month INTEGER,
    year INTEGER
)
"""
)

# === Step 6: Create file format, stage, and upload cleaned gzipped CSV files ===
cursor.execute("""
    CREATE OR REPLACE FILE FORMAT nyc_csv_format
    TYPE = 'CSV'
    FIELD_OPTIONALLY_ENCLOSED_BY = '"'
    PARSE_HEADER = TRUE
""")
cursor.execute("CREATE OR REPLACE STAGE nyc_taxi_stage FILE_FORMAT = nyc_csv_format")
cursor.execute("REMOVE @nyc_taxi_stage")
print("🧹 Cleared stage: nyc_taxi_stage")

for filename in os.listdir(cleaned_folder):
    if filename.endswith(".csv.gz"):
        full_path = os.path.abspath(os.path.join(cleaned_folder, filename))
        put_command = f"PUT file://{full_path} @nyc_taxi_stage OVERWRITE = TRUE"
        print(f"⬆️ Uploading: {filename}")
        cursor.execute(put_command)

# === Step 7: Load data into the table from stage ===
cursor.execute(
    """
COPY INTO yellow_taxi_data_raw
FROM @nyc_taxi_stage
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
"""
)

print("✅ All data loaded into Snowflake!")

# === Step 8: Close the connection ===
cursor.close()
conn.close()

end = datetime.datetime.now()
duration = end - start
minutes = duration.total_seconds() / 60

print("Duration:", duration)
print("Duration in minutes:", minutes)


Collecting snowflake
  Downloading snowflake-1.4.0-py3-none-any.whl.metadata (2.0 kB)
Collecting snowflake-core==1.4.0 (from snowflake)
  Downloading snowflake_core-1.4.0-py3-none-any.whl.metadata (2.0 kB)
Collecting snowflake-legacy (from snowflake)
  Downloading snowflake_legacy-1.0.0-py3-none-any.whl.metadata (2.5 kB)
Collecting snowflake-connector-python (from snowflake-core==1.4.0->snowflake)
  Downloading snowflake_connector_python-3.15.0-cp311-cp311-macosx_11_0_arm64.whl.metadata (70 kB)
Collecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python->snowflake-core==1.4.0->snowflake)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting pyOpenSSL<26.0.0,>=22.0.0 (from snowflake-connector-python->snowflake-core==1.4.0->snowflake)
  Downloading pyOpenSSL-25.0.0-py3-none-any.whl.metadata (16 kB)
Collecting tomlkit (from snowflake-connector-python->snowflake-core==1.4.0->snowflake)
  Downloading tomlkit-0.13.2-py3-none-any.whl.metadata (2.7 kB)
Downlo