In [None]:
# Install Java (JDK 11 direkomendasikan)
!apt-get install openjdk-11-jdk -y

# Set environment variable agar PySpark bisa menemukan Java
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] += ":/usr/lib/jvm/java-11-openjdk-amd64/bin"

# Cek versi Java
!java -version


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  ca-certificates-java fonts-dejavu-core fonts-dejavu-extra java-common
  libatk-wrapper-java libatk-wrapper-java-jni libpcsclite1 libxt-dev libxtst6
  libxxf86dga1 openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
  x11-utils
Suggested packages:
  default-jre pcscd libxt-doc openjdk-11-demo openjdk-11-source visualvm
  libnss-mdns fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  | fonts-wqy-zenhei fonts-indic mesa-utils
The following NEW packages will be installed:
  ca-certificates-java fonts-dejavu-core fonts-dejavu-extra java-common
  libatk-wrapper-java libatk-wrapper-java-jni libpcsclite1 libxt-dev libxtst6
  libxxf86dga1 openjdk-11-jdk openjdk-11-jdk-headless openjdk-11-jre
  openjdk-11-jre-headless x11-utils
0 upgraded, 15 newly installed, 0 to remove and 41 not upgraded.
Need to get 122 MB of archives.


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Praktikum Data Preprocessing") \
    .getOrCreate()

print("SparkSession berhasil dibuat!")


SparkSession berhasil dibuat!


In [None]:
from pyspark.sql import Row

data = [
    Row(id_pelanggan=1, nama="Andi", jenis_kelamin="Laki-laki", kota="Jakarta", usia=25, gaji=7000000),
    Row(id_pelanggan=2, nama="Budi", jenis_kelamin="Laki-laki", kota="Bandung", usia=30, gaji=9000000),
    Row(id_pelanggan=3, nama="Citra", jenis_kelamin="Perempuan", kota="Jakarta", usia=28, gaji=12000000),
    Row(id_pelanggan=4, nama="Dewi", jenis_kelamin="Perempuan", kota="Surabaya", usia=35, gaji=15000000),
    Row(id_pelanggan=5, nama="Eko", jenis_kelamin="Laki-laki", kota="Bandung", usia=40, gaji=10000000),
]

df_bersih = spark.createDataFrame(data)
df_bersih.show()


+------------+-----+-------------+--------+----+--------+
|id_pelanggan| nama|jenis_kelamin|    kota|usia|    gaji|
+------------+-----+-------------+--------+----+--------+
|           1| Andi|    Laki-laki| Jakarta|  25| 7000000|
|           2| Budi|    Laki-laki| Bandung|  30| 9000000|
|           3|Citra|    Perempuan| Jakarta|  28|12000000|
|           4| Dewi|    Perempuan|Surabaya|  35|15000000|
|           5|  Eko|    Laki-laki| Bandung|  40|10000000|
+------------+-----+-------------+--------+----+--------+



In [None]:
from pyspark.sql.functions import max as spark_max, min as spark_min

df_aggr = df_bersih.groupBy("jenis_kelamin", "kota").agg(
    spark_max("gaji").alias("gaji_maks"),
    spark_min("usia").alias("usia_min")
)

df_aggr.show()


+-------------+--------+---------+--------+
|jenis_kelamin|    kota|gaji_maks|usia_min|
+-------------+--------+---------+--------+
|    Laki-laki| Bandung| 10000000|      30|
|    Laki-laki| Jakarta|  7000000|      25|
|    Perempuan| Jakarta| 12000000|      28|
|    Perempuan|Surabaya| 15000000|      35|
+-------------+--------+---------+--------+



In [None]:
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import when

splits = [-float("inf"), 7000000.0, 15000000.0, float("inf")]

bucketizer = Bucketizer(splits=splits, inputCol="gaji", outputCol="gaji_bin")
df_binned = bucketizer.transform(df_bersih)

df_binned = df_binned.withColumn(
    "level_gaji",
    when(df_binned["gaji_bin"] == 0.0, "Rendah")
    .when(df_binned["gaji_bin"] == 1.0, "Menengah")
    .when(df_binned["gaji_bin"] == 2.0, "Tinggi")
)

df_binned.select("id_pelanggan", "gaji", "level_gaji").show()


