1. Ambil sebuah studi kasus tertentu dengan dataset primer / sekunder dengan
syarat memenuhi minimal 3 dari kategori Big Data 5V, boleh data terstruktur
maupun tidak terstruktur.

2. Gunakan file system HDFS (Hadoop file system) maupun file system lain untuk
menyimpan dataset yang anda gunakan

In [None]:
!curl -L -o ./e-commerce-shopper-behavior-amazonshopify-based.zip\
  https://www.kaggle.com/api/v1/datasets/download/dhrubangtalukdar/e-commerce-shopper-behavior-amazonshopify-based

!unzip e-commerce-shopper-behavior-amazonshopify-based.zip

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 64.5M  100 64.5M    0     0  76.3M      0 --:--:-- --:--:-- --:--:--  258M
Archive:  e-commerce-shopper-behavior-amazonshopify-based.zip
  inflating: e_commerce_shopper_behaviour_and_lifestyle.csv  


3. Lakukan pemrosesan data menggunakan PySpark sesuai dengan pipeline big
data diantaranya :

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("UAS_BDPAL_Ecommerce") \
    .getOrCreate()

df = spark.read.csv(
    "./e_commerce_shopper_behaviour_and_lifestyle.csv",
    header=True,
    inferSchema=True
)

df.printSchema()
df.show(5)

root
 |-- user_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- urban_rural: string (nullable = true)
 |-- income_level: integer (nullable = true)
 |-- employment_status: string (nullable = true)
 |-- education_level: string (nullable = true)
 |-- relationship_status: string (nullable = true)
 |-- has_children: integer (nullable = true)
 |-- household_size: integer (nullable = true)
 |-- occupation: string (nullable = true)
 |-- ethnicity: string (nullable = true)
 |-- language_preference: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- weekly_purchases: integer (nullable = true)
 |-- monthly_spend: integer (nullable = true)
 |-- cart_abandonment_rate: integer (nullable = true)
 |-- review_writing_frequency: integer (nullable = true)
 |-- average_order_value: integer (nullable = true)
 |-- preferred_payment_method: string (nullable = true)
 |-- coupon_usage_freque



*   Gunakan pemrosesan batch dengan menggunakan mapreduce



In [None]:
rdd = df.rdd

user_activity = (
    rdd
    .map(lambda x: (x.user_id, 1))      # MAP
    .reduceByKey(lambda a, b: a + b)    # REDUCE
)

user_activity.take(5)

[(2, 1), (4, 1), (6, 1), (8, 1), (10, 1)]



*  EDA (Exploratory Data Analysis)



In [None]:
# Struktur Data
df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- urban_rural: string (nullable = true)
 |-- income_level: integer (nullable = true)
 |-- employment_status: string (nullable = true)
 |-- education_level: string (nullable = true)
 |-- relationship_status: string (nullable = true)
 |-- has_children: integer (nullable = true)
 |-- household_size: integer (nullable = true)
 |-- occupation: string (nullable = true)
 |-- ethnicity: string (nullable = true)
 |-- language_preference: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- weekly_purchases: integer (nullable = true)
 |-- monthly_spend: integer (nullable = true)
 |-- cart_abandonment_rate: integer (nullable = true)
 |-- review_writing_frequency: integer (nullable = true)
 |-- average_order_value: integer (nullable = true)
 |-- preferred_payment_method: string (nullable = true)
 |-- coupon_usage_freque

In [None]:
# Statistik Deskriptif
df.select(
    "age",
    "monthly_spend",
    "average_order_value",
    "weekly_purchases"
).describe().show()

+-------+------------------+------------------+-------------------+-----------------+
|summary|               age|     monthly_spend|average_order_value| weekly_purchases|
+-------+------------------+------------------+-------------------+-----------------+
|  count|           1000000|           1000000|            1000000|          1000000|
|   mean|         49.003377|       2498.775654|         255.031632|         9.993011|
| stddev|18.193958739537862|1444.2086738790817| 141.70846605868624|6.055123683163065|
|    min|                18|                 0|                 10|                0|
|    max|                80|              5000|                500|               20|
+-------+------------------+------------------+-------------------+-----------------+



