In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, ArrayType
from pyspark.ml import PipelineModel
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.feature import IndexToString
from classes.TextPreprocessing import TextPreprocessing
from classes.NewsStreaming import NewsStreaming

In [2]:
spark = SparkSession.builder \
    .appName("KafkaIntegration") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.4,org.mongodb.spark:mongo-spark-connector_2.13:10.2.0") \
    .getOrCreate()  



25/04/17 22:32:17 WARN Utils: Your hostname, Macpad5Pro. resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/04/17 22:32:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/hduser/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/student/.ivy2/cache
The jars for the packages stored in: /home/student/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b5bef409-26f4-4132-854b-762f2bb0bdb4;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;3.5.4 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.5.4 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.scala-lang.modules#scala-parallel-collec

In [3]:
preprocessing_path = "/user/student/preprocessingPipeline"
preprocess = PipelineModel.load(preprocessing_path)

                                                                                

In [4]:
model_path = "/user/student/naive_bayes"
cv_model = CrossValidatorModel.load(model_path)
model = cv_model.bestModel


In [5]:
kafka_broker = "localhost:9092"  # Change if necessary
kafka_topic = "nation_news"
hdfs_output_path = "/user/student/sentiment_results"
schema = StructType() \
    .add("title", StringType()) \
    .add("link", StringType()) \
    .add("summary", StringType()) \
    .add("publish_time", StringType()) \
    .add("section", StringType()) \
    .add("sub_section", StringType()) \
    .add("keywords", ArrayType(StringType())) \
    .add("location", StringType()) \
    .add("body", StringType()) 

In [6]:
streamer = NewsStreaming()

In [7]:
kafka_data = streamer.read_from_kafka(spark, kafka_broker, kafka_topic)

In [8]:
df = streamer.process_stream(kafka_data, schema)

In [9]:
tp = TextPreprocessing()

In [10]:
df = tp.remove_duplicate(df)

In [11]:
df = tp.lowercase(df, "summary")

In [12]:
df = tp.remove_special(df, "summary")

In [13]:
df = preprocess.transform(df)

25/04/17 22:32:37 WARN StringIndexerModel: Input column sentiment does not exist during transformation. Skip StringIndexerModel for this column.


In [14]:
df_result = streamer.analyse_stream(df, model)

In [15]:
label_indexer_model = preprocess.stages[0]  # Assuming it's the first stage
labels = label_indexer_model.labels

label_reverse = IndexToString(inputCol="prediction", outputCol="predicted_label", labels=labels)
df_result = label_reverse.transform(df_result)


In [16]:
query = streamer.write_output_console(df_result)
query.awaitTermination(60)

if query or query.isActive:
    print("Query is still running after timeout, stopping it now")
    query.stop()
    print("Query stopped")
else:
    print("Query terminated within the timeout period")

25/04/17 22:32:38 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-89341763-067b-467f-bf95-17824ccfa607. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/17 22:32:38 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/17 22:32:39 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/04/17 22:32:43 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+--------------------+--------------------+-------------------+-------+-----------+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+---------------+
|               title|                link|             summary|       publish_time|section|sub_section|keywords|          location|                body|               words|      filtered_words|        raw_features|            features|       rawPrediction|         probability|prediction|predicted_label|
+--------------------+--------------------+--------------------+-------------------+-------+-----------+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------

## Extra

In [17]:
news_by_section = streamer.filter_by_sub_section(df_result, "Aseanplus")
query = streamer.write_output_console(news_by_section)
query.awaitTermination(60)


25/04/17 22:33:49 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-396698c0-8b22-4c96-8245-d8d724ae1d6b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/17 22:33:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/17 22:33:49 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/04/17 22:33:50 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+----+-------+------------+-------+-----------+--------+--------+----+-----+--------------+------------+--------+-------------+-----------+----------+---------------+
|title|link|summary|publish_time|section|sub_section|keywords|location|body|words|filtered_words|raw_features|features|rawPrediction|probability|prediction|predicted_label|
+-----+----+-------+------------+-------+-----------+--------+--------+----+-----+--------------+------------+--------+-------------+-----------+----------+---------------+
+-----+----+-------+------------+-------+-----------+--------+--------+----+-----+--------------+------------+--------+-------------+-----------+----------+---------------+



False

In [18]:
positive_news = streamer.filter_by_sentiment(df_result, "positive")
query = streamer.write_output_console(positive_news)
query.awaitTermination(60)


25/04/17 22:34:49 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6e136d96-0bdb-4e4b-8334-0e6df1ad287e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/17 22:34:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/17 22:34:49 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/04/17 22:34:50 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+--------------------+--------------------+-------------------+-------+-----------+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+---------------+
|               title|                link|             summary|       publish_time|section|sub_section|keywords|          location|                body|               words|      filtered_words|        raw_features|            features|       rawPrediction|         probability|prediction|predicted_label|
+--------------------+--------------------+--------------------+-------------------+-------+-----------+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------

False

In [19]:
date_filtered_news = streamer.filter_by_date(
    df_result,
    "2025-03-01 00:00:00",
    "2025-03-31 23:59:59"
)
query = streamer.write_output_console(date_filtered_news)
query.awaitTermination(60)


