In [0]:
pip install ucimlrepo

: 

In [0]:
%pip install ucimlrepo dbt-core dbt-databricks

Collecting dbt-core
  Downloading dbt_core-1.10.3-py3-none-any.whl.metadata (4.2 kB)
Collecting dbt-databricks
  Downloading dbt_databricks-1.10.4-py3-none-any.whl.metadata (6.1 kB)
Collecting agate<1.10,>=1.7.0 (from dbt-core)
  Downloading agate-1.9.1-py2.py3-none-any.whl.metadata (3.2 kB)
Collecting Jinja2<4,>=3.1.3 (from dbt-core)
  Downloading jinja2-3.1.6-py3-none-any.whl.metadata (2.9 kB)
Collecting mashumaro<3.15,>=3.9 (from mashumaro[msgpack]<3.15,>=3.9->dbt-core)
  Downloading mashumaro-3.14-py3-none-any.whl.metadata (114 kB)
Collecting jsonschema<5.0,>=4.19.1 (from dbt-core)
  Downloading jsonschema-4.24.0-py3-none-any.whl.metadata (7.8 kB)
Collecting networkx<4.0,>=2.3 (from dbt-core)
  Downloading networkx-3.5-py3-none-any.whl.metadata (6.3 kB)
Collecting snowplow-tracker<2.0,>=1.0.2 (from dbt-core)
  Downloading snowplow_tracker-1.1.0-py3-none-any.whl.metadata (5.7 kB)
Collecting dbt-extractor<=0.6,>=0.5.0 (from dbt-core)
  Downloading dbt_extractor-0.6.0-cp39-abi3-manyli

In [None]:
from ucimlrepo import fetch_ucirepo
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Load dataset
ds = fetch_ucirepo(id=468)

# Convert features and labels to pandas DataFrames (ensure columns are present)
features_pdf = ds.data.features
labels_pdf = ds.data.targets

# If they are NumPy arrays, assign column names manually
import pandas as pd

if not hasattr(features_pdf, "columns"):
    feature_names = ['Administrative', 'Administrative_Duration', 'Informational', 'Informational_Duration',
                     'ProductRelated', 'ProductRelated_Duration', 'BounceRates', 'ExitRates',
                     'PageValues', 'SpecialDay', 'Month', 'OperatingSystems', 'Browser', 'Region',
                     'TrafficType', 'VisitorType', 'Weekend']
    features_pdf = pd.DataFrame(features_pdf, columns=feature_names)

if not hasattr(labels_pdf, "columns"):
    labels_pdf = pd.DataFrame(labels_pdf, columns=['Revenue'])

# Start Spark session
spark = SparkSession.builder.getOrCreate()

# Convert to Spark DataFrames
features_df = spark.createDataFrame(features_pdf)
labels_df = spark.createDataFrame(labels_pdf)

# Add DeviceType column
features_df = features_df.withColumn(
    "DeviceType",
    when(col("OperatingSystems").rlike("(?i)android|ios"), "mobile")
    .when(col("OperatingSystems").rlike("(?i)windows|mac|linux"), "desktop")
    .otherwise("other")
)


# Combine features and target using an artificial ID
df = features_df.withColumn("id", monotonically_increasing_id()) \
    .join(labels_df.withColumn("id", monotonically_increasing_id()), on="id") \
    .withColumnRenamed("id", "session_id")

# Define funnel columns
spark_df = df.withColumn("product_view", (col("ProductRelated") > 0).cast("int")) \
             .withColumn("add_to_cart", (col("PageValues") > 0).cast("int")) \
             .withColumn("purchase", col("Revenue").cast("int"))

spark_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("default.source_data")

In [0]:
from ucimlrepo import fetch_ucirepo
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Load dataset
ds = fetch_ucirepo(id=468)  # Online Shoppers Purchasing Intention Dataset

# Create Spark DataFrames
spark = SparkSession.builder.getOrCreate()
features_df = spark.createDataFrame(ds.data.features)
labels_df = spark.createDataFrame(ds.data.targets)

