# Midterm

## Imports/setup

In [5]:
import os
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text
import json
import pymongo
import certifi

In [6]:
mysql_args = {
    "uid" : "root",
    "pwd" : "Passw0rd123",
    "hostname" : "localhost",
    "dbname" : "chinook"
}


In [7]:
mysqldw_args = {
    "uid" : "root",
    "pwd" : "Passw0rd123",
    "hostname" : "localhost",
    "dbname" : "chinook_dw"
}

In [8]:

mongodb_args = {
    "user_name" : "xpt3bn",
    "password" : "zrxjU6ULjbxkfuAi",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "nbrar",
    "cluster_location" : "atlas", # "local"
    "db_name" : "chinook_customers"
}

In [9]:
# Create connection engine mysql_args
conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}"
engine = create_engine(conn_str)

# Create chinook_dw if it doesn't exist and switch to it
with engine.connect() as connection:
    connection.execute(text(f"CREATE DATABASE IF NOT EXISTS {mysql_args['dbname']};"))
    connection.execute(text(f"USE {mysql_args['dbname']};"))




In [10]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()


## Data Extraction

### JSON from MongoDB

In [11]:
#import json to MongoDB
client = get_mongo_client(**mongodb_args)
set_mongo_collections(client, mongodb_args["db_name"], os.getcwd(), {"Customer": "customer_data.json"})


In [12]:
#get JSON from MongoDB
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "Customer"

df_customer = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customer.head(2)


Unnamed: 0,CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email,SupportRepId
0,1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,+55 (12) 3923-5566,luisg@embraer.com.br,3
1,2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,,leonekohler@surfeu.de,5


In [13]:
#Transform it

df_customer = df_customer.rename(columns={'CustomerId': 'customer_id', 'FirstName': 'first_name', 'LastName': 'last_name'})

# Verify data 
df_customer.head(2)

Unnamed: 0,customer_id,first_name,last_name,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email,SupportRepId
0,1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,+55 (12) 3923-5566,luisg@embraer.com.br,3
1,2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,,leonekohler@surfeu.de,5


### CSV from Local Files

In [14]:
#load track
df_track = pd.read_csv("trackCSV.csv")

In [15]:
#fix names
df_track = df_track.rename(columns={'TrackId': 'track_id', 'Name': 'track_name', 'AlbumId': 'album_id', 'GenreId': 'genre_id', 'MediaTypeId': 'media_type_id', 'UnitPrice': 'price'})

# Verify data 
df_track.head(2)

Unnamed: 0,track_id,track_name,album_id,media_type_id,genre_id,Composer,Milliseconds,Bytes,price
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99
1,2,Balls to the Wall,2,2,1,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Ba...",342562,5510424,0.99


### MySQL extraction

In [16]:
# mysql extraction/transform
employee_query = """
    SELECT EmployeeId AS employee_id, FirstName AS first_name, LastName AS last_name, 
           Title, City, State, Country, PostalCode, Phone, Email 
    FROM Employee;
"""


df_employee = get_sql_dataframe(employee_query, **mysql_args)

## Load Into Chinook_dw in MySQL

In [17]:
# Add dim track to chinookdw
set_dataframe(df_track, "dim_track", "track_id", db_operation="insert", **mysqldw_args)

In [18]:
# add dimcustomer to chinookdw
set_dataframe(df_customer, "dim_customer", "customer_id", db_operation="insert", **mysqldw_args)

In [19]:

# add dimemplooyee to chinookdw
set_dataframe(df_employee, "dim_employee", "employee_id", db_operation="insert", **mysqldw_args)

# Verify data 
df_employee.head(2)


Unnamed: 0,employee_id,first_name,last_name,Title,City,State,Country,PostalCode,Phone,Email
0,1,Andrew,Adams,General Manager,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,andrew@chinookcorp.com
1,2,Nancy,Edwards,Sales Manager,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,nancy@chinookcorp.com


## Joins and summaries

In [20]:
# Extract invoice data with invoice dates formatted as dates
invoice_query = """
SELECT 
    InvoiceId AS invoice_id, 
    CustomerId AS customer_id, 
    InvoiceDate AS invoice_date 
FROM Invoice;
"""
df_invoice = get_sql_dataframe(invoice_query, **mysql_args)


df_invoice['invoice_date'] = pd.to_datetime(df_invoice['invoice_date']).dt.date
df_invoice



Unnamed: 0,invoice_id,customer_id,invoice_date
0,1,2,2021-01-01
1,2,4,2021-01-02
2,3,8,2021-01-03
3,4,14,2021-01-06
4,5,23,2021-01-11
...,...,...,...
407,408,25,2025-12-05
408,409,29,2025-12-06
409,410,35,2025-12-09
410,411,44,2025-12-14


In [21]:
# Retrieve dim_date to map dates with surrogate keys
date_query = """
SELECT date_key, full_date 
FROM dim_date;
"""
df_dim_date = get_sql_dataframe(date_query, **mysqldw_args)
df_dim_date

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02
2,20000103,2000-01-03
3,20000104,2000-01-04
4,20000105,2000-01-05
...,...,...
8396,20221227,2022-12-27
8397,20221228,2022-12-28
8398,20221229,2022-12-29
8399,20221230,2022-12-30


In [30]:
# Merge df_invoice with df_dim_date on date to get date_key
df_invoice_with_date_key = pd.merge(
    df_invoice, 
    df_dim_date, 
    left_on='invoice_date', 
    right_on='full_date', 
    how='left'
)
# Drop the full_date and invoice_date columns and keep date_key
df_invoice_with_date_key.drop(columns=['invoice_date', 'full_date'], inplace=True)
df_invoice_with_date_key.head()


Unnamed: 0,invoice_id,customer_id,date_key
0,1,2,20210101.0
1,2,4,20210102.0
2,3,8,20210103.0
3,4,14,20210106.0
4,5,23,20210111.0


In [31]:
# Query to extract necessary fields from Invoice and InvoiceLine tables
invoice_query = """
SELECT 
    il.InvoiceLineId AS order_id,
    il.InvoiceId AS invoice_id,
    i.CustomerId AS customer_id,
    il.TrackId AS product_id,
    il.Quantity AS quantity,
    il.UnitPrice AS price,
    i.InvoiceDate AS invoice_date
FROM InvoiceLine AS il
JOIN Invoice AS i ON il.InvoiceId = i.InvoiceId;
"""

df_fact_order = get_sql_dataframe(invoice_query, **mysql_args)
df_fact_order


Unnamed: 0,order_id,invoice_id,customer_id,product_id,quantity,price,invoice_date
0,1,1,2,2,1,0.99,2021-01-01
1,2,1,2,4,1,0.99,2021-01-01
2,3,2,4,6,1,0.99,2021-01-02
3,4,2,4,8,1,0.99,2021-01-02
4,5,2,4,10,1,0.99,2021-01-02
...,...,...,...,...,...,...,...
2235,2236,411,44,3136,1,0.99,2025-12-14
2236,2237,411,44,3145,1,0.99,2025-12-14
2237,2238,411,44,3154,1,0.99,2025-12-14
2238,2239,411,44,3163,1,0.99,2025-12-14


In [None]:
# Merge fact_order with the invoice data including date_key
df_fact_order = pd.merge(
    df_fact_order, 
    df_invoice_with_date_key[['invoice_id', 'date_key']], 
    on='invoice_id', 
    how='left'
)

# Load the updated fact_order with date_key into MySQL
set_dataframe(df_fact_order, "fact_order", "order_id", db_operation="insert", **mysqldw_args)

In [36]:
# Assuming `get_sql_dataframe` is your function to run SQL queries
query = "SELECT * FROM fact_order;"
df_fact_order = get_sql_dataframe(query, **mysqldw_args)

# Display the dataframe
df_fact_order.head(10)  # Shows the first 10 rows


Unnamed: 0,order_id,invoice_id,customer_id,product_id,quantity,price,invoice_date,date_key_x,date_key_y
0,1,1,2,2,1,0.99,2021-01-01,20210101.0,20210101.0
1,2,1,2,4,1,0.99,2021-01-01,20210101.0,20210101.0
2,3,2,4,6,1,0.99,2021-01-02,20210102.0,20210102.0
3,4,2,4,8,1,0.99,2021-01-02,20210102.0,20210102.0
4,5,2,4,10,1,0.99,2021-01-02,20210102.0,20210102.0
5,6,2,4,12,1,0.99,2021-01-02,20210102.0,20210102.0
6,7,3,8,16,1,0.99,2021-01-03,20210103.0,20210103.0
7,8,3,8,20,1,0.99,2021-01-03,20210103.0,20210103.0
8,9,3,8,24,1,0.99,2021-01-03,20210103.0,20210103.0
9,10,3,8,28,1,0.99,2021-01-03,20210103.0,20210103.0


In [34]:
# SQL query to calculate total sales by customer
query = """
SELECT 
    c.customer_id,
    CONCAT(c.first_name, ' ', c.last_name) AS customer_name,
    SUM(o.quantity * o.price) AS total_sales
FROM fact_order AS o
JOIN dim_customer AS c ON o.customer_id = c.customer_id
GROUP BY c.customer_id, c.first_name, c.last_name
ORDER BY total_sales DESC;
"""

# Run the query using get_sql_dataframe
df_total_sales_by_customer = get_sql_dataframe(query, **mysqldw_args)
df_total_sales_by_customer


Unnamed: 0,customer_id,customer_name,total_sales
0,6,Helena Holý,49.62
1,26,Richard Cunningham,47.62
2,57,Luis Rojas,46.62
3,46,Hugh O'Reilly,45.62
4,45,Ladislav Kovács,45.62
5,37,Fynn Zimmermann,43.62
6,24,Frank Ralston,43.62
7,28,Julia Barnett,43.62
8,25,Victor Stevens,42.62
9,7,Astrid Gruber,42.62


