In [415]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower
from pyspark.ml.feature import StringIndexer
import nltk
from nltk.corpus import stopwords
from pyspark.sql.functions import when, col, expr, regexp_replace, explode
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
import pandas as pd
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.sql import Window


In [416]:
spark = SparkSession.builder.master("local[*]").appName("SentimentAnalyzeTwitter").getOrCreate()

In [417]:
df_train_path = "hdfs://localhost:9000/project-uts/dataset_penyisihan_bdc_2024.csv"
df_test_path = "hdfs://localhost:9000/project-uts/zaken.csv"



In [418]:
df_train = spark.read.csv(df_train_path, header=True, sep=';')
df_test = spark.read.csv(df_test_path, header=True)

# Training

## Data Preprocessing

In [419]:
# Lowercase
df_train = df_train.withColumn("text", lower(df_train["text"]))
df_train = df_train.withColumn("label", lower(df_train["label"]))

In [420]:
# Label Encoding
valid_labels = ["politik", "sosial budaya", "pertahanan dan keamanan", "ideologi", "ekonomi", "sumber daya alam", "demografi", "geografi"]

df_train = df_train.filter(df_train.label.isin(valid_labels))

indexer = StringIndexer(inputCol="label", outputCol="label_indexed")
indexer_model = indexer.fit(df_train)
df_train = indexer_model.transform(df_train)
labels_mapping = dict(zip(indexer_model.labels, range(len(indexer_model.labels))))
print("Mapping label to label_indexed:", labels_mapping)
reverse_labels_mapping = {v: k for k, v in labels_mapping.items()}
print("Mapping prediction to label:", reverse_labels_mapping)
df_train = df_train.drop("label").withColumnRenamed("label_indexed", "label")


Mapping label to label_indexed: {'politik': 0, 'sosial budaya': 1, 'pertahanan dan keamanan': 2, 'ideologi': 3, 'ekonomi': 4, 'sumber daya alam': 5, 'demografi': 6, 'geografi': 7}
Mapping prediction to label: {0: 'politik', 1: 'sosial budaya', 2: 'pertahanan dan keamanan', 3: 'ideologi', 4: 'ekonomi', 5: 'sumber daya alam', 6: 'demografi', 7: 'geografi'}


In [421]:
# Check Retweet
df_train = df_train.withColumn("retweet", when(col("text").contains("rt"), True).otherwise(False))
df_train = df_train.withColumn("text", expr("trim(replace(text, 'rt', ''))"))

In [422]:
# Check Mention
df_train = df_train.withColumn("mention", when(col("text").contains("@"), True).otherwise(False))
df_train = df_train.withColumn("text", regexp_replace(col("text"), r'@\S+', '').alias("text"))
df_train = df_train.withColumn("text", expr("trim(text)"))


In [423]:
# Check Hashtag
df_train = df_train.withColumn("hashtag", when(col("text").contains("#"), True).otherwise(False))
df_train = df_train.withColumn("text", regexp_replace(col("text"), r'#\S+', '').alias("text"))
df_train = df_train.withColumn("text", expr("trim(text)"))

In [424]:
# Check URL
df_train = df_train.withColumn("url", when(col("text").rlike(r'https?://\S+'), True).otherwise(False))
df_train = df_train.withColumn("text", regexp_replace(col("text"), r'https?://\S+', '').alias("text"))
df_train = df_train.withColumn("text", expr("trim(text)"))

In [425]:
# Check Reply
df_train = df_train.withColumn("reply", when(col("text").rlike(r'\[.*?\]'), True).otherwise(False))
df_train = df_train.withColumn("text", regexp_replace(col("text"), r'\[.*?\]', ''))
df_train = df_train.withColumn("text", expr("trim(text)"))

In [426]:
# Check Candidate 01
df_train = df_train.withColumn("candidate01", when(col("text").rlike(r'\b(anies|baswedan|muhaimin|imin|iskandar|abah|amin|01|1)\b'), True).otherwise(False))
df_train = df_train.withColumn("text", regexp_replace(col("text"), r'\b(anies|baswedan|muhaimin|imin|iskandar|abah|amin|01|1)\b', '').alias("text"))
df_train = df_train.withColumn("text", expr("trim(text)"))

In [427]:
# Check Candidate 02
df_train = df_train.withColumn("candidate02", when(col("text").rlike(r'\b(prabowo|subianto|gibran|rakabuming|rakabumingraka|raka|pragib|prabowo-gibran|02|2)\b'), True).otherwise(False))
df_train = df_train.withColumn("text", regexp_replace(col("text"), r'\b(prabowo|subianto|gibran|rakabuming|rakabumingraka|raka|pragib|prabowo-gibran|02|2)\b', '').alias("text"))
df_train = df_train.withColumn("text", expr("trim(text)"))

