In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, avg, year, month, desc, col, when, isnan, isnull, to_timestamp

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Fecom Inc E-commerce Analysis") \
    .getOrCreate()

# 1. Reading CSV files with schema inference and semicolon delimiter
print("1. Reading CSV files with auto schema inference")

# Read Orders data
orders_df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ";") \
    .csv("Orders.csv")

# Read Customer data
customers_df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ";") \
    .csv("Customer_List.csv")

# Read Order Items data
order_items_df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ";") \
    .csv("Order_Items.csv")

# Read Products data
products_df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ";") \
    .csv("Products.csv")

# Read Order Reviews data
reviews_df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ";") \
    .csv("Order_Reviews.csv")

# Show schemas to verify data types were correctly inferred
print("\nOrders Schema:")
orders_df.printSchema()

print("\nCustomers Schema:")
customers_df.printSchema()

print("\nOrder Items Schema:")
order_items_df.printSchema()

print("\nProducts Schema:")
products_df.printSchema()

print("\nReviews Schema:")
reviews_df.printSchema()

# Convert timestamp strings to timestamp type for date operations
orders_df = orders_df.withColumn(
    "Order_Purchase_Timestamp", 
    to_timestamp(col("Order_Purchase_Timestamp"), "yyyy-MM-dd HH:mm:ss")
)

1. Reading CSV files with auto schema inference

Orders Schema:
root
 |-- Order_ID: string (nullable = true)
 |-- Customer_Trx_ID: string (nullable = true)
 |-- Order_Status: string (nullable = true)
 |-- Order_Purchase_Timestamp: timestamp (nullable = true)
 |-- Order_Approved_At: timestamp (nullable = true)
 |-- Order_Delivered_Carrier_Date: timestamp (nullable = true)
 |-- Order_Delivered_Customer_Date: timestamp (nullable = true)
 |-- Order_Estimated_Delivery_Date: timestamp (nullable = true)


Customers Schema:
root
 |-- Customer_Trx_ID: string (nullable = true)
 |-- Subscriber_ID: string (nullable = true)
 |-- Subscribe_Date: date (nullable = true)
 |-- First_Order_Date: date (nullable = true)
 |-- Customer_Postal_Code: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Customer_Country_Code: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)


Order Items Schem

In [8]:
# 2. Count total orders, customers, and sellers
print("\n2. Statistics on orders, customers, and sellers")

# Count unique orders
total_orders = orders_df.select("Order_ID").distinct().count()
print(f"Total number of orders: {total_orders}")

# Count unique customers
total_customers = customers_df.select("Customer_Trx_ID").distinct().count()
print(f"Total number of unique customers: {total_customers}")

# Count unique sellers (assuming Seller_ID is in order_items_df)
total_sellers = order_items_df.select("Seller_ID").distinct().count()
print(f"Total number of sellers: {total_sellers}")



2. Statistics on orders, customers, and sellers
Total number of orders: 99441
Total number of unique customers: 99442
Total number of sellers: 3095


In [14]:
# 3. Orders by country (descending order)
print("\n3. Number of orders by country (descending order)")

# Join orders with customers to get country information
orders_with_customer = orders_df.join(
    customers_df,
    orders_df["Customer_Trx_ID"] == customers_df["Customer_Trx_ID"],
    "inner"
)

orders_by_country = orders_with_customer.groupBy("Customer_Country") \
    .agg(count("Order_ID").alias("Order_Count")) \
    .orderBy(desc("Order_Count"))

orders_by_country.show(50, False)


3. Number of orders by country (descending order)
+----------------+-----------+
|Customer_Country|Order_Count|
+----------------+-----------+
|Germany         |41754      |
|France          |12848      |
|Netherlands     |11629      |
|Belgium         |5464       |
|Austria         |5043       |
|Switzerland     |3640       |
|United Kingdom  |3382       |
|Poland          |2139       |
|Czechia         |2034       |
|Italy           |2025       |
|Spain           |1651       |
|Portugal        |1336       |
|Sweden          |975        |
|Denmark         |905        |
|Serbia          |746        |
|Norway          |716        |
|Slovakia        |534        |
|Slovenia        |495        |
|Turkey          |485        |
|Greece          |412        |
|Lithuania       |351        |
|Latvia          |280        |
|Croatia         |254        |
|Estonia         |148        |
|Finland         |81         |
|Luxembourg      |68         |
|Andorra         |46         |
+----------------+-

