<h3>📊Data Enrichment Transformations (Olist E-commerce)</h3>
<p>Below are the key enrichment transformations applied on the raw Olist datasets to create analytics-ready, business-focused features using PySpark.</p>
<br>
<h4>1️⃣Profit Margin</h4>
<ul>
  <li><b>Goal:</b> Financial insight — estimate product/order profitability.</li>
  <li><b>Techniques:</b> Derived columns in Spark.</li>
  <li><b>Process:</b> profit_margin = price - (0.7 * price) - freight_value</li>
  <li><b>Features:</b> profit_margin, margin_percent</li>
</ul>
<br>
<h4>2️⃣Distance</h4>
<ul>
  <li><b>Goal:</b> Optimize delivery cost vs distance.</li>
  <li><b>Techniques:</b> Haversine formula (lat/lng from geolocation).</li>
  <li><b>Process:</b> Join customers & sellers with avg lat/lng → compute distance_km.</li>
  <li><b>Features:</b> distance_km, freight_per_km</li>
</ul>
<br>
<h4>3️⃣RFM (Recency, Frequency, Monetary)</h4>
<ul>
  <li><b>Goal:</b> Customer segmentation.</li>
  <li><b>Techniques:</b> Window + aggregation.</li>
  <li><b>Process:</b> Calculate days since last order, count of orders, total spend → rank 1–5.</li>
  <li><b>Features:</b> recency, frequency, monetary, rfm_score</li>
</ul>
<br>
<h4>4️⃣Delivery Delay</h4>
<ul>
  <li><b>Goal:</b> Monitor delivery performance.</li>
  <li><b>Techniques:</b> Date difference + conditional logic.</li>
  <li><b>Process:</b> Compare actual vs estimated delivery → classify as Late/Early/On-time.</li>
  <li><b>Features:</b> delivery_status, delay_days</li>
</ul>
<br>
<h4>5️⃣Sentiment</h4>
<ul>
  <li><b>Goal:</b> Measure customer satisfaction (CX).</li>
  <li><b>Techniques:</b> NLP UDF on reviews.</li>
  <li><b>Process:</b> Apply sentiment polarity on review_comment_message.</li>
  <li><b>Features:</b> review_sentiment, sentiment_label</li>
</ul>
<br>
<h4>6️⃣Seasonality</h4>
<ul>
  <li><b>Goal:</b> Identify trends and seasonality.</li>
  <li><b>Techniques:</b> Date extraction.</li>
  <li><b>Process:</b> Extract month, year, quarter, and holiday season flag.</li>
  <li><b>Features:</b> month, quarter, holiday_flag</li>
</ul>

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('olist_ecom_anz') \
        .getOrCreate()

In [0]:
from datetime import datetime
now = datetime.now().date().isoformat()

<h4>Load Data From Amazon S3 Using Spark</h4>
<p>Already Created External Location With S3</p>

In [0]:
customer = spark.read \
    .format('csv') \
        .option('header','true') \
            .option('mode','PERMISSIVE') \
                .load(f's3://amz-s3-databricks-conn/Bronze/{now}/olist_customers_dataset.csv')

geo = spark.read \
    .format('csv') \
        .option('header','true') \
            .option('mode','PERMISSIVE') \
                .load('s3://amz-s3-databricks-conn/Bronze/olist_geolocation_dataset.csv')

order_item = spark.read \
    .format('csv') \
        .option('header','true') \
            .option('mode','PERMISSIVE') \
                .load(f's3://amz-s3-databricks-conn/Bronze/{now}/olist_order_items_dataset.csv')

pay = spark.read \
    .format('csv') \
        .option('header','true') \
            .option('mode','PERMISSIVE') \
                .load(f's3://amz-s3-databricks-conn/Bronze/{now}/olist_order_payments_dataset.csv')

reviews = spark.read \
    .format('csv') \
        .option('header','true') \
            .option('mode','PERMISSIVE') \
                .load(f's3://amz-s3-databricks-conn/Bronze/{now}/olist_order_reviews_dataset.csv')

order = spark.read \
    .format('csv') \
        .option('header','true') \
            .option('mode','PERMISSIVE') \
                .load(f's3://amz-s3-databricks-conn/Bronze/{now}/olist_orders_dataset.csv')

products= spark.read \
    .format('csv') \
        .option('header','true') \
            .option('mode','PERMISSIVE') \
                .load(f's3://amz-s3-databricks-conn/Bronze/{now}/olist_products_dataset.csv')