# Combine features with the target ("Revenue")
df = features_df.withColumn("id", monotonically_increasing_id()) \
    .join(labels_df.withColumn("id", monotonically_increasing_id()), on="id") \
    .drop("id")

# Define funnel columns
spark_df = df.withColumn("product_view", (col("ProductRelated")>0).cast("int")) \
                   .withColumn("add_to_cart", (col("PageValues") > 0).cast("int")) \
                   .withColumn("purchase", col("Revenue").cast("int"))



: 

In [0]:
from pyspark.sql.functions import sum

funnel = spark_df.agg(
    sum("product_view").alias("Product Views"),
    sum("add_to_cart").alias("Add to Cart"),
    sum("purchase").alias("Purchases")
).toPandas()
funnel.insert(0, "Website Visits", df.count())
funnel["Conversion Rate"] = funnel["Purchases"] / funnel["Website Visits"]
funnel["Drop Off"] = funnel["Website Visits"].shift(0) - funnel["Product Views"]  # adjust per row
funnel["Drop Off Rate"] = funnel["Drop Off"] / funnel["Website Visits"]
display(funnel)



Website Visits,Product Views,Add to Cart,Purchases,Conversion Rate,Drop Off,Drop Off Rate
12330,12292,2730,1908,0.1547445255474452,38,0.0030819140308191


VisitorType,Purchases,Product Views,Add to Cart,Purchases.1,ConvRate
New_Visitor,422,1687,381,422,0.2501481920569057
Returning_Visitor,1470,10520,2333,1470,0.1397338403041825
Other,16,85,16,16,0.188235294117647


Month,(sum(purchase) / sum(product_view))
Feb,0.0163934426229508
Mar,0.1008403361344537
May,0.1088577393379063
Aug,0.1759259259259259
Jul,0.1527777777777778
Nov,0.2545210984594775
Oct,0.2110091743119266
Sep,0.192393736017897
June,0.1010452961672473
Dec,0.1253627394080093


purchase,AvgBounce,AvgExit
1,0.0051171526404612,0.0195551682568134
0,0.0253172321978506,0.0473782705264826


In [0]:
# Cell 5: Segmentation by VisitorType & Month
seg = spark_df.groupBy("VisitorType") \
    .agg(sum("purchase").alias("Purchases"), sum("product_view").alias("Product Views"), 
         sum("add_to_cart").alias("Add to Cart"), sum("purchase").alias("Purchases"), 
         sum("purchase") / sum("product_view")).withColumnRenamed("(sum(purchase) / sum(product_view))", "ConvRate")
display(seg)

seg_m = spark_df.groupBy("Month") \
    .agg(sum("purchase") / sum("product_view")).withColumnRenamed("((sum(purchase) / sum(product_view)))", "ConvRate")
display(seg_m)

# Cell 6: Bounce & Exit rates vs purchase
from pyspark.sql.functions import avg
spark_df.groupBy("purchase") \
  .agg(avg("BounceRates").alias("AvgBounce"), avg("ExitRates").alias("AvgExit")) \
  .display()

# Cell 7: Save cleaned data to Delta for dbt
spark_df.write.format("delta").mode("overwrite").saveAsTable("default.online_shopping_intention_cleaned")

In [None]:
from ucimlrepo import fetch_ucirepo
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Load dataset
ds = fetch_ucirepo(id=468)  # Online Shoppers Purchasing Intention Dataset

# Create Spark DataFrames
spark = SparkSession.builder.getOrCreate()
features_df = spark.createDataFrame(ds.data.features)
labels_df = spark.createDataFrame(ds.data.targets)

# Combine features with the target ("Revenue")
df = features_df.withColumn("id", monotonically_increasing_id()) \
    .join(labels_df.withColumn("id", monotonically_increasing_id()), on="id") \
    .drop("id")

# Define funnel columns
spark_df = df.withColumn("product_view", (col("ProductRelated")>0).cast("int")) \
                   .withColumn("add_to_cart", (col("PageValues") > 0).cast("int")) \
                   .withColumn("purchase", col("Revenue").cast("int"))

from pyspark.sql.functions import sum

