## Using Python to Integrate MongoDB Data into an ETL Process
This notebook demonstrates the setup of an ETL (Extract, Transform, Load) pipeline.

In this lab you will build upon the **Northwind_DW2** dimensional database from Lab 3; however, you will be integrating new data sourced from an instance of MongoDB. The new data will be concerned with new business processes; inventory and purchasing. You will continue to interact with both the source systems (MongoDB and MySQL), and the destination system (the Northwind_DW2 data warehouse) from a remote client running Python (Jupyter Notebooks). 

I fetch data into Pandas DataFrames, perform all the necessary transformations in-memory on the client, and then push the newly transformed DataFrame to the RDBMS data warehouse using a Pandas function that will create the table and fill it with data with a single operation.

### Prerequisites:
This notebook uses the PyMongo database connectivity library to connect to MySQL databases; therefore, you must have first installed that libary into your python environment by executing the following command in a Terminal window.

- `python -m pip install pymongo[srv]`

#### Import the Necessary Libraries

In [40]:
import os
import logging
from typing import Dict
import pandas as pd
import numpy as np
import pymysql


import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [41]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.34
Running PyMongo Version: 4.8.0


#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [42]:
from pymongo import MongoClient
import json

# Example setup of logging for the notebook
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Function to get MongoDB client
def get_mongo_client(host: str, port: int, username: str = None, password: str = None) -> MongoClient:
    """Initialize MongoDB client."""
    if username and password:
        client = MongoClient(host, port, username=username, password=password)
    else:
        client = MongoClient(host, port)
    logger.info("MongoDB client initialized.")
    return client


# SQL connection
def get_sql_connection(host: str, user: str, password: str, db: str):
    """Initialize SQL connection."""
    conn = pymysql.connect(host=host, user=user, password=password, db=db)
    return conn

In [43]:
# Set the path of the current working directory and append 'data' directory
data_dir = os.path.join(os.getcwd(), 'data')
logger.info(f"Data directory set to: {data_dir}")


INFO:__main__:Data directory set to: /Users/mac/Downloads/data-warehouse-project/data


In [44]:
# Define JSON files for MongoDB collections
json_files = {
    "sales_orders": 'StoreSales.json',
}


In [45]:
def set_mongo_collections(client: MongoClient, db_name: str, data_dir: str, json_files: dict):
    """Load JSON data into MongoDB collections."""
    db = client[db_name]
    for collection_name, file_name in json_files.items():
        file_path = os.path.abspath(os.path.join(data_dir, file_name))
        
        # Load JSON data and insert into MongoDB
        try:
            with open(file_path, 'r') as f:
                data = json.load(f)
                if isinstance(data, list): 
                    db[collection_name].insert_many(data)
                    logger.info(f"Inserted {len(data)} documents into '{collection_name}' collection.")
                else:
                    db[collection_name].insert_one(data)
                    logger.info(f"Inserted a single document into '{collection_name}' collection.")
        except Exception as e:
            logger.error(f"Error loading data for collection '{collection_name}': {str(e)}")


In [46]:
# MongoDB connection arguments (example)
mongodb_args = {
    "host": "localhost",
    "port": 27017,
    "username": "mikelangelo1",
    "password": "password123",
    "db_name": "data_ware_house"
}

# Initialize the MongoDB client
client = get_mongo_client(
    host=mongodb_args["host"],
    port=mongodb_args["port"],
    username=mongodb_args.get("username"),
    password=mongodb_args.get("password")
)

INFO:__main__:MongoDB client initialized.


#### Populate MongoDB with Source Data
You only need to run this cell once; however, the operation is *idempotent*.  In other words, it can be run multiple times without changing the end result.

In [47]:
# Load data into MongoDB collections
set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

INFO:__main__:Inserted 51291 documents into 'sales_orders' collection.


#### Data Extractor
This class provides mock methods to:

Extract data from a MongoDB collection.