In [None]:
# Distribusi Kategori
df.groupBy("gender").count().show()
df.groupBy("country").count().show(5)

+----------+------+
|    gender| count|
+----------+------+
|    Female|479808|
|     Other| 19837|
|Non-binary| 20223|
|      Male|480132|
+----------+------+

+-------+------+
|country| count|
+-------+------+
|Germany|100097|
| France| 99992|
|  India| 99899|
|  China|100190|
|    USA| 99996|
+-------+------+
only showing top 5 rows




*   Preprocessing Data (Cleaning & Preparation)



In [None]:
# Cek Missing Value
from pyspark.sql.functions import col, when, count

df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df.columns
]).show()

+-------+---+------+-------+-----------+------------+-----------------+---------------+-------------------+------------+--------------+----------+---------+-------------------+-----------+----------------+-------------+---------------------+------------------------+-------------------+------------------------+----------------------+----------------------+--------------+---------------------------+--------------------+---------------+---------------------------+-------------------+----------------+---------------+-------------------+--------------------+---------------------------+-------------------------+----------------+-----------+----------------------------+--------------+------------------+-------------------------------+--------------------+-------------+-----------------------+-------------------+--------------------------+---------------------+----------------+-----------------+--------------------+------------------+-------------------------------+------------------------+---

In [None]:
# Handling Missing Value
df_clean = df.na.drop()

In [None]:
# Casting Tipe Data
df_clean = (
    df_clean
    .withColumn("monthly_spend", col("monthly_spend").cast("double"))
    .withColumn("average_order_value", col("average_order_value").cast("double"))
    .withColumn("weekly_purchases", col("weekly_purchases").cast("int"))
)

* Manipulasi Data (Spark SQL)

In [None]:
# Register Temp View
df_clean.createOrReplaceTempView("ecommerce")

In [None]:
# Query Agregasi
spark.sql("""
SELECT country,
       AVG(monthly_spend) AS avg_spend
FROM ecommerce
GROUP BY country
ORDER BY avg_spend DESC
""").show()

+---------+------------------+
|  country|         avg_spend|
+---------+------------------+
|   France| 2502.496429714377|
|    India|2502.4020160361965|
|    China|2501.1314402634994|
|   Canada|2500.6225475054057|
|      USA| 2500.461798471939|
|       UK|2499.5985882352943|
|Australia|2498.7318702862744|
|   Brazil|2497.1284286794858|
|  Germany|2493.6787216400094|
|    Japan| 2491.448741224885|
+---------+------------------+



In [None]:
# Query Filter
spark.sql("""
SELECT user_id, monthly_spend
FROM ecommerce
WHERE monthly_spend > 500
""").show(5)

+-------+-------------+
|user_id|monthly_spend|
+-------+-------------+
|      1|       2405.0|
|      2|       3651.0|
|      3|       2045.0|
|      4|       1611.0|
|      5|       3476.0|
+-------+-------------+
only showing top 5 rows


* Operasi RDD

In [None]:
# map & filter
rdd.map(lambda x: (x.country, x.monthly_spend)) \
   .filter(lambda x: x[1] > 500) \
   .take(5)

[('Germany', 2405),
 ('Japan', 3651),
 ('India', 2045),
 ('Canada', 1611),
 ('Japan', 3476)]

In [None]:
# groupByKey
rdd.map(lambda x: (x.country, x.weekly_purchases)) \
   .groupByKey() \
   .mapValues(sum) \
   .take(5)

[('Germany', 999250),
 ('Japan', 989716),
 ('France', 997499),
 ('Australia', 998991),
 ('UK', 1000414)]

In [None]:
# reduceByKey
rdd.map(lambda x: (x.country, x.weekly_purchases)) \
   .reduceByKey(lambda a, b: a + b) \
   .take(5)

[('Germany', 999250),
 ('Japan', 989716),
 ('France', 997499),
 ('Australia', 998991),
 ('UK', 1000414)]

In [None]:
# combineByKey
def createCombiner(v):
    return (v, 1)

