In [1]:
# Import the necessary packages

import pandas as pd
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, HiveContext, Row, functions as F
from pyspark.sql.functions import avg, count, max, min, udf, substring, expr, concat, col, concat_ws
from pyspark.sql.types import FloatType, BooleanType, DateType
from transformers import pipeline

In [2]:
# Create the spark Session
spark = SparkSession.builder.appName("sentiment_load").config("spark.driver.extraClassPath","/usr/local/spark/jars/mysql-connector-java-8.0.33.jar").getOrCreate()

23/10/29 15:37:39 WARN Utils: Your hostname, muhammad-Vm resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/10/29 15:37:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/29 15:37:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Create the sentiment analysis function to be applied to the tweets
sentiment_analyzer = pipeline("sentiment-analysis", model = "distilbert-base-uncased-finetuned-sst-2-english")

In [4]:
# Create a user defined fucntion (UDF) to apply the sentiment analysis to the select text of the tweets df
def get_sentiment_score(text):
    result = sentiment_analyzer(text)
    score = result[0]["score"]
    return score

In [5]:
# Create a user defined fucntion (UDF) to create a new date column with an abbreviated value
def extract_and_concat(input_string):
    first_5 = input_string[:10]
    last_4 = input_string[-4:]
    return f"{first_5} {last_4}"

In [6]:
# Register the UDF's with the current spark session
spark.udf.register("get_sentiment_score", get_sentiment_score)
spark.udf.register("extract_and_concat", extract_and_concat)

<function __main__.extract_and_concat(input_string)>

In [7]:
# Set the MySQL connection settings
jdbc_url = "jdbc:mysql://localhost:3306/Tweets"
connection_properties = {
    "user":"root",
    "password":"password",
    "driver":"com.mysql.cj.jdbc.Driver"
}

In [8]:
# create the spark df by reading the data from the MySWL table
df = spark.read.jdbc(
    url = jdbc_url, 
    table = "tweetable1", 
    properties = connection_properties
)

In [9]:
# Apply the sentiment analysis UDF to the text column, and create a new column "sentiment_score", with the results
df = df.withColumn("sentiment_score", udf(get_sentiment_score)(df["text"]))

In [10]:
# Apply the date abbreviation UDF to the date column, and create a new column "full_date", with the results
df = df.withColumn("full_date", udf(extract_and_concat)(df["date"]))

In [11]:
# Check the sentiment_score column data type
df.schema["sentiment_score"].dataType

StringType()

In [12]:
# Convert the dt from string to float
df = df.withColumn("sentiment_score", col("sentiment_score").cast("Float"))

In [13]:
# Create a seperate df with just the full_date and sentiment_score columns neccessary for the time series sentiment analysis.
df1 = df.select("full_date", "sentiment_score")

In [14]:
# Optionally write to a csv file for backup/later use
#df1.write.format("csv").mode("overwrite").save("/home/hduser/Downloads/testsheet.csv")

In [15]:
# Convert the pyspark df to a pandas df
pandas_df = df1.limit(10).toPandas()

                                                                                

In [16]:
# Check the top 10 rows
pandas_df.head()

Unnamed: 0,full_date,sentiment_score
0,Mon Apr 06 2009,0.998989
1,Mon Apr 06 2009,0.982745
2,Mon Apr 06 2009,0.976971
3,Mon Apr 06 2009,0.996757
4,Mon Apr 06 2009,0.996205


In [17]:
# End the current spark session
spark.stop()

In [18]:
# Create a new spark session for connection to Apache Hive
spark = SparkSession.builder.appName(
    "sentiment_write"
).config(
    "hive.metastore.uris", 
    "thrift://localhost:9083", 
    conf = SparkConf()
).enableHiveSupport().getOrCreate()

In [19]:
# Write the pandas df to a df in the current spark session
df = spark.createDataFrame(pandas_df)

In [20]:
# Save as a new table within the hive db
df.write.saveAsTable('sentiment_over_time')

23/10/29 15:38:30 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/10/29 15:38:31 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
23/10/29 15:38:36 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
23/10/29 15:38:36 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore hduser@127.0.1.1
23/10/29 15:38:42 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
23/10/29 15:38:43 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
23/10/29 15:38:43 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/10/29 15:38:43 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
23/10/29 15:38:43 WARN ObjectStore: 

In [21]:
# Check the first 5 rows of the stored data
df_load = spark.sql('SELECT * FROM sentiment_over_time limit 5')
df_load.show()

+---------------+------------------+
|      full_date|   sentiment_score|
+---------------+------------------+
|Mon Apr 06 2009|0.9989890456199646|
|Mon Apr 06 2009|0.9827452301979065|
|Mon Apr 06 2009|0.9769710898399353|
|Mon Apr 06 2009| 0.996757447719574|
|Mon Apr 06 2009|0.9962053894996643|
+---------------+------------------+

+---------------+------------------+
|      full_date|   sentiment_score|
+---------------+------------------+
|Mon Apr 06 2009|0.9989890456199646|
|Mon Apr 06 2009|0.9827452301979065|
|Mon Apr 06 2009|0.9769710898399353|
|Mon Apr 06 2009| 0.996757447719574|
|Mon Apr 06 2009|0.9962053894996643|
+---------------+------------------+

None


In [26]:
df_final = df_load.toPandas()