# MySQL Companion to MongoDB II: Aggregation Pipelines

# Import Libraries

In [None]:
# updates package lists for upgrades, suppresses output
!apt-get update > /dev/null 2>&1

# installs mysql server
!apt-get install -y mysql-server > /dev/null 2>&1

# changes the home directory for stability
!usermod -d /var/lib/mysql mysql

# enables the database server
!service mysql start

# checks status
!service mysql status

# SQL interpreter and options
!pip install ipython-sql
!pip install mysql-connector-python
%load_ext sql
%config SqlMagic.style = '_DEPRECATED_DEFAULT'
%config SqlMagic.autopandas = True

# csv handling
import pandas as pd

# for connecting to database
import mysql.connector

# for creating database engine connections
from sqlalchemy import create_engine

# for executing raw SQL queries
from sqlalchemy.sql import text

# numerical operations and data types
import numpy as np

# tracking execution time
import time

# Connect to MySQL

In [2]:
# Create .my.cnf file for password-based authentication
!rm -f ~/.my.cnf /root/.my.cnf                              # clear if existing
!echo -e "[client]\nuser=root\npassword=pw" > ~/.my.cnf     # print text to CLI
!chmod 600 ~/.my.cnf                                        # grants read/write permissions to file owner

In [3]:
!sudo mysql -e "ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'pw'; FLUSH PRIVILEGES;"

In [4]:
!mysql -e 'drop database clickstream;'

ERROR 1008 (HY000) at line 1: Can't drop database 'clickstream'; database doesn't exist


In [5]:
!mysql --host=localhost -e "CREATE DATABASE IF NOT EXISTS clickstream;"

In [6]:
%sql mysql+mysqlconnector://root:pw@localhost/clickstream

# Import Data

In [None]:
# connection details
user = 'root'
password = 'pw'
host = 'localhost'
database = 'clickstream'
table_name = 'clicks'

# file path
csv_file = 'clicks_flattened.csv'

# connection
engine = create_engine(f'mysql+mysqlconnector://{user}:{password}@{host}/{database}')

# read CSV in chunks, infer schema from a sample
chunk_size = 100000
sample_size = 1000

sample_df = pd.read_csv(csv_file, nrows=sample_size)
dtypes = sample_df.dtypes.to_dict()

# map pandas dtypes to MySQL dtypes
mysql_dtypes = {
    'int64': 'BIGINT',
    'float64': 'DOUBLE',
    'object': 'TEXT',
    'bool': 'BOOLEAN',
    'datetime64': 'DATETIME'
}

# create table schema
columns = [f"`{col}` {mysql_dtypes.get(str(dtype).split('[')[0], 'TEXT')}" for col, dtype in dtypes.items()]
create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"

# execute table creation
with engine.connect() as conn:
    conn.execute(text(create_table_query))

progress_interval = 100000
record_count = 0
start_time = time.time()

# process CSV in chunks
for chunk in pd.read_csv(csv_file, chunksize=chunk_size, dtype=dtypes):
    chunk.to_sql(table_name, engine, if_exists='append', index=False)
    record_count = record_count + chunk_size
    print(f"Records processed: {record_count}")

elapsed_time = time.time() - start_time
print(f"Completed: Imported data to database in {elapsed_time:.0f} seconds")

In [None]:
# Records processed: 100000
# Records processed: 200000
# ...
# Records processed: 6000000
# Records processed: 6100000
# Completed: Imported data to database in 418 seconds

# Create Aggregated <code>users_weekly</code> Table

In [None]:
# drop table if re-constructing
# !mysql -e 'use clickstream; drop table users_weekly'

In [None]:
%%sql

-- create a table to store weekly aggregated user activity data
CREATE TABLE users_weekly (
    userID VARCHAR(255),      -- unique identifier for each user
    weeknum INT,              -- week number in year (Monday as start of week)
    numDays INT,              -- distinct active days for user in week
    City VARCHAR(100),        -- user city
    Country VARCHAR(100),     -- user country
    pageloads_mobile BIGINT,  -- mobile pageload events
    pageloads_desktop BIGINT, -- desktop pageload events
    pageloads_bot BIGINT,     -- bot pageload events
    clicks_mobile BIGINT,     -- mobile click events
    clicks_desktop BIGINT,    -- desktop click events
    clicks_bot BIGINT,        -- bot click events
    PRIMARY KEY (userID, weeknum, Country)  -- composite primary key
);