In [428]:
# Check Candidate 03
df_train = df_train.withColumn("candidate03", when(col("text").rlike(r'\b(ganjar|pranowo|mahfud|md|prof|ganjar-pranowo|03|3)\b'), True).otherwise(False))
df_train = df_train.withColumn("text", regexp_replace(col("text"), r'\b(ganjar|pranowo|mahfud|md|prof|ganjar-pranowo|03|3)\b', '').alias("text"))
df_train = df_train.withColumn("text", expr("trim(text)"))

In [429]:
# Clean Text Whitespace
df_train = df_train.withColumn("text", regexp_replace(col("text"), r'[^\w\s]', ''))
df_train = df_train.withColumn("text", regexp_replace(col("text"), r'\s+', ' '))
df_train = df_train.withColumn("text", expr("trim(text)"))

In [430]:
# Clean Text Number
df_train = df_train.withColumn("text", regexp_replace(col("text"), r'\d+', ''))
df_train = df_train.withColumn("text", expr("trim(text)"))

In [431]:
# Clean Text Stopword
stopwords_english = stopwords.words('english')
stopwords_indo = stopwords.words('indonesian')
stopwords_custom = ['duh', 'ya', 'emang', 'emng', 'sih', 'mas', 'wait','yang', 'yg', 'dan', 'atau', 'saya', 'kami', 'aku', 'kamu','dia','sok', 'juga','jg','dn','dgn','dg', 'dengan', 'hanya', 'hny', 'hnya', 'saja', 'sj' ,'kalo', 'kl', 'sekarang', 'skrg', 'nih', 'ini']
all_stopwords = list(set(stopwords_english+stopwords_indo + stopwords_custom))
tokenizer = Tokenizer(inputCol="text", outputCol="words_token")
remover = StopWordsRemover(inputCol="words_token", outputCol="filtered_words", stopWords=all_stopwords)

In [432]:
# Tokenize
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=2000)
idf = IDF(inputCol="raw_features", outputCol="features")


In [433]:
# Model ML
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
pipeline = Pipeline(stages=[tokenizer, remover, hashing_tf, idf, lr])

In [434]:
lr_model = pipeline.fit(df_train)

In [435]:
predictions = lr_model.transform(df_train)

In [436]:
predictions.select("text", "label", "prediction").show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------+
|text                                                                                                                                                                                                                                                                                                                                                                                                                                                  |label|prediction|
+-----------------------------------------------------------------------------------

## Balance Accuracy

In [437]:
predictions = predictions.withColumn("is_correct", F.when(F.col("label") == F.col("prediction"), 1).otherwise(0))

class_metrics = predictions.groupBy("label").agg(
    F.sum("is_correct").alias("TP"),
    F.count("label").alias("total_samples")
)

class_metrics = class_metrics.withColumn("FN", F.col("total_samples") - F.col("TP"))

class_metrics = class_metrics.withColumn("recall", F.col("TP") / (F.col("TP") + F.col("FN")))

balanced_accuracy = class_metrics.select(F.mean("recall")).collect()[0][0]

print(f"Balanced Accuracy: {balanced_accuracy}")


Balanced Accuracy: 0.90323520839575


# Testing

In [438]:
df_test = df_test.drop('conversation_id_str').drop('created_at').drop('favorite_count').drop('id_str').drop('image_url').drop('in_reply_to_screen_name').drop('lang').drop('location').drop('quote_count').drop('reply_count').drop('retweet_count').drop('tweet_url').drop('user_id_str').drop('username')
df_test = df_test.withColumnRenamed('full_text','text')

## Data Preprocessing

In [439]:
# Lowercase
df_test = df_test.withColumn("text", lower(df_test["text"]))

In [440]:
# Check Retweet
df_test = df_test.withColumn("retweet", when(col("text").contains("rt"), True).otherwise(False))
df_test = df_test.withColumn("text", expr("trim(replace(text, 'rt', ''))"))

In [441]:
# Check Mention
df_test = df_test.withColumn("mention", when(col("text").contains("@"), True).otherwise(False))
df_test = df_test.withColumn("text", regexp_replace(col("text"), r'@\S+', '').alias("text"))
df_test = df_test.withColumn("text", expr("trim(text)"))


In [442]:
# Check Hastag
df_test = df_test.withColumn("hashtag", when(col("text").contains("#"), True).otherwise(False))
df_test = df_test.withColumn("text", regexp_replace(col("text"), r'#\S+', '').alias("text"))
df_test = df_test.withColumn("text", expr("trim(text)"))