seller = spark.read \
    .format('csv') \
        .option('header','true') \
            .option('mode','PERMISSIVE') \
                .load(f's3://amz-s3-databricks-conn/Bronze/{now}/olist_sellers_dataset.csv')

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math

<h3>Cleaning The Reviews Dataframe</h3>
<p>All the columns I have cleaned below contains wrong & misleading information</p>

In [0]:
# Creating Character Length Column To Check For Anomalies

reviews = reviews.withColumn('review_id_c_count',char_length('review_id')).withColumn('order_id_c_count',char_length('order_id')).withColumn('review_score_c_count',char_length('review_score')).withColumn('review_creation_date_c_count',char_length('review_creation_date')).withColumn('review_answer_timestamp_c_count',char_length('review_answer_timestamp'))

In [0]:
# Creating Regex Patterns and leaving only valid rows

regex_pattern_review_id = r'^[0-9A-Za-z]{32,40}$'
regex_pattern_review_score = r'[0-9]'
regex_pattern_review_creation_date = r'^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}$'
reviews = reviews.filter(col('review_id').rlike(regex_pattern_review_id))
reviews = reviews.filter(col('review_score').rlike(regex_pattern_review_score))
reviews = reviews.filter(col('review_creation_date').rlike(regex_pattern_review_creation_date))

In [0]:
# Dropping Character Length Columns

reviews = reviews.drop('review_id_c_count','order_id_c_count','review_score_c_count','review_creation_date_c_count','review_answer_timestamp_c_count')


<h4>Change DataTypes</h4>

In [0]:
geo = geo.withColumns(
    {
        'geolocation_lat': geo.geolocation_lat.cast('float'),
        'geolocation_lng': geo.geolocation_lng.cast('float')
    }
)

order_item = order_item.withColumns(
    {
        'order_item_id':col('order_item_id').cast('int'),
        'shipping_date':split(col('shipping_limit_date'),' ')[0].cast('date'),
        'shipping_time':split(col('shipping_limit_date'),' ')[1],
        'price':col('price').cast('float'),
        'freight_value':col('freight_value').cast('float'),
        'shipping_limit_date':to_timestamp(col('shipping_limit_date'),'yyyy-MM-dd HH:mm:ss').cast(TimestampNTZType())
    }
)

pay = pay.withColumns(
    {
        'payment_sequential':col('payment_sequential').cast('int'),
        'payment_installments':col('payment_installments').cast('int'),
        'payment_value':col('payment_value').cast('float')
    }
)

reviews = reviews.withColumns(
    {
        'review_score':col('review_score').cast('int'),
        'review_creation_date':to_timestamp(col('review_creation_date'),'yyyy-MM-dd HH:mm:ss').cast(TimestampNTZType()),
        'review_answer_timestamp':to_timestamp(col('review_answer_timestamp'),'yyyy-MM-dd HH:mm:ss').cast(TimestampNTZType())
    }
)

order = order.withColumns(
    {
        'order_purchase_timestamp':to_timestamp(col('order_purchase_timestamp'),'yyyy-MM-dd HH:mm:ss').cast(TimestampNTZType()),
        'order_approved_at':to_timestamp(col('order_approved_at'),'yyyy-MM-dd HH:mm:ss').cast(TimestampNTZType()),
        'order_delivered_carrier_date':to_timestamp(col('order_delivered_carrier_date'),'yyyy-MM-dd HH:mm:ss').cast(TimestampNTZType()),
        'order_delivered_customer_date':to_timestamp(col('order_delivered_customer_date'),'yyyy-MM-dd HH:mm:ss').cast(TimestampNTZType()),
        'order_estimated_delivery_date':to_timestamp(col('order_estimated_delivery_date'),'yyyy-MM-dd HH:mm:ss').cast(TimestampNTZType())
    }
)

products = products.withColumns(
    {
        'product_name_lenght':col('product_name_lenght').cast('int'),
        'product_description_lenght':col('product_description_lenght').cast('int'),
        'product_photos_qty':col('product_photos_qty').cast('int'),
        'product_weight_g':col('product_weight_g').cast('int'),
        'product_length_cm':col('product_length_cm').cast('int'),
        'product_height_cm':col('product_height_cm').cast('int'),
        'product_width_cm':col('product_width_cm').cast('int')
    }
)

