<img width="200" style="float:left" 
     src="https://upload.wikimedia.org/wikipedia/commons/f/f3/Apache_Spark_logo.svg" />

# Data analytics with SPARK and GRAFANA

<a id='0'></a>
## Objetive
**Customer Understanding & Segmentation**  
- Profile customers by age, gender, and spending patterns  
- Track average purchase values and preferred payment methods  
- Identify high-value customers and their shopping frequency

**Product Category Performance**  
- Measure revenue and profitability by category  
- Monitor pricing effectiveness, return rates, and identify best-performing categories  
- Analyze seasonal performance trends  

**Sales and Revenue Optimization**  
- Track total revenue and analyze transaction values  
- Monitor payment method distribution and identify peak sales periods  
- Calculate key metrics like average order value  

These objectives drive actionable decisions for inventory management, marketing, and business growth.

**Environment**
Configure the environment by removing pandas display limitations and allowing PySpark to connect to MariaDB using the necessary JDBC connector and Python interpreter path. Start Spark session.

In [1]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.mariadb.jdbc:mariadb-java-client:3.3.0" pyspark-shell'
os.environ['PYSPARK_PYTHON'] = '/home/osbdet/.jupyter_venv/bin/python'

In [2]:
import pandas as pd

pd.set_option('display.max_colwidth', None)

In [3]:
from pyspark.sql.session import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T

# We need to set `spark.sql.caseSensitive` to true due to field 'e' and 'E' in
# our dataset; without case sensitivity both fields would have the same name 
# and would produce an error.

spark_session = \
  SparkSession.builder\
              .appName("ecommerce_analytics")\
              .config("spark.sql.caseSensitive", "true")\
              .getOrCreate()

print(f"This cluster relies on Spark '{spark_session.version}'")

25/01/14 13:31:35 WARN Utils: Your hostname, osbdet resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s1)
25/01/14 13:31:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
org.mariadb.jdbc#mariadb-java-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-156a57a1-7ab9-4966-8643-e8cbb8089b90;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.mariadb.jdbc#mariadb-java-client;3.3.0 in central
	found com.github.waffle#waffle-jna;3.3.0 in central
	found net.java.dev.jna#jna;5.13.0 in central
	found net.java.dev.jna#jna-platform;5.13.0 in central
	found org.slf4j#jcl-over-slf4j;2.0.7 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found com.github.ben-manes.caffeine#caffeine;2.9.3 in central
	found com.google.errorprone#error_prone_annotations;2.10.0 in central
	found org.checkerframework#checker-qual;3.32.0 in central
:: resolution report :: resolve 288ms :: artifacts dl 14ms
	:: modules in use:
	com.github.ben-manes.caffeine#caffeine;2.9.3 from central in [default]
	com.github.waffle#waffle-jna;3.3.0 from central in [default]
	com.google.errorprone#error_prone_annotations;2.10.0 from central in [default]
	net.java.dev.jna#jna;5.13.0 from central in [default]
	net.java.dev.jna#jna-platform;5.13.0 from central in [default]
	org.checkerframework#checker-qual;3.32.0 from central in [default]
	org.mariadb.jdbc#m

This cluster relies on Spark '3.5.0'


<a id='2.1'></a>
### DataFrame creation

We need to tell the Spark Session **how to access MinIO**, our *S3 compatible object store*:

In [4]:
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "s3access")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "_s3access123$")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://localhost:9000")

We are now ready to **create the DataFrame on top of the data in MinIO**, which will allow us to *implement our use case*:

In [5]:
synthetic_raw_df = spark_session.read\
                              .json("s3a://raw-synthetic/synthetic_data_*.json")

# Let's double check the inferred schema to understand whats in front of us.
synthetic_raw_df.printSchema()

25/01/14 13:31:37 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