In [48]:
class DatabaseConnection:
    def __init__(self, mongodb_args: Dict):
        """Initialize the DatabaseConnection with MongoDB client parameters."""
        self.client = self.get_mongo_client(mongodb_args)
        self.db = None
        self.db_name = mongodb_args.get('db_name', 'default_db')

    def get_mongo_connection(self):
        """Return a MongoDB connection using the provided config."""
        try:
            # Connect to MongoDB and access the specified database
            self.db = self.client[self.db_name]
            logger.info(f"MongoDB connection established to database: {self.db_name}")
            return self.db  
        except Exception as e:
            logger.error(f"Error connecting to MongoDB: {str(e)}")
            raise
    
    def close_connections(self):
        """Close MongoDB connection."""
        try:
            if self.client:
                self.client.close()
                logger.info("MongoDB connection closed.")
        except Exception as e:
            logger.error(f"Error closing MongoDB connection: {str(e)}")
            raise

    def get_mongo_client(self, mongodb_args: Dict) -> MongoClient:
        """Initialize and return MongoDB client."""
        try:
            host = mongodb_args.get("host", "localhost")
            port = mongodb_args.get("port", 27017)
            username = mongodb_args.get("username")
            password = mongodb_args.get("password")

            if username and password:
                client = MongoClient(host, port, username=username, password=password)
            else:
                client = MongoClient(host, port)

            logger.info("MongoDB client initialized.")
            return client
        except Exception as e:
            logger.error(f"Error initializing MongoDB client: {str(e)}")
            raise

class DataExtractor:
    def __init__(self, db_connection: DatabaseConnection):
        """Initialize DataExtractor with the given database connection."""
        self.db_conn = db_connection
        
    def extract_from_mongodb(self, collection: str, query: Dict = None) -> pd.DataFrame:
        """Extract data from MongoDB collection."""
        try:
            mongo_db = self.db_conn.get_mongo_connection()
            data = mongo_db[collection].find(query or {})  # Find all documents or filtered by query
            return pd.DataFrame(list(data))  # Convert to DataFrame
        except Exception as e:
            logger.error(f"Error extracting from MongoDB collection '{collection}': {str(e)}")
            raise
            
    def extract_from_api(self, endpoint: str, params: Dict = None) -> pd.DataFrame:
        """Extract data from a REST API."""
        try:
            session = self.db_conn.get_api_session()  
            api_config = self.db_conn.config['api']
            response = session.get(f"{api_config['base_url']}/{endpoint}", params=params)
            response.raise_for_status()
            return pd.DataFrame(response.json())
        except Exception as e:
            logger.error(f"Error extracting from API endpoint '{endpoint}': {str(e)}")
            raise



db_conn = DatabaseConnection(mongodb_args)
extractor = DataExtractor(db_conn)

try:
    df_mongo = extractor.extract_from_mongodb("sales_orders")
    print("MongoDB Data:")
    print(df_mongo)
except Exception as e:
    logger.error(f"Error in MongoDB extraction test: {str(e)}")

# Note: For `extract_from_api`, replace with an actual endpoint or mock response as required.


INFO:__main__:MongoDB client initialized.
INFO:__main__:MongoDB connection established to database: data_ware_house


MongoDB Data:
                             _id Row ID         Order ID  Order Date  \
0       673167cbbdc428b0f7839e83  32298   CA-2012-124891  31-07-2012   
1       673167cbbdc428b0f7839e84  26341    IN-2013-77878  05-02-2013   
2       673167cbbdc428b0f7839e85  25330    IN-2013-71249  17-10-2013   
3       673167cbbdc428b0f7839e86  13524  ES-2013-1579342  28-01-2013   
4       673167cbbdc428b0f7839e87  47221     SG-2013-4320  05-11-2013   
...                          ...    ...              ...         ...   
205159  67316ec2bdc428b0f786bff1  35398   US-2014-102288  20-06-2014   
205160  67316ec2bdc428b0f786bff2  40470   US-2013-155768  02-12-2013   
205161  67316ec2bdc428b0f786bff3   9596   MX-2012-140767  18-02-2012   
205162  67316ec2bdc428b0f786bff4   6147   MX-2012-134460  22-05-2012   
205163  67316ec2bdc428b0f786bff5                     NaN         NaN   

         Ship Date       Ship Mode Customer ID     Customer Name      Segment  \
0       31-07-2012        Same Day    RH

