Author: Edwina Hon Kai Xin

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import PipelineModel

from sparkstreaming.pipeline_trainer import PipelineTrainer
from sparkstreaming.kafka_reader import KafkaTweetReader
from sparkstreaming.hdfs_writer import HDFSWriter

import requests
import json
import re  # Import the regular expressions module
import csv
import pandas as pd
import datetime
import time
import uuid
from kafka import KafkaConsumer
from kafka import KafkaProducer

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType
from pyspark.ml.tuning import CrossValidatorModel
from classes.TextPreprocessor import TextPreprocessor
from pyspark.sql.utils import StreamingQueryException
import logging
from pyspark.sql.types import StructType, StringType


## Create kafka topics and sent tweets data to kafka (Task 1)

In [7]:
# 1. In pwrshell, to dlt topics
# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic PoliticsNewsTopic
# AFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic BusinessNewsTopic
# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic SportsNewsTopic
# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic EntertainmentNewsTopic
# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic MalaysiaNewsTopic

# # 2. Clear output data
# # hdfs dfs -rm -r /user/student/sentiments_data_json_1
# # Remove checkpoint data
# #hdfs dfs -rm -r /user/student/checkpoints/sentiments_data_json_new_1


# # In pwrshell, create topics

# for topic in BusinessNewsTopic EntertainmentNewsTopic MalaysiaNewsTopic PoliticsNewsTopic SportsNewsTopic
# do
#   echo "Creating topic: $topic"
#   bin/kafka-topics.sh --create \
#     --topic $topic \
#     --bootstrap-server localhost:9092 \
#     --partitions 1 \
#     --replication-factor 1
# done


In [8]:
# Define CATEGORY_KEYWORDS dictionary
CATEGORY_KEYWORDS = {
    'PoliticsNewsTopic': [
        'parliament', 'minister', 'government', 'election', 'policy', 'vote', 
        'cabinet', 'PM', 'democracy', 'corruption', 'political', 'politician',
        'law', 'bill', 'constitution', 'amendment', 'opposition', 'campaign',
        'UMNO', 'PAS', 'PKR', 'DAP', 'Bersatu', 'Pakatan', 'Barisan', 'budget'
    ],
    'BusinessNewsTopic': [
        'economy', 'market', 'stock', 'investment', 'company', 'business', 
        'trade', 'finance', 'bank', 'ringgit', 'profit', 'revenue', 'CEO',
        'entrepreneur', 'startup', 'commerce', 'industry', 'economic', 
        'inflation', 'recession', 'growth', 'GST', 'tax', 'BURSA', 'FDI'
    ],
    'SportsNewsTopic': [
        'football', 'badminton', 'hockey', 'athlete', 'tournament', 'championship',
        'league', 'match', 'player', 'coach', 'team', 'sport', 'medal', 'win',
        'game', 'score', 'FIFA', 'Olympic', 'Petronas', 'stadium', 'final',
        'competition', 'record', 'JDT', 'Selangor', 'Perak', 'Malaysia Super League'
    ],
    'EntertainmentNewsTopic': [
        'movie', 'music', 'concert', 'celebrity', 'actor', 'actress', 'film',
        'entertainment', 'drama', 'show', 'artist', 'singer', 'star', 'TV',
        'Netflix', 'performance', 'premiere', 'award', 'festival', 'viral',
        'album', 'song', 'talent', 'meme', 'trending', 'Astro', 'Media Prima'
    ]
}

# Load your CSV data
df = pd.read_csv('tweets_output_with_sentiment.csv')

# Initialize Kafka producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Define function to categorize tweets based on keywords
def categorize_tweet(tweet_text):
    # Ensure tweet_text is a string
    if isinstance(tweet_text, str):  # Check if tweet_text is a string
        tweet_text = tweet_text.lower()  # Make the text case-insensitive
        for category, keywords in CATEGORY_KEYWORDS.items():
            if any(keyword in tweet_text for keyword in keywords):  # Check if any keyword matches
                return category
    return 'MalaysiaNewsTopic'  # Default topic if no category is found or invalid tweet

# Iterate and send data to the appropriate Kafka topic
for index, row in df.iterrows():
    tweet_data = {
        'user_id': row['User ID'],
        'name': row['Name'],
        'followers_count': row['Followers Count'],
        'tweet_text': row['Tweet'],
        'Location': row['Location'], 
        'Time': row['Tweet Time'], 
        'Friends Count': row['Friends Count'], 
        'sentiment': row['Sentiment']
    }

    # Categorize the tweet into the correct topic
    topic = categorize_tweet(row['Tweet'])

    # Send each row to the appropriate Kafka topic
    producer.send(topic, value=tweet_data)

    # Print confirmation (optional)
    print(f"Sent tweet from {row['Name']} to {topic}")