def mergeValue(acc, v):
    return (acc[0] + v, acc[1] + 1)

def mergeCombiners(acc1, acc2):
    return (acc1[0] + acc2[0], acc1[1] + acc2[1])

avg_spend_country = (
    rdd.map(lambda x: (x.country, x.monthly_spend))
       .combineByKey(createCombiner, mergeValue, mergeCombiners)
       .mapValues(lambda x: x[0] / x[1])
)

avg_spend_country.take(5)

[('Germany', 2493.6787216400094),
 ('Japan', 2491.448741224885),
 ('France', 2502.496429714377),
 ('Australia', 2498.7318702862744),
 ('UK', 2499.5985882352943)]

4. Dari data yang telah bersih lakukan permodelan algoritma berbasis ML framework (MLLib) sesuai dengan tugas dataset yang anda miliki (supervised
/ unsupervised), komparasi minimal 2 algoritma.

* Menentukan Label & Fitur

In [None]:
# Label
label_col = "premium_subscription"

In [None]:
# Fitur (dipilih yang relevan & numerik)
feature_cols = [
    "age",
    "income_level",
    "monthly_spend",
    "average_order_value",
    "weekly_purchases",
    "brand_loyalty_score",
    "impulse_buying_score",
    "daily_session_time_minutes",
    "product_views_per_day",
    "cart_abandonment_rate"
]

* Feature Engineering (VectorAssembler)

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

df_ml = assembler.transform(df_clean) \
                 .select("features", label_col)

* Split Data Train & Test

In [None]:
train_data, test_data = df_ml.randomSplit([0.8, 0.2], seed=42)

* Model 1 — Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol=label_col
)

lr_model = lr.fit(train_data)
lr_predictions = lr_model.transform(test_data)

* Model 2 — Random Forest Classifier

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol=label_col,
    numTrees=50,
    seed=42
)

rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)

5. Lakukan hyperparameter tuning terhadap best model,
Interpretasikan hasil yang ada dengan baik

* Evaluator yang Digunakan

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol=label_col,
    metricName="areaUnderROC"
)

* Menentukan Hyperparameter Grid

In [None]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

* Cross Validation

In [None]:
from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

* Training Model dengan Hyperparameter Tuning

In [None]:
cv_model = cv.fit(train_data)
best_model = cv_model.bestModel

* Menampilkan Parameter Terbaik

In [None]:
best_model.getNumTrees
best_model.getOrDefault("maxDepth")

10

6. Gunakan evaluasi model yang anda telah buat seperti (RMSE, MSE, Akurasi,
presisi, recall, F1-Score, AUC, Silhoutte)

* Melakukan Prediksi pada Data Uji

In [None]:
predictions = best_model.transform(test_data)
predictions.select(
    label_col,
    "prediction",
    "probability"
).show(5)

+--------------------+----------+--------------------+
|premium_subscription|prediction|         probability|
+--------------------+----------+--------------------+
|                   0|       0.0|[0.63994516611417...|
|                   0|       0.0|[0.64393456719125...|
|                   0|       0.0|[0.65623376091409...|
|                   0|       0.0|[0.63480710015185...|
|                   1|       0.0|[0.63217829751988...|
+--------------------+----------+--------------------+
only showing top 5 rows


* Evaluasi AUC (Area Under ROC)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

auc = evaluator.evaluate(predictions)
auc

0.49841773526944133

* Evaluasi Accuracy

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

accuracy_evaluator = MulticlassClassificationEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = accuracy_evaluator.evaluate(predictions)
accuracy

0.640884060722959

* Evaluasi Precision, Recall, dan F1-Score

In [None]:
precision_evaluator = MulticlassClassificationEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="weightedPrecision"
)

recall_evaluator = MulticlassClassificationEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="weightedRecall"
)

f1_evaluator = MulticlassClassificationEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="f1"
)

precision = precision_evaluator.evaluate(predictions)
recall = recall_evaluator.evaluate(predictions)
f1_score = f1_evaluator.evaluate(predictions)

precision, recall, f1_score

(0.41073237928874945, 0.640884060722959, 0.5006232787803233)