# Snowflake Data Loader - Step by Step Tutorial

This notebook breaks down the `load_to_snowflake_pandas.py` script into executable steps.
You can run each cell individually to understand how the code works.


## Step 1: Import Required Libraries


In [None]:
# IMPORTANT: Set working directory to project root
# This ensures all file paths work correctly regardless of where notebook is opened from
import os
from pathlib import Path

# Get the directory where this notebook is located
notebook_dir = Path().resolve()
# If notebook is in project root, use it; otherwise try to find project root
if (notebook_dir / "aws_config.json").exists():
    project_root = str(notebook_dir)
else:
    # Fallback to absolute path
    project_root = "/Users/karthikdhina/PycharmProjects/learnpython"

os.chdir(project_root)
print(f"✓ Working directory set to: {os.getcwd()}")

# Verify config files exist
config_files = ["aws_config.json", "snowflake_config.json"]
print("\nChecking for config files:")
for config_file in config_files:
    full_path = os.path.join(project_root, config_file)
    if os.path.exists(full_path):
        print(f"  ✓ Found: {config_file}")
    else:
        print(f"  ✗ Missing: {config_file} (looking in: {full_path})")


In [None]:
import json
import os
import sys
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional
import numpy as np
import pandas as pd
import snowflake.connector as snowflake
from tqdm import tqdm
import boto3
from botocore.exceptions import ClientError, NoCredentialsError

print("✓ All libraries imported successfully!")


## Step 2: Load AWS Configuration

This function loads AWS credentials and S3 bucket information from `aws_config.json`


In [None]:
def load_aws_config(config_path="aws_config.json"):
    """
    Loads AWS configuration from a JSON file
    
    Args:
        config_path (str): Path to the AWS config JSON file
        
    Returns:
        dict: AWS configuration dictionary, None if failed
    """
    config_file = Path(config_path)
    
    if not config_file.exists():
        print(f"✗ Error: AWS config file '{config_path}' not found.")
        return None
    
    try:
        with open(config_file, 'r', encoding='utf-8') as f:
            config = json.load(f)
        
        aws_config = config.get('aws', {})
        required_fields = ['access_key_id', 'secret_access_key', 'bucket_name']
        
        if not all(field in aws_config and aws_config[field] for field in required_fields):
            print(f"✗ Error: Missing required fields in AWS config. Required: {required_fields}")
            return None
        
        print(f"✓ Loaded AWS configuration from: {config_path}")
        return aws_config
        
    except json.JSONDecodeError as e:
        print(f"✗ Error: Invalid JSON in AWS config file: {e}")
        return None
    except Exception as e:
        print(f"✗ Error loading AWS config: {e}")
        return None

# Test loading AWS config
aws_config = load_aws_config()
if aws_config:
    print(f"Bucket: {aws_config.get('bucket_name')}")
    print(f"Region: {aws_config.get('region', 'us-east-1')}")
    print(f"S3 Prefix: {aws_config.get('s3_prefix', '')}")


NameError: name 'Path' is not defined

## Step 3: Initialize S3 Client and List JSON Files

Create a boto3 S3 client and list all JSON files in the bucket


In [None]:
def initialize_s3_client(aws_config: Dict):
    """Initialize S3 client with AWS credentials"""
    try:
        region = aws_config.get('region', 'us-east-1')
        s3_client = boto3.client(
            's3',
            aws_access_key_id=aws_config['access_key_id'],
            aws_secret_access_key=aws_config['secret_access_key'],
            region_name=region
        )
        print(f"✓ Initialized S3 client for region: {region}")
        return s3_client
    except Exception as e:
        print(f"✗ Error initializing S3 client: {e}")
        raise

def list_s3_json_files(s3_client, aws_config: Dict) -> List[Dict]:
    """List all JSON files in S3 bucket"""
    bucket_name = aws_config['bucket_name']
    s3_prefix = aws_config.get('s3_prefix', '').strip()
    
    json_files = []
    continuation_token = None
    
    try:
        while True:
            list_kwargs = {'Bucket': bucket_name, 'Prefix': s3_prefix}
            if continuation_token:
                list_kwargs['ContinuationToken'] = continuation_token
            
            response = s3_client.list_objects_v2(**list_kwargs)
            
            if 'Contents' not in response:
                break
            
            for obj in response['Contents']:
                key = obj['Key']
                if key.lower().endswith('.json'):
                    json_files.append({
                        'Key': key,
                        'Size': obj['Size'],
                        'LastModified': obj.get('LastModified')
                    })
            
            if not response.get('IsTruncated', False):
                break
            continuation_token = response.get('NextContinuationToken')
        
        return json_files
    except Exception as e:
        print(f"✗ Error listing S3 files: {e}")
        return []