root
 |-- Customer Age: double (nullable = true)
 |-- Customer ID: long (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Product Price: double (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Returns: long (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- Total Purchase Amount: double (nullable = true)



In [6]:
print("Source data count:", synthetic_raw_df.count())
print("\nSource data sample:")
synthetic_raw_df.show(2)

Source data count: 126

Source data sample:
+-----------------+-----------+------+--------------+----------------+-------------+--------+-------+--------------------+---------------------+
|     Customer Age|Customer ID|Gender|Payment Method|Product Category|Product Price|Quantity|Returns|           Timestamp|Total Purchase Amount|
+-----------------+-----------+------+--------------+----------------+-------------+--------+-------+--------------------+---------------------+
|40.75771835727266|      29627|Female|   Credit Card|            Home|        500.0|       4|      1|2025-01-10T19:55:...|   2603.8907954988026|
|54.07769909125408|       7307|Female|        PayPal|     Electronics|         10.0|       5|      1|2025-01-10T19:56:...|   1657.7618313707655|
+-----------------+-----------+------+--------------+----------------+-------------+--------+-------+--------------------+---------------------+
only showing top 2 rows



<a id='2.2'></a>
### Initial shaping of the DataFrame to implement our use case

In [7]:
# Transform the DataFrame with proper names and types
ecommerce_df = synthetic_raw_df.select(
    F.col("Customer ID").alias("customer_id").cast("long"),
    F.col("Customer Age").alias("customer_age").cast("double"),
    F.col("Gender").cast("string"),
    F.col("Payment Method").alias("payment_method").cast("string"),
    F.col("Product Category").alias("product_category").cast("string"),
    F.col("Product Price").alias("product_price").cast("double"),
    F.col("Quantity").cast("long"),
    F.col("Returns").cast("long"),
    F.col("Total Purchase Amount").alias("total_purchase_amount").cast("double"),
    F.col("Timestamp").cast("timestamp")  # Changed from string to timestamp
)

In [8]:
# Review the intermediate schema you're building.
ecommerce_df.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- customer_age: double (nullable = true)
 |-- Gender: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_price: double (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Returns: long (nullable = true)
 |-- total_purchase_amount: double (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



<a id='2.3'></a>
### Sales data bucketing into intervals of 1 hour

In [9]:
# Create buckets
ecommerce_bucketed_df = ecommerce_df.withColumn(
    "time_bucket",
    F.date_trunc("hour", F.col("Timestamp"))  # Use directly the Timestamp column
).drop("Timestamp")  

In [10]:
# Check the schema structure
print("DataFrame Schema:")
ecommerce_bucketed_df.printSchema()

#  Check number of rows
print("\nTotal number of records:", ecommerce_bucketed_df.count())

# Look at sample data (first 5 rows)
print("\nSample of data:")
ecommerce_bucketed_df.show(5)

# Get basic statistics
print("\nBasic statistics:")
ecommerce_bucketed_df.describe().show()

DataFrame Schema:
root
 |-- customer_id: long (nullable = true)
 |-- customer_age: double (nullable = true)
 |-- Gender: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_price: double (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Returns: long (nullable = true)
 |-- total_purchase_amount: double (nullable = true)
 |-- time_bucket: timestamp (nullable = true)


Total number of records: 126

Sample of data:
+-----------+------------------+------+--------------+----------------+------------------+--------+-------+---------------------+-------------------+
|customer_id|      customer_age|Gender|payment_method|product_category|     product_price|Quantity|Returns|total_purchase_amount|        time_bucket|
+-----------+------------------+------+--------------+----------------+------------------+--------+-------+---------------------+-------------------+
|      29627| 40.75771835727266|Female|   C

25/01/14 13:32:25 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+------------------+------+--------------+----------------+------------------+------------------+------------------+---------------------+
|summary|       customer_id|      customer_age|Gender|payment_method|product_category|     product_price|          Quantity|           Returns|total_purchase_amount|
+-------+------------------+------------------+------+--------------+----------------+------------------+------------------+------------------+---------------------+
|  count|               126|               126|   126|           126|             126|               126|               126|               106|                  126|
|   mean| 28103.85714285714|46.534653531742094|  NULL|          NULL|            NULL|   273.11072151371|3.1507936507936507|0.4716981132075472|   2522.2533598833675|
| stddev|13968.485324738273|13.765980154253576|  NULL|          NULL|            NULL|138.72815049268405|1.4145951240829882|0.5015698625755192|    1365.817033104756|
|   

### Create new dataframe for agreggated metrics inclluding categories and customers

In [11]:

# New dataframe for categories
category_metrics_df = ecommerce_bucketed_df.groupBy(
    "product_category", 
    "time_bucket"
).agg(
    F.countDistinct("customer_id").alias("unique_customers"),
    F.sum("total_purchase_amount").alias("total_revenue"),
    F.avg("product_price").alias("avg_price"),
    F.sum("Quantity").alias("total_quantity"),
    F.sum("Returns").alias("total_returns"),
)

print("\nCATEGORY METRICS:")
print("Number of records:", category_metrics_df.count())
print("\nSchema:")
category_metrics_df.printSchema()
print("\nSample data:")
category_metrics_df.show(5)

# New dataframe for categories
client_profiles_df = ecommerce_bucketed_df.groupBy(
    "customer_id",
    "Gender",
    "customer_age"
).agg(
    F.avg("product_price").alias("avg_purchase_price"),
    F.avg("total_purchase_amount").alias("avg_transaction_value"),
    F.count("*").alias("total_transactions"),
    F.first("payment_method").alias("preferred_payment"),
    F.sum("Returns").alias("total_returns")
)

print("\nCLIENT PROFILES:")
print("Number of records:", client_profiles_df.count())
print("\nSchema:")
client_profiles_df.printSchema()
print("\nSample data:")
client_profiles_df.show(5)

# Basic statistics for each DataFrame
print("\nBASIC STATISTICS:")
print("Category Metrics Stats:")
category_metrics_df.describe().show()

print("\nClient Profiles Stats:")
client_profiles_df.describe().show()

# Example filters and sorts
print("\nTop Categories by Revenue:")
category_metrics_df.orderBy(F.col("total_revenue").desc()).show(5)

print("\nTop Customers by Transaction Value:")
client_profiles_df.orderBy(F.col("avg_transaction_value").desc()).show(5)


CATEGORY METRICS:
Number of records: 33

Schema:
root
 |-- product_category: string (nullable = true)
 |-- time_bucket: timestamp (nullable = true)
 |-- unique_customers: long (nullable = false)
 |-- total_revenue: double (nullable = true)
 |-- avg_price: double (nullable = true)
 |-- total_quantity: long (nullable = true)
 |-- total_returns: long (nullable = true)


Sample data:
+----------------+-------------------+----------------+------------------+------------------+--------------+-------------+
|product_category|        time_bucket|unique_customers|     total_revenue|         avg_price|total_quantity|total_returns|
+----------------+-------------------+----------------+------------------+------------------+--------------+-------------+
|     Electronics|2025-01-10 21:00:00|               6|11270.763396404705| 341.7803635973901|            16|            1|
|            Home|2025-01-09 20:00:00|               6|12262.784723079485| 274.5063781064674|            19|            2|
|

**Before we can store results in a relational database, we need to create the destination *database* and *table*.** The `dataservingdb_indicators.sql` file contains the SQL code to do so:

```
-- Final Assignment SQL Script
-- Created: 2025-01-09

-- Drop the database if it exists
DROP DATABASE IF EXISTS final_assignment;

-- Create the database
CREATE DATABASE final_assignment;

-- Switch to the newly created database
USE final_assignment;

-- Drop views if they exist
DROP VIEW IF EXISTS category_metrics;
DROP VIEW IF EXISTS client_profiles;

-- Drop the tables if they exist
DROP TABLE IF EXISTS ecommerce_events;
DROP TABLE IF EXISTS category_metrics;
DROP TABLE IF EXISTS client_profiles;

-- Create the e-commerce events table
CREATE TABLE ecommerce_events (
    id BIGINT AUTO_INCREMENT,
    customer_id BIGINT,
    customer_age DOUBLE,
    gender VARCHAR(50),
    payment_method VARCHAR(100),
    product_category VARCHAR(100),
    product_price DOUBLE,
    quantity BIGINT,
    returns BIGINT,
    event_timestamp TIMESTAMP,
    total_purchase_amount DOUBLE,
    time_bucket TIMESTAMP,
    PRIMARY KEY (id),
    INDEX idx_time_bucket (time_bucket),
    INDEX idx_customer (customer_id),
    INDEX idx_category (product_category)
);

-- Create the category metrics table
CREATE TABLE category_metrics (
    id BIGINT AUTO_INCREMENT,
    product_category VARCHAR(100),
    time_bucket TIMESTAMP,
    unique_customers BIGINT,
    total_revenue DOUBLE,
    avg_price DOUBLE,
    total_quantity BIGINT,
    total_returns BIGINT,
    PRIMARY KEY (id),
    INDEX idx_category (product_category),
    INDEX idx_time (time_bucket)
);

-- Create the client profiles table
CREATE TABLE client_profiles (
    id BIGINT AUTO_INCREMENT,
    customer_id BIGINT,
    gender VARCHAR(50),
    customer_age DOUBLE,
    avg_purchase_price DOUBLE,
    avg_transaction_value DOUBLE,
    total_transactions BIGINT,
    preferred_payment VARCHAR(100),
    total_returns BIGINT,
    PRIMARY KEY (id),
    INDEX idx_customer (customer_id)
);
```

We'll use the `mariadb` command line tool by opening a *Terminal* window from Jupyter and running the SQL script like this (don't copy the $ sign):

```
$ cd /home/osbdet/notebooks/mda2/FINALWORK
$ mariadb -u osbdet -p < final-assignment-sql.sql
```
**Note:** type `osbdet123$` **WHEN** you're asked for the password; remember that nothing will come up while typing, it's a security feature.

<a id='5'></a>
## Results of the Spark analysis into MariaDB

In [13]:
# Write to MariaDB
driver_name = "org.mariadb.jdbc.Driver"
connection_string = "jdbc:mariadb://localhost:3306/final_assignment"
table_name = "ecommerce_events"
user_name = "osbdet"
password = "osbdet123$"

(ecommerce_bucketed_df.write
    .format("jdbc")
    .mode("overwrite")
    .option("driver", driver_name)
    .option("url", connection_string)
    .option("dbtable", table_name)
    .option("user", user_name)
    .option("password", password)
    .option("batchsize", 1000)
    .option("truncate", "true")
    .option("numPartitions", 4)
    .save())

# Write category_metrics to MariaDB
(category_metrics_df.write
    .format("jdbc")
    .mode("overwrite")
    .option("driver", driver_name)
    .option("url", connection_string)
    .option("dbtable", "category_metrics")
    .option("user", user_name)
    .option("password", password)
    .option("batchsize", 1000)
    .option("truncate", "true")
    .save())

# Write client_profiles to MariaDB
(client_profiles_df.write
    .format("jdbc")
    .mode("overwrite")
    .option("driver", driver_name)
    .option("url", connection_string)
    .option("dbtable", "client_profiles")
    .option("user", user_name)
    .option("password", password)
    .option("batchsize", 1000)
    .option("truncate", "true")
    .save())

# Verify the writes by reading back and showing data
print("Category Metrics Data:")
category_metrics_df.show(5)

print("\nClient Profiles Data:")
client_profiles_df.show(5)

Category Metrics Data:
+----------------+-------------------+----------------+------------------+------------------+--------------+-------------+
|product_category|        time_bucket|unique_customers|     total_revenue|         avg_price|total_quantity|total_returns|
+----------------+-------------------+----------------+------------------+------------------+--------------+-------------+
|     Electronics|2025-01-10 21:00:00|               6|11270.763396404705| 341.7803635973901|            16|            1|
|            Home|2025-01-09 20:00:00|               6|12262.784723079485| 274.5063781064674|            19|            2|
|        Clothing|2025-01-10 21:00:00|               5|12769.918995999655|302.37018109248555|            15|            2|
|            Home|2025-01-10 20:00:00|               3| 4934.453705040595| 432.8838179830964|            12|            1|
|            Home|2025-01-11 00:00:00|               5|11631.103413336197|262.76213835473453|            15|        

<a id='5.1'></a>
### Verify the contents of the *ecommerce events* table


```
$ mariadb -u osbdet -p -D final_assignment -e "SELECT * FROM ecommerce_events ORDER BY time_bucket DESC LIMIT 20;"
```
**Note:** type `osbdet123$` **WHEN** you're asked for the password; remember that nothing will come up while typing, it's a security feature.