<h3>Profit Margin Estimation (Financial Enrichment)</h3>
<h4>💡 Business Rationale</h4>
<p>Olist data doesn't include the product cost price, only the selling price (price) and freight (freight_value).
By estimating margins, we can calculate profitability per order, per category, or per seller — valuable for CFO-type insights.</p>

<h4>🧱 Features</h4>
<ul>
  <li>estimated_cost_price</li>
  <li>profit_margin = price - estimated_cost_price - freight_value</li>
  <li>margin_percent = profit_margin / price * 100</li>
</ul>

In [0]:
# Assume cost price is 70% of selling price

order_item = order_item.withColumn('estimate_cost_price',col('price') - (col('price')*lit(0.7)))

order_item = order_item.withColumn('profit_margin',col('price')-col('estimate_cost_price')-col('freight_value'))

order_item = order_item.withColumn('margin_percent',(col('profit_margin')/col('price'))*100)

<h3>Distance Between Seller and Customer (Logistics Enrichment)</h3>
<h4>💡 Business Rationale</h4>

<p>Delivery delays and freight costs are heavily impacted by distance.
By deriving geographic distances, you can measure logistics efficiency, shipping cost fairness, and warehouse planning.</p>

<h4>🧱 Features</h4>
<ul>
  <li>seller_lat, seller_lng, customer_lat, customer_lng</li>
  <li>distance_km (using Haversine formula)</li>
  <li>freight_per_km</li>
</ul>

In [0]:
# The geolocation table contains multiple entries for the same ZIP prefix, because a ZIP prefix can span multiple nearby coordinates.So, we can’t do a direct 1-to-1 join — we need to aggregate coordinates per ZIP prefix.

geo_avg = geo.groupBy('geolocation_zip_code_prefix') \
    .agg(
        avg('geolocation_lat').alias('geolocation_lat'),
        avg('geolocation_lng').alias('geolocation_lng'),
        first('geolocation_city').alias('geolocation_city'),
        first('geolocation_state').alias('geolocation_state')
    )

# Join customer with geo_avg

customer_geo = (customer.join(geo_avg, customer.customer_zip_code_prefix == geo_avg.geolocation_zip_code_prefix, 'left') \
    .select('customer_id','customer_city','customer_state','customer_zip_code_prefix',col('geolocation_lat').alias('customer_lat'),col('geolocation_lng').alias('customer_lng')))

# Join seller with geo_avg

seller_geo = (seller.join(geo_avg, seller.seller_zip_code_prefix == geo_avg.geolocation_zip_code_prefix, 'left') \
    .select('seller_id','seller_city','seller_state','seller_zip_code_prefix',col('geolocation_lat').alias('seller_lat'),col('geolocation_lng').alias('seller_lng')))

# Compute distance

order_geo = order.join(order_item,on='order_id',how='inner') \
    .join(customer_geo,on='customer_id',how='inner') \
        .join(seller_geo,on='seller_id',how='inner')

# Apply the Haversine Formula

def haversine(lat1,lon1,lat2,lon2):
    if None in(lat1,lon1,lat2,lon2):
        return None
    try:
        R = 6371.0
        lat_diff = math.radians(lat2-lat1)
        lon_diff = math.radians(lon2-lon1)
        a = (math.sin(lat_diff/2)**2+math.cos(math.radians(lat1))*math.cos(math.radians(lat2))*math.sin(lon_diff/2)**2)

        return R*2*math.atan2(math.sqrt(a),math.sqrt(1-a))
    except Exception:
        return None

haversine_udf = udf(haversine,FloatType())

order_geo = order_geo.withColumn('distance',haversine_udf(col('seller_lat'),col('seller_lng'),col('customer_lat'),col('customer_lng')))

order_geo = order_geo.withColumn('freight_per_km',expr('try_divide(freight_value,distance)'))

<h3>Customer Segmentation (RFM Analysis)</h3>
<h4>💡 Business Rationale</h4>

<p>Segmenting customers based on Recency, Frequency, and Monetary helps marketing and CRM teams identify:</p>
<ul>
  <li>Loyal customers</li>
  <li>At-risk customers</li>
  <li>High spenders (VIPs)</li>
</ul>

