In [1]:
import time
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc, when, count as spark_count, concat_ws, length, rand, row_number
from pyspark.sql.window import Window
import pandas as pd

def measure_time(func, *args, **kwargs):
    start = time.perf_counter()
    result = func(*args, **kwargs)
    end = time.perf_counter()
    return result, end - start

measure_time

<function __main__.measure_time(func, *args, **kwargs)>

In [2]:
spark = SparkSession.builder \
    .appName("RAID-TRAIN") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

spark

In [3]:
df = spark.read.json("hdfs://namenode:8020/user/raid/raw")
df

DataFrame[adv_source_id: string, attack: string, decoding: string, domain: string, generation: string, id: string, model: string, prompt: string, repetition_penalty: string, source_id: string, title: string]

# Total Baris

In [4]:
print("Menghitung total jumlah baris...")
total_rows, time_0 = measure_time(lambda: df.count())
print(f"✅ Total baris: {total_rows:,} | Waktu: {time_0:.4f} detik")

Menghitung total jumlah baris...
✅ Total baris: 5,615,820 | Waktu: 411.4781 detik


# Filter

In [5]:
print("Memfilter data: length(generation) > 50...")
df_filtered, filter_time = measure_time(
    lambda: df.filter(length(col("generation")) > 50)
)

print(f"✅ Filter selesai dalam {filter_time:.2f} detik")

Memfilter data: length(generation) > 50...
✅ Filter selesai dalam 0.24 detik


In [6]:
print("Menghitung total jumlah baris setelah filter...")
total_rows, time_1 = measure_time(lambda: df_filtered.count())
print(f"✅ Total baris: {total_rows:,} | Waktu: {time_1:.4f} detik")

Menghitung total jumlah baris setelah filter...
✅ Total baris: 5,610,609 | Waktu: 376.3021 detik


In [7]:
df_filtered = df_filtered.repartition(32)
df_filtered

DataFrame[adv_source_id: string, attack: string, decoding: string, domain: string, generation: string, id: string, model: string, prompt: string, repetition_penalty: string, source_id: string, title: string]

# Split

In [8]:
print(f"[1] Total jumlah baris setelah filter...")
print(f"✅ Total baris: {total_rows:,} | Waktu: {time_1:.4f} detik")

[1] Total jumlah baris setelah filter...
✅ Total baris: 5,610,609 | Waktu: 376.3021 detik


In [9]:
print("[2] Membuat kolom stratifikasi 'model_domain_attack' ...")
df_with_strata, strat_time = measure_time(
    lambda: df_filtered.withColumn(
        "model_domain_attack",
        concat_ws("_", col("model"), col("domain"), col("attack"))
    )
)
print(f"✅ Kolom stratifikasi dibuat dalam {strat_time:.4f} detik")

[2] Membuat kolom stratifikasi 'model_domain_attack' ...
✅ Kolom stratifikasi dibuat dalam 0.0438 detik


In [10]:
df_with_strata = df_with_strata.repartition(32)
df_with_strata

DataFrame[adv_source_id: string, attack: string, decoding: string, domain: string, generation: string, id: string, model: string, prompt: string, repetition_penalty: string, source_id: string, title: string, model_domain_attack: string]

In [11]:
print("[3] Menghitung jumlah baris per 'model_domain_attack' ...")
df_counts, count_time = measure_time(
    lambda: df_with_strata.groupBy("model_domain_attack").agg(spark_count("*").alias("total_per_group"))
)
print(f"✅ Selesai dalam {count_time:.4f} detik")

[3] Menghitung jumlah baris per 'model_domain_attack' ...
✅ Selesai dalam 0.0352 detik


In [12]:
print("[4] Gabungkan count ke setiap baris")
df_joined, join_time = measure_time(
    lambda: df_with_strata.join(df_counts, on="model_domain_attack", how="inner")
)
print(f"✅ Data berhasil digabung dengan count per grup dalam {join_time:.4f} detik")

[4] Gabungkan count ke setiap baris
✅ Data berhasil digabung dengan count per grup dalam 0.1456 detik


In [13]:
print("[5] Memberi nomor urut acak dalam setiap grup ...")
window_spec = Window.partitionBy("model_domain_attack").orderBy(rand())
df_numbered, number_time = measure_time(
    lambda: df_joined.withColumn("row_num", row_number().over(window_spec))
)
print(f"✅ Penomoran selesai dalam {number_time:.4f} detik")

