In [1]:
import pandas as pd

customers_df = pd.read_csv('customers.csv')
orders_df = pd.read_csv('orders.csv')
categories_df = pd.read_csv('categories.csv')
products_df = pd.read_csv('products.csv')
order_items_df = pd.read_csv('order_items.csv')
reviews_df = pd.read_csv('reviews.csv')

Data type Mapping

In [2]:
table_df_mapping = {'customers_df':'customers',
          'orders_df':'orders',
          'categories_df':'categories',
          'products_df':'products',
          'order_items_df':'order_items',
          'reviews_df':'reviews'}
all_df = [customers_df,orders_df,categories_df,products_df,order_items_df,reviews_df]
all_df_str = [table_df_mapping[x] for x in table_df_mapping]
type_mapping = {
        'int': 'int64',
        'smallint': 'int64',
        'tinyint': 'int64',
        'bigint': 'int64',
        'bit': 'bool',
        'decimal': 'float64',
        'numeric': 'float64',
        'float': 'float64',
        'real': 'float64',
        'money': 'float64',
        'smallmoney': 'float64',
        'char': 'object',
        'varchar': 'object',
        'text': 'object',
        'nchar': 'object',
        'nvarchar': 'object',
        'ntext': 'object',
        'date': 'datetime64[ns]',
        'datetime': 'datetime64[ns]',
        'datetime2': 'datetime64[ns]',
        'smalldatetime': 'datetime64[ns]',
        'time': 'object',
        'timestamp': 'datetime64[ns]',
    }

Data CLeanup Function

In [3]:
def clean_dataframe(df):
    """
    Cleans the DataFrame by removing rows with null values and duplicate rows.
    """
    # Check for null values
    # null_values = df.isna().any()

    # Remove rows with null values
    df_cleaned = df.dropna()

    # Remove duplicate rows
    df_combined_cleaned = df_cleaned.drop_duplicates()
    if 'email' in df_combined_cleaned.columns:
        df_combined_cleaned = df_cleaned.drop_duplicates(subset='email')


    return df_combined_cleaned

# order_items_df = clean_dataframe(order_items_df)

# print("Final cleaned DataFrame:")
# order_items_df.head()

def clean_all_dataframes(dataframes, cleaning_function):
    """
    Applies the cleaning function to each DataFrame in the list and updates the original DataFrame.
    
    Parameters:
    - dataframes (list): List of DataFrames to clean.
    - cleaning_function (function): Function to clean a DataFrame.
    """
    
    globals_dict = globals()
    # for name in dataframes:
    #     if name in globals_dict:
    #         globals_dict[name] = cleaning_function(globals_dict[name])
    
    for i in dataframes:
        x = cleaning_function(globals_dict[i])
        globals_dict[i] = x

Cleaning all the dataframes

In [4]:
clean_all_dataframes(table_df_mapping.keys(),clean_dataframe)

SSMS Connection Script

In [5]:
import pyodbc

globals_dict = globals()

# Define the connection parameters
server = 'DESKTOP-9O0F96A\SQLEXPRESS'  # e.g., '192.168.1.1'
database = 'Advarisk'  # e.g., 'test_db'
username = 'yashu'      # e.g., 'admin'
password = 'password'      # e.g., 'password123'
driver = 'SQL Server'  # Use the correct ODBC driver name
# driver = 'ODBC Driver 17 for SQL Server'

# Create a connection string
connection_string = f'''
DRIVER={{{driver}}};
SERVER={server};
Trust_connection=yes;
PORT=1433;
DATABASE={database};
'''

try:
    # Establish a connection to the SQL Server
    connection = pyodbc.connect(connection_string)
    print("Connection successful!")

    # Create a cursor object
    cursor = connection.cursor()
    # Write a query
    for i in table_df_mapping:
        query = f""" SELECT COLUMN_NAME, DATA_TYPE
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_NAME = '{table_df_mapping[i]}'"""

    # Execute the query
        cursor.execute(query)

    # Fetch the results
        columns = cursor.fetchall()

    
        schema = {col[0]: [col[1],i] for col in columns}
        # print(schema)
        for col, sql_type in schema.items():
                if col in globals_dict[sql_type[1]].columns:
                    if sql_type[0] in type_mapping:
                       globals_dict[sql_type[1]][col]  = globals_dict[sql_type[1]][col].astype(type_mapping[sql_type[0]])
                    else:
                        print(f"Warning: No mapping for SQL type {sql_type[0]}")
    