<h4>🧱 Feature(s)</h4>
<ul>
  <li>recency_days</li>
  <li>frequency (# of orders)</li>
  <li>monetary_value</li>
</ul>

In [0]:
# Prepare complete order dataframe

orders_whole = order.join(order_item,on="order_id",how='inner')

# Take out the maximum date of order purchase

max_date = orders_whole.agg(max('order_purchase_timestamp')).collect()[0][0]

rfm = (orders_whole.groupBy('customer_id') \
    .agg(
        max('order_purchase_timestamp').alias('last_purchase_date'),
        count('order_id').alias('frequency'),
        sum('price').alias('monetary')
    ) \
    .withColumn('recency',datediff(lit(max_date),col('last_purchase_date'))))

<h3>Delivery Time and Delay Classification</h3>
<h4>💡 Business Rationale</h4>

<p>Customers expect on-time delivery. You can derive delivery performance KPIs and link them to satisfaction (review scores).</p>

<h4>🧱 Features</h4>
<ul>
  <li>delivery_time_days</li>
  <li>delay_days</li>
  <li>delivery_status = On-time, Early, Late</li>
</ul>

In [0]:
order = order.withColumn('delivery_days',datediff(col('order_delivered_customer_date'),col('order_approved_at')))
order = order.withColumn('delay_days',datediff(col('order_delivered_customer_date'),col('order_estimated_delivery_date')))
order = order.withColumn('delivery_status',when(col('delay_days')>0,'delayed').when(col('delay_days')<0,'early').otherwise('on time'))

<h3>Review Sentiment Analysis (Text Enrichment)</h3>
<h4>💡 Business Rationale</h4>

<p>Review comments can be turned into sentiment polarity scores, providing a more nuanced customer satisfaction metric than 1–5 star ratings.</p>

<h4>🧱 Features</h4>
<ul>
  <li>review_sentiment ∈ [−1, 1]</li>
  <li>sentiment_label (Positive / Neutral / Negative)</li>
</ul>

In [0]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

def vader_score(text):
    if text is None:
        return None
    analyzer = SentimentIntensityAnalyzer()
    score = analyzer.polarity_scores(text)["compound"]
    return float(score)

vader_udf = udf(vader_score, FloatType())

reviews = reviews.withColumn(
    "sentiment_score",
    vader_udf("review_comment_message")
)

<h3>Seasonality and Trend Enrichment</h3>
<h4>💡 Business Rationale</h4>

<p>E-commerce has strong monthly, quarterly, and holiday-based seasonality.
Helps in demand forecasting and inventory planning.</p>

<h4>🧱 Features</h4>
<ul>
  <li>order_month</li>
  <li>order_year</li>
  <li>order_quarter</li>
  <li>is_holiday_season</li>
</ul>

In [0]:
order = order.withColumns(
    {
        'order_month':month(col('order_purchase_timestamp')),
        'order_year':year(col('order_purchase_timestamp')),
        'order_quarter':quarter(col('order_purchase_timestamp')),
        'is_festive_season':when(col('order_month').isin(11,12,1),True).otherwise(False)
    }
)

<h3>Text Enrichment</h3>
<p>Load product translation data from MongoDB into Databricks</p>

In [0]:
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from bson import ObjectId

uri = 'mongodb+srv://soumyap569official:Soumya2001@cluster569.mh8pcc9.mongodb.net/?appName=Cluster569'
client = MongoClient(uri,server_api=ServerApi('1'))
db = client['olist_translation']
collection = db['port_to_en']

try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
    def clean_doc(doc):
        def clean_value(v):
            if isinstance(v,ObjectId):
                return str(v)
            else:
                return v
        return {k:clean_value(v) for k,v in doc.items()}
    olist_trans = [clean_doc(doc) for doc in collection.find()]
    trans_df = spark.createDataFrame(olist_trans)
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [0]:
trans_df = trans_df.drop('_id')
products = products.join(trans_df,on='product_category_name',how='left')

In [0]:
# Customer Dataframe
customer.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/customer/')

#Customer_Geo Dataframe
customer_geo.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/customer_geo/')

#Geo Dataframe
geo.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/geo/')

#Order Item Dataframe
order_item.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/order_item')

#Complete Order Dataframe
orders_whole.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/orders_whole')

#RFM Dataframe
rfm.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/rfm')

#Order Dataframe
order.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/order')

#Order Geo Dataframe
order_geo.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/order_geo')

#Payment Dataframe
pay.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/payment')

#Seller Dataframe
seller.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/seller')

#Seller Geo Dataframe
seller_geo.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/seller_geo')

#Product Dataframe
products.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/products')

#Review Dataframe
reviews.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/reviews')

#Product Translation Dataframe
trans_df.write \
    .format('parquet') \
        .mode('overwrite') \
            .save('s3://amz-s3-databricks-conn/Silver/product_with_translation')