In [0]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import os

# 1. Get the location of your Repo folder
current_dir = os.getcwd()

customer data


In [0]:
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd
customer_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("customer_unique_id", StringType(), True),
    StructField("customer_zip_code_prefix", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True)
])
csv_path = f"file:{current_dir}/data/olist_customers_dataset.csv"

# df_customers = spark.read.schema(customer_schema).table("workspace.ecommerce.olist_customers_dataset")
# print(f"Reading file from: {csv_path}")

# 2. Read using PANDAS (This bypasses the Security Exception)
# We treat all columns as strings first to match your Spark Schema logic
pdf_customers = pd.read_csv(csv_path, dtype=str)

# 3. Convert Pandas DataFrame -> Spark DataFrame
# df_customers = spark.createDataFrame(pdf_customers)

# 4. Enforce your specific schema (Optional, but good practice)
# Since we read everything as string, we can cast it now or just apply schema
df_customers = spark.createDataFrame(pdf_customers, schema=customer_schema)


display(df_customers.head(5))
# display(df_customers.describe())
is_null_customer = {col_name: df_customers.filter(df_customers[col_name].isNull()).count() for col_name in df_customers.columns}
percentage_of_null_customer = {col_name: (df_customers.filter(df_customers[col_name].isNull()).count() / df_customers.count()) * 100 for col_name in df_customers.columns}

df_customers.printSchema()

display({col_name: f"{percentage:.2f} %" for col_name, percentage in percentage_of_null_customer.items()})
print("\n")
display(is_null_customer)


order data

In [0]:

order_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_status",StringType(),True),
    StructField("order_purchase_timestamp", StringType(), True),
    StructField("order_approved_at", StringType(), True),
    StructField("order_delivered_carrier_date", StringType(), True),
    StructField("order_delivered_customer_date", StringType(), True),
    StructField("order_estimated_delivery_date", StringType(), True)
])

csv_path = f"file:{current_dir}/data/olist_orders_dataset.csv"
pdf_orders = pd.read_csv(csv_path, dtype=str)
df_orders = spark.createDataFrame(pdf_orders, schema=order_schema)

df_orders.printSchema()

display(df_orders.head(5))
display(df_orders.describe())
is_null_order = {col_name: df_orders.filter(df_orders[col_name].isNull()).count() for col_name in df_orders.columns}
percentage_of_null_order = {col_name: (df_orders.filter(df_orders[col_name].isNull()).count() / df_orders.count()) * 100 for col_name in df_orders.columns}

display({col_name: f"{percentage:.2f} %" for col_name, percentage in percentage_of_null_order.items()})
print("\n")
display(is_null_order)

order items

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

order_items_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_item_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("seller_id", StringType(), True),
    StructField("shipping_limit_date", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("freight_value", DoubleType(), True)
])
csv_path = f"file:{current_dir}/data/olist_order_items_dataset.csv"
pdf_order_items = pd.read_csv(csv_path, dtype=str)

# Minimal fix: convert price and freight_value to float
pdf_order_items["price"] = pd.to_numeric(pdf_order_items["price"], errors="coerce")
pdf_order_items["freight_value"] = pd.to_numeric(pdf_order_items["freight_value"], errors="coerce")

df_order_items = spark.createDataFrame(pdf_order_items, schema=order_items_schema)

# df_order_items = spark.read.schema(order_items_schema).table("workspace.ecommerce.olist_order_items_dataset")
df_order_items.printSchema()
display(df_order_items.head(5))
is_null_order_items = {col_name: df_order_items.filter(df_order_items[col_name].isNull()).count() for col_name in df_order_items.columns}
percentage_of_null_order_items = {col_name: (df_order_items.filter(df_order_items[col_name].isNull()).count() / df_order_items.count()) * 100 for col_name in df_order_items.columns}

display({col_name: f"{percentage:.2f} %" for col_name, percentage in percentage_of_null_order_items.items()})
print("\n")
display(is_null_order_items)
display(df_order_items.describe())

order payments

In [0]:

from pyspark.sql.types import StructType, StructField, StringType, DoubleType,IntegerType

order_payments_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("payment_sequential", StringType(), True),
    StructField("payment_type", StringType(), True),
    StructField("payment_installments", IntegerType(), True),
    StructField("payment_value", DoubleType(), True)
])

csv_path = f"file:{current_dir}/data/olist_order_payments_dataset.csv"
pdf_orders_payments = pd.read_csv(csv_path, dtype=str)