## Data Loader
This class provides the implementation of load_to_warehouse, simulating loading a DataFrame into a data warehouse. It connects to the MSQL database and load data


In [53]:
import logging
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine


class MsqlDatabaseConnection:
    def __init__(self, username: str, password: str, host: str = "localhost", port: int = 3306, db_name: str = "myshop"):
        self.username = username
        self.password = password
        self.host = host
        self.port = port
        self.db_name = db_name

    def get_sqlalchemy_engine(self) -> Engine:
        """Return a SQLAlchemy engine connected to MySQL database."""
        try:
            connection_uri = f"mysql+pymysql://{self.username}:{self.password}@{self.host}:{self.port}/{self.db_name}"
            engine = create_engine(connection_uri)
            logger.info("Successfully connected to MySQL database.")
            return engine
        except Exception as e:
            logger.error(f"Error connecting to MySQL: {str(e)}")
            raise
    
    def get_query_result(self, query: str) -> pd.DataFrame:
        """Execute a query and return the result as a DataFrame."""
        try:
            engine = self.get_sqlalchemy_engine()
            result = pd.read_sql(query, engine)
            logger.info(f"Query executed successfully: {query}")
            return result
        except Exception as e:
            logger.error(f"Error executing query: {str(e)}")
            raise

# DataLoader class
class DataLoader:
    def __init__(self, db_connection: MsqlDatabaseConnection):
        self.db_conn = db_connection
    
    def load_to_warehouse(self, df: pd.DataFrame, table_name: str, if_exists: str = 'append') -> None:
        """Load DataFrame to data warehouse."""
        try:
            engine = self.db_conn.get_sqlalchemy_engine()
            df.to_sql(
                name=table_name,
                con=engine,
                if_exists=if_exists,
                index=False,
                chunksize=1000
            )
            logger.info(f"Successfully loaded {len(df)} rows to {table_name}")
        except Exception as e:
            logger.error(f"Error loading data to warehouse: {str(e)}")
            raise


db_connection = MsqlDatabaseConnection(
    username="root",  # Replace with your MySQL username
    password="Akinolami6650!",  # Replace with your MySQL password
    db_name="myshop"           # The name of your database
)

# Create an instance of DataLoader
data_loader = DataLoader(db_connection)

#### Populate MongoDB with Source Data
You only need to run this cell once; however, the operation is *idempotent*.  In other words, it can be run multiple times without changing the end result.

### 1.0. Create and Populate the New Dimension Tables
#### 1.1. Extract Data from the Source MongoDB Collections Into DataFrames

In [49]:
class DataExtractor:
    def __init__(self, db_connection: DatabaseConnection):
        self.db_conn = db_connection
        
    def extract_from_mongodb(self, collection: str, query: Dict = None) -> pd.DataFrame:
        """Extract data from MongoDB collection."""
        try:
            mongo_db = self.db_conn.get_mongo_connection("data_ware_house")
            data = mongo_db[collection].find(query if query else {})
            return pd.DataFrame(list(data))
        except Exception as e:
            logger.error(f"Error extracting from MongoDB: {str(e)}")
            raise