funnel = spark_df.agg(
    sum("product_view").alias("Product Views"),
    sum("add_to_cart").alias("Add to Cart"),
    sum("purchase").alias("Purchases")
).toPandas()
funnel.insert(0, "Website Visits", df.count())

funnel["Conversion Rate"] = funnel["Purchases"] / funnel["Website Visits"]
funnel["Drop Off"] = funnel["Website Visits"].shift(0) - funnel["Product Views"]  # adjust per row
funnel["Drop Off Rate"] = funnel["Drop Off"] / funnel["Website Visits"]


print(funnel.head(10))

# Cell 5: Segmentation by VisitorType & Month
seg = spark_df.groupBy("VisitorType") \
    .agg(sum("purchase").alias("Purchases"), sum("product_view").alias("Product Views"), 
         sum("add_to_cart").alias("Add to Cart"), sum("purchase").alias("Purchases"), 
         sum("purchase") / sum("product_view")).withColumnRenamed("(sum(purchase) / sum(product_view))", "ConvRate")
display(seg)

seg_m = spark_df.groupBy("Month") \
    .agg(sum("purchase") / sum("product_view")).withColumnRenamed("((sum(purchase) / sum(product_view)))", "ConvRate")
display(seg_m)

# Cell 6: Bounce & Exit rates vs purchase
from pyspark.sql.functions import avg
spark_df.groupBy("purchase") \
  .agg(avg("BounceRates").alias("AvgBounce"), avg("ExitRates").alias("AvgExit")) \
  .display()

# 1️⃣ Average Order Value (AOV) - Assuming 'Revenue' is present
'''
aov_df = spark_df.filter(col("purchase") == 1) \
    .agg(round(avg("Revenue"), 2).alias("Average Order Value"))
aov_df.display()
'''

# 2️⃣ Cart Abandonment Rate
cart_abandonment_df = spark_df.agg(
    (sum(col("add_to_cart") - col("purchase")) / sum("add_to_cart")).alias("Cart Abandonment Rate")
)
cart_abandonment_df.display()
'''
# 3️⃣ Estimated Total Revenue (if 'Revenue' or 'price' field exists)
total_revenue_df = spark_df.agg(round(sum("Revenue"), 2).alias("Total Revenue"))
total_revenue_df.display()
'''
# 4️⃣ Average Session Duration vs. Conversion
session_vs_purchase_df = spark_df.groupBy("purchase") \
    .agg(
        round(avg("PageValues"), 2).alias("AvgPageValue"),
        round(avg("Administrative_Duration"), 2).alias("AvgAdminDuration"),
        round(avg("Informational_Duration"), 2).alias("AvgInfoDuration"),
        round(avg("ProductRelated_Duration"), 2).alias("AvgProductDuration")
    )
session_vs_purchase_df.display()

# 5️⃣ Device Category Segmentation (if device_type exists)
if "DeviceType" in spark_df.columns:
    device_seg = spark_df.groupBy("DeviceType") \
        .agg(
            sum("purchase").alias("Purchases"),
            count("*").alias("Sessions"),
            round(sum("purchase") / count("*"), 4).alias("ConvRate")
        )
    device_seg.display()

# 6️⃣ Returning vs New Visitor Conversion
visitor_seg = spark_df.groupBy("VisitorType") \
    .agg(
        count("*").alias("Sessions"),
        sum("purchase").alias("Purchases"),
        round(sum("purchase") / count("*"), 4).alias("ConvRate")
    )
visitor_seg.display()

# 7️⃣ Funnel by Region or Traffic Type (if available)
if "Region" in spark_df.columns:
    region_funnel = spark_df.groupBy("Region") \
        .agg(
            sum("product_view").alias("Product Views"),
            sum("add_to_cart").alias("Add to Cart"),
            sum("purchase").alias("Purchases"),
            round(sum("purchase") / count("*"), 4).alias("ConvRate")
        )
    region_funnel.display()


# ========== Save Cleaned Data ==========
spark_df.write.format("delta").mode("overwrite").saveAsTable("default.online_shopping_intention_cleaned")



: 