except Exception as e:
    print(f"Error: {e}")

finally:
# Close the connection
    if connection:
        connection.close()
        print("Connection closed.")

Connection successful!
Connection closed.


Emptying tables before loading

In [6]:

# List of tables to truncate
tables_to_truncate = [table_df_mapping[i] for i in table_df_mapping]  # Replace with your actual table names
try:
    # Connect to SQL Server
    connection = pyodbc.connect(connection_string)
    cursor = connection.cursor()
    print("Connection successful!")

    # Step 1: Disable foreign key constraints for the selected tables
    for table in tables_to_truncate:
        cursor.execute(f"ALTER TABLE {table} NOCHECK CONSTRAINT ALL;")
        print(f"Foreign key constraints disabled for {table}")

    # Step 2: Delete data from the selected tables
    for table in tables_to_truncate:
        try:
            cursor.execute(f"DELETE FROM {table};")
            print(f"Deleted all rows from {table}")
        except Exception as e:
            print(f"Error deleting rows from {table}: {e}")

    # Step 3: Re-enable foreign key constraints for the selected tables
    for table in tables_to_truncate:
        cursor.execute(f"ALTER TABLE {table} WITH CHECK CHECK CONSTRAINT ALL;")
        print(f"Foreign key constraints enabled for {table}")

    # Commit the transaction
    connection.commit()
    print("Data deleted successfully from all tables.")

except Exception as e:
    print(f"Error: {e}")
    if connection:
        connection.rollback()

finally:
    # Close the connection
    if connection:
        connection.close()
        print("Connection closed.")

Connection successful!
Foreign key constraints disabled for customers
Foreign key constraints disabled for orders
Foreign key constraints disabled for categories
Foreign key constraints disabled for products
Foreign key constraints disabled for order_items
Foreign key constraints disabled for reviews
Deleted all rows from customers
Deleted all rows from orders
Deleted all rows from categories
Deleted all rows from products
Deleted all rows from order_items
Deleted all rows from reviews
Foreign key constraints enabled for customers
Foreign key constraints enabled for orders
Foreign key constraints enabled for categories
Foreign key constraints enabled for products
Foreign key constraints enabled for order_items
Foreign key constraints enabled for reviews
Data deleted successfully from all tables.
Connection closed.


Loading Dataframe into SSMS Table

In [7]:
from sqlalchemy import create_engine

# Define your connection string
connection_string = f'mssql+pyodbc://{server}/{database}?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server'

# Create an SQLAlchemy engine
engine = create_engine(connection_string)

try:

    # Load DataFrame into SQL Server
    for table_name in table_df_mapping:
        globals_dict[table_name].to_sql(table_df_mapping[table_name], engine, if_exists='append', index=False)
        print(f"DataFrame has been loaded into table '{table_name}' in the database '{database}'.")

except Exception as e:
    print(f"Error: {e}")

finally:
    # Close the SQLAlchemy engine
    if engine:
        engine.dispose()
        print("Engine disposed.")


DataFrame has been loaded into table 'customers_df' in the database 'Advarisk'.
DataFrame has been loaded into table 'orders_df' in the database 'Advarisk'.
DataFrame has been loaded into table 'categories_df' in the database 'Advarisk'.
DataFrame has been loaded into table 'products_df' in the database 'Advarisk'.
DataFrame has been loaded into table 'order_items_df' in the database 'Advarisk'.
DataFrame has been loaded into table 'reviews_df' in the database 'Advarisk'.
Engine disposed.


Runnig aggregation query on the tables stored in SSMS
and Saving the result to a dataframe

In [8]:
connection_string = f'mssql+pyodbc://{server}/{database}?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server'

# Create an SQLAlchemy engine
engine = create_engine(connection_string)