INSERT INTO users_weekly
SELECT
    user_UserID AS userID,
    WEEK(DATE_SUB(VisitDateTime, INTERVAL 1 DAY), 3) AS weeknum,
    COUNT(DISTINCT DATE_FORMAT(VisitDateTime, '%Y-%m-%d')) AS numDays,
    ANY_VALUE(user_City) AS City,
    COALESCE(user_Country, 'Null') AS Country,
    SUM(CASE WHEN Activity = 'pageload' AND device_type = 'mobile' THEN 1 ELSE 0 END) AS pageloads_mobile,
    SUM(CASE WHEN Activity = 'pageload' AND device_type = 'desktop' THEN 1 ELSE 0 END) AS pageloads_desktop,
    SUM(CASE WHEN Activity = 'pageload' AND device_type = 'bot' THEN 1 ELSE 0 END) AS pageloads_bot,
    SUM(CASE WHEN Activity = 'click' AND device_type = 'mobile' THEN 1 ELSE 0 END) AS clicks_mobile,
    SUM(CASE WHEN Activity = 'click' AND device_type = 'desktop' THEN 1 ELSE 0 END) AS clicks_desktop,
    SUM(CASE WHEN Activity = 'click' AND device_type = 'bot' THEN 1 ELSE 0 END) AS clicks_bot
FROM clicks
WHERE user_UserID IS NOT NULL AND device_type IS NOT NULL
GROUP BY user_UserID, user_Country, WEEK(DATE_SUB(VisitDateTime, INTERVAL 1 DAY), 3);

 * mysql+mysqlconnector://root:***@localhost/clickstream
0 rows affected.
54222 rows affected.


# Inspect Top-Line Results

In [None]:
%%sql

SELECT SUM(pageloads_mobile) AS total_pageloads_mobile,
       SUM(pageloads_desktop) AS total_pageloads_desktop,
       SUM(pageloads_bot) AS total_pageloads_bot,
       SUM(clicks_mobile) AS total_clicks_mobile,
       SUM(clicks_desktop) AS total_clicks_desktop,
       SUM(clicks_bot) AS total_clicks_bot
FROM users_weekly;

 * mysql+mysqlconnector://root:***@localhost/clickstream
1 rows affected.


Unnamed: 0,total_pageloads_mobile,total_pageloads_desktop,total_pageloads_bot,total_clicks_mobile,total_clicks_desktop,total_clicks_bot
0,46649,132217,62,39884,383409,72


# Activity Counts by Country

In [None]:
%%sql

SELECT Country,
       SUM(pageloads_mobile) +
       SUM(pageloads_desktop) +
       SUM(pageloads_bot) +
       SUM(clicks_mobile) +
       SUM(clicks_desktop) +
       SUM(clicks_bot) as total_count
FROM users_weekly
GROUP BY Country
ORDER BY total_count DESC;

 * mysql+mysqlconnector://root:***@localhost/clickstream
158 rows affected.


Unnamed: 0,Country,total_count
0,India,452969
1,United States,29541
2,Null,16416
3,United Kingdom,6182
4,Singapore,4436
...,...,...
153,Mali,2
154,Togo,1
155,Honduras,1
156,El Salvador,1


# Export Checkpoint to File

In [None]:
!mysqldump -e clickstream clicks > clicks.sql

In [None]:
!mysqldump -e clickstream users_weekly > users_weekly.sql

# Import From File

In [10]:
!mysql --host=localhost clickstream < users_weekly.sql

# Plot Statistics by Country and User (Top 5 Countries)

In [18]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# connect to database
engine = create_engine('mysql+mysqlconnector://root:pw@localhost/clickstream')

# function to fetch top N countries by total pageloads and clicks
def fetch_top_countries(n):
    query_top_countries = f"""
    SELECT Country
    FROM (
        SELECT Country, SUM(pageloads_mobile + pageloads_desktop + pageloads_bot + clicks_mobile + clicks_desktop + clicks_bot) as total_count
        FROM users_weekly
        WHERE Country IS NOT NULL AND Country != 'Null'
        GROUP BY Country
        ORDER BY total_count DESC
        LIMIT {n}
    ) t
    """
    return pd.read_sql(query_top_countries, engine)['Country'].tolist()