# Close the producer
producer.flush()
producer.close()
print("All tweets have been sent to Kafka")


Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to PoliticsNewsTopic
Sent tweet from The Star to SportsNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to BusinessNewsTopic
Sent tweet from The Star to BusinessNewsTopic
Sent tweet from The Star to BusinessNewsTopic
Sent tweet from The Star to EntertainmentNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to PoliticsNewsTopic
Sent tweet from The Star to BusinessNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to MalaysiaNewsTopic
Sent tweet from The Star to Pol

In [2]:
tweet_data

NameError: name 'tweet_data' is not defined

In [3]:
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("SentimentAnalysisStream")

# Start Here

## Initialize Spark Session

In [4]:
# 1. Initialize Spark Session
spark = SparkSession.builder \
    .appName("Real-Time Sentiment Analysis") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.1") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

25/04/15 02:03:00 WARN Utils: Your hostname, Ck. resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/04/15 02:03:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/hduser/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/student/.ivy2/cache
The jars for the packages stored in: /home/student/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cce3fdb2-12a6-4e21-8e87-a25d8986deb0;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 in central
	found org.apache.commons#commons-pool2

## Apply preprocessing pipeline

In [5]:
try:
    # 2. Fit preprocessing pipeline on static data
    trainer = PipelineTrainer(spark)
    pipeline_model = trainer.train_pipeline()

except StreamingQueryException as e:
    logger.error("Streaming Query Exception: %s", e)
except Exception as e:
    logger.error("General Exception: %s", e)

                                                                                

## Define kafka tweet schema

In [6]:
try:
    # 3. Define Kafka tweet schema
    tweet_schema = StructType() \
    .add("user_id", StringType()) \
    .add("name", StringType()) \
    .add("followers_count", StringType()) \
    .add("tweet_text", StringType()) \
    .add("sentiment", StringType()) \
    .add("Location", StringType()) \
    .add("Time", StringType())

except StreamingQueryException as e:
    logger.error("Streaming Query Exception: %s", e)
except Exception as e:
    logger.error("General Exception: %s", e)

In [7]:
tweet_schema

StructType([StructField('user_id', StringType(), True), StructField('name', StringType(), True), StructField('followers_count', StringType(), True), StructField('tweet_text', StringType(), True), StructField('sentiment', StringType(), True), StructField('Location', StringType(), True), StructField('Time', StringType(), True)])

## Read data from kafka and Convert to structured DataFrame

In [8]:
# 4. Read from Kafka
reader = KafkaTweetReader(spark)
parsed_data = reader.read_and_parse()

## Apply preprocessing and Perform sentiment analysis

In [9]:
# 5. Apply preprocessing
preprocessed_data = pipeline_model.transform(parsed_data)
preprocessed_data = preprocessed_data.drop("prediction")
# 6. Load trained model
model = CrossValidatorModel.load("hdfs://localhost:9000/user/student/dt_model")

# 7. Predict sentiment
predicted_data = model.bestModel.transform(preprocessed_data)

## Display predicted data

In [None]:
# 8A. Write to memory table instead of console
memory_query = predicted_data.select("topic", "name", "user_id", "followers_count" , "Tweet", "Location", "Time", "prediction", "sentiment") \
    .writeStream \
    .queryName("sentimentStream") \
    .outputMode("append") \
    .format("memory") \
    .start()

# Wait until data appears
import time
while True:
    df = spark.sql("SELECT * FROM sentimentStream")
    if df.count() > 0:
        df.show(truncate=False)
        memory_query.stop()
        break
    time.sleep(1)

25/04/15 02:07:21 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-94eb3183-5f1a-454b-8be4-8f5595c84032. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/15 02:07:21 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/15 02:07:21 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/04/15 02:07:22 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
25/04/15 02:07:22 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Uninterrupti

## Output to HDFS in Parquet and Json format

In [None]:
# 8B. Output to HDFS in Json format
writer = HDFSWriter(parsed_data)
writer.write_json()

In [None]:
df = spark.read.json("hdfs://localhost:9000/user/student/sentiments_data_json_1")
print(f"Total tweets saved: {df.count()}")