In [13]:
# 4. Orders by year and month (year ascending, month descending)
print("\n4. Number of orders by year and month (year ascending, month descending)")

# Extract year and month from order date
orders_by_time = orders_df.withColumn("Year", year(col("Order_Purchase_Timestamp"))) \
    .withColumn("Month", month(col("Order_Purchase_Timestamp"))) \
    .groupBy("Year", "Month") \
    .agg(count("Order_ID").alias("Order_Count")) \
    .orderBy("Year", desc("Month"))

orders_by_time.show(50)


4. Number of orders by year and month (year ascending, month descending)
+----+-----+-----------+
|Year|Month|Order_Count|
+----+-----+-----------+
|2022|   12|          1|
|2022|   10|        324|
|2022|    9|          4|
|2023|   12|       5673|
|2023|   11|       7544|
|2023|   10|       4631|
|2023|    9|       4285|
|2023|    8|       4331|
|2023|    7|       4026|
|2023|    6|       3245|
|2023|    5|       3700|
|2023|    4|       2404|
|2023|    3|       2682|
|2023|    2|       1780|
|2023|    1|        800|
|2024|   10|          4|
|2024|    9|         16|
|2024|    8|       6512|
|2024|    7|       6292|
|2024|    6|       6167|
|2024|    5|       6873|
|2024|    4|       6939|
|2024|    3|       7211|
|2024|    2|       6728|
|2024|    1|       7269|
+----+-----+-----------+



In [15]:
# 5. Review score analysis
print("\n5. Review score statistics")

# Handle NULL values in Review_Score
# First, convert Review_Score to numeric type
reviews_df = reviews_df.withColumn("Review_Score", col("Review_Score").cast("double"))

clean_reviews_df = reviews_df.withColumn(
    "Review_Score_Clean", 
    when(col("Review_Score").isNull() | isnan(col("Review_Score")), None).otherwise(col("Review_Score"))
)

# Calculate average review score (excluding NULL values)
avg_review_score = clean_reviews_df.agg(avg("Review_Score_Clean").alias("Average_Review_Score")).collect()[0]["Average_Review_Score"]
print(f"Average review score: {avg_review_score:.2f}")

# Count reviews by score
reviews_by_score = clean_reviews_df.groupBy("Review_Score_Clean") \
    .agg(count("*").alias("Review_Count")) \
    .orderBy("Review_Score_Clean")

print("\nDistribution of review scores:")
reviews_by_score.show()

# Count NULL and non-NULL values in Review_Score
null_count = clean_reviews_df.filter(col("Review_Score_Clean").isNull()).count()
print(f"\nNumber of NULL review scores: {null_count}")

# Additional analysis: Show percentage distribution of review scores
total_non_null_reviews = clean_reviews_df.filter(col("Review_Score_Clean").isNotNull()).count()

if total_non_null_reviews > 0:
    percentage_by_score = clean_reviews_df.filter(col("Review_Score_Clean").isNotNull()) \
        .groupBy("Review_Score_Clean") \
        .agg(count("*").alias("Count")) \
        .withColumn("Percentage", (col("Count") * 100 / total_non_null_reviews)) \
        .orderBy("Review_Score_Clean")

    print("\nPercentage distribution of review scores:")
    percentage_by_score.select("Review_Score_Clean", "Count", "Percentage").show()



5. Review score statistics
Average review score: 4.09

Distribution of review scores:
+------------------+------------+
|Review_Score_Clean|Review_Count|
+------------------+------------+
|              NULL|          47|
|               1.0|       11424|
|               2.0|        3151|
|               3.0|        8179|
|               4.0|       19141|
|               5.0|       57328|
+------------------+------------+


Number of NULL review scores: 47

Percentage distribution of review scores:
+------------------+-----+------------------+
|Review_Score_Clean|Count|        Percentage|
+------------------+-----+------------------+
|               1.0|11424|11.513459580943934|
|               2.0| 3151| 3.175674994708888|
|               3.0| 8179| 8.243048486741985|
|               4.0|19141|19.290890216985982|
|               5.0|57328| 57.77692672061921|
+------------------+-----+------------------+

