<a href="https://colab.research.google.com/github/saerarawas/AAI_634O_A11_202520/blob/main/week3/Hands_on_Lab_Implementing_the_ETL_Process.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Hands-on Lab: Implementing the ETL (Extract, Transform, Load) Process**

**Objective:**

In this hands-on lab, students will learn how to implement the fundamental steps of the ETL process by extracting data from multiple sources, transforming the data, and loading it into a database. Students will use Python along with libraries such as Pandas for data transformation and PyMongo for loading the data into a MongoDB database.

By the end of this lab, students will be able to:

* Extract data from different sources (CSV and API).
* Clean, transform, and validate the data.
* Load the transformed data into MongoDB.
* Automate the ETL process by building a reusable pipeline.

**Pre-requisites:**

* Basic knowledge of Python.
* MongoDB Atlas account (or a local MongoDB instance).
* Install the required Python libraries:



**In this Lab:**

You are tasked with creating an ETL pipeline for a fictitious retail company. You will extract product and sales data from different sources (a CSV file and a REST API), transform the data by cleaning and standardizing it, and load the transformed data into MongoDB for further analysis.

**Step 1: Extract Data**

**1.1. Extract Product Data from a CSV File**

Create a CSV file named ***products.csv*** with the following data:

product_id,product_name,category,price

1001,Laptop,Electronics,1200

1002,Smartphone,Electronics,800

1003,Chair,Furniture,150

In [1]:
import pandas as pd

# Define the data
data = {
    'product_id': [1001, 1002, 1003],
    'product_name': ['Laptop', 'Smartphone', 'Chair'],
    'category': ['Electronics', 'Electronics', 'Furniture'],
    'price': [1200, 800, 150]
}

# Create a DataFrame
df = pd.DataFrame(data)

# Save the DataFrame to a CSV file
df.to_csv('products.csv', index=False)

print('products.csv file has been created.')


products.csv file has been created.


Use Python and Pandas to extract the product data from this CSV file.

In [2]:
import pandas as pd

# Extract data from the CSV file
products_df = pd.read_csv('products.csv')
print("Extracted Product Data:")
print(products_df)


Extracted Product Data:
   product_id product_name     category  price
0        1001       Laptop  Electronics   1200
1        1002   Smartphone  Electronics    800
2        1003        Chair    Furniture    150


**1.2. Extract Sales Data from a REST API**

For the sales data, we will simulate an API response using a dictionary. In a real-world scenario, you would use the requests library to fetch data from an API.

In [3]:
import requests

# Simulated API response (in a real scenario, use requests.get(URL).json())
sales_data = [
    {"sale_id": "S001", "product_id": "1001", "quantity": 2, "total": 2400},
    {"sale_id": "S002", "product_id": "1002", "quantity": 1, "total": 800},
    {"sale_id": "S003", "product_id": "1003", "quantity": 4, "total": 600}
]

print("Extracted Sales Data:")
print(sales_data)


Extracted Sales Data:
[{'sale_id': 'S001', 'product_id': '1001', 'quantity': 2, 'total': 2400}, {'sale_id': 'S002', 'product_id': '1002', 'quantity': 1, 'total': 800}, {'sale_id': 'S003', 'product_id': '1003', 'quantity': 4, 'total': 600}]


**Step 2: Transform Data**

**2.1. Clean and Standardize the Product Data**

Use Pandas to clean and transform the product data. For this example, let's assume you need to ensure the price field is numeric and filter out products that are too expensive.

In [8]:
import pandas as pd

# Load the CSV file into a DataFrame
df = pd.read_csv('products.csv')

# Ensure the price field is numeric
df['price'] = pd.to_numeric(df['price'], errors='coerce')

# Filter out products that are too expensive (price > 1000)
df_filtered = df[df['price'] <= 1000]

# Display the cleaned and filtered DataFrame
print(df_filtered)

# Save the cleaned and filtered DataFrame to a new CSV file
df_filtered.to_csv('cleaned_products.csv', index=False)

print('cleaned_products.csv file has been created.')


   product_id product_name     category  price
1        1002   Smartphone  Electronics    800
2        1003        Chair    Furniture    150
cleaned_products.csv file has been created.


**2.2. Enrich the Sales Data**

For the sales data, we'll perform a simple enrichment by adding the product_name to each sale by joining the sales_data and products_df on the product_id.

In [9]:
# Convert sales_data to a DataFrame
sales_df = pd.DataFrame(sales_data)

# Convert 'product_id' to numeric in sales_df before merging
sales_df['product_id'] = pd.to_numeric(sales_df['product_id'])

# Join sales data with product data to add product_name
sales_df = pd.merge(sales_df, products_df[['product_id', 'product_name']], on='product_id', how='left')
print("Enriched Sales Data:")
print(sales_df)


Enriched Sales Data:
  sale_id  product_id  quantity  total product_name
0    S001        1001         2   2400       Laptop
1    S002        1002         1    800   Smartphone
2    S003        1003         4    600        Chair


**Step 3: Load Data into MongoDB**

Now that the data is transformed and cleaned, load the product and sales data into MongoDB.

**3.1. Connect to MongoDB**

Ensure you have MongoDB running locally or use MongoDB Atlas. Connect to MongoDB using PyMongo.

In [13]:
!pip install pymongo
!pip install --upgrade pymongo

from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
uri = "mongodb+srv://tsjannoun123:KufyyNNqnno0atX9@cluster0.sb8py.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))
# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)
from pymongo import MongoClient

# Access a specific database
db = client['retail_db']

# Access a collection within the database
#collection = db['sales']



Collecting pymongo
  Downloading pymongo-4.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading pymongo-4.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.4 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.4 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m39.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dnspython-2.7.0-py3-none-any.whl (313 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.6/313.6 kB[0m [31m17.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.7.0 pymongo-4.11
Pinged your deployment. You successfully connected to MongoDB!


**3.2. Load Product Data**

Insert the transformed product data into the MongoDB products collection.

In [15]:
# Convert DataFrame to dictionary and insert into MongoDB
#product_records = filtered_products_df.to_dict(orient='records')
#db.products.insert_many(product_records)
#print("Loaded Product Data into MongoDB")
# Convert DataFrame to dictionary and insert into MongoDB
# The DataFrame df_filtered created in the transform step (ipython-input-7-699028617ea2) is used here.
product_records = df_filtered.to_dict(orient='records')
db.products.insert_many(product_records)
print("Loaded Product Data into MongoDB")

Loaded Product Data into MongoDB


**3.3. Load Sales Data**

Insert the enriched sales data into the MongoDB sales collection.

In [16]:
# Convert DataFrame to dictionary and insert into MongoDB
sales_records = sales_df.to_dict(orient='records')
db.sales.insert_many(sales_records)
print("Loaded Sales Data into MongoDB")


Loaded Sales Data into MongoDB


**Step 4: Automate the ETL Process**

To make the ETL process reusable, wrap the steps into functions and run the ETL pipeline from start to finish.

In [18]:
def extract_products():
    return pd.read_csv('products.csv')

def extract_sales():
    return pd.DataFrame(sales_data)

def transform_products(products_df):
    products_df['price'] = pd.to_numeric(products_df['price'], errors='coerce')
    return products_df[products_df['price'] < 1000]

def transform_sales(sales_df, products_df):
    # Convert 'product_id' to numeric in sales_df before merging
    sales_df['product_id'] = pd.to_numeric(sales_df['product_id'], errors='coerce')
    # Now you can merge
    return pd.merge(sales_df, products_df[['product_id', 'product_name']], on='product_id', how='left')

def load_data(products_df, sales_df):
    db.products.insert_many(products_df.to_dict(orient='records'))
    db.sales.insert_many(sales_df.to_dict(orient='records'))

# Run the ETL pipeline
products_df = extract_products()
sales_df = extract_sales()
transformed_products_df = transform_products(products_df)
transformed_sales_df = transform_sales(sales_df, products_df)
load_data(transformed_products_df, transformed_sales_df)
print("ETL Process Completed!")

ETL Process Completed!


**Conclusion:**
This hands-on lab provides a comprehensive introduction to the ETL process, from extracting raw data from multiple sources, transforming it for quality and consistency, and finally loading it into MongoDB.