# DataTransformer class
class DataTransformer:
    @staticmethod
    def clean_customer_data(df: pd.DataFrame) -> pd.DataFrame:
        """Clean and transform customer data."""
        try:
            # Remove duplicates
            df = df.drop_duplicates()
            
            # Handle missing values (example with customer data fields)
            df['Customer Name'] = df['Customer Name'].fillna('Unknown')
            df['City'] = df['City'].fillna('Unknown')
            df['State'] = df['State'].fillna('Unknown')
            df['Country'] = df['Country'].fillna('Unknown')
            df['Postal Code'] = df['Postal Code'].fillna('')
            
            # Standardize customer-related columns (e.g., postal codes as strings)
            df['Postal Code'] = df['Postal Code'].apply(lambda x: str(x).zfill(5))  # Fill leading zeros if necessary
            
            logger.info("Customer data cleaned successfully.")
            return df
        except Exception as e:
            logger.error(f"Error cleaning customer data: {str(e)}")
            raise
    
    @staticmethod
    def transform_sales_data(df: pd.DataFrame) -> pd.DataFrame:
        """Transform sales data."""
        try:
            # Calculate derived columns
            df['Total Amount'] = df['Quantity'] * df['Sales']  # Calculate total sales amount
            df['Discount Amount'] = df['Total Amount'] * df['Discount']  # Calculate discount amount
            df['Final Amount'] = df['Total Amount'] - df['Discount Amount']  # Calculate final sales amount
            
            # Convert dates
            df['Order Date'] = pd.to_datetime(df['Order Date'], format='%d-%m-%Y')
            df['Ship Date'] = pd.to_datetime(df['Ship Date'], format='%d-%m-%Y')
            
            # Add time dimensions (Year, Month, Quarter)
            df['Order Year'] = df['Order Date'].dt.year
            df['Order Month'] = df['Order Date'].dt.month
            df['Order Quarter'] = df['Order Date'].dt.quarter
            df['Ship Year'] = df['Ship Date'].dt.year
            df['Ship Month'] = df['Ship Date'].dt.month
            df['Ship Quarter'] = df['Ship Date'].dt.quarter
            
            logger.info("Sales data transformed successfully.")
            return df
        except Exception as e:
            logger.error(f"Error transforming sales data: {str(e)}")
            raise
try:
    df_sales = extractor.extract_from_mongodb("sales_orders")
    logger.info("Sales order data extracted successfully.")
except Exception as e:
    logger.error(f"Error extracting sales data: {str(e)}")

# Apply transformations to the extracted sales data
try:
    df_transformed_sales = DataTransformer.transform_sales_data(df_sales)
    logger.info("Sales data transformed successfully.")
except Exception as e:
    logger.error(f"Error transforming sales data: {str(e)}")

# Display transformed data
print("Transformed Sales Data:")
print(df_transformed_sales)


INFO:__main__:MongoDB connection established to database: data_ware_house
INFO:__main__:Sales order data extracted successfully.
ERROR:__main__:Error transforming sales data: can't multiply sequence by non-int of type 'str'
ERROR:__main__:Error transforming sales data: can't multiply sequence by non-int of type 'str'


Transformed Sales Data:
   sale_id  quantity  unit_price  discount_rate  sale_date  total_amount  \
0      101         2        10.0           0.10 2023-04-01          20.0   
1      102         5        20.0           0.20 2023-05-12         100.0   
2      103         1        15.0           0.15 2023-06-23          15.0   

   discount_amount  final_amount  sale_year  sale_month  sale_quarter  
0             2.00         18.00       2023           4             2  
1            20.00         80.00       2023           5             2  
2             2.25         12.75       2023           6             2  


In [54]:
# Example of executing a SQL query
my_shop_employees = "SELECT * FROM myshop.employees;"
try:
    df_query_result = db_connection.get_query_result(my_shop_employees)
    print("Query Result:")
    print(df_query_result)
except Exception as e:
    logger.error(f"Error in query execution: {str(e)}")

INFO:__main__:Successfully connected to MySQL database.
INFO:__main__:Query executed successfully: SELECT * FROM myshop.employees;


Query Result:
   id         company       last_name first_name              email_address  \
0   1  myshop Traders       Freehafer      Nancy    nancy@myshoptraders.com   
1   2  myshop Traders         Cencini     Andrew   andrew@myshoptraders.com   
2   3  myshop Traders           Kotas        Jan      jan@myshoptraders.com   
3   4  myshop Traders       Sergienko     Mariya   mariya@myshoptraders.com   
4   5  myshop Traders          Thorpe     Steven   steven@myshoptraders.com   
5   6  myshop Traders         Neipper    Michael  michael@myshoptraders.com   
6   7  myshop Traders            Zare     Robert   robert@myshoptraders.com   
7   8  myshop Traders        Giussani      Laura    laura@myshoptraders.com   
8   9  myshop Traders  Hellung-Larsen       Anne     anne@myshoptraders.com   

               job_title business_phone     home_phone mobile_phone  \
0   Sales Representative  (123)555-0100  (123)555-0102         None   
1  Vice President, Sales  (123)555-0100  (123)555-010