
#  ETL Pipeline using Python

This notebook demonstrates how to build a simple **ETL pipeline** (Extract, Transform, Load) using:
- `requests` to extract data from a public API
- `pandas` to transform and clean data
- `sqlite3` to load data into a local SQLite database

---

###  What is ETL?
ETL stands for:
- **Extract**: Retrieve raw data from sources like APIs or files
- **Transform**: Clean, enrich, and format the data
- **Load**: Store the final data into databases or data warehouses
"""



##  ETL Workflow Overview

```plaintext
           +------------+         +------------+         +------------+
           |            |         |            |         |            |
           |   Extract  +-------->+ Transform  +-------->+    Load    |
           | (API Call) |         | (Clean-up) |         | (SQLite DB)|
           +------------+         +------------+         +------------+


# ETL Pipeline for Product Pricing Analysis
Problem Statement:
You are tasked with designing a basic ETL (Extract, Transform, Load) pipeline that pulls product data from a public API, processes the pricing information for analysis, and stores the result into a local SQLite database.

Your goal is to implement the following:

## Tasks:
### Step 1: Extraction
Write a function extract_data() that:

Sends a GET request to the public API endpoint: https://dummyjson.com/products.

Extracts the list of products from the response.

Handles potential HTTP and JSON parsing errors gracefully.

Returns the product list (as JSON/dict).

### Step 2: Transformation
Write a function transform_data(data) that:

Converts the raw product list into a pandas DataFrame.

Rounds the price to 2 decimal places.

Calculates the price_difference of each product from a baseline price of $20.0.

Keeps only these fields: id, title, price, category, price_difference.

### Step 3: Loading
Write a function load_data(df) that:

Loads the transformed DataFrame into a SQLite database (pricing_data.db).

Saves the data into a table called products.

Overwrites the table if it already exists.

### Step 4: Verification
Write a function count_records() that:

Connects to the database and prints the total number of records in the products table.

### Step 5: Preview
Write a function preview_data() that:

Retrieves the first 5 records from the products table and returns them as a DataFrame.

In [None]:

# Step 1: Install necessary libraries
!pip install -q pandas requests


In [None]:
# Step 2: Import Libraries
import requests
import pandas as pd
import sqlite3


In [None]:
# Step 1: Extract Pricing Data from Public API
def extract_data():
    url = "https://dummyjson.com/products"   # Public API providing product pricing data
    response = requests.get(url)
    

    print(f"Status Code: {response.status_code}")
    if response.status_code == 200:
        try:
            
            json_data = response.json()
            products = json_data.get("products", [])
            print(f"Extracted {len(products)} products")
            return products
        except ValueError:
            print("JSON Decode Error")
            return None
    else:
        print(f"Failed to retrieve data. Status Code: {response.status_code}")
        return None


In [None]:
# Step 2: Transform Data for Pricing Optimization
def transform_data(data):
    df = pd.DataFrame(data)

 

    # Transformations
    df['price'] = df['price'].round(2)  # Round the prices to two decimal places
    baseline_price = 20.0
    df['price_difference'] = df['price'] - baseline_price

    # Keep relevant columns
    df = df[['id', 'title', 'price', 'category', 'price_difference']]

    # Print transformed data for debugging
    print(f"Transformed Data:\n{df.head()}")  # Print the first 5 rows of the transformed data

    return df


In [None]:
def load_data(df):
    # Connect to SQLite database
    conn = sqlite3.connect('pricing_data.db')

    # Save the data â€” this will create the table if it doesn't exist
    df.to_sql('products', conn, if_exists='replace', index=False)  # 'replace' instead of 'append'

    # Close the connection
    conn.commit()
    conn.close()

    print(f"Loaded {len(df)} records into the 'products' table.")


In [None]:
# Check the number of records in the database
def count_records():
    # Connect to SQLite database
    conn = sqlite3.connect('pricing_data.db')
    cursor = conn.cursor()

    # Query to count records
    cursor.execute("SELECT COUNT(*) FROM products")
    count = cursor.fetchone()[0]
    conn.close()

    print(f"Number of records in the database: {count}")



In [None]:
# Step 7: Run the ETL Pipeline
data = extract_data()

if data:
    transformed_data = transform_data(data)
    load_data(transformed_data)
    count_records()
else:
    print("ETL Pipeline terminated due to extraction error.")


In [None]:
# Preview data directly from SQLite
def preview_data():
    conn = sqlite3.connect('pricing_data.db')
    df = pd.read_sql_query("SELECT * FROM products LIMIT 5", conn)
    conn.close()
    return df

preview_data()


Coding question link:
    
    https://django.newtonschool.co/admin/assignments/assignmentquestion/19673/change/
    https://django.newtonschool.co/admin/assignments/assignmentquestion/19732/change/
    https://django.newtonschool.co/admin/assignments/assignmentquestion/19737/change/