## Data Exploration & Preprocessing

### 1.	Load all datasets (orders, payments, customers, products, reviews) into Spark DataFrames.

In [0]:
orders_df = (spark.read
      .format("csv")
      .option("header","true")
      .option("inferSchema", "true")
      .load("/FileStore/ecom/olist_orders_dataset.csv"))

payments_df = (spark.read
      .format("csv")
      .option("header","true")
      .option("inferSchema", "true")
      .load("/FileStore/ecom/olist_order_payments_dataset.csv"))

customers_df = (spark.read
      .format("csv")
      .option("header","true")
      .option("inferSchema", "true")
      .load("/FileStore/ecom/olist_customers_dataset.csv"))

products_df = (spark.read
      .format("csv")
      .option("header","true")
      .option("inferSchema", "true")
      .load("/FileStore/ecom/olist_products_dataset.csv"))

reviews_df = (spark.read
      .format("csv")
      .option("header","true")
      .option("inferSchema", "true")
      .load("/FileStore/ecom/olist_order_reviews_dataset.csv"))

order_items_df = (spark.read
      .format("csv")
      .option("header","true")
      .option("inferSchema", "true")
      .load("/FileStore/ecom/olist_order_items_dataset.csv"))

sellers_df = (spark.read
      .format("csv")
      .option("header","true")
      .option("inferSchema", "true")
      .load("/FileStore/ecom/olist_sellers_dataset.csv"))

geolocation_df = (spark.read
      .format("csv")
      .option("header","true")
      .option("inferSchema", "true")
      .load("/FileStore/ecom/olist_geolocation_dataset.csv"))


category_translation_df = (spark.read
      .format("csv")
      .option("header","true")
      .option("inferSchema", "true")
      .load("/FileStore/ecom/product_category_name_translation.csv"))

### 2.	Display schema and sample records

In [0]:
print("Orders Schema:")
orders_df.printSchema()
orders_df.show(5)

print("Payments Schema:")
payments_df.printSchema()
payments_df.show(5)

print("Customers Schema:")
customers_df.printSchema()
customers_df.show(5)

print("Products Schema:")
products_df.printSchema()
products_df.show(5)

print("Reviews Schema:")
reviews_df.printSchema()
reviews_df.show(5)

print("Order items Schema:")
order_items_df.printSchema()
order_items_df.show(5)

print("Sellers Schema:")
sellers_df.printSchema()
sellers_df.show(5)

print("Category Translation Schema:")
category_translation_df.printSchema()
category_translation_df.show(5)

print("Geolocation Schema:")
geolocation_df.printSchema()
geolocation_df.show(5)



Orders Schema:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+--------

### 3.	Count the number of unique orders and customers.

In [0]:
from pyspark.sql.functions import *

In [0]:
unique_counts_df = orders_df.select(
    countDistinct("order_id").alias("unique_orders"),
    countDistinct("customer_id").alias("unique_customers")
)
display(unique_counts_df)


unique_orders,unique_customers
99441,99441


## Sales & Revenue Analysis

### 4.	Calculate total revenue, average order value, and monthly revenue trends.

In [0]:
revenue_metrics = payments_df.agg(
    sum("payment_value").alias("total_revenue"),
    avg("payment_value").alias("avg_order_value")
)
display(revenue_metrics)

total_revenue,avg_order_value
16008872.119999208,154.10038041698792


In [0]:
# Monthly revenue trends
monthly_revenue_df = orders_df.join(payments_df, "order_id") \
    .withColumn("year", year("order_purchase_timestamp")) \
    .withColumn("month", month("order_purchase_timestamp")) \
    .groupBy("year", "month") \
    .agg(sum("payment_value").alias("monthly_revenue")) \
    .orderBy("year", "month")

display(monthly_revenue_df)

year,month,monthly_revenue
2016,9,252.24
2016,10,59090.48
2016,12,19.62
2017,1,138488.04
2017,2,291908.00999999983
2017,3,449863.5999999998
2017,4,417788.0299999998
2017,5,592918.8199999994
2017,6,511276.37999999966
2017,7,592382.92


Databricks visualization. Run in Databricks to view.

### 5.	Identify the top 5 best-selling product categories.

In [0]:
top_categories_df = order_items_df \
    .join(products_df, "product_id") \
    .join(category_translation_df, "product_category_name") \
    .groupBy("product_category_name_english") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(5)

display(top_categories_df)

product_category_name_english,count
bed_bath_table,11115
health_beauty,9670
sports_leisure,8641
furniture_decor,8334
computers_accessories,7827


Databricks visualization. Run in Databricks to view.

### 6.	Determine the top 3 payment methods used by customers.

In [0]:
payment_methods_df = payments_df \
    .groupBy("payment_type") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(3)
display(payment_methods_df)

payment_type,count
credit_card,76795
boleto,19784
voucher,5775