In [443]:
# Check URL
df_test = df_test.withColumn("url", when(col("text").rlike(r'https?://\S+'), True).otherwise(False))
df_test = df_test.withColumn("text", regexp_replace(col("text"), r'https?://\S+', '').alias("text"))
df_test = df_test.withColumn("text", expr("trim(text)"))

In [444]:
# Check Reoly
df_test = df_test.withColumn("reply", when(col("text").rlike(r'\[.*?\]'), True).otherwise(False))
df_test = df_test.withColumn("text", regexp_replace(col("text"), r'\[.*?\]', ''))
df_test = df_test.withColumn("text", expr("trim(text)"))

In [445]:
# Check Candidate01
df_test = df_test.withColumn("candidate01", when(col("text").rlike(r'\b(anies|baswedan|muhaimin|imin|iskandar|abah|amin|01|1)\b'), True).otherwise(False))
df_test = df_test.withColumn("text", regexp_replace(col("text"), r'\b(anies|baswedan|muhaimin|imin|iskandar|abah|amin|01|1)\b', '').alias("text"))
df_test = df_test.withColumn("text", expr("trim(text)"))

In [446]:
# Check Candidate02
df_test = df_test.withColumn("candidate02", when(col("text").rlike(r'\b(prabowo|subianto|gibran|rakabuming|rakabumingraka|raka|pragib|prabowo-gibran|02|2)\b'), True).otherwise(False))
df_test = df_test.withColumn("text", regexp_replace(col("text"), r'\b(prabowo|subianto|gibran|rakabuming|rakabumingraka|raka|pragib|prabowo-gibran|02|2)\b', '').alias("text"))
df_test = df_test.withColumn("text", expr("trim(text)"))

In [447]:
# Check Candidate03
df_test = df_test.withColumn("candidate03", when(col("text").rlike(r'\b(ganjar|pranowo|mahfud|md|prof|ganjar-pranowo|03|3)\b'), True).otherwise(False))
df_test = df_test.withColumn("text", regexp_replace(col("text"), r'\b(ganjar|pranowo|mahfud|md|prof|ganjar-pranowo|03|3)\b', '').alias("text"))
df_test = df_test.withColumn("text", expr("trim(text)"))

In [448]:
# Clean Text White Space
df_test = df_test.withColumn("text", regexp_replace(col("text"), r'[^\w\s]', ''))
df_test = df_test.withColumn("text", regexp_replace(col("text"), r'\s+', ' '))
df_test = df_test.withColumn("text", expr("trim(text)"))

In [449]:
# Clean Text Number
df_test = df_test.withColumn("text", regexp_replace(col("text"), r'\d+', ''))
df_test = df_test.withColumn("text", expr("trim(text)"))

In [450]:
predictions_new = lr_model.transform(df_test)

In [451]:
predictions_new = predictions_new.drop('retweet').drop('mention').drop('hashtag').drop('url').drop('reply').drop('candidate01').drop('candidate02').drop('candidate03').drop('words_token').drop('filtered_words').drop('raw_features').drop('rawPrediction').drop('probability').drop("features")

In [452]:
# Reverse Label Encoding
predictions_new = predictions_new.withColumn(
    "label", 
    when(col("prediction") == 0.0, "politik")
    .when(col("prediction") == 1.0, "sosial budaya")
    .when(col("prediction") == 2.0, "pertahanan dan keamanan")
    .when(col("prediction") == 3.0, "ideologi")
    .when(col("prediction") == 4.0, "ekonomi")
    .when(col("prediction") == 5.0, "sumber daya alam")
    .when(col("prediction") == 6.0, "demografi")
    .when(col("prediction") == 7.0, "geografi")
)

predictions_new = predictions_new.drop('prediction')
predictions_new.show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+
|text                                                                                                                                                                                                                                                                         |label           |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+
|zaken itu apaan                                                                                                                     

In [453]:
label_counts = predictions_new.groupBy("label").agg(F.count("label").alias("count"))
label_counts.show(truncate=False)

+-----------------------+-----+
|label                  |count|
+-----------------------+-----+
|sumber daya alam       |14   |
|ideologi               |20   |
|ekonomi                |16   |
|sosial budaya          |13   |
|pertahanan dan keamanan|9    |
|politik                |357  |
+-----------------------+-----+



# Saving Output

In [454]:
csv_output_path = "./output"
predictions_new.coalesce(1).write.option("header", "true").csv(csv_output_path)

In [455]:
spark.stop()