In [15]:
print("Hola Mundo")

Hola Mundo


In [16]:
# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
    .appName("ETL") \
    .master("local") \
    .getOrCreate()


In [17]:
!ls work/dataset/

product_dataset.csv


In [18]:
csv_path = "./work/dataset/product_dataset.csv"

In [19]:
df_resource = spark.read.csv(csv_path, header=True, inferSchema=True)
df_resource.printSchema()
df_resource.show(truncate=True)

root
 |-- transaction_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- category: string (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: string (nullable = true)

+--------------+----------+--------+--------+--------+------------+
|transaction_id|      date|category| product|quantity|       price|
+--------------+----------+--------+--------+--------+------------+
|             1|2024-07-01|  Widget|Widget-A|      10|        9.99|
|             2|2024-07-01|  Gadget|Gadget-X|       5|       19.99|
|             3|2024-07-02|  Widget|Widget-B|       7|        9.99|
|             4|2024-07-02|  Doodad|Doodad-1|    NULL|       4.99 |
|             5|2024-07-03|  Widget|Widget-C|       3|        9.99|
|             6|2024-07-03|  Gadget|Gadget-Y|       8|       19.99|
|             7|2024-07-04|  Widget|Widget-A|       2|        9.99|
|             8|2024-07-04|  Doodad|Doodad-2|       4|not_a_number|
|       

1.	Data Cleaning:

    •	Handle missing values:

        •	Replace missing quantity with 0.

        •	Replace missing or invalid price values (not_a_number) with the median price for the same product category.

        •	Drop rows where both quantity and price are invalid or missing.


In [20]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Replace missing quantity with 0.
df = df_resource.fillna({"quantity": 0})

# Replace missing or invalid price values (not_a_number) with the median price for the same product category.
df = df.withColumn("price", F.when(F.col("price") == "not_a_number", None).otherwise(F.col("price").cast("float")))

# Calculate Median by category
filtered_for_median = df.filter(F.col("price").isNotNull())
window_spec = Window.partitionBy("category")
median_price_df = filtered_for_median.withColumn("median_price", F.expr("percentile_approx(price, 0.5)").over(window_spec))

# Replace missing or invalid price values
median_price_df = median_price_df.select("category", "median_price").dropDuplicates(["category"])
df = df.join(median_price_df, on="category", how="left")
df = df.withColumn("price", F.when(F.col("price").isNull(), F.col("median_price")).otherwise(F.col("price")))

# Drop rows where both quantity and price are invalid or missing
df = df.filter(~(F.col("quantity").isNull() & F.col("price").isNull()))

# Delete median_price_column
df = df.drop("median_price")

# Show data clean results
df.show(10)

+--------+--------------+----------+--------+--------+-----+
|category|transaction_id|      date| product|quantity|price|
+--------+--------------+----------+--------+--------+-----+
|  Widget|             1|2024-07-01|Widget-A|      10| 9.99|
|  Gadget|             2|2024-07-01|Gadget-X|       5|19.99|
|  Widget|             3|2024-07-02|Widget-B|       7| 9.99|
|  Doodad|             4|2024-07-02|Doodad-1|       0| 4.99|
|  Widget|             5|2024-07-03|Widget-C|       3| 9.99|
|  Gadget|             6|2024-07-03|Gadget-Y|       8|19.99|
|  Widget|             7|2024-07-04|Widget-A|       2| 9.99|
|  Doodad|             8|2024-07-04|Doodad-2|       4| 4.99|
|  Widget|             9|2024-07-05|Widget-B|       6| 9.99|
|  Gadget|            10|2024-07-05|Gadget-X|       3|19.99|
+--------+--------------+----------+--------+--------+-----+
only showing top 10 rows



In [21]:
#transaction_ids = [4, 8, 11, 17, 27, 32, 40, 47]
transaction_ids = [8,32]
filtered_df = df.filter(F.col("transaction_id").isin(transaction_ids))

# Mostrar los resultados
filtered_df.show(20)

+--------+--------------+----------+--------+--------+-----+
|category|transaction_id|      date| product|quantity|price|
+--------+--------------+----------+--------+--------+-----+
|  Doodad|             8|2024-07-04|Doodad-2|       4| 4.99|
|  Gadget|            32|2024-07-16|Gadget-Y|       5|19.99|
+--------+--------------+----------+--------+--------+-----+



2.	Derived Columns:

    •	Calculate total_sales (quantity * price) for each transaction.

    •	Create a day_of_week column based on the date.

    •	Add a high_volume flag: True if quantity > 10, otherwise False.

In [22]:
# Calculate total_sales (quantity * price) for each transaction
df = df.withColumn("total_sales", F.col("quantity") * F.col("price"))

# Create a day_of_week column based on the date.
df = df.withColumn("day_of_week", F.date_format(F.col("date"), "EEEE"))

# Add a high_volume flag: True if quantity > 10, otherwise False.
df = df.withColumn("high_volume", F.when(F.col("quantity") > 10, True).otherwise(False))

# Show dervied columns results
df.show(10)

+--------+--------------+----------+--------+--------+-----+-----------+-----------+-----------+
|category|transaction_id|      date| product|quantity|price|total_sales|day_of_week|high_volume|
+--------+--------------+----------+--------+--------+-----+-----------+-----------+-----------+
|  Widget|             1|2024-07-01|Widget-A|      10| 9.99|  99.899994|     Monday|      false|
|  Gadget|             2|2024-07-01|Gadget-X|       5|19.99|      99.95|     Monday|      false|
|  Widget|             3|2024-07-02|Widget-B|       7| 9.99|      69.93|    Tuesday|      false|
|  Doodad|             4|2024-07-02|Doodad-1|       0| 4.99|        0.0|    Tuesday|      false|
|  Widget|             5|2024-07-03|Widget-C|       3| 9.99|      29.97|  Wednesday|      false|
|  Gadget|             6|2024-07-03|Gadget-Y|       8|19.99|     159.92|  Wednesday|      false|
|  Widget|             7|2024-07-04|Widget-A|       2| 9.99|      19.98|   Thursday|      false|
|  Doodad|             8|2024-

In [23]:
filtered_df = df.filter(F.col("high_volume")==True)

# Mostrar los resultados
filtered_df.show(20)

+--------+--------------+----+-------+--------+-----+-----------+-----------+-----------+
|category|transaction_id|date|product|quantity|price|total_sales|day_of_week|high_volume|
+--------+--------------+----+-------+--------+-----+-----------+-----------+-----------+
+--------+--------------+----+-------+--------+-----+-----------+-----------+-----------+



3.	Complex Transformations:

    •	Group data by category and calculate:

        •	Average price per product in the category.

        •	Total revenue for each category.

        •	Day with highest sales for the category.

    •	Identify outliers in the data:

        •	Transactions where quantity is more than 2 standard deviations from the category mean.

        •	Mark these rows with an outlier flag.
        


In [24]:
### Group data by category and calculate

# Average price per product in the category.
df_category_avg_price = df.groupBy("category", "product").agg(
    F.avg("price").alias("avg_price")
)

# Total revenue for each category.
df_category_revenue = df.groupBy("category").agg(
    F.sum("total_sales").alias("total_revenue")
)

# Day with highest sales for the category.
window_spec = Window.partitionBy("category").orderBy(F.desc("total_sales"))
df_sales_ranked = df.withColumn("rank", F.rank().over(window_spec))
df_day_highest_sales = df_sales_ranked.filter(F.col("rank") == 1).select("category", "day_of_week").dropDuplicates(["category"])

### Identify outliers in the data:

# Transactions where quantity is more than 2 standard deviations from the category mean.
df_stats = df.groupBy("category").agg(
    F.mean("quantity").alias("mean_quantity"),
    F.stddev("quantity").alias("stddev_quantity")
)

# Mark these rows with an outlier flag.
df_stats = df_stats.dropDuplicates(["category"])
df = df.join(df_stats, on="category", how="left")
df = df.withColumn("outlier_flag", F.when(
    (F.col("quantity") > F.col("mean_quantity") + 2 * F.col("stddev_quantity")) |
    (F.col("quantity") < F.col("mean_quantity") - 2 * F.col("stddev_quantity")),
    True
).otherwise(False))

# Joins results
df = df.drop("mean_quantity", "stddev_quantity")

# Show Resultds
df.show(10)


+--------+--------------+----------+--------+--------+-----+-----------+-----------+-----------+------------+
|category|transaction_id|      date| product|quantity|price|total_sales|day_of_week|high_volume|outlier_flag|
+--------+--------------+----------+--------+--------+-----+-----------+-----------+-----------+------------+
|  Widget|             1|2024-07-01|Widget-A|      10| 9.99|  99.899994|     Monday|      false|       false|
|  Gadget|             2|2024-07-01|Gadget-X|       5|19.99|      99.95|     Monday|      false|       false|
|  Widget|             3|2024-07-02|Widget-B|       7| 9.99|      69.93|    Tuesday|      false|       false|
|  Doodad|             4|2024-07-02|Doodad-1|       0| 4.99|        0.0|    Tuesday|      false|       false|
|  Widget|             5|2024-07-03|Widget-C|       3| 9.99|      29.97|  Wednesday|      false|       false|
|  Gadget|             6|2024-07-03|Gadget-Y|       8|19.99|     159.92|  Wednesday|      false|       false|
|  Widget|

In [25]:
filtered_df = df.filter(F.col("outlier_flag")==True)

# Mostrar los resultados
filtered_df.show()

+--------+--------------+----+-------+--------+-----+-----------+-----------+-----------+------------+
|category|transaction_id|date|product|quantity|price|total_sales|day_of_week|high_volume|outlier_flag|
+--------+--------------+----+-------+--------+-----+-----------+-----------+-----------+------------+
+--------+--------------+----+-------+--------+-----+-----------+-----------+-----------+------------+



4.	Storage:

    •	Store the cleaned and processed data in a SQLite database with separate tables for:

        •	Transactions

        •	Aggregated metrics by category

        •	Outliers

In [26]:
import sqlite3
import os

# Path to DB SQLite with the shared volumne of docker
db_path = "/home/jovyan/work/data/sales_dashboard.db"

# Check DB if null, create
if not os.path.exists(db_path):
    open(db_path, 'w').close()  # Make empty file for SQLite

# Connecto to SQLite
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# Delete tables if any exis
cursor.executescript("""
DROP TABLE IF EXISTS transactions;
DROP TABLE IF EXISTS aggregated_metrics;
DROP TABLE IF EXISTS outliers;
""")
conn.commit()

# Save Transactions in SQLite
df_transactions = df.select(
    "transaction_id", "date", "category", "product", "quantity", "price",
    "total_sales", "day_of_week", "high_volume"
)
df_transactions.toPandas().to_sql("transactions", conn, if_exists="replace", index=False)

# Save Aggregated Metrics
df_aggregated_metrics = df_category_avg_price \
    .join(df_category_revenue, on="category", how="inner") \
    .join(df_day_highest_sales, on="category", how="left")

df_aggregated_metrics.toPandas().to_sql("aggregated_metrics", conn, if_exists="replace", index=False)

# Save Guardar Outliers (Only transactiones like True)
#df_outliers = df.filter(F.col("outlier_flag") == True).select("transaction_id", "category", "product", "quantity", "price", "total_sales", "outlier_flag")
df_outliers = df.select("transaction_id", "category", "product", "quantity", "price", "total_sales", "outlier_flag")
df_outliers.toPandas().to_sql("outliers", conn, if_exists="replace", index=False)

# Close connection to SQLite
conn.close()

print("Base de datos SQLite actualizada correctamente en el volumen compartido.")


Base de datos SQLite actualizada correctamente en el volumen compartido.


In [27]:
spark.stop()