25/04/17 22:35:49 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-fe9f3c12-9cf8-42db-a9d4-5effb7ada127. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/17 22:35:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/17 22:35:49 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/04/17 22:35:50 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+----+-------+------------+-------+-----------+--------+--------+----+-----+--------------+------------+--------+-------------+-----------+----------+---------------+
|title|link|summary|publish_time|section|sub_section|keywords|location|body|words|filtered_words|raw_features|features|rawPrediction|probability|prediction|predicted_label|
+-----+----+-------+------------+-------+-----------+--------+--------+----+-----+--------------+------------+--------+-------------+-----------+----------+---------------+
+-----+----+-------+------------+-------+-----------+--------+--------+----+-----+--------------+------------+--------+-------------+-----------+----------+---------------+



False

In [20]:
selected = streamer.select_columns(df_result, ["title", "location", "prediction"])
query = streamer.write_output_console(selected)
query.awaitTermination(60)

25/04/17 22:36:49 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-40bf84f5-c654-4b5c-9f41-214340211ea4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/17 22:36:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/17 22:36:49 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/04/17 22:36:50 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+------------------+----------+
|               title|          location|prediction|
+--------------------+------------------+----------+
|Covid-19: Four ne...|     PETALING JAYA|       0.0|
|Over 3,400 arrest...|            MELAKA|       1.0|
|JPJ to conduct Op...|         PUTRAJAYA|       2.0|
|Delay in ratifyin...|      KUALA LUMPUR|       1.0|
|Melaka Polls: Peo...|            MELAKA|       0.0|
|Viral message war...|        JOHOR BARU|       1.0|
|Masidi: Fleet of ...|     KOTA KINABALU|       0.0|
|Customs foil atte...|        PORT KLANG|       1.0|
|Scheduled water c...|Location not found|       0.0|
|Singapore to tigh...|Location not found|       2.0|
|Melaka polls: Per...|            MELAKA|       1.0|
|Kota Baru 'Gold r...|Location not found|       1.0|
|Road rage: Fight ...|      KUALA LUMPUR|       1.0|
|Covid-19: Sabah r...|         PUTRAJAYA|       0.0|
|O

False

In [21]:
location_agg = streamer.aggregate_by_location(df_result)
query = streamer.write_output_console(location_agg, output_mode = "complete")
query.awaitTermination(60)

25/04/17 22:37:49 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c59c6503-7519-4113-94e6-eac5cc1a976e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/17 22:37:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/17 22:37:49 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------+----------+
|      location|news_count|
+--------------+----------+
|          IPOH|        12|
|          SIBU|         1|
|   BUTTERWORTH|         1|
|        KAJANG|         1|
|       BEIJING|         1|
|        LABUAN|         1|
|  SUNGAI SIPUT|         1|
|         MARAN|         1|
|         KLANG|         4|
|BUKIT MERTAJAM|         2|
|     PUTRAJAYA|        27|
|   KOTA TINGGI|         4|
|    BATU PAHAT|         2|
|         KULIM|         1|
|      SEREMBAN|         2|
|    JOHOR BARU|        18|
|         KAPAR|         1|
|  PORT DICKSON|         1|
|        RAWANG|         1|
|     SHAH ALAM|         2|
+--------------+----------+
only showing top 20 rows



False

In [22]:
section_agg = streamer.aggregate_by_section(df_result)
query = streamer.write_output_console(section_agg, output_mode = "complete")
query.awaitTermination(60)

25/04/17 22:38:49 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-fac72768-7b31-4300-80cf-7d6ee828e57b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/17 22:38:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/17 22:38:49 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+----------+
|section|news_count|
+-------+----------+
|   News|       400|
+-------+----------+



False

In [23]:
from pyspark.sql.functions import to_timestamp

# Convert publish_time to timestamp for windowing
structured_df = df_result.withColumn("publish_time", to_timestamp("publish_time"))

sentiment_windowed = streamer.windowed_sentiment_count(
    structured_df,
    window_duration="10 minutes",
    slide_duration="5 minutes"
)

query = streamer.write_output_console(sentiment_windowed)
query.awaitTermination(60)

25/04/17 22:39:49 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-73b25398-416c-406b-a277-a6304a817aa6. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/17 22:39:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/17 22:39:49 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/04/17 22:39:51 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/04/17 22:39:56 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+---------------+-----+
|window|predicted_label|count|
+------+---------------+-----+
+------+---------------+-----+



25/04/17 22:40:02 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
25/04/17 22:40:06 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+---------------+-----+
|              window|predicted_label|count|
+--------------------+---------------+-----+
|{2020-12-01 21:05...|       positive|    1|
|{2020-11-27 19:40...|       positive|    2|
|{2021-11-28 22:20...|       positive|    2|
|{2020-11-14 18:55...|       positive|    1|
|{2020-11-16 21:50...|       positive|    1|
|{2020-11-30 21:00...|       positive|    1|
|{2021-11-16 20:50...|       positive|    1|
|{2020-11-24 19:45...|       positive|    1|
|{2020-11-14 18:20...|       negative|    1|
|{2021-11-29 22:25...|       positive|    1|
|{2021-11-12 23:05...|       positive|    1|
|{2020-11-15 21:30...|       positive|    1|
|{2020-11-17 20:50...|       positive|    1|
|{2021-11-14 21:10...|       positive|    1|
|{2020-12-01 22:40...|       positive|    1|
|{2021-11-16 20:15...|       positive|    1|
|{2021-11-24 19:30...|       positive|    1|
|{2

False