Databricks visualization. Run in Databricks to view.

## Customer Behavior Analysis

### 7.	Find the average number of orders per customer.

In [0]:
avg_orders_df = orders_df \
    .groupBy("customer_id") \
    .agg(count("order_id").alias("order_count")) \
    .agg(avg("order_count").alias("avg_orders_per_customer"))

display(avg_orders_df)

avg_orders_per_customer
1.0


### 8.	Identify the top 5 cities with the highest number of purchases.

In [0]:
top_cities_df = customers_df \
    .join(orders_df, "customer_id") \
    .groupBy("customer_city") \
    .agg(count("order_id").alias("purchase_count")) \
    .orderBy(col("purchase_count").desc()) \
    .limit(5)

display(top_cities_df)

customer_city,purchase_count
sao paulo,15540
rio de janeiro,6882
belo horizonte,2773
brasilia,2131
curitiba,1521


Databricks visualization. Run in Databricks to view.

### 9.	Segment customers based on their purchase frequency (one-time, occasional (2 – 5 orders), frequent (6 – 15 orders) and loyal (> 15 orders)).

In [0]:
customer_segments_df = customers_df \
    .groupBy("customer_unique_id") \
    .agg(count("customer_id").alias("total_orders")) \
    .withColumn("customer_segment", when(col("total_orders") == 1, "One-time")
        .when((col("total_orders") >= 2) & (col("total_orders") <= 5), "Occasional")
        .when((col("total_orders") >= 6) & (col("total_orders") <= 15), "Frequent")
        .otherwise("Loyal")) \
    .groupBy("customer_segment") \
    .agg(count("customer_segment").alias("numberofcustomers")) \
    .orderBy("customer_segment")

# Display the results
display(customer_segments_df)


customer_segment,numberofcustomers
Frequent,10
Loyal,1
Occasional,2986
One-time,93099


## Delivery & Logistics Analysis

### 10.	Calculate the average shipping time per order.

In [0]:
shipping_time_df = orders_df \
    .withColumn("shipping_days", 
        datediff("order_delivered_customer_date", "order_purchase_timestamp")) \
    .agg(avg("shipping_days").alias("avg_shipping_days"))

display(shipping_time_df)

avg_shipping_days
12.497336125046644


### 11.	Identify the top 10 orders with worst delivery experience based on delays. 

In [0]:
worst_delays_df = orders_df \
    .withColumn("delay_days", datediff("order_delivered_customer_date", "order_estimated_delivery_date")) \
    .where(col("order_delivered_customer_date") > col("order_estimated_delivery_date")) \
    .select("order_id", "delay_days") \
    .orderBy(col("delay_days").desc()) \
    .limit(10)

display(worst_delays_df)

order_id,delay_days
1b3190b2dfa9d789e1f14c05b647a14a,188
ca07593549f1816d26a572e06dc1eab6,181
47b40429ed8cce3aee9199792275433f,175
2fe324febf907e3ea3f2aa9650869fa5,167
285ab9426d6982034523a855f55a885e,166
440d0d17af552815d15a9e41abe49359,165
c27815f7e3dd0b926b58552628481575,162
d24e8541128cea179a11a65176e0a96f,161
0f4519c5f1c541ddec9f21b3bddd533a,161
2d7561026d542c8dbd8f0daeadf67a43,159


### 12.	Find out which regions have the longest shipping times.

In [0]:
longest_shipping_regions_df = orders_df \
    .join(customers_df, "customer_id") \
    .join(geolocation_df, customers_df.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix) \
    .where(col("order_delivered_customer_date").isNotNull()) \
    .withColumn("shipping_days", datediff("order_delivered_customer_date", "order_purchase_timestamp")) \
    .groupBy("geolocation_state") \
    .agg(avg("shipping_days").alias("avg_shipping_days")) \
    .orderBy(col("avg_shipping_days").desc())
display(longest_shipping_regions_df)

geolocation_state,avg_shipping_days
AP,28.41779820346773
AM,25.03014799926914
RR,24.92204899777283
AL,23.516066555154502
PA,22.95213375913984
SE,21.89394895084311
CE,21.47808588957055
MA,21.121620564184827
AC,20.796251993620416
PB,19.91945959989608


## Customer Reviews & Satisfaction

### 13.	Analyze the distribution of review scores.

In [0]:
review_distribution_df = reviews_df \
    .groupBy("review_score") \
    .count() \
    .where(col("count") > 5) \
    .orderBy("review_score")
display(review_distribution_df)

review_score,count
,2380
1.0,11424
2.0,3151
3.0,8179
4.0,19142
5.0,57328


Databricks visualization. Run in Databricks to view.

### 14.	Find the percentage of orders with low (1-2) vs. high (4-5) reviews.