pdf_orders_payments["payment_installments"] = pd.to_numeric(pdf_orders_payments["payment_installments"], errors="coerce")
pdf_orders_payments["payment_value"] = pd.to_numeric(pdf_orders_payments["payment_value"], errors="coerce")

df_order_payments = spark.createDataFrame(pdf_orders_payments, schema=order_payments_schema)


# df_order_payments = spark.read.schema(order_payments_schema).table("workspace.ecommerce.olist_order_payments_dataset")
df_order_payments.printSchema()

display(df_order_payments.head(5))
is_null_order_payments = {col_name: df_order_payments.filter(df_order_payments[col_name].isNull()).count() for col_name in df_order_payments.columns}
percentage_of_null_order_payments = {col_name: (df_order_payments.filter(df_order_payments[col_name].isNull()).count() / df_order_payments.count() * 100) for col_name in df_order_payments.columns}
display(is_null_order_payments)
print("\n")
display({col_name:f"{percentage:.2f} %" for col_name, percentage in percentage_of_null_order_payments.items()})
display(df_order_payments.describe())


order reviews


In [0]:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

order_reviews_schema = StructType([
    StructField("review_id", StringType(), True),
    StructField("order_id", StringType(), True),
    StructField("review_score", IntegerType(), True),
    StructField("review_comment_title", StringType(), True),
    StructField("review_comment_message", StringType(), True),
    StructField("review_creation_date", StringType(), True),
    StructField("review_answer_timestamp", StringType(), True)
])

csv_path = f"file:{current_dir}/data/olist_order_reviews_dataset.csv"
pdf_orders_reviews = pd.read_csv(csv_path, dtype=str)
pdf_orders_reviews["review_score"] = pd.to_numeric(pdf_orders_reviews["review_score"], errors="coerce")
df_order_reviews = spark.createDataFrame(pdf_orders_reviews, schema=order_reviews_schema)


# df_order_reviews = spark.read.schema(order_reviews_schema).table("workspace.ecommerce.olist_order_reviews_dataset")
df_order_reviews.printSchema()

display(df_order_reviews.limit(5))
is_null_order_reviews = {col_name: df_order_reviews.filter(df_order_reviews[col_name].isNull()).count() for col_name in df_order_reviews.columns}
percentage_of_null_order_reviews = {col_name: (df_order_reviews.filter(df_order_reviews[col_name].isNull()).count() / df_order_reviews.count() * 100) for col_name in df_order_reviews.columns}
display(is_null_order_reviews)
print("\n")
display({col_name:f"{percentage:.2f} %" for col_name, percentage in percentage_of_null_order_reviews.items()})
display(df_order_reviews.describe())

products

In [0]:

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

products_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("product_category_name", StringType(), True),
    StructField("product_name_lenght", IntegerType(), True),
    StructField("product_description_lenght", IntegerType(), True),
    StructField("product_photos_qty", IntegerType(), True),
    StructField("product_weight_g", DoubleType(), True),
    StructField("product_length_cm", DoubleType(), True),
    StructField("product_height_cm", DoubleType(), True),
    StructField("product_width_cm", DoubleType(), True)
])

csv_path = f"file:{current_dir}/data/olist_products_dataset.csv"
pdf_products = pd.read_csv(csv_path, dtype=str)
pdf_products["product_name_lenght"] = pd.to_numeric(pdf_products["product_name_lenght"], errors="coerce")
pdf_products["product_description_lenght"] = pd.to_numeric(pdf_products["product_description_lenght"], errors="coerce")
pdf_products["product_photos_qty"] = pd.to_numeric(pdf_products["product_photos_qty"], errors="coerce")
pdf_products["product_weight_g"] = pd.to_numeric(pdf_products["product_weight_g"], errors="coerce")
pdf_products["product_length_cm"] = pd.to_numeric(pdf_products["product_length_cm"], errors="coerce")
pdf_products["product_height_cm"] = pd.to_numeric(pdf_products["product_height_cm"], errors="coerce")
pdf_products["product_width_cm"] = pd.to_numeric(pdf_products["product_width_cm"], errors="coerce")
df_products = spark.createDataFrame(pdf_products, schema=products_schema)

# df_products = spark.read.schema(products_schema).table("workspace.ecommerce.olist_products_dataset")
df_products.printSchema()
display(df_products.limit(5))
is_null_products = {col_name: df_products.filter(df_products[col_name].isNull()).count() for col_name in df_products.columns}
percentage_of_null_products = {col_name: (df_products.filter(df_products[col_name].isNull()).count() / df_products.count() * 100) for col_name in df_products.columns}
display(is_null_products)
print("\n")
display({col_name:f"{percentage:.2f} %" for col_name, percentage in percentage_of_null_products.items()})
display(df_products.describe())