# Initialize S3 and list files
if aws_config:
    s3_client = initialize_s3_client(aws_config)
    json_files = list_s3_json_files(s3_client, aws_config)
    print(f"\n✓ Found {len(json_files)} JSON file(s) in S3")
    for i, file_info in enumerate(json_files[:5], 1):  # Show first 5
        print(f"  {i}. {file_info['Key']} ({file_info['Size']} bytes)")


## Step 4: Read JSON Files from S3 and Create DataFrame

Read all JSON files from S3 and convert them into a pandas DataFrame


In [None]:
def read_json_from_s3(s3_client, s3_key: str, aws_config: Dict) -> Optional[Dict]:
    """Read and parse a JSON file from S3"""
    bucket_name = aws_config['bucket_name']
    try:
        response = s3_client.get_object(Bucket=bucket_name, Key=s3_key)
        content = response['Body'].read().decode('utf-8')
        return json.loads(content)
    except Exception as e:
        print(f"✗ Error reading S3 object '{s3_key}': {e}")
        return None

def read_json_files_pandas(s3_client, json_files: List[Dict], aws_config: Dict) -> pd.DataFrame:
    """Read all JSON files from S3 using pandas"""
    if not json_files:
        return pd.DataFrame()
    
    print(f"Processing {len(json_files)} JSON file(s)...")
    all_dataframes = []
    
    for s3_file in tqdm(json_files, desc="Reading files", unit="files"):
        s3_key = s3_file['Key']
        try:
            data = read_json_from_s3(s3_client, s3_key, aws_config)
            if data is None:
                continue
            
            # Normalize nested JSON structure using pandas
            normalized = pd.json_normalize(data)
            normalized['filename'] = Path(s3_key).name
            normalized['filepath'] = s3_key
            normalized['raw_json'] = json.dumps(data)
            
            all_dataframes.append(normalized)
        except Exception as e:
            print(f"\n✗ Error processing '{s3_key}': {e}")
    
    if not all_dataframes:
        return pd.DataFrame()
    
    df = pd.concat(all_dataframes, ignore_index=True)
    print(f"\n✓ Total records loaded: {len(df)}")
    print(f"  Memory usage: {df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
    return df

# Read all JSON files into DataFrame
if json_files and 's3_client' in locals():
    df = read_json_files_pandas(s3_client, json_files, aws_config)
    print(f"\nDataFrame shape: {df.shape}")
    print(f"Column names (first 10): {list(df.columns)[:10]}")
    if not df.empty:
        print(f"\nSample data:")
        display(df[['filename', 'name', 'main.temp']].head() if 'name' in df.columns else df.head())


## Step 5: Normalize DataFrame for RAW Table

Prepare the DataFrame for the WEATHER_DATA_RAW table (VARIANT column)


In [None]:
def normalize_dataframe_for_raw_table(df: pd.DataFrame) -> pd.DataFrame:
    """Prepare DataFrame for WEATHER_DATA_RAW table (VARIANT)"""
    if df.empty:
        return pd.DataFrame()
    
    result_df = pd.DataFrame()
    result_df['CITY_NAME'] = df.get('name', pd.Series(['Unknown'] * len(df))).fillna('Unknown')
    result_df['CITY_ID'] = pd.to_numeric(df.get('id', 0), errors='coerce').fillna(0).astype(np.int64)
    result_df['COUNTRY_CODE'] = df.get('sys.country', pd.Series([''] * len(df))).fillna('')
    
    if 'raw_json' in df.columns:
        result_df['WEATHER_JSON'] = df['raw_json']
    else:
        result_df['WEATHER_JSON'] = df.apply(lambda row: json.dumps({
            'coord': row.get('coord', {}),
            'weather': row.get('weather', []),
            'main': row.get('main', {}),
            'wind': row.get('wind', {}),
            'sys': row.get('sys', {}),
            'name': row.get('name', ''),
            'id': row.get('id', 0)
        }), axis=1)
    
    return result_df

# Normalize for RAW table
if 'df' in locals() and not df.empty:
    df_raw = normalize_dataframe_for_raw_table(df)
    print(f"✓ Normalized DataFrame for RAW table")
    print(f"Shape: {df_raw.shape}")
    print(f"Columns: {list(df_raw.columns)}")
    display(df_raw.head())


## Step 6: Connect to Snowflake

Load Snowflake credentials and establish connection


In [None]:
def load_snowflake_config(config_path="snowflake_config.json"):
    """Load Snowflake configuration from file"""
    config_file = Path(config_path)
    if not config_file.exists():
        return None
    
    try:
        with open(config_file, 'r', encoding='utf-8') as f:
            config = json.load(f)
        snowflake_config = config.get('snowflake', {})
        required_fields = ['account', 'user', 'password', 'warehouse', 'database', 'schema']
        
        if all(field in snowflake_config and snowflake_config[field] for field in required_fields):
            print(f"✓ Found Snowflake credentials in config file: {config_path}")
            return {
                'account': snowflake_config['account'],
                'user': snowflake_config['user'],
                'password': snowflake_config['password'],
                'warehouse': snowflake_config['warehouse'],
                'database': snowflake_config['database'],
                'schema': snowflake_config['schema']
            }
        return None
    except Exception as e:
        print(f"✗ Error reading config: {e}")
        return None

def connect_to_snowflake(credentials: Dict):
    """Connect to Snowflake with account format variations"""
    account = credentials['account']
    account_locator = account.split('-')[0] if '-' in account else account
    
    account_variations = [
        account_locator,
        f"{account_locator}.us-east-1",
        f"{account_locator}.us-west-2",
        account,
    ]
    
    for acc in account_variations:
        try:
            print(f"Trying to connect with account: {acc}...")
            conn = snowflake.connect(
                account=acc,
                user=credentials['user'],
                password=credentials['password'],
                warehouse=credentials['warehouse'],
                database=credentials['database'],
                schema=credentials['schema']
            )
            cursor = conn.cursor()
            print("✓ Connected to Snowflake successfully!")
            return conn, cursor
        except Exception:
            continue
    
    return None, None

# Load Snowflake config and connect
snowflake_credentials = load_snowflake_config()
if snowflake_credentials:
    conn, cursor = connect_to_snowflake(snowflake_credentials)
    if conn:
        print(f"Connection established!")


## Step 7: Load Data to RAW Table (VARIANT)

Insert data into WEATHER_DATA_RAW table with JSON in VARIANT column

**Note:** Uncomment the code below to actually insert data. This is commented to prevent accidental inserts during learning.


In [None]:
def load_dataframe_to_raw_table(cursor, conn, df: pd.DataFrame) -> bool:
    """Load DataFrame to WEATHER_DATA_RAW table"""
    if df.empty:
        print("⚠ No data to load.")
        return False
    
    try:
        print(f"\nLoading {len(df):,} record(s) into WEATHER_DATA_RAW table...")
        
        # Check existing count
        cursor.execute("SELECT COUNT(*) FROM WEATHER_DATA_RAW")
        existing_count = cursor.fetchone()[0]
        print(f"   Current records in table: {existing_count:,}")
        
        # Process records one by one
        for idx, (_, row) in enumerate(df.iterrows(), 1):
            try:
                city_name = str(row['CITY_NAME']).replace("'", "''")
                city_id = int(row['CITY_ID'])
                country_code = str(row['COUNTRY_CODE']).replace("'", "''")
                weather_json = str(row['WEATHER_JSON'])
                # Escape: single quotes -> double single quotes, backslashes -> double backslashes
                weather_json_escaped = weather_json.replace("'", "''").replace("\\", "\\\\")
                
                # Use INSERT with SELECT (more reliable for JSON parsing)
                insert_sql = f"""
                    INSERT INTO WEATHER_DATA_RAW (CITY_NAME, CITY_ID, COUNTRY_CODE, WEATHER_JSON)
                    SELECT '{city_name}', {city_id}, '{country_code}', PARSE_JSON('{weather_json_escaped}')
                """
                cursor.execute(insert_sql)
                
                if idx % 5 == 0:
                    print(f"   Progress: {idx}/{len(df)} records processed...")
            except Exception as e:
                print(f"\n✗ Error inserting record {idx}: {e}")
                continue
        
        conn.commit()
        print(f"✓ Successfully loaded {len(df):,} record(s) into WEATHER_DATA_RAW")
        return True
    except Exception as e:
        print(f"✗ Error loading data: {e}")
        conn.rollback()
        return False

# Load to RAW table (UNCOMMENT TO EXECUTE)
# if 'df_raw' in locals() and 'cursor' in locals() and 'conn' in locals():
#     load_dataframe_to_raw_table(cursor, conn, df_raw)


## Summary

This notebook breaks down the data loading process into these steps:

1. **Import libraries** - All required Python packages
2. **Load AWS config** - Read S3 credentials and bucket info
3. **Initialize S3 client** - Create boto3 S3 client and list files
4. **Read JSON from S3** - Download and parse JSON files into DataFrame
5. **Normalize for RAW table** - Prepare data for VARIANT column
6. **Connect to Snowflake** - Load credentials and establish connection
7. **Load to RAW table** - Insert data with JSON in VARIANT column

You can execute each cell step by step to understand how the code works!

**Tips:**
- Run cells in order (top to bottom)
- Check the output of each cell before proceeding
- Uncomment the last cell only when you're ready to insert data
- Use `display()` or `print()` to inspect DataFrames at each step