In [0]:
review_percentages_df = reviews_df \
    .agg(
        (sum(when(col("review_score").isin(1, 2), 1).otherwise(0)) * 100.0 / count("*"))
        .alias("low_review_percentage"),
        (sum(when(col("review_score").isin(4, 5), 1).otherwise(0)) * 100.0 / count("*"))
        .alias("high_review_percentage")
    )

display(review_percentages_df)


low_review_percentage,high_review_percentage
13.99262686968376,73.41448896910582


### 15.	Identify the most common complaints in low-rated reviews.

In [0]:
low_rated_complaints_df = reviews_df \
    .where((col("review_score").isin(1, 2)) & (col("review_comment_message").isNotNull())) \
    .groupby("review_comment_message") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10)
display(low_rated_complaints_df)

review_comment_message,count
Não recebi o produto,33
Não recebi o produto,13
Não recebi,12
Ainda não recebi o produto,11
Ainda não recebi,11
nao recebi o produto,10
Nao recebi o produto,10
Ainda não recebi o produto,9
Produto não entregue,7
Bom,7


## Time-Series & Seasonal Trends

### 16.	Find the month with the highest sales in 2018.

In [0]:
highest_sales_month_2018_df = orders_df \
    .where(year("order_purchase_timestamp") == 2018) \
    .withColumn("month", month("order_purchase_timestamp")) \
    .groupBy("month") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(1)
display(highest_sales_month_2018_df)

month,count
1,7269


### 17.	Identify seasonal trends in customer purchases.

In [0]:
seasonal_trends_df = orders_df \
    .join(payments_df, "order_id") \
    .withColumn("year", year("order_purchase_timestamp")) \
    .withColumn("month", month("order_purchase_timestamp")) \
    .withColumn("season", when(col("month").isin(12, 1, 2), "Summer")
        .when(col("month").isin(3, 4, 5), "Fall")
        .when(col("month").isin(6, 7, 8), "Winter")
        .otherwise("Spring")) \
    .groupBy("year", "season") \
    .agg(count("order_id").alias("total_orders"), sum("payment_value").alias("total_revenue"),
         avg("payment_value").alias("avg_order_value")) \
    .orderBy("year", "season")
display(seasonal_trends_df)

year,season,total_orders,total_revenue,avg_order_value
2016,Spring,345,59342.72,172.007884057971
2016,Summer,1,19.62,19.62
2017,Fall,9352,1460570.4500000027,156.1773363986316
2017,Spring,17239,2702323.1300000064,156.7563739196013
2017,Summer,8631,1308797.529999998,151.63915305294844
2017,Winter,12303,1778055.620000008,144.52211818255776
2018,Fall,21856,3474419.7500000005,158.96869280746708
2018,Spring,20,5029.210000000001,251.46050000000005
2018,Summer,14515,2107467.520000008,145.19238856355548
2018,Winter,19624,3112846.569999997,158.62446850794933


### 18.	Determine whether higher review scores correlate with faster delivery times.

In [0]:
delivery_review_correlation_df = orders_df \
    .join(reviews_df, "order_id") \
    .where(col("order_delivered_customer_date").isNotNull()) \
    .withColumn("delivery_days", datediff("order_delivered_customer_date", "order_purchase_timestamp")) \
    .groupBy("review_score") \
    .agg(avg("delivery_days").alias("avg_delivery_days"), count("*").alias("review_count")) \
    .orderBy("review_score")
display(delivery_review_correlation_df)

review_score,avg_delivery_days,review_count
1,21.25188649165693,9409
2,16.6059163549813,2941
3,14.204345641798543,7962
4,12.25312055617001,18987
5,10.625359270942868,57060


## Fraud Detection & Business Insights

### 19.	Identify orders with suspiciously high transaction values.

In [0]:
payment_fraud = payments_df \
    .agg(avg("payment_value").alias("avg_payment"), stddev("payment_value").alias("stddev_payment")).collect()[0]
suspicious_transactions_df = payments_df \
    .withColumn("threshold", lit(payment_fraud["avg_payment"] + (3 * payment_fraud["stddev_payment"]))) \
    .where(col("payment_value") > col("threshold")) \
    .select("order_id", "payment_value") \
    .orderBy(col("payment_value").desc())
display(suspicious_transactions_df)


order_id,payment_value
03caa2c082116e1d31e67e9ae3700499,13664.08
736e1922ae60d0d6a89247b851902527,7274.88
0812eb902a67711a1cb742b3cdaa65ae,6929.31
fefacc66af859508bf1a7934eab1e97f,6922.21
f5136e38d1a14a4dbd87dff67da82701,6726.66
2cc9089445046817a7539d90805e6e5a,6081.54
a96610ab360d42a2e5335a3998b4718a,4950.34
b4c4b76c642808cbe472a32b86cddc95,4809.44
199af31afc78c699f0dbf71fb178d4d4,4764.34
8dbc85d1447242f3b127dda390d56e19,4681.78