# Define your SQL query
query = """ 
with combined as (
select c.*,o.total_amount, oi.product_id, oi.quantity, r.rating
from customers c
join orders o
on c.customer_id = o.customer_id
join order_items oi
on oi.order_id = o.order_id
join reviews r
on r.customer_id = c.customer_id)
select customer_id, name, email, country, 
sum(total_amount) as total_amount_spent,
avg(total_amount) as average_order_value,
sum(quantity) as total_products_ordered,
avg(rating) as average_rating,
CURRENT_TIMESTAMP as Time_run
from combined
group by combined.customer_id, combined.name, combined.email, combined.country
"""  

# Execute the query and save the result to a DataFrame
with engine.connect() as connection:
    df_agg = pd.read_sql_query(query, connection)

# Display the DataFrame
df_agg.head()

Unnamed: 0,customer_id,name,email,country,total_amount_spent,average_order_value,total_products_ordered,average_rating
0,1,Alexander Williams,alexander.williams@company.com,India,592.44,98.74,9,4
1,2,Isabella Garcia,isabella.garcia@company.com,Mexico,1099.14,183.19,15,4
2,4,Emma Jones,emma.jones@company.com,USA,344.18,172.09,5,3
3,5,Michael Brown,michael.brown@company.com,Germany,1249.86,208.31,9,3
4,6,Olivia Rodriguez,olivia.rodriguez@company.com,France,750.72,125.12,9,5


Insights Generation

1. Top 5 customers by total amount spent.

In [9]:
# Merge the orders and customers data on customer_id
merged_df = pd.merge(orders_df, customers_df, on='customer_id')

# Group by customer and calculate total amount spent
customer_spending = merged_df.groupby(['customer_id', 'name', 'email', 'country'])['total_amount'].sum().reset_index()

# Sort the customers by total amount spent in descending order and select the top 5
top_customers = customer_spending.sort_values(by='total_amount', ascending=False).head(5)
print("Top 5 customers by total amount spent:")
top_customers

Top 5 customers by total amount spent:


Unnamed: 0,customer_id,name,email,country,total_amount
11,12,Emily Garcia,emily.garcia@company.com,USA,614.28
26,27,Lily Martin,lily.martin@company.com,Spain,575.59
7,8,Charlotte Wilson,charlotte.wilson@company.com,UK,548.82
28,29,Michael black,michael.black@company.com,Germany,539.31
21,22,Benjamin Thomas,benjamin.thomas@company.com,Germany,526.57


2. Top 5 products by number of orders.

In [10]:
# Merge the order items and products data on product_id
merged_df = pd.merge(order_items_df, products_df, on='product_id')

# Group by product and calculate the number of orders
product_orders = merged_df.groupby(['product_id', 'product_name']).size().reset_index(name='number_of_orders')

# Sort the products by the number of orders in descending order and select the top 5
top_products = product_orders.sort_values(by='number_of_orders', ascending=False).head(5)

print("Top 5 products by number of orders:")
top_products

Top 5 products by number of orders:


Unnamed: 0,product_id,product_name,number_of_orders
2,3,Sofa,4
0,1,Laptop,3
3,4,Aspirin,3
27,31,Wireless Earphones,2
4,5,Protein Powder,2


3. Average rating of products by category.

In [11]:
reviews_df = pd.read_csv('reviews.csv')
# Merge the reviews and products data on product_id
merged_df = pd.merge(reviews_df, products_df, on='product_id')

# Merge the resulting DataFrame with categories data on category_id
merged_df = pd.merge(merged_df, categories_df, on='category_id')

# Group by category and calculate the average rating
category_ratings = merged_df.groupby(['category_id', 'category_name'])['rating'].mean().reset_index()

# Sort the categories by average rating in descending order
category_ratings = category_ratings.sort_values(by='rating', ascending=False)

print("Average rating of products by category:")
category_ratings

Average rating of products by category:


Unnamed: 0,category_id,category_name,rating
1,2,clothing,4.333333
0,1,electronics,4.166667
2,3,furniture,4.1
4,5,health supplement,4.0
3,4,medicine,3.833333


DataFrame - Collection mapping

In [12]:
df_collection_map = {
    "df_agg":['aggregation', df_agg],
    "top_customers":['top_customers', top_customers],
    "top_products":['top_products', top_products],
    "category_ratings":['category_ratings', category_ratings]
}

Loading Aggregations in Mongodb

In [22]:
# from pymongo.mongo_client import MongoClient
# from pymongo.server_api import ServerApi

# uri = "mongodb+srv://Yasshuu:pass123@cluster0.x8qfcnt.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