+------------+--------+----------+
|id_pelanggan|    gaji|level_gaji|
+------------+--------+----------+
|           1| 7000000|  Menengah|
|           2| 9000000|  Menengah|
|           3|12000000|  Menengah|
|           4|15000000|    Tinggi|
|           5|10000000|  Menengah|
+------------+--------+----------+



In [None]:
from pyspark.sql.functions import col

df_fe = df_bersih.withColumn("usia_x_gaji", col("usia") * col("gaji"))
df_fe.select("id_pelanggan", "usia", "gaji", "usia_x_gaji").show(5)


+------------+----+--------+-----------+
|id_pelanggan|usia|    gaji|usia_x_gaji|
+------------+----+--------+-----------+
|           1|  25| 7000000|  175000000|
|           2|  30| 9000000|  270000000|
|           3|  28|12000000|  336000000|
|           4|  35|15000000|  525000000|
|           5|  40|10000000|  400000000|
+------------+----+--------+-----------+



In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

skema_produk = StructType([
    StructField("id_produk", IntegerType()),
    StructField("nama_produk", StringType()),
    StructField("kategori", StringType()),
    StructField("harga", IntegerType()),
    StructField("rating", FloatType()),
    StructField("terjual", IntegerType()),
    StructField("tgl_rilis", StringType()),
    StructField("status_stok", StringType())
])

data_produk = [
    (101, 'Laptop A', 'Elektronik', 15000000, 4.5, 120, '2023-01-20', 'stok_tersedia'),
    (102, 'Smartphone B', 'Elektronik', 8000000, 4.7, 250, '2023-02-10', 'stok_tersedia'),
    (103, 'Headphone C', 'Aksesoris', 1200000, 4.2, None, '2023-02-15', 'stok_habis'),
    (104, 'Laptop A', 'Elektronik', 15000000, 4.5, 120, '2023-01-20', 'stok_tersedia'),
    (105, 'Tablet D', 'Elektronik', 6500000, None, 80, '2023-03-01', 'stok_tersedia'),
    (106, 'Charger E', 'Aksesoris', 250000, -4.0, 500, '2023-03-05', 'Stok_Tersedia'),
    (107, 'Smartwatch F', 'Elektronik', 3100000, 4.8, 150, '2023-04-12', 'stok_habis')
]

df_produk = spark.createDataFrame(data=data_produk, schema=skema_produk)
df_produk.show(truncate=False)


+---------+------------+----------+--------+------+-------+----------+-------------+
|id_produk|nama_produk |kategori  |harga   |rating|terjual|tgl_rilis |status_stok  |
+---------+------------+----------+--------+------+-------+----------+-------------+
|101      |Laptop A    |Elektronik|15000000|4.5   |120    |2023-01-20|stok_tersedia|
|102      |Smartphone B|Elektronik|8000000 |4.7   |250    |2023-02-10|stok_tersedia|
|103      |Headphone C |Aksesoris |1200000 |4.2   |NULL   |2023-02-15|stok_habis   |
|104      |Laptop A    |Elektronik|15000000|4.5   |120    |2023-01-20|stok_tersedia|
|105      |Tablet D    |Elektronik|6500000 |NULL  |80     |2023-03-01|stok_tersedia|
|106      |Charger E   |Aksesoris |250000  |-4.0  |500    |2023-03-05|Stok_Tersedia|
|107      |Smartwatch F|Elektronik|3100000 |4.8   |150    |2023-04-12|stok_habis   |
+---------+------------+----------+--------+------+-------+----------+-------------+



In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import mean

# Normalisasi status stok
df_produk = df_produk.withColumn("status_stok", F.trim(F.lower(F.col("status_stok"))))

# Perbaiki rating negatif
df_produk = df_produk.withColumn("rating", F.abs(F.col("rating")))

# Hapus duplikat
df_produk = df_produk.dropDuplicates()

# Hitung mean rating & median terjual
mean_rating = df_produk.select(mean(F.col("rating"))).collect()[0][0]
median_terjual = df_produk.stat.approxQuantile("terjual", [0.5], 0.0)[0]

# Imputasi
df_produk = df_produk.na.fill({
    "rating": float(mean_rating),
    "terjual": int(median_terjual)
})

df_produk.show(truncate=False)