# function to fetch bar plot data
def fetch_bar_data(top_countries):
    countries_str = ','.join(f"'{c}'" for c in top_countries)
    query_pageloads = f"""
    SELECT Country, SUM(pageloads_mobile + pageloads_desktop + pageloads_bot) as pageloads_count
    FROM users_weekly
    WHERE Country IS NOT NULL AND Country != 'Null' AND Country IN ({countries_str})
    GROUP BY Country
    """
    query_clicks = f"""
    SELECT Country, SUM(clicks_mobile + clicks_desktop + clicks_bot) as clicks_count
    FROM users_weekly
    WHERE Country IS NOT NULL AND Country != 'Null' AND Country IN ({countries_str})
    GROUP BY Country
    """
    pageloads_data = pd.read_sql(query_pageloads, engine)
    clicks_data = pd.read_sql(query_clicks, engine)

    countries = top_countries
    pageloads_counts = [pageloads_data[pageloads_data['Country'] == c]['pageloads_count'].iloc[0] if c in pageloads_data['Country'].values else 0 for c in countries]
    clicks_counts = [clicks_data[clicks_data['Country'] == c]['clicks_count'].iloc[0] if c in clicks_data['Country'].values else 0 for c in countries]

    return countries, pageloads_counts, clicks_counts

# function to fetch scatter plot data
def fetch_scatter_data(top_countries):
    countries_str = ','.join(f"'{c}'" for c in top_countries)
    query_scatter = f"""
    SELECT userID, Country,
           SUM(pageloads_mobile + pageloads_desktop + pageloads_bot) as total_pageloads,
           SUM(clicks_mobile + clicks_desktop + clicks_bot) as total_clicks
    FROM users_weekly
    WHERE Country IS NOT NULL AND Country != 'Null' AND Country IN ({countries_str})
    GROUP BY userID, Country
    """
    return pd.read_sql(query_scatter, engine)

# fetch top 5 countries dynamically
top_countries = fetch_top_countries(5)

# fetch bar plot data
bar_data = fetch_bar_data(top_countries)

# fetch scatter plot data
scatter_data = fetch_scatter_data(top_countries)

# prepare scatter plot data
countries_data = {country: {"pageloads": [], "clicks": [], "userIDs": []} for country in top_countries}
for _, row in scatter_data.iterrows():
    country = row['Country']
    countries_data[country]["pageloads"].append(row['total_pageloads'])
    countries_data[country]["clicks"].append(row['total_clicks'])
    countries_data[country]["userIDs"].append(row['userID'])

# create subplot: 2 rows, 1 column
fig = make_subplots(
    rows=2, cols=1,
    subplot_titles=('Country Distribution (Top 5)', 'Pageloads vs Clicks by User'),
    vertical_spacing=0.15
)

# add bar plot traces
fig.add_trace(
    go.Bar(
        name='Pageloads',
        x=bar_data[0],
        y=bar_data[1],
        marker_color='#40E0D0',
        showlegend=True,
        legendgroup='bar',
        legend='legend1'
    ),
    row=1, col=1
)
fig.add_trace(
    go.Bar(
        name='Clicks',
        x=bar_data[0],
        y=bar_data[2],
        marker_color='#C71585',
        showlegend=True,
        legendgroup='bar',
        legend='legend1'
    ),
    row=1, col=1
)

# define color map for scatter plot
color_map = {
    top_countries[0]: "#FF6347",
    top_countries[1]: "#4682B4",
    top_countries[2]: "#32CD32",
    top_countries[3]: "#FFD700",
    top_countries[4]: "#9932CC"
}
# add scatter plot traces
for country in top_countries:
    if countries_data[country]["pageloads"]:
        fig.add_trace(
            go.Scatter(
                x=countries_data[country]["pageloads"],
                y=countries_data[country]["clicks"],
                mode='markers',
                name=country,
                marker=dict(size=10, color=color_map[country], opacity=0.5, line=dict(width=0.5, color='black')),
                text=countries_data[country]["userIDs"],
                hovertemplate="User: %{text}<br>Pageloads: %{x}<br>Clicks: %{y}<extra></extra>",
                showlegend=True,
                legendgroup='scatter',
                legend='legend2'
            ),
            row=2, col=1
        )

# update layout
fig.update_layout(
    barmode='group',
    title_text='Users Weekly: Country Distribution and Pageloads vs Clicks',
    template='plotly_dark',
    showlegend=True,
    legend1=dict(x=1.05, y=1.0, xanchor='left', yanchor='top', title='Bar Chart'),
    legend2=dict(x=1.05, y=0.2, xanchor='left', yanchor='bottom', title='Scatter Plot'),
    height=800
)

# update axes
fig.update_xaxes(title_text='Country', row=1, col=1)
fig.update_xaxes(title_text='Total Pageloads', range=[0, 750], row=2, col=1)
fig.update_yaxes(title_text='Count', row=1, col=1)
fig.update_yaxes(title_text='Total Clicks', range=[0, 5000], row=2, col=1)

# save and show
fig.write_html('top_5_country_pageloads_and_clicks.html')
fig.show()