In [None]:
pip install requests pandas boto3 snowflake-connector-python


In [None]:
# extract_data.py
import requests
import pandas as pd
 
def extract_data():
    url = "https://dummyjson.com/products"
    response = requests.get(url)
    data = response.json()
 
    # Normalize nested 'products' list
    products = data.get("products", [])
    df = pd.json_normalize(products)
 
    # Save to CSV
    df.to_csv("products.csv", index=False)
    print("✅ Data extracted and saved as products.csv")
 
if __name__ == "__main__":
    extract_data()


In [None]:
# upload_to_s3.py
import boto3
import os
from botocore.exceptions import NoCredentialsError, ClientError
 
# Set AWS credentials directly in code (for Colab only — not recommended for production)
os.environ['AWS_ACCESS_KEY_ID'] = 'AKIAVFSGHNMMUKJT'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'yE1S9PJqisBFgfnc77LT8GwI1sfSWPG'
 
def upload_to_s3(file_name, bucket_name, object_name=None):
    try:
        s3 = boto3.client('s3')
 
        if object_name is None:
            object_name = file_name
 
        # Optional: Check if bucket exists
        try:
            s3.head_bucket(Bucket=bucket_name)
        except ClientError as e:
            print(f"❌ Bucket '{bucket_name}' not found or inaccessible: {e}")
            return
 
        # Upload file
        s3.upload_file(file_name, bucket_name, object_name)
        print(f"✅ '{file_name}' uploaded to S3 bucket '{bucket_name}' as '{object_name}'")
 
    except FileNotFoundError:
        print(f"❌ File '{file_name}' not found.")
    except NoCredentialsError:
        print("❌ AWS credentials not found. Check your environment variables.")
    except Exception as e:
        print(f"❌ Upload failed: {e}")
 
if __name__ == "__main__":
    upload_to_s3("products.csv", "ndataassignment")

In [None]:
# load_to_snowflake.py
import snowflake.connector
import pandas as pd
 
def load_to_snowflake():
    # Load CSV
    df = pd.read_csv("products.csv")
 
    # Ensure required columns exist
    expected_cols = ['id', 'title', 'price', 'description', 'category', 'thumbnail']
    missing = [col for col in expected_cols if col not in df.columns]
    if missing:
        raise ValueError(f"Missing expected columns in CSV: {missing}")
 
    # Connect to Snowflake
    conn = snowflake.connector.connect(
        user='nehaa067',
        password='**********@',
        account='gnhgh-hjng38'
    )
    cur = conn.cursor()
 
    # Create database and schema
    cur.execute("CREATE DATABASE IF NOT EXISTS DATA_PIPELINE_DB;")
    cur.execute("USE DATABASE DATA_PIPELINE_DB;")
    cur.execute("CREATE SCHEMA IF NOT EXISTS RAW_SCHEMA;")
 
    # Create raw table
    cur.execute("""
    CREATE OR REPLACE TABLE RAW_SCHEMA.PRODUCTS_RAW (
        id INT,
        title STRING,
        price FLOAT,
        description STRING,
        category STRING,
        thumbnail STRING
    );
    """)
 
    # Insert data
    for _, row in df.iterrows():
        cur.execute("""
            INSERT INTO RAW_SCHEMA.PRODUCTS_RAW (id, title, price, description, category, thumbnail)
            VALUES (%s, %s, %s, %s, %s, %s)
        """, (int(row['id']), row['title'], float(row['price']), row['description'], row['category'], row['thumbnail']))
 
    print("✅ Raw data loaded into Snowflake")
 
    # Create transformed view
    cur.execute("""
    CREATE OR REPLACE VIEW RAW_SCHEMA.PRODUCTS_TRANSFORMED AS
    SELECT
        category,
        COUNT(*) AS product_count,
        AVG(price) AS avg_price
    FROM RAW_SCHEMA.PRODUCTS_RAW
    GROUP BY category;
    """)
 
    print("✅ Created transformed view in Snowflake")
 
    cur.close()
    conn.close()
 
if __name__ == "__main__":
    load_to_snowflake()