<a href="https://colab.research.google.com/github/lamyse1/Data-Engineering-Projects/blob/main/week%203/DE_Week3_Exercise2_lamyse.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


## Step 1: Prepare the Dataset

Excel.CSV prepared and saved to Github


# Step 2: Extract Data

In [59]:
import pandas as pd
import logging

In [60]:
def extract_data(file_path):
    """
    Extracts data from a CSV file.
    Logs an error if the file is not found.
    """
    try:
        df = pd.read_csv(file_path)
        print("Data extracted successfully!")
        return df
    except FileNotFoundError as e:
        logging.error(f"File not found: {file_path} - {e}")
        raise Exception(f"Error: The file {file_path} does not exist!")

# Run extraction
file_path = "https://raw.githubusercontent.com/lamyse1/Data-Engineering-Projects/refs/heads/main/week%203/Sales.csv"
df_sales = extract_data(file_path)

# Display extracted data
df_sales.head()


Data extracted successfully!


Unnamed: 0,transaction_id,customer_id,product_id,quantity,price
0,T001,C001,P001,2,100
1,T002,C002,P002,1,200
2,T003,C003,P003,3,50


# Step 3: Transform Data

In [61]:
def transform_data(df):
    """
    Transforms the extracted data by calculating total revenue.
    """
    try:
        df["total_revenue"] = df["quantity"] * df["price"]
        print("Data transformed successfully!")
        return df
    except Exception as e:
        logging.error(f"Data transformation failed: {e}")
        raise Exception(f"Error during data transformation: {e}")

# Run transformation
df_transformed = transform_data(df_sales)

# Display transformed data
print("Transformed Sales Data:")
display(df_transformed)



Data transformed successfully!
Transformed Sales Data:


Unnamed: 0,transaction_id,customer_id,product_id,quantity,price,total_revenue
0,T001,C001,P001,2,100,200
1,T002,C002,P002,1,200,200
2,T003,C003,P003,3,50,150


# Step 4: Load Data into MongoDB

In [62]:
!pip install pymongo



In [63]:
from pymongo import MongoClient
import logging

# MongoDB Connection String (Using MongoDB Atlas)
MONGO_URI = "mongodb+srv://lamyseammar:Laura9966@cluster0.pfzed.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
DB_NAME = "ecommerce_db"
COLLECTION_NAME = "sales_transactions"

# Configure logging
logging.basicConfig(filename="pipeline.log", level=logging.ERROR,
                    format="%(asctime)s - %(levelname)s - %(message)s")

def load_data_to_mongodb(df, uri, db_name, collection_name):
    """
    Loads transformed data into a MongoDB Atlas collection.
    Handles connection and insertion errors with logging.
    """
    try:
        # Connect to MongoDB Atlas
        client = MongoClient(uri)
        db = client[db_name]
        collection = db[collection_name]

        # Convert DataFrame to dictionary format for MongoDB insertion
        records = df.to_dict(orient="records")

        # Insert data into MongoDB
        collection.insert_many(records)
        print("Data loaded into MongoDB Atlas successfully!")

    except Exception as e:
        logging.error(f" Error loading data into MongoDB: {e}")
        raise Exception(f"Error loading data into MongoDB: {e}")

# Run data loading
load_data_to_mongodb(df_transformed, MONGO_URI, DB_NAME, COLLECTION_NAME)


Data loaded into MongoDB Atlas successfully!


In [52]:
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]

# Retrieve and display inserted data
data_from_db = list(collection.find({}, {"_id": 0}))
print(" Inserted Data from MongoDB Atlas:")
for record in data_from_db:
    print(record)


 Inserted Data from MongoDB Atlas:
{'transaction_id': 'T001', 'customer_id': 'C001', 'product_id': 'P001', 'quantity': 2, 'price': 100, 'total_revenue': 200}
{'transaction_id': 'T002', 'customer_id': 'C002', 'product_id': 'P002', 'quantity': 1, 'price': 200, 'total_revenue': 200}
{'transaction_id': 'T003', 'customer_id': 'C003', 'product_id': 'P003', 'quantity': 3, 'price': 50, 'total_revenue': 150}


# Step 5: Implement Logging

In [64]:
import logging

# Define log file
log_file = "pipeline.log"

# Configure logging (overwrite existing logs)
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    filemode="w"  # Overwrite logs to avoid old data
)

# Force log initialization message
logging.info(" Logging system initialized.")


In [65]:
import pandas as pd
from pymongo import MongoClient

MONGO_URI = "mongodb+srv://lamyseammar:Laura9966@cluster0.pfzed.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
DB_NAME = "ecommerce_db"
COLLECTION_NAME = "sales_transactions"

# Step 2: Extract Data
def extract_data(file_path):
    try:
        df = pd.read_csv(file_path)
        logging.info(" Data extracted successfully from CSV.")
        return df
    except FileNotFoundError as e:
        logging.error(f" File not found: {file_path} - {e}")
        raise Exception(f"Error: The file {file_path} does not exist!")

# Step 3: Transform Data
def transform_data(df):
    try:
        df["total_revenue"] = df["quantity"] * df["price"]
        logging.info(" Data transformation successful.")
        return df
    except Exception as e:
        logging.error(f" Data transformation failed: {e}")
        raise Exception(f"Error during data transformation: {e}")

# Step 4: Load Data into MongoDB
def load_data_to_mongodb(df, uri, db_name, collection_name):
    try:
        client = MongoClient(uri)
        db = client[db_name]
        collection = db[collection_name]

        records = df.to_dict(orient="records")
        collection.insert_many(records)

        logging.info(" Data loaded into MongoDB Atlas successfully.")
        print(" Data loaded into MongoDB Atlas successfully!")
    except Exception as e:
        logging.error(f" Error loading data into MongoDB: {e}")
        raise Exception(f"Error loading data into MongoDB: {e}")

# Run the pipeline
try:
    file_path = "https://raw.githubusercontent.com/lamyse1/Data-Engineering-Projects/refs/heads/main/week%203/Sales.csv"

    # Step 2: Extract
    df_sales = extract_data(file_path)

    # Step 3: Transform
    df_transformed = transform_data(df_sales)

    # Step 4: Load
    load_data_to_mongodb(df_transformed, MONGO_URI, DB_NAME, COLLECTION_NAME)

    logging.info(" Data pipeline completed successfully!")

    # Flush logs to ensure writing
    logging.shutdown()

except Exception as e:
    logging.error(f" Pipeline execution failed: {e}")
    print(" Pipeline execution failed! Check pipeline.log for details.")


 Data loaded into MongoDB Atlas successfully!


In [66]:
import time

time.sleep(1)

!cat pipeline.log


📝 Logging initialized.