sellers


In [0]:

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql.functions import  sum as spark_sum, when,count as spark_count,col as spark_col
import builtins
sellers_schema = StructType([
    StructField("seller_id", StringType(), True),
    StructField("seller_zip_code_prefix", StringType(), True),
    StructField("seller_city", StringType(), True),
    StructField("seller_state", StringType(), True)
])

csv_path = f"file:{current_dir}/data/olist_sellers_dataset.csv"
pdf_sellers = pd.read_csv(csv_path, dtype=str)
df_sellers = spark.createDataFrame(pdf_sellers, schema=sellers_schema)

# df_sellers = spark.read.schema(sellers_schema).table("workspace.ecommerce.olist_sellers_dataset")
df_sellers.printSchema()

display(df_sellers.limit(5))
# for databricks storage
# is_null_sellers = {col_name: df_sellers.filter(df_sellers[col_name].isNull()).count() for col_name in df_sellers.columns}
# percentage_of_null_sellers = {col_name: round((count/df_sellers.count())*100, 2) for col_name, count in is_null_sellers.items()}

total_rows = df_sellers.count()

null_counts = df_sellers.select([
    spark_count(when(spark_col(c).isNull(), 1)).alias(c)
    for c in df_sellers.columns
]).collect()[0].asDict()

# percentage_of_null_sellers = {
#     c: round((null_counts[c] / total_rows) * 100, 2)
#     for c in null_counts
# }

# percentage_of_null_sellers = {
#     col_name: round((count / total_rows) * 100, 2)
#     for col_name, count in null_counts.items()
# }


percentage_of_null_sellers = {
    c: builtins.round((null_counts[c] / total_rows) * 100, 2)
    for c in null_counts
}

display(null_counts)

print("\n")
display({col_name:f"{percentage:.2f} %" for col_name, percentage in percentage_of_null_sellers.items()})

display(df_sellers.describe())


geo location


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

geolocation_schema = StructType([
    StructField("geolocation_zip_code_prefix", StringType(), True),
    StructField("geolocation_lat", DoubleType(), True),
    StructField("geolocation_lng", DoubleType(), True),
    StructField("geolocation_city", StringType(), True),
    StructField("geolocation_state", StringType(), True)
])

csv_path = f"file:{current_dir}/data/olist_geolocation_dataset.csv"
pdf_geolocation = pd.read_csv(csv_path, dtype=str)

# Minimal fix: convert geolocation_lat and geolocation_lng to float
pdf_geolocation["geolocation_lat"] = pd.to_numeric(pdf_geolocation["geolocation_lat"], errors="coerce")
pdf_geolocation["geolocation_lng"] = pd.to_numeric(pdf_geolocation["geolocation_lng"], errors="coerce")

df_geolocation = spark.createDataFrame(pdf_geolocation, schema=geolocation_schema)

# df_geolocation = spark.read.schema(geolocation_schema).table("workspace.ecommerce.olist_geolocation_dataset")
df_geolocation.printSchema()
display(df_geolocation.limit(5))
is_null_geolocation = {col_name: df_geolocation.filter(df_geolocation[col_name].isNull()).count() for col_name in df_geolocation.columns}
percentage_of_null_geolocation = {col_name: (df_geolocation.filter(df_geolocation[col_name].isNull()).count() / df_geolocation.count() * 100) for col_name in df_geolocation.columns}
display(is_null_geolocation)
print("\n")
display({col_name:f"{percentage:.2f} %" for col_name, percentage in percentage_of_null_geolocation.items()})
display(df_geolocation.describe())


products category translation

In [0]:
from pyspark.sql.types import StructType, StructField, StringType

products_category_schema = StructType([
    StructField("product_category_name", StringType(), True),
    StructField("product_category_name_english", StringType(), True)
])

csv_path = f"file:{current_dir}/data/product_category_name_translation.csv"
pdf_products_category = pd.read_csv(csv_path, dtype=str)
df_products_category = spark.createDataFrame(pdf_products_category, schema=products_category_schema)

# df_products_category = spark.read.schema(products_category_schema).table("workspace.ecommerce.product_category_name_translation")
df_products_category.printSchema()
display(df_products_category.limit(5))
is_null_products_category = {col_name: df_products_category.filter(df_products_category[col_name].isNull()).count() for col_name in df_products_category.columns}
percentage_of_null_products_category = {col_name: (df_products_category.filter(df_products_category[col_name].isNull()).count() / df_products_category.count() * 100) for col_name in df_products_category.columns}
display(is_null_products_category)
print("\n")
display({col_name:f"{percentage:.2f} %" for col_name, percentage in percentage_of_null_products_category.items()})
display(df_products_category.describe())