[5] Memberi nomor urut acak dalam setiap grup ...
✅ Penomoran selesai dalam 0.0621 detik


In [14]:
# [6] Tentukan batas 70% → masuk train jika row_num <= 0.7 * total_per_group
print("[6] Menentukan split berdasarkan 70% per grup ...")
df_with_split, split_time = measure_time(
    lambda: df_numbered.withColumn(
        "is_train",
        col("row_num") <= (col("total_per_group") * 0.7)
    )
)
print(f"✅ Split logic selesai dalam {split_time:.4f} detik")

[6] Menentukan split berdasarkan 70% per grup ...
✅ Split logic selesai dalam 0.0740 detik


In [15]:
# [7] Pisahkan train dan test
print("[7] Memisahkan train dan test ...")
train_df = df_with_split.filter(col("is_train")).drop("row_num", "total_per_group", "is_train")
test_df = df_with_split.filter(~col("is_train")).drop("row_num", "total_per_group", "is_train")

train_count, train_time = measure_time(lambda: train_df.count())
test_count, test_time = measure_time(lambda: test_df.count())

print(f"✅ Train: {train_count:,} baris (dalam {train_time:.4f} detik)")
print(f"✅ Test:  {test_count:,} baris (dalam {test_time:.4f} detik)")

[7] Memisahkan train dan test ...
✅ Train: 3,926,925 baris (dalam 235.9291 detik)
✅ Test:  1,683,684 baris (dalam 218.4977 detik)


In [16]:
# [8] Hitung distribusi di train
print("[8] Menghitung distribusi 'model_domain_attack' di train...")
train_dist = train_df.groupBy("model_domain_attack").count().withColumnRenamed("count", "train_count")

# [9] Hitung distribusi di test
print("[9] Menghitung distribusi 'model_domain_attack' di test...")
test_dist = test_df.groupBy("model_domain_attack").count().withColumnRenamed("count", "test_count")

# [10] Gabungkan train dan test berdasarkan label
print("[10] Menggabungkan distribusi train dan test...")
from pyspark.sql.functions import coalesce, lit, round as spark_round

# Hitung total keseluruhan untuk persentase
total_train = train_count
total_test = test_count

# Join full outer agar semua label muncul (meski hanya di train atau test)
combined = train_dist.join(test_dist, on="model_domain_attack", how="full_outer") \
    .fillna(0, subset=["train_count", "test_count"])

# Urutkan berdasarkan total frekuensi (opsional)
combined_sorted = combined.orderBy(col("train_count").desc())

# Ambil sebagai Pandas DataFrame untuk tampilan
print("[11] Mengambil hasil untuk ditampilkan...")
result_pd, fetch_time = measure_time(
    lambda: combined_sorted.toPandas()
)
print(f"✅ Berhasil mengambil {len(result_pd):,} kelas dalam {fetch_time:.4f} detik")

[8] Menghitung distribusi 'model_domain_attack' di train...
[9] Menghitung distribusi 'model_domain_attack' di test...
[10] Menggabungkan distribusi train dan test...
[11] Mengambil hasil untuk ditampilkan...
✅ Berhasil mengambil 1,152 kelas dalam 120.3367 detik


In [17]:
result_pd

Unnamed: 0,model_domain_attack,train_count,test_count
0,llama-chat_books_alternative_spelling,4986,2138
1,llama-chat_books_synonym,4986,2138
2,mistral-chat_books_insert_paragraphs,4986,2138
3,mpt-chat_books_whitespace,4986,2138
4,llama-chat_books_none,4986,2138
...,...,...,...
1147,human_reviews_alternative_spelling,660,283
1148,human_reviews_zero_width_space,660,283
1149,human_reviews_number,660,283
1150,human_reviews_upper_lower,660,283


# Train

In [18]:
from pyspark.ml.feature import StringIndexer, RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import FloatType

train_df, test_df

(DataFrame[model_domain_attack: string, adv_source_id: string, attack: string, decoding: string, domain: string, generation: string, id: string, model: string, prompt: string, repetition_penalty: string, source_id: string, title: string],
 DataFrame[model_domain_attack: string, adv_source_id: string, attack: string, decoding: string, domain: string, generation: string, id: string, model: string, prompt: string, repetition_penalty: string, source_id: string, title: string])