In [None]:
# SQL query to count orders by track
query = """
SELECT 
    t.track_name,
    COUNT(o.order_id) AS order_count
FROM fact_order AS o
JOIN dim_track AS t ON o.track_id = t.track_id
GROUP BY t.track_name
ORDER BY order_count DESC;
"""

# Run the query using get_sql_dataframe
df_order_count_by_track = get_sql_dataframe(query, **mysqldw_args)
df_order_count_by_track


Unnamed: 0,track_name,order_count
0,Linha Do Equador,3
1,Good Golly Miss Molly,3
2,Balls to the Wall,2
3,Inject The Venom,2
4,Overdose,2
...,...,...
563,Be Aggressive,1
564,Get Out,1
565,Take This Bottle,1
566,Surprise! You're Dead!,1


In [None]:
# SQL query to calculate total quantity ordered by each customer
query = """
SELECT 
    c.customer_id,
    CONCAT(c.first_name, ' ', c.last_name) AS customer_name,
    SUM(o.quantity) AS total_quantity_ordered
FROM fact_order AS o
JOIN dim_customer AS c ON o.customer_id = c.customer_id
GROUP BY c.customer_id, c.first_name, c.last_name
ORDER BY total_quantity_ordered DESC;
"""

# Run the query using get_sql_dataframe
df_total_quantity_by_customer = get_sql_dataframe(query, **mysqldw_args)
df_total_quantity_by_customer



Unnamed: 0,customer_id,customer_name,total_quantity_ordered
0,2,Leonie Köhler,38.0
1,4,Bjørn Hansen,38.0
2,8,Daan Peeters,38.0
3,14,Mark Philips,38.0
4,23,John Gordon,38.0
5,37,Fynn Zimmermann,38.0
6,38,Niklas Schröder,38.0
7,40,Dominique Lefebvre,38.0
8,42,Wyatt Girard,38.0
9,46,Hugh O'Reilly,38.0


In [None]:
# SQL query to calculate total revenue by track
query = """
SELECT 
    t.track_name,
    SUM(o.quantity * o.price) AS total_revenue
FROM fact_order AS o
JOIN dim_track AS t ON o.track_id = t.track_id
GROUP BY t.track_name
ORDER BY total_revenue DESC;
"""

# Run the query using get_sql_dataframe
df_total_revenue_by_track = get_sql_dataframe(query, **mysqldw_args)
df_total_revenue_by_track


Unnamed: 0,track_name,total_revenue
0,Linha Do Equador,2.97
1,Good Golly Miss Molly,2.97
2,Balls to the Wall,1.98
3,Inject The Venom,1.98
4,Overdose,1.98
...,...,...
563,Be Aggressive,0.99
564,Get Out,0.99
565,Take This Bottle,0.99
566,Surprise! You're Dead!,0.99


In [None]:
query = """
SELECT 
    d.calendar_year,
    d.month_name,
    SUM(o.quantity * o.price) AS monthly_revenue
FROM fact_order AS o
JOIN dim_date AS d ON o.date_key = d.date_key
GROUP BY d.calendar_year, d.month_name
ORDER BY d.calendar_year, d.month_name;
"""

# Execute the query and fetch results
df_monthly_sales_revenue = get_sql_dataframe(query, **mysqldw_args)
print(df_monthly_sales_revenue)


    calendar_year month_name  monthly_revenue
0            2021      April            37.62
1            2021     August            37.62
2            2021   December            37.62
3            2021   February            37.62
4            2021    January            35.64
5            2021       July            37.62
6            2021       June            37.62
7            2021      March            37.62
8            2021        May            37.62
9            2021   November            37.62
10           2021    October            37.62
11           2021  September            37.62
12           2022   February            44.63
13           2022    January            52.62


In [None]:
query = """
SELECT 
    t.track_name AS product_name,
    SUM(o.quantity) AS total_quantity,
    MIN(o.quantity) AS min_quantity,
    MAX(o.quantity) AS max_quantity,
    STD(o.quantity) AS std_quantity
FROM fact_order AS o
JOIN dim_track AS t ON o.track_id = t.track_id
GROUP BY t.track_name
ORDER BY total_quantity DESC;
"""


# Execute the query and fetch results
df_order_stats_by_product = get_sql_dataframe(query, **mysqldw_args)
print(df_order_stats_by_product)


               product_name  total_quantity  min_quantity  max_quantity  \
0          Linha Do Equador             3.0             1             1   
1     Good Golly Miss Molly             3.0             1             1   
2         Balls to the Wall             2.0             1             1   
3          Inject The Venom             2.0             1             1   
4                  Overdose             2.0             1             1   
..                      ...             ...           ...           ...   
563           Be Aggressive             1.0             1             1   
564                 Get Out             1.0             1             1   
565        Take This Bottle             1.0             1             1   
566  Surprise! You're Dead!             1.0             1             1   
567          Medo De Escuro             1.0             1             1   

     std_quantity  
0             0.0  
1             0.0  
2             0.0  
3             0.0  