# # Create a new client and connect to the server
# client = MongoClient(uri, server_api=ServerApi('1'))
# try:
#     client.admin.command('ping')
#     print("Pinged your deployment. You successfully connected to MongoDB!")
# except Exception as e:
#     print(e)

# db = client['Advarisk_aggregation_insights']

# # List of collections that should be present based on df_collection_map
# collections_to_keep = [value[0] for value in df_collection_map.values()]

# # Get the list of all collections currently in the database
# existing_collections = db.list_collection_names()

# # Find collections that are not in the df_collection_map
# collections_to_delete = set(existing_collections) - set(collections_to_keep)

# # Delete collections that are not in the df_collection_map
# for collection_name in collections_to_delete:
#     db[collection_name].drop()
#     print(f"Collection '{collection_name}' has been deleted as it is not present in df_collection_map.")

# for key, value in df_collection_map.items():
#     collection_name = value[0]
#     df = value[1]

#     collection = db[collection_name]

#     # Check if the collection exists and if it has documents
#     if collection.count_documents({}) > 0:
#         # Empty the collection
#         collection.delete_many({})
#         print(f"The collection '{collection_name}' was not empty. It has been cleared.")
    
#     # Convert DataFrame to dictionary
#     data_dict = df.to_dict("records")

#     # Insert data into MongoDB
#     collection.insert_many(data_dict)
#     print(f"Data from DataFrame '{key}' inserted successfully into collection '{collection_name}'.")

# # Close the connection
# client.close()


Pinged your deployment. You successfully connected to MongoDB!
Collection 'insights' has been deleted as it is not present in df_collection_map.
Collection 'monthly_sales' has been deleted as it is not present in df_collection_map.
The collection 'aggregation' was not empty. It has been cleared.
Data from DataFrame 'df_agg' inserted successfully into collection 'aggregation'.
The collection 'top_customers' was not empty. It has been cleared.
Data from DataFrame 'top_customers' inserted successfully into collection 'top_customers'.
The collection 'top_products' was not empty. It has been cleared.
Data from DataFrame 'top_products' inserted successfully into collection 'top_products'.
The collection 'category_ratings' was not empty. It has been cleared.
Data from DataFrame 'category_ratings' inserted successfully into collection 'category_ratings'.


In [15]:
# from pymongo.mongo_client import MongoClient
# from pymongo.server_api import ServerApi

# uri = "mongodb+srv://Yasshuu:pass123@cluster0.x8qfcnt.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

# # Create a new client and connect to the server
# client = MongoClient(uri, server_api=ServerApi('1'))
# try:
#     client.admin.command('ping')
#     print("Pinged your deployment. You successfully connected to MongoDB!")
# except Exception as e:
#     print(e)

# db = client['Advarisk_aggregation_insights']

# for key, value in df_collection_map.items():
#     collection_name = value[0]
#     df = value[1]

#     collection = db[collection_name]

#     # Check if the collection exists and if it has documents
#     if collection.count_documents({}) > 0:
#         # Empty the collection
#         collection.delete_many({})
#         print(f"The collection '{collection_name}' was not empty. It has been cleared.")
    
#     # Convert DataFrame to dictionary
#     data_dict = df.to_dict("records")

#     # Insert data into MongoDB
#     collection.insert_many(data_dict)
#     print(f"Data from DataFrame '{key}' inserted successfully into collection '{collection_name}'.")

# # Close the connection
# client.close()

Pinged your deployment. You successfully connected to MongoDB!
The collection 'aggregation' was not empty. It has been cleared.
Data from DataFrame 'df_agg' inserted successfully into collection 'aggregation'.
Data from DataFrame 'top_customers' inserted successfully into collection 'top_customers'.
Data from DataFrame 'top_products' inserted successfully into collection 'top_products'.
Data from DataFrame 'category_ratings' inserted successfully into collection 'category_ratings'.


In [13]:
# monthly_sales.to_pickle('monthly_sales.pkl')  #insight 4
category_ratings.to_pickle('category_ratings.pkl')  #insight 3
top_products.to_pickle('top_products.pkl')  #insight 2
top_customers.to_pickle('top_customers.pkl')  #insight 1
df_agg.to_pickle('aggregation.pkl')