In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession\
         .builder\
         .appName("SparkHiveDemo")\
         .config('spark.sql.warehouse.dir', 'hdfs:/user/hive/warehouse/')\
         .config("spark.sql.catalogImplementation", "hive")\
         .config("spark.sql.adaptive.enabled", "false") \
         .enableHiveSupport()\
         .getOrCreate()

24/04/26 08:39:37 WARN Utils: Your hostname, jupy-06 resolves to a loopback address: 127.0.1.1; using 10.123.51.206 instead (on interface ens18)
24/04/26 08:39:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/26 08:39:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Use schema/db used by JY
spark.sql('USE sentiment_data')
tables = spark.sql("SHOW TABLES").show()

24/04/26 08:39:40 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.


+--------------+--------------------+-----------+
|     namespace|           tableName|isTemporary|
+--------------+--------------------+-----------+
|sentiment_data|   prediction_result|      false|
|sentiment_data|prediction_result...|      false|
|sentiment_data|      temp_csv_table|      false|
|sentiment_data|           test_data|      false|
|sentiment_data|          train_data|      false|
|sentiment_data|      train_data_eva|      false|
+--------------+--------------------+-----------+



<h2>1. For the Purpose of Displaying Negative Sentiments (Management wants to know what customers are unhappy about)</h2>

In [3]:
# Positive = 0.0
# Negative = 1.0
# Neutral  = 2.0

In [3]:
spark.sql("SELECT * FROM prediction_result").show()

24/04/26 08:39:49 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


+--------------------+---------+----------+
|             Comment|Sentiment|prediction|
+--------------------+---------+----------+
|a gun with    met...| Negative|       1.0|
|a ww  mythic with...| Negative|       2.0|
|already a w  alth...|  Neutral|       2.0|
|              always|  Neutral|       2.0|
|am i the only one...| Positive|       0.0|
|american college ...|  Neutral|       2.0|
|among ars i feel ...| Positive|       0.0|
|and we appreciate...| Positive|       2.0|
|appearently it s ...|  Neutral|       2.0|
|are we ever getti...| Positive|       0.0|
|as a persistence ...| Positive|       0.0|
|as long as there ...| Positive|       0.0|
|as long as there ...|  Neutral|       0.0|
|asks redditors fo...| Positive|       0.0|
|ay another loose ...| Negative|       1.0|
|because i m a dum...| Negative|       2.0|
|best record for l...| Positive|       0.0|
|better not  buttt...| Negative|       0.0|
|   bot match invites| Positive|       2.0|
|bro is going to d...|  Neutral|

In [4]:
hive_table_df = spark.table("prediction_result")
filtered_df = hive_table_df.filter(hive_table_df['prediction'] == 1.0)
filtered_df = filtered_df.drop("Sentiment")
filtered_df.show(10)

+--------------------+----------+
|             Comment|prediction|
+--------------------+----------+
|a gun with    met...|       1.0|
|ay another loose ...|       1.0|
|bruh what    it s...|       1.0|
|by that logic sni...|       1.0|
|far from balanced...|       1.0|
|for once i agree ...|       1.0|
|fully upgraded th...|       1.0|
|           hell yeah|       1.0|
|hey there  alcatr...|       1.0|
|i forgot about re...|       1.0|
+--------------------+----------+
only showing top 10 rows



In [5]:
record_count = filtered_df.count()
print("Number of records in filtered_df:", record_count)

Number of records in filtered_df: 210


In [6]:
filtered_df.printSchema()

root
 |-- Comment: string (nullable = true)
 |-- prediction: double (nullable = true)



In [7]:
parquet_output_path = "hdfs://10.123.51.194/user/g23/predicted.parquet"

In [8]:
filtered_df.write.mode("overwrite").parquet(parquet_output_path)

                                                                                

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [10]:
schema = StructType([
    StructField("Comment", StringType(), True),
    StructField("prediction", DoubleType(), True),
    # Add more StructFields as needed
])

In [11]:
parquet_stream = spark \
    .readStream \
    .schema(schema) \
    .format("parquet") \
    .load(parquet_output_path)

In [12]:
parquet_stream.printSchema()

root
 |-- Comment: string (nullable = true)
 |-- prediction: double (nullable = true)



In [13]:
memory_query = parquet_stream \
    .writeStream \
    .queryName("memory_query") \
    .format("memory") \
    .outputMode("append") \
    .start()

24/04/26 08:40:12 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-eee8fbe6-35f7-46bf-990e-04f777a96d1d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


In [15]:
#memory_query.awaitTermination()

In [14]:
spark.sql("SELECT count(*) FROM memory_query").show()

+--------+
|count(1)|
+--------+
|     210|
+--------+



In [15]:
spark.sql("SELECT * FROM memory_query").show(10)

+--------------------+----------+
|             Comment|prediction|
+--------------------+----------+
|350mbps internet ...|       1.0|
|activision seriou...|       1.0|
|almost 4 year pla...|       1.0|
|amazing game kind...|       1.0|
|attention  please...|       1.0|
|battle royal suck...|       1.0|
|best game world l...|       1.0|
|change 1 star fre...|       1.0|
|consistently deal...|       1.0|
|could always use ...|       1.0|
+--------------------+----------+
only showing top 10 rows



In [16]:
comment_df = spark.sql("SELECT Comment FROM memory_query")
comment_content = comment_df.collect()
print("Full content of the Comment column:")
print("-" * 50)
count = 0
for row in comment_content:
    count = count + 1
    print("Comment", count)
    print(row.Comment)
    print("-" * 50)

Full content of the Comment column:
--------------------------------------------------
Comment 1
350mbps internet battle royales totally unplayable lag ridiculous everytime encounter enemy god forbid fire weapon screen lock decided work youre dead used bad manageable albeit frustrating much worse pointless
--------------------------------------------------
Comment 2
activision seriously need look ive wasted 2gb data already trying update app find 4gb space left phone memory prior update mysteriously vanished downloading additional resource result update couldnt get completed shouldnt said 14gb fact 153gb size download additional file installing update sad seriously regretting trying update
--------------------------------------------------
Comment 3
almost 4 year playing game launch day changing review 5 star 1 star cannot go 1 star main reason spending thousand dollar thousand gaming hour account got hacked activision customer service bad 3 week trying get help gave facebook psn accou

<h2>2. Count & Percent for Each Sentiment</h2>

In [17]:
hive_table_df = spark.table("prediction_result")

In [18]:
prediction_df = hive_table_df.withColumn("Sentiment_Predicted", when(col("prediction") == 0.0, "Positive")
                                             .when(col("prediction") == 1.0, "Negative")
                                             .when(col("prediction") == 2.0, "Neutral"))

In [19]:
prediction_df.show(10)

+--------------------+---------+----------+-------------------+
|             Comment|Sentiment|prediction|Sentiment_Predicted|
+--------------------+---------+----------+-------------------+
|a gun with    met...| Negative|       1.0|           Negative|
|a ww  mythic with...| Negative|       2.0|            Neutral|
|already a w  alth...|  Neutral|       2.0|            Neutral|
|              always|  Neutral|       2.0|            Neutral|
|am i the only one...| Positive|       0.0|           Positive|
|american college ...|  Neutral|       2.0|            Neutral|
|among ars i feel ...| Positive|       0.0|           Positive|
|and we appreciate...| Positive|       2.0|            Neutral|
|appearently it s ...|  Neutral|       2.0|            Neutral|
|are we ever getti...| Positive|       0.0|           Positive|
+--------------------+---------+----------+-------------------+
only showing top 10 rows



In [20]:
prediction_df = prediction_df.drop("Sentiment")
prediction_df = prediction_df.drop("prediction")
prediction_df.show(10)

+--------------------+-------------------+
|             Comment|Sentiment_Predicted|
+--------------------+-------------------+
|a gun with    met...|           Negative|
|a ww  mythic with...|            Neutral|
|already a w  alth...|            Neutral|
|              always|            Neutral|
|am i the only one...|           Positive|
|american college ...|            Neutral|
|among ars i feel ...|           Positive|
|and we appreciate...|            Neutral|
|appearently it s ...|            Neutral|
|are we ever getti...|           Positive|
+--------------------+-------------------+
only showing top 10 rows



In [21]:
prediction_df.printSchema()

root
 |-- Comment: string (nullable = true)
 |-- Sentiment_Predicted: string (nullable = true)



In [22]:
recCount = prediction_df.count()
print("Number of records in prediction_df:", recCount)

Number of records in prediction_df: 1176


In [23]:
parquet_output_path2 = "hdfs://10.123.51.194/user/g23/all_predicted.parquet"

In [24]:
prediction_df.write.mode("overwrite").parquet(parquet_output_path2)

In [25]:
schema = StructType([
    StructField("Comment", StringType(), True),
    StructField("Sentiment_Predicted", StringType(), True),
    # Add more StructFields as needed
])

In [26]:
parquet_stream2 = spark \
    .readStream \
    .schema(schema) \
    .format("parquet") \
    .load(parquet_output_path2)

In [27]:
parquet_stream2.printSchema()

root
 |-- Comment: string (nullable = true)
 |-- Sentiment_Predicted: string (nullable = true)



In [28]:
from pyspark.sql.functions import count
grouped_df = parquet_stream2.groupBy("Sentiment_Predicted").agg(count("*").alias("total"))
grouped_df.printSchema()


root
 |-- Sentiment_Predicted: string (nullable = true)
 |-- total: long (nullable = false)



In [29]:
memory_query = grouped_df \
    .writeStream \
    .queryName("memory_query2") \
    .format("memory") \
    .outputMode("complete") \
    .start()

24/04/26 08:41:20 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-dd6e8914-81fe-4a3a-98bd-885399779a1d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

In [30]:
final_result = spark.sql("SELECT *, ROUND(total / (SELECT SUM(total) FROM memory_query2) * 100, 2) AS percent FROM memory_query2 ORDER BY total DESC")


In [31]:
final_result.show()

+-------------------+-----+-------+
|Sentiment_Predicted|total|percent|
+-------------------+-----+-------+
|           Positive|  827|  70.32|
|           Negative|  210|  17.86|
|            Neutral|  139|  11.82|
+-------------------+-----+-------+



In [32]:
spark.stop()

24/04/26 08:42:20 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:632)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:610)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:453)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor