In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# 1. Buat SparkSession
# SparkSession adalah entry point untuk menggunakan fungsionalitas Spark dengan DataFrame API.
# .builder: Memulai pembangunan SparkSession.
# .appName("ProduksiIkanCumiApp"): Memberikan nama aplikasi Spark Anda.
# .getOrCreate(): Mengembalikan SparkSession yang ada atau membuat yang baru jika belum ada.
spark = SparkSession.builder \
    .appName("ProduksiIkanCumiApp") \
    .getOrCreate()

# 2. Tentukan Path Data Parquet di HDFS
# Ini adalah lokasi di HDFS tempat Hive menyimpan tabel Parquet Anda.
# Anda bisa mendapatkan path ini dengan menjalankan `DESCRIBE EXTENDED data_produksi_ikan_cumi_parquet;` di Hive CLI/Beeline
# dan mencari baris 'Location'.
# Biasanya formatnya adalah hdfs://<namenode_host>:<port>/user/hive/warehouse/<database_name>.db/<table_name>
# Jika Anda menggunakan kluster pseudo-distributed lokal (misalnya di Docker) dan port default HDFS,
# ini seringkali adalah hdfs://localhost:9000 atau hanya /user/hive/warehouse/...
# Mari kita asumsikan path yang umum:
hdfs_path = "hdfs://localhost:9000/user/hive/warehouse/data_produksi_ikan_cumi_parquet"

# Penting: Jika Anda tidak bisa menemukan port NameNode (biasanya 9000),
# atau jika Anda menggunakan Docker Compose dan nama service HDFS Anda berbeda,
# sesuaikan 'localhost:9000' dengan konfigurasi Anda.
# Seringkali, jika HDFS dan Spark berada di lingkungan yang sama (misalnya satu Docker container besar),
# Anda bisa cukup menggunakan path HDFS tanpa skema hdfs://<host>:<port>
# Contoh: hdfs_path = "/user/hive/warehouse/data_produksi_ikan_cumi_parquet"
# Coba yang tanpa hdfs://localhost:9000 dulu jika Anda di satu container.

# 3. Baca Data Parquet dari HDFS
# Spark akan secara otomatis mengidentifikasi format Parquet berdasarkan ekstensi file dan metadatanya.
df_cumi = spark.read.parquet(hdfs_path)

# 4. Tampilkan Schema dan Beberapa Baris Data
# Ini akan membantu Anda memverifikasi bahwa data telah dimuat dengan benar.
print("Schema DataFrame:")
df_cumi.printSchema()

print("\n10 Baris Pertama DataFrame:")
df_cumi.show(10)

# 5. Lakukan Pengolahan Data (Contoh)
# Anda sekarang bisa melakukan berbagai operasi Spark DataFrame.
# Contoh: Menghitung total volume produksi cumi-cumi per provinsi
total_volume_per_provinsi = df_cumi.groupBy("Provinsi") \
                                   .agg({"Volume_Produksi_Ton": "sum"}) \
                                   .orderBy("sum(Volume_Produksi_Ton)", ascending=False)

print("\nTotal Volume Produksi Cumi-Cumi per Provinsi:")
total_volume_per_provinsi.show()

25/05/22 19:19:13 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 172.17.0.2 instead (on interface eth0)
25/05/22 19:19:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/22 19:19:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Schema DataFrame:
root
 |-- tahun: integer (nullable = true)
 |-- kabupaten_kota: string (nullable = true)
 |-- wpp: string (nullable = true)
 |-- volume_produksi_ton: long (nullable = true)
 |-- nilai_produksi_rp_000: long (nullable = true)
 |-- provinsi: string (nullable = true)
 |-- kelompok: string (nullable = true)
 |-- jenis_ikan: string (nullable = true)


10 Baris Pertama DataFrame:


                                                                                

+-----+-------------------+----------+-------------------+---------------------+----------------+---------+----------+
|tahun|     kabupaten_kota|       wpp|volume_produksi_ton|nilai_produksi_rp_000|        provinsi| kelompok|jenis_ikan|
+-----+-------------------+----------+-------------------+---------------------+----------------+---------+----------+
| 2023|               AGAM|WPP-RI-572|                 65|              4550000|  SUMATERA BARAT|CUMI-CUMI|CUMI-CUMI;|
| 2023|             ASAHAN|WPP-RI-571|                872|             52328880|  SUMATERA UTARA|CUMI-CUMI|CUMI-CUMI;|
| 2023|          BANYUASIN|WPP-RI-711|                769|             19226900|SUMATERA SELATAN|CUMI-CUMI|CUMI-CUMI;|
| 2023|          BATU BARA|WPP-RI-571|               1400|             63005535|  SUMATERA UTARA|CUMI-CUMI|CUMI-CUMI;|
| 2023|       DELI SERDANG|WPP-RI-571|                643|             26511140|  SUMATERA UTARA|CUMI-CUMI|CUMI-CUMI;|
| 2023| KEPULAUAN MENTAWAI|WPP-RI-572|          

In [3]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd

# Pastikan Anda sudah menjalankan kode untuk membuat SparkSession dan memuat df_cumi
# Contoh sederhana jika Anda baru memulai atau ingin menjalankan ulang bagian ini:
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("ProduksiIkanCumiClustering").getOrCreate()
# hdfs_path = "hdfs://localhost:9000/user/hive/warehouse/data_produksi_ikan_cumi_parquet"
# df_cumi = spark.read.parquet(hdfs_path)

# Pilih kolom-kolom numerik yang akan digunakan untuk klasterisasi
# Kita akan gunakan Volume_Produksi_Ton dan Nilai_Produksi_Rp_000
feature_cols = ['volume_produksi_ton', 'nilai_produksi_rp_000']

# Hapus baris dengan nilai null pada kolom fitur
df_cumi_cleaned = df_cumi.na.drop(subset=feature_cols)

# Inisialisasi VectorAssembler
# InputCols adalah kolom-kolom yang ingin digabungkan
# OutputCol adalah nama kolom baru yang akan berisi vektor fitur
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Terapkan VectorAssembler ke DataFrame
# Ini akan menghasilkan DataFrame baru dengan kolom 'features'
df_features = assembler.transform(df_cumi_cleaned)

print("Schema DataFrame setelah VectorAssembler:")
df_features.printSchema()
df_features.select("features").show(5, truncate=False)

Schema DataFrame setelah VectorAssembler:
root
 |-- tahun: integer (nullable = true)
 |-- kabupaten_kota: string (nullable = true)
 |-- wpp: string (nullable = true)
 |-- volume_produksi_ton: long (nullable = true)
 |-- nilai_produksi_rp_000: long (nullable = true)
 |-- provinsi: string (nullable = true)
 |-- kelompok: string (nullable = true)
 |-- jenis_ikan: string (nullable = true)
 |-- features: vector (nullable = true)

+--------------------+
|features            |
+--------------------+
|[65.0,4550000.0]    |
|[872.0,5.232888E7]  |
|[769.0,1.92269E7]   |
|[1400.0,6.3005535E7]|
|[643.0,2.651114E7]  |
+--------------------+
only showing top 5 rows



In [4]:
# Inisialisasi StandardScaler
# inputCol adalah kolom fitur yang sudah digabungkan
# outputCol adalah nama kolom baru untuk fitur yang sudah diskalakan
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False) # withMean=False untuk data sparse jika ada 0

# Latih scaler pada data (menghitung rata-rata dan standar deviasi)
scaler_model = scaler.fit(df_features)

# Terapkan scaler ke DataFrame
df_scaled = scaler_model.transform(df_features)

print("\nSchema DataFrame setelah StandardScaler:")
df_scaled.printSchema()
df_scaled.select("features", "scaledFeatures").show(5, truncate=False)


Schema DataFrame setelah StandardScaler:
root
 |-- tahun: integer (nullable = true)
 |-- kabupaten_kota: string (nullable = true)
 |-- wpp: string (nullable = true)
 |-- volume_produksi_ton: long (nullable = true)
 |-- nilai_produksi_rp_000: long (nullable = true)
 |-- provinsi: string (nullable = true)
 |-- kelompok: string (nullable = true)
 |-- jenis_ikan: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)

+--------------------+-----------------------------------------+
|features            |scaledFeatures                           |
+--------------------+-----------------------------------------+
|[65.0,4550000.0]    |[0.043537413903869615,0.0796067957227162]|
|[872.0,5.232888E7]  |[0.5840711526796047,0.9155460352875888]  |
|[769.0,1.92269E7]   |[0.5150810968011652,0.33639382432551473] |
|[1400.0,6.3005535E7]|[0.9377289148525764,1.102344781130867]   |
|[643.0,2.651114E7]  |[0.43068549446443327,0.4638388805178748] |
+----

In [5]:
# Tentukan jumlah kluster yang diinginkan (misalnya, 3 kluster)
k = 3

# Inisialisasi model KMeans
# featuresCol adalah kolom fitur yang akan digunakan (setelah scaling)
# predictionCol adalah nama kolom baru yang akan berisi ID kluster hasil prediksi
# seed untuk hasil yang reproducible
kmeans = KMeans(featuresCol="scaledFeatures", k=k, seed=1)

# Latih model KMeans pada data yang sudah diskalakan
model = kmeans.fit(df_scaled)

# Dapatkan kluster hasil prediksi untuk setiap baris
# Kolom 'prediction' akan ditambahkan ke DataFrame
predictions = model.transform(df_scaled)

print(f"\nDistribusi Kluster (untuk k={k}):")
predictions.groupBy("prediction").count().orderBy("prediction").show()

print("\n10 Baris Pertama dengan Prediksi Kluster:")
predictions.select("Tahun", "Kabupaten_Kota", "Volume_Produksi_Ton", "Nilai_Produksi_Rp_000", "prediction").show(10)

# Menampilkan pusat kluster (cluster centers)
centers = model.clusterCenters()
print("\nPusat Kluster (dalam fitur yang sudah diskalakan):")
for i, center in enumerate(centers):
    print(f"Kluster {i}: {center}")

# Jika Anda ingin melihat pusat kluster dalam skala asli (lebih interpretif), Anda perlu "inverse transform"
# Ini sedikit lebih kompleks karena StandardScaler tidak punya inverse_transform bawaan seperti scikit-learn.
# Anda bisa secara manual mengalikan kembali dengan standar deviasi dan menambahkan mean jika withMean=True
# Untuk tujuan ini, mari kita asumsikan interpretasi dari scaledFeatures sudah cukup.

25/05/22 19:26:36 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS



Distribusi Kluster (untuk k=3):
+----------+-----+
|prediction|count|
+----------+-----+
|         0|   19|
|         1|  138|
|         2|    2|
+----------+-----+


10 Baris Pertama dengan Prediksi Kluster:
+-----+-------------------+-------------------+---------------------+----------+
|Tahun|     Kabupaten_Kota|Volume_Produksi_Ton|Nilai_Produksi_Rp_000|prediction|
+-----+-------------------+-------------------+---------------------+----------+
| 2023|               AGAM|                 65|              4550000|         1|
| 2023|             ASAHAN|                872|             52328880|         1|
| 2023|          BANYUASIN|                769|             19226900|         1|
| 2023|          BATU BARA|               1400|             63005535|         1|
| 2023|       DELI SERDANG|                643|             26511140|         1|
| 2023| KEPULAUAN MENTAWAI|                 68|              2769015|         1|
| 2023|KOTA BANDAR LAMPUNG|                 59|              

In [6]:
from pyspark.ml.evaluation import ClusteringEvaluator

# Pastikan 'predictions' DataFrame sudah ada dari langkah klasterisasi sebelumnya.
# DataFrame ini harus memiliki kolom 'prediction' (hasil klaster) dan 'scaledFeatures' (fitur yang digunakan).

# Inisialisasi evaluator untuk Silhouette Score
evaluator = ClusteringEvaluator(
    predictionCol="prediction",  # Kolom yang berisi ID kluster hasil prediksi
    featuresCol="scaledFeatures",  # Kolom yang berisi fitur yang digunakan untuk klasterisasi
    metricName="silhouette",       # Metrik yang ingin dihitung
    distanceMeasure="squaredEuclidean" # Pengukuran jarak yang digunakan oleh K-Means
)

# Hitung Silhouette Score
silhouette = evaluator.evaluate(predictions)

print(f"\nSilhouette Score: {silhouette}")

# Anda juga bisa menampilkan WSSSE (Within Set Sum of Squared Errors)
# Ini adalah metrik yang dioptimalkan oleh K-Means, semakin kecil semakin baik.
# Namun, tidak ada batasan bawah, jadi lebih cocok untuk membandingkan model dengan K yang berbeda.
wssse = model.summary.trainingCost
print(f"Within Set Sum of Squared Errors (WSSSE): {wssse}")


Silhouette Score: 0.880911329039544
Within Set Sum of Squared Errors (WSSSE): 45.50444148055718


In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

# Pastikan SparkSession sudah dibuat dan DataFrame 'predictions' sudah tersedia
# dari langkah klasterisasi sebelumnya. Jika tidak, Anda perlu menjalankannya lagi:
# spark = SparkSession.builder.appName("ProduksiIkanCumiClusteringInterpretation").getOrCreate()
# hdfs_path = "hdfs://localhost:9000/user/hive/warehouse/data_produksi_ikan_cumi_parquet"
# df_cumi = spark.read.parquet(hdfs_path)
# ... (jalankan VectorAssembler, StandardScaler, dan KMeans untuk mendapatkan 'predictions') ...

# Hitung rata-rata fitur asli untuk setiap kluster
cluster_summary = predictions.groupBy("prediction").agg(
    avg("Volume_Produksi_Ton").alias("Avg_Volume_Produksi_Ton"),
    avg("Nilai_Produksi_Rp_000").alias("Avg_Nilai_Produksi_Rp_000")
).orderBy("prediction")

print("\nRata-rata Volume dan Nilai Produksi per Kluster:")
cluster_summary.show(truncate=False)

# spark.stop() # Hentikan SparkSession jika ini adalah operasi terakhir


Rata-rata Volume dan Nilai Produksi per Kluster:
+----------+-----------------------+-------------------------+
|prediction|Avg_Volume_Produksi_Ton|Avg_Nilai_Produksi_Rp_000|
+----------+-----------------------+-------------------------+
|0         |2679.0                 |1.1246813531578948E8     |
|1         |340.09420289855075     |1.3997276956521738E7     |
|2         |11194.0                |3.9181044E8              |
+----------+-----------------------+-------------------------+



In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Pastikan SparkSession sudah dibuat dan DataFrame 'predictions' sudah tersedia
# Jika Anda baru memulai atau ingin menjalankan ulang bagian ini, Anda perlu menjalankan
# langkah-langkah sebelumnya (load data, feature engineering, scaling, clustering)
# untuk mendapatkan DataFrame 'predictions'.

# Filter DataFrame hanya untuk Kluster 2 (Produksi Tinggi)
df_cluster2 = predictions.filter(col("prediction") == 2)

print("---")
print("Top 10 Kabupaten/Kota di Kluster 2 (Produksi Tinggi):")
# Hitung jumlah entri per Kabupaten/Kota dalam Kluster 2
kab_kota_cluster2 = df_cluster2.groupBy("Kabupaten_Kota").count().orderBy(col("count").desc())
kab_kota_cluster2.show(10, truncate=False)

print("---")
print("Top 10 Provinsi di Kluster 2 (Produksi Tinggi):")
# Hitung jumlah entri per Provinsi dalam Kluster 2
provinsi_cluster2 = df_cluster2.groupBy("Provinsi").count().orderBy(col("count").desc())
provinsi_cluster2.show(10, truncate=False)

# Jangan lupa menghentikan SparkSession jika ini adalah operasi terakhir Anda
# spark.stop()

---
Top 10 Kabupaten/Kota di Kluster 2 (Produksi Tinggi):
+------------------+-----+
|Kabupaten_Kota    |count|
+------------------+-----+
|KOTA TANJUNG BALAI|2    |
+------------------+-----+

---
Top 10 Provinsi di Kluster 2 (Produksi Tinggi):
+--------------+-----+
|Provinsi      |count|
+--------------+-----+
|SUMATERA UTARA|2    |
+--------------+-----+



In [23]:
from pyspark.sql import SparkSession

# Pastikan SparkSession sudah dibuat dan DataFrame 'predictions' tersedia.
# Contoh:
# spark = SparkSession.builder.appName("ExportLocalParquet").getOrCreate()
# asumsi 'predictions' adalah DataFrame yang berisi data cumi-cumi Anda dengan kolom 'prediction'

# Tentukan jalur output di filesystem lokal container JupyterLab Anda.
# Misalnya, di dalam folder /home/jovyan/work/output_parquet
# Atau, jika Anda mau, bisa langsung di root folder kerja Anda: "./output_cumi_parquet"
local_output_path = "/home/jovyan/work/output_cumi_parquet"

# --- OPSI 1: Menyimpan seluruh hasil klasterisasi ke Parquet lokal ---
print(f"Menyimpan seluruh DataFrame 'predictions' ke {local_output_path}_all_clusters...")
predictions.write.mode("overwrite").parquet(f"{local_output_path}_all_clusters")
print("Selesai.")
# Jangan lupa menghentikan SparkSession jika sudah selesai dengan semua operasi.
# spark.stop()

Menyimpan seluruh DataFrame 'predictions' ke /home/jovyan/work/output_cumi_parquet_all_clusters...


[Stage 70:>                                                         (0 + 1) / 1]

Selesai.


                                                                                

In [24]:
from pyspark.ml.clustering import KMeansModel # Ini penting untuk memuat model nanti
import os

# Pastikan objek 'model' (model KMeans yang sudah dilatih) tersedia di sesi Jupyter Anda.
# Jika Anda menjalankan cell ini terpisah, Anda perlu memastikan 'model' sudah didefinisikan
# dari langkah klasterisasi sebelumnya.

# Tentukan jalur di filesystem lokal container JupyterLab Anda untuk menyimpan model
# Misalnya, di dalam folder /home/jovyan/work/models
local_model_path = "/home/jovyan/work/kmeans_cumi_model"

# --- Opsional: Hapus folder model lama jika sudah ada ---
# Ini penting karena Spark tidak akan menimpa folder secara default
# dan akan memberikan error jika folder sudah ada.
# os.path.exists adalah fungsi Python standar untuk memeriksa keberadaan file/folder.
if os.path.exists(local_model_path):
    print(f"Direktori model '{local_model_path}' sudah ada. Menghapus...")
    # Menggunakan shutil.rmtree untuk menghapus folder secara rekursif
    # Pastikan Anda mengimpor 'shutil' jika ingin menggunakan ini
    import shutil
    shutil.rmtree(local_model_path)
    print("Direktori model lama berhasil dihapus.")
else:
    print(f"Direktori model '{local_model_path}' belum ada. Akan dibuat.")

# --- Simpan Model K-Means ke Filesystem Lokal ---
model.save(local_model_path)
print(f"Model K-Means berhasil disimpan di lokal: {local_model_path}")

# Jangan lupa menghentikan SparkSession jika sudah selesai dengan semua operasi.
# spark.stop()

Direktori model '/home/jovyan/work/kmeans_cumi_model' belum ada. Akan dibuat.
Model K-Means berhasil disimpan di lokal: /home/jovyan/work/kmeans_cumi_model


In [26]:
# Ganti 'localhost:9000' dengan host dan port NameNode HDFS Anda yang sebenarnya.
# Jika Anda menggunakan Docker Compose, kemungkinan ini adalah nama service HDFS (misal: namenode:9000).
HDFS_NAMENODE_URI="hdfs://localhost:9000"

# Jalur lokal di dalam container JupyterLab
LOCAL_SOURCE_PATH="/home/jovyan/work/output_cumi_parquet"

# Jalur tujuan di HDFS
HDFS_DEST_PATH="/output" # Nama folder tujuan di HDFS

# Perintah untuk mengunggah folder
!hdfs dfs -put -f "$LOCAL_SOURCE_PATH" "$HDFS_DEST_PATH"
# -f : Opsional, untuk menimpa file/folder tujuan jika sudah ada. Hati-hati menggunakannya!

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/hadoop-3.4.1/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/apache-tez-0.10.4-bin/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
put: `/home/jovyan/work/output_cumi_parquet': No such file or directory