In [0]:

def indian_number_format(num):
    try:
        num = float(num)
        if num < 1000:
            return f"{num:.2f}"
        elif num < 100000:
            return f"{num/1000:.2f} Thousand"
        elif num < 10000000:
            return f"{num/100000:.2f} Lakh"
        else:
            return f"{num/10000000:.2f} Crore"
    except:
        return None

indian_format_udf = udf(indian_number_format, StringType())


**Handling Nulls in Orders (df_orders)**

The Problem: You have nulls in order_delivered_customer_date (2.98%) and order_approved_at (0.16%).

The Logic: If a delivery date is missing, the order might still be in progress or canceled.

The Fix:

For Delivered analysis: Drop rows where the delivery date is null.

For Funnel analysis: Keep them but flag them as "Undelivered".

In [0]:
from pyspark.sql.functions import col, when, datediff, round,count

# 1. Create a "Clean" orders dataframe for delivery analysis
# We remove orders that were never delivered/canceled to calculate accurate delivery times
df_orders_clean = df_orders.dropna(subset=["order_delivered_customer_date"])

# 2. Fill nulls in 'order_approved_at' with the purchase timestamp
# (Assumption: Instant approval if missing)
df_orders_filled = df_orders.withColumn(
    "order_approved_at",
    when(col("order_approved_at").isNull(), col("order_purchase_timestamp"))
    .otherwise(col("order_approved_at"))
)

**Handling Nulls in Reviews (df_order_reviews)**

The Problem: review_comment_title is 88% empty and review_comment_message is 60% empty.

The Logic: This is normal. People rate stars (score) but don't always write text.

The Fix: Replace nulls with "No Comment" so your text analysis code doesn't crash.

In [0]:
df_reviews_clean = df_order_reviews.fillna({
    "review_comment_title": "No Title",
    "review_comment_message": "No Message"
})

**Handling Product Categories (df_products)**

The Problem: product_category_name has 1.85% nulls.

The Fix: Fill with "unknown". Also, join with df_products_category to get the English names, as the original names are likely in Portuguese.

In [0]:
# 1. Fill unknown categories
df_products_clean = df_products.fillna({"product_category_name": "unknown"})

# 2. Join to get English names (Crucial for presentation)
df_products_enriched = df_products_clean.join(
    df_products_category, 
    "product_category_name", 
    "left"
).select(
    df_products_clean["*"],
    # If English name is missing (for 'unknown' category), use 'Unknown'
    when(col("product_category_name_english").isNull(), "Unknown")
    .otherwise(col("product_category_name_english")).alias("category_name")
)

**Analysis A: Delivery Performance (Logistics)**

# Business Question: "How long does it really take to deliver our products? Are we late?"

In [0]:
from pyspark.sql.functions import col, when, datediff, round, count, avg, sum
# Calculate "Actual Delivery Days" vs "Estimated Delivery Days"
delivery_performance = df_orders_clean.withColumn(
    "actual_days", 
    datediff(col("order_delivered_customer_date"), col("order_purchase_timestamp"))
).withColumn(
    "estimated_days", 
    datediff(col("order_estimated_delivery_date"), col("order_purchase_timestamp"))
).withColumn(
    "is_late", 
    when(col("actual_days") > col("estimated_days"), 1).otherwise(0)
)

# Aggregated Metrics
delivery_metrics = delivery_performance.agg(
    round(avg("actual_days"), 2).alias("avg_delivery_days"),
    round(avg("estimated_days"), 2).alias("avg_estimated_days"),
    sum("is_late").alias("total_late_orders"),
    count("*").alias("total_orders")
).withColumn(
    "late_percentage", 
    round((col("total_late_orders") / col("total_orders")) * 100, 2)
)

display(delivery_metrics)


**Top Performing Product Categories (Sales)**

# Business Question: "Which categories generate the most revenue?"

Requires Joining: Order Items -> Products

In [0]:
# Join items with products to get category names
revenue_by_category = df_order_items.join(df_products_enriched, "product_id") \
    .groupBy("category_name") \
    .agg(
        round(sum("price"), 2).alias("total_revenue"),
        count("order_id").alias("total_orders")
    ) \
    .withColumn("total_revenue_formatted", indian_format_udf(col("total_revenue"))) \
    .orderBy(col("total_revenue").desc())

display(revenue_by_category)