+---------+------------+----------+--------+------+-------+----------+-------------+
|id_produk|nama_produk |kategori  |harga   |rating|terjual|tgl_rilis |status_stok  |
+---------+------------+----------+--------+------+-------+----------+-------------+
|101      |Laptop A    |Elektronik|15000000|4.5   |120    |2023-01-20|stok_tersedia|
|102      |Smartphone B|Elektronik|8000000 |4.7   |250    |2023-02-10|stok_tersedia|
|106      |Charger E   |Aksesoris |250000  |4.0   |500    |2023-03-05|stok_tersedia|
|107      |Smartwatch F|Elektronik|3100000 |4.8   |150    |2023-04-12|stok_habis   |
|104      |Laptop A    |Elektronik|15000000|4.5   |120    |2023-01-20|stok_tersedia|
|103      |Headphone C |Aksesoris |1200000 |4.2   |120    |2023-02-15|stok_habis   |
|105      |Tablet D    |Elektronik|6500000 |4.45  |80     |2023-03-01|stok_tersedia|
+---------+------------+----------+--------+------+-------+----------+-------------+



In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

assembler = VectorAssembler(inputCols=["harga", "rating", "terjual"], outputCol="features_num")
df_num = assembler.transform(df_produk)

scaler = StandardScaler(inputCol="features_num", outputCol="features_num_scaled", withMean=True, withStd=True)
scaler_model = scaler.fit(df_num)
df_scaled = scaler_model.transform(df_num)
df_scaled.select("features_num_scaled").show(5, truncate=False)


+-------------------------------------------------------------+
|features_num_scaled                                          |
+-------------------------------------------------------------+
|[1.3091259608901722,0.18156843105475512,-0.48887433956125864]|
|[0.16261707646446283,0.907840671071254,0.40087695844023225]  |
|[-1.1067320455782867,-1.6341139005560985,2.1119371469046375] |
|[-0.6399391426335337,1.27097852264911,-0.28354711694553]     |
|[1.3091259608901722,0.18156843105475512,-0.48887433956125864]|
+-------------------------------------------------------------+
only showing top 5 rows



In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Tambah kolom bulan_rilis
df_scaled = df_scaled.withColumn("ts_rilis", F.to_timestamp("tgl_rilis", "yyyy-MM-dd"))
df_scaled = df_scaled.withColumn("bulan_rilis", F.month("ts_rilis"))

# Indexing
idx_kategori = StringIndexer(inputCol="kategori", outputCol="kategori_index").fit(df_scaled)
idx_status = StringIndexer(inputCol="status_stok", outputCol="status_index").fit(df_scaled)
df_idx = idx_kategori.transform(df_scaled)
df_idx = idx_status.transform(df_idx)

# One-Hot Encoding
ohe = OneHotEncoder(inputCols=["kategori_index", "status_index"], outputCols=["kategori_ohe", "status_ohe"])
ohe_model = ohe.fit(df_idx)
df_final = ohe_model.transform(df_idx)

df_final.select("id_produk", "kategori", "status_stok", "bulan_rilis", "kategori_ohe", "status_ohe").show(truncate=False)


+---------+----------+-------------+-----------+-------------+-------------+
|id_produk|kategori  |status_stok  |bulan_rilis|kategori_ohe |status_ohe   |
+---------+----------+-------------+-----------+-------------+-------------+
|101      |Elektronik|stok_tersedia|1          |(1,[0],[1.0])|(1,[0],[1.0])|
|102      |Elektronik|stok_tersedia|2          |(1,[0],[1.0])|(1,[0],[1.0])|
|106      |Aksesoris |stok_tersedia|3          |(1,[],[])    |(1,[0],[1.0])|
|107      |Elektronik|stok_habis   |4          |(1,[0],[1.0])|(1,[],[])    |
|104      |Elektronik|stok_tersedia|1          |(1,[0],[1.0])|(1,[0],[1.0])|
|103      |Aksesoris |stok_habis   |2          |(1,[],[])    |(1,[],[])    |
|105      |Elektronik|stok_tersedia|3          |(1,[0],[1.0])|(1,[0],[1.0])|
+---------+----------+-------------+-----------+-------------+-------------+



In [None]:
cols_to_show = [
    "id_produk", "nama_produk", "kategori", "harga", "rating", "terjual",
    "tgl_rilis", "bulan_rilis", "status_stok", "features_num", "features_num_scaled",
    "kategori_index", "kategori_ohe", "status_index", "status_ohe"
]

df_final.select(*cols_to_show).show(truncate=False)