In [19]:
# --- 1. Siapkan hanya kolom teks (label akan diindex) ---
print("[12] Menyiapkan kolom 'text' dan 'label_string'...")
train_input = train_df.select(
    col("generation").alias("text"),
    col("model").alias("label_string")
)
test_input = test_df.select(
    col("generation").alias("text"),
    col("model").alias("label_string")
)

train_input = train_input.repartition(128)
test_input = test_input.repartition(128)

train_input, test_input

[12] Menyiapkan kolom 'text' dan 'label_string'...


(DataFrame[text: string, label_string: string],
 DataFrame[text: string, label_string: string])

In [20]:
# --- 2. Bangun pipeline dengan StringIndexer ---
print("[13] Membangun pipeline: StringIndexer → TF-IDF → LogisticRegression...")

# StringIndexer: string label → numeric (0, 1, 2, ...)
string_indexer = StringIndexer(
    inputCol="label_string",
    outputCol="label",
    handleInvalid="error"  # atau "skip" jika ada nilai tidak valid
)

# Tokenizer
tokenizer = RegexTokenizer(
    inputCol="text",
    outputCol="words",
    pattern="\\W"
)

# Stopwords remover
stopwords_remover = StopWordsRemover(
    inputCol="words",
    outputCol="filtered"
)

# TF-IDF
hashing_tf = HashingTF(
    inputCol="filtered",
    outputCol="raw_features",
    numFeatures=1000
)

idf = IDF(
    inputCol="raw_features",
    outputCol="features"
)

# Logistic Regression (multinomial)
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    family="multinomial",
    regParam=0.1,
    maxIter=50
)

# Pipeline lengkap
pipeline = Pipeline(stages=[
    string_indexer,
    tokenizer,
    stopwords_remover,
    hashing_tf,
    idf,
    lr
])

pipeline

[13] Membangun pipeline: StringIndexer → TF-IDF → LogisticRegression...


Pipeline_be5b34b2eff5

In [21]:
# --- 3. Latih model ---
print("[14] Melatih model pada train_input...")
model, train_model_time = measure_time(
    lambda: pipeline.fit(train_input)
)
print(f"✅ Model berhasil dilatih dalam {train_model_time:.4f} detik")

[14] Melatih model pada train_input...


Py4JJavaError: An error occurred while calling o163.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 143.0 failed 4 times, most recent failure: Lost task 4.3 in stage 143.0 (TID 2534) (172.18.0.8 executor 1): org.apache.spark.memory.SparkOutOfMemoryError: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 16384 bytes of memory, got 0.
	at org.apache.spark.errors.SparkCoreErrors$.outOfMemoryError(SparkCoreErrors.scala:467)
	at org.apache.spark.errors.SparkCoreErrors.outOfMemoryError(SparkCoreErrors.scala)
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:384)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:467)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:487)
	at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.$anonfun$add$2(ExternalAppendOnlyUnsafeRowArray.scala:149)
	at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.$anonfun$add$2$adapted(ExternalAppendOnlyUnsafeRowArray.scala:143)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(ExternalAppendOnlyUnsafeRowArray.scala:143)
	at org.apache.spark.sql.execution.window.WindowExec$$anon$1.fetchNextPartition(WindowExec.scala:147)
	at org.apache.spark.sql.execution.window.WindowExec$$anon$1.next(WindowExec.scala:180)
	at org.apache.spark.sql.execution.window.WindowExec$$anon$1.next(WindowExec.scala:107)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$10(ShuffleExchangeExec.scala:375)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 16384 bytes of memory, got 0.
	at org.apache.spark.errors.SparkCoreErrors$.outOfMemoryError(SparkCoreErrors.scala:467)
	at org.apache.spark.errors.SparkCoreErrors.outOfMemoryError(SparkCoreErrors.scala)
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:384)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:467)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:487)
	at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.$anonfun$add$2(ExternalAppendOnlyUnsafeRowArray.scala:149)
	at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.$anonfun$add$2$adapted(ExternalAppendOnlyUnsafeRowArray.scala:143)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(ExternalAppendOnlyUnsafeRowArray.scala:143)
	at org.apache.spark.sql.execution.window.WindowExec$$anon$1.fetchNextPartition(WindowExec.scala:147)
	at org.apache.spark.sql.execution.window.WindowExec$$anon$1.next(WindowExec.scala:180)
	at org.apache.spark.sql.execution.window.WindowExec$$anon$1.next(WindowExec.scala:107)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$10(ShuffleExchangeExec.scala:375)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