**Customer Value (RFM Proxy)**

# Business Question: "Who are our big spenders? (States)"

Requires Joining: Orders -> Order Items -> Customers

In [0]:
# 1. Get total spend per order
order_spend = df_order_items.groupBy("order_id").agg(sum("price").alias("order_total"))

# 2. Join with Customers to find location
state_revenue = df_orders.join(order_spend, "order_id") \
    .join(df_customers, "customer_id") \
    .groupBy("customer_state") \
    .agg(
        round(sum("order_total"), 2).alias("total_revenue"),
        count("order_id").alias("order_count")
    ) \
    .withColumn("total_revenue_formatted", indian_format_udf(col("total_revenue")))\
    .orderBy(col("total_revenue").desc())

display(state_revenue)

**Map the Sellers vs. Customers.(Geospatial Analysis)**

# Business Question: "Are sellers located near our customers, or are we shipping across the country?"

Requires Joining: df_geolocation -> Customers

In [0]:
# Clean Geolocation: Take the average Lat/Lng for each Zip Code to avoid duplicates
df_geo_clean = df_geolocation.groupBy("geolocation_zip_code_prefix") \
    .agg(
        avg("geolocation_lat").alias("lat"),
        avg("geolocation_lng").alias("lng")
    )

# Now you can join this 'df_geo_clean' with 'df_customers' (on zip) 
df_customers_geo = df_customers.join(
    df_geo_clean,
    df_customers.customer_zip_code_prefix == df_geo_clean.geolocation_zip_code_prefix,
    "left"
)

# and 'df_sellers' (on zip) to visualize locations.
df_sellers_geo = df_sellers.join(
    df_geo_clean,
    df_sellers.seller_zip_code_prefix == df_geo_clean.geolocation_zip_code_prefix,
    "left"
)

display(df_customers_geo.limit(5))
display(df_sellers_geo.limit(5))

**RFM Analysis (Customer Segmentation)**

Goal: Segment customers into "Champions," "At Risk," and "Hibernating" based on their buying behavior.

Recency: How many days since their last purchase?

Frequency: How many total orders have they made?

Monetary: How much have they spent in total?

Requires joining df_orders, df_order_items, and df_customers.

In [0]:
from pyspark.sql.functions import max, countDistinct, sum, datediff, current_date, col, lit

# 1. Prepare the Base Table
# We need Customer Unique ID, Last Purchase Date, Total Orders, Total Spend
rfm_table = df_orders.join(df_order_items, "order_id") \
    .join(df_customers, "customer_id") \
    .groupBy("customer_unique_id") \
    .agg(
        max("order_purchase_timestamp").alias("last_purchase_date"),
        countDistinct("order_id").alias("frequency"),
        sum("price").alias("monetary")
    ) \
    .withColumn("recency", datediff(current_date(), col("last_purchase_date")))

# 2. Score them (Simple Quantile-based scoring)
# Create simple segments (Logic: High Recency is BAD, High Freq/Monetary is GOOD)
rfm_scored = rfm_table.withColumn(
    "segment", 
    when((col("recency") < 90) & (col("frequency") >= 2), "Champion")
    .when((col("recency") < 90) & (col("frequency") == 1), "New Customer")
    .when((col("recency") >= 90) & (col("monetary") > 500), "Can't Lose")
    .otherwise("Hibernating")
)

display(rfm_scored.groupBy("segment").count())

**Cohort Analysis (Retention)**

Goal: See if customers acquired in January stick around longer than those acquired in February. 

Metric: Retention Rate (Month 0 to Month 12).

Requires--> df_orders and df_customers.

In [0]:
from pyspark.sql.functions import date_format, min, month, year

# 1. Get the "Cohort Month" (First Purchase Month) for each customer
customer_cohorts = df_orders.join(df_customers, "customer_id") \
    .groupBy("customer_unique_id") \
    .agg(min("order_purchase_timestamp").alias("first_purchase_date")) \
    .withColumn("cohort_month", date_format(col("first_purchase_date"), "yyyy-MM"))

# 2. Join back to get ALL activities
cohort_analysis = df_orders.join(df_customers, "customer_id") \
    .join(customer_cohorts, "customer_unique_id") \
    .withColumn("activity_month", date_format(col("order_purchase_timestamp"), "yyyy-MM")) \
    .groupBy("cohort_month", "activity_month") \
    .agg(countDistinct("customer_unique_id").alias("active_users")) \
    .orderBy("cohort_month", "activity_month")

# This result is best viewed in a Pivot Table in Excel/Pandas
display(cohort_analysis)