In [1]:
#IMPORTING LIBRARIE
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark import SparkConf
from pyspark.sql.functions import to_timestamp


import warnings
warnings.filterwarnings('ignore')

## Reading Data from Hadoop

In [2]:
spark_conf = SparkConf().setMaster("local[*]").setAppName("Tweets_Hadoop")

spark = SparkSession.builder.config(conf=spark_conf).config('spark.sql.session.timeZone', 'UTC').getOrCreate()

sc = spark.sparkContext

sc.setLogLevel('ERROR')



24/01/03 16:49:28 WARN Utils: Your hostname, BDS-2023 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/01/03 16:49:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/03 16:49:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
tweets_spark = spark.read.parquet("hdfs://localhost:9000/CA4/TWEETS_NEW.parquet")


                                                                                

In [4]:
tweets_spark.printSchema()

root
 |-- Index: long (nullable = true)
 |-- ID: long (nullable = true)
 |-- Date/Time: string (nullable = true)
 |-- Info: string (nullable = true)
 |-- User: string (nullable = true)
 |-- Tweet: string (nullable = true)
 |-- Tokenized_Tweet: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- __index_level_0__: long (nullable = true)



In [5]:
tweets_spark.summary().show()

                                                                                

+-------+-----------------+--------------------+--------------------+--------+--------------------+--------------------+-----------------+
|summary|            Index|                  ID|           Date/Time|    Info|                User|               Tweet|__index_level_0__|
+-------+-----------------+--------------------+--------------------+--------+--------------------+--------------------+-----------------+
|  count|            83941|               83941|               83941|   83941|               83941|               83941|            83941|
|   mean|876774.3877604508|1.9935392360531683E9|                null|    null| 8.126396164285715E7|                null|876774.3877604508|
| stddev|455942.6130676904|1.8773316488771933E8|                null|    null|2.5422699526890153E8|                null|455942.6130676904|
|    min|               18|          1467813579|Fri Apr 17 20:30:...|NO_QUERY|            007peter|            Miss ...|               18|
|    25%|           492103|

In [6]:
conf = SparkConf().set("spark.sql.legacy.timeParserPolicy", "LEGACY")
spark = spark.builder.config(conf=conf).getOrCreate()

In [7]:
tweets_spark = tweets_spark.withColumn("Date/Time", to_timestamp(tweets_spark["Date/Time"], "EEE MMM dd HH:mm:ss zzz yyyy"))

In [8]:
tweets_spark = tweets_spark.select("ID", "Date/Time", "User", "Tweet")

In [9]:
tweets_spark.show(truncate=True, n=5)

+----------+-------------------+-------------+--------------------+
|        ID|          Date/Time|         User|               Tweet|
+----------+-------------------+-------------+--------------------+
|1467813579|2009-04-07 05:20:31|   starkissed|@LettyA ahh ive a...|
|1467814438|2009-04-07 05:20:44|ChicagoCubbie|I hate when I hav...|
|1467816149|2009-04-07 05:21:11|     Pbearfox|@julieebaby awe i...|
|1467818603|2009-04-07 05:21:49|    kennypham|Sad, sad, sad. I ...|
|1467822384|2009-04-07 05:22:47|  Lindsey0920|@oanhLove I hate ...|
+----------+-------------------+-------------+--------------------+
only showing top 5 rows





In [10]:
tweets_spark.printSchema()

root
 |-- ID: long (nullable = true)
 |-- Date/Time: timestamp (nullable = true)
 |-- User: string (nullable = true)
 |-- Tweet: string (nullable = true)



In [11]:
from pyspark.sql.functions import year, month, count, format_string, col

In [12]:

df = tweets_spark.groupBy(year("Date/Time").alias("year"), month("Date/Time").alias("month")).count() \
                 .orderBy(["year", "month"])


df = df.withColumn("percentage", format_string("%.2f%%", ((col("count")/tweets_spark.count())*100)))


df.show(truncate=False)



+----+-----+-----+----------+
|year|month|count|percentage|
+----+-----+-----+----------+
|2009|4    |4943 |5.89%     |
|2009|5    |30346|36.15%    |
|2009|6    |48652|57.96%    |
+----+-----+-----+----------+



                                                                                

# Sentiment Analysis

## Logistic Regression Classifier Model

In [13]:
#import modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover


In [14]:
#read csv file into dataFrame with automatically inferred schema
tweets_csv = spark.read.csv("hdfs://localhost:9000/CA4/tweets.csv", inferSchema=True, header=True)
tweets_csv.show(truncate=False, n=3)

+------+---------+---------------+---------------------------------+
|ItemID|Sentiment|SentimentSource|SentimentText                    |
+------+---------+---------------+---------------------------------+
|1038  |1        |Sentiment140   |that film is fantastic #brilliant|
|1804  |1        |Sentiment140   |this music is really bad #myband |
|1693  |0        |Sentiment140   |winter is terrible #thumbs-down  |
+------+---------+---------------+---------------------------------+
only showing top 3 rows



In [15]:
#select only "SentimentText" and "Sentiment" column, 
#and cast "Sentiment" column data into integer
data = tweets_csv.select("SentimentText", col("Sentiment").cast("Int").alias("label"))
data.show(truncate = False,n=5)

+---------------------------------+-----+
|SentimentText                    |label|
+---------------------------------+-----+
|that film is fantastic #brilliant|1    |
|this music is really bad #myband |1    |
|winter is terrible #thumbs-down  |0    |
|this game is awful #nightmare    |0    |
|I love jam #loveit               |1    |
+---------------------------------+-----+
only showing top 5 rows



In [16]:
#divide data, 70% for training, 30% for testing
dividedData = data.randomSplit([0.7, 0.3]) 
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print ("Training data rows:", train_rows, "; Testing data rows:", test_rows)

[Stage 19:>                                                         (0 + 1) / 1]

Training data rows: 1339 ; Testing data rows: 593


                                                                                

In [17]:
tokenizer = Tokenizer(inputCol="SentimentText", outputCol="SentimentWords")
tokenizedTrain = tokenizer.transform(trainingData)
tokenizedTrain.show(truncate=False, n=5)

[Stage 22:>                                                         (0 + 1) / 1]

+----------------------------------+-----+----------------------------------------+
|SentimentText                     |label|SentimentWords                          |
+----------------------------------+-----+----------------------------------------+
|I adore cheese #brilliant         |1    |[i, adore, cheese, #brilliant]          |
|I adore cheese #favorite          |1    |[i, adore, cheese, #favorite]           |
|I adore cheese #loveit            |1    |[i, adore, cheese, #loveit]             |
|I adore cheese #thumbs-up         |1    |[i, adore, cheese, #thumbs-up]          |
|I adore classical music #brilliant|1    |[i, adore, classical, music, #brilliant]|
+----------------------------------+-----+----------------------------------------+
only showing top 5 rows



                                                                                

In [18]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate=False, n=5)

[Stage 23:>                                                         (0 + 1) / 1]

+----------------------------------+-----+----------------------------------------+-------------------------------------+
|SentimentText                     |label|SentimentWords                          |MeaningfulWords                      |
+----------------------------------+-----+----------------------------------------+-------------------------------------+
|I adore cheese #brilliant         |1    |[i, adore, cheese, #brilliant]          |[adore, cheese, #brilliant]          |
|I adore cheese #favorite          |1    |[i, adore, cheese, #favorite]           |[adore, cheese, #favorite]           |
|I adore cheese #loveit            |1    |[i, adore, cheese, #loveit]             |[adore, cheese, #loveit]             |
|I adore cheese #thumbs-up         |1    |[i, adore, cheese, #thumbs-up]          |[adore, cheese, #thumbs-up]          |
|I adore classical music #brilliant|1    |[i, adore, classical, music, #brilliant]|[adore, classical, music, #brilliant]|
+-----------------------

                                                                                

In [19]:
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTrainData = hashTF.transform(SwRemovedTrain).select(
    'label', 'MeaningfulWords', 'features')
numericTrainData.show(truncate=False, n=3)

+-----+---------------------------+-------------------------------------------+
|label|MeaningfulWords            |features                                   |
+-----+---------------------------+-------------------------------------------+
|1    |[adore, cheese, #brilliant]|(262144,[1689,45361,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #favorite] |(262144,[1689,100089,108624],[1.0,1.0,1.0])|
|1    |[adore, cheese, #loveit]   |(262144,[1689,100089,254974],[1.0,1.0,1.0])|
+-----+---------------------------+-------------------------------------------+
only showing top 3 rows



## Training the model


In [20]:
lr = LogisticRegression(labelCol="label", featuresCol="features", 
                        maxIter=10, regParam=0.01)
model = lr.fit(numericTrainData)
print ("Training is done!")

                                                                                

Training is done!


## Testing data

In [21]:
tokenizedTest = tokenizer.transform(testingData)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest).select(
    'Label', 'MeaningfulWords', 'features')
numericTest.show(truncate=False, n=2)

[Stage 37:>                                                         (0 + 1) / 1]

+-----+---------------------------+------------------------------------------+
|Label|MeaningfulWords            |features                                  |
+-----+---------------------------+------------------------------------------+
|1    |[adore, cheese, #bestever] |(262144,[1689,91011,100089],[1.0,1.0,1.0])|
|1    |[adore, cheese, #toptastic]|(262144,[1689,42010,100089],[1.0,1.0,1.0])|
+-----+---------------------------+------------------------------------------+
only showing top 2 rows



                                                                                

## Predicting testing data

In [22]:
prediction = model.transform(numericTest)
predictionFinal = prediction.select(
    "MeaningfulWords", "prediction", "Label")
predictionFinal.show(n=4, truncate = False)
correctPrediction = predictionFinal.filter(
    predictionFinal['prediction'] == predictionFinal['Label']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", totalData, 
      ", accuracy:", correctPrediction/totalData)

+------------------------------------+----------+-----+
|MeaningfulWords                     |prediction|Label|
+------------------------------------+----------+-----+
|[adore, cheese, #bestever]          |1.0       |1    |
|[adore, cheese, #toptastic]         |1.0       |1    |
|[adore, classical, music, #bestever]|1.0       |1    |
|[adore, classical, music, #loveit]  |1.0       |1    |
+------------------------------------+----------+-----+
only showing top 4 rows

correct prediction: 584 , total data: 593 , accuracy: 0.984822934232715


## Tweets Dataset - cleaning

In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace

In [24]:
# Definição de expressões regulares
at_regex = r"@\w+"  # Remove usernames
link_regex = r"http\S+"  # Remove links
rt_regex = r'\bRT\b'  # Remove 'RT'
ss_regex = r'[^\w\s]'  # Remove Special strings
ds_regex = r'\s+'  # Remove spaces

In [25]:
tweets_spark = tweets_spark.withColumn("clean_tweet", regexp_replace("Tweet", at_regex, ""))
tweets_spark = tweets_spark.withColumn("clean_tweet", regexp_replace("clean_tweet", link_regex, ""))
tweets_spark = tweets_spark.withColumn("clean_tweet", regexp_replace("clean_tweet", rt_regex, ""))
tweets_spark = tweets_spark.withColumn("clean_tweet", regexp_replace("clean_tweet", ss_regex, ""))
tweets_spark = tweets_spark.withColumn("clean_tweet", regexp_replace("clean_tweet", ds_regex, " "))

# Exibição dos resultados
tweets_spark.show(truncate=False, n=5)

                                                                                

+----------+-------------------+-------------+-----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|ID        |Date/Time          |User         |Tweet                                                                                    |clean_tweet                                                                       |
+----------+-------------------+-------------+-----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|1467813579|2009-04-07 05:20:31|starkissed   |@LettyA ahh ive always wanted to see rent  love the soundtrack!!                         | ahh ive always wanted to see rent love the soundtrack                            |
|1467814438|2009-04-07 05:20:44|ChicagoCubbie|I hate when I have to call and wake people up                             

In [26]:
tokenizer = Tokenizer(inputCol="clean_tweet", outputCol="words")
tokenizedData = tokenizer.transform(tweets_spark)

swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemoved = swr.transform(tokenizedData)

hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericData = hashTF.transform(SwRemoved).select('MeaningfulWords', 'features')


numericData.show(n=3)

+--------------------+--------------------+
|     MeaningfulWords|            features|
+--------------------+--------------------+
|[, ahh, ive, alwa...|(262144,[8538,524...|
|[hate, call, wake...|(262144,[72709,10...|
|[, awe, love, 1, ...|(262144,[92651,13...|
+--------------------+--------------------+
only showing top 3 rows



## Predicting Data

In [27]:
prediction = model.transform(numericData)

predictionFinal = prediction.select(
    "MeaningfulWords", "prediction")

In [28]:
predictionFinal.show(truncate = False, n=10)

+-------------------------------------------------------------------------------------------------------------+----------+
|MeaningfulWords                                                                                              |prediction|
+-------------------------------------------------------------------------------------------------------------+----------+
|[, ahh, ive, always, wanted, see, rent, love, soundtrack]                                                    |1.0       |
|[hate, call, wake, people]                                                                                   |0.0       |
|[, awe, love, 1, miss]                                                                                       |1.0       |
|[sad, sad, sad, dont, know, hate, feeling, wanna, sleep, still, cant]                                        |0.0       |
|[, hate, happens]                                                                                            |0.0       |
|[really, hate, 



In [29]:
predictionFinal.count()

                                                                                

83941

In [30]:
# Create a column with id following the data's order 
tweets_spark = tweets_spark.withColumn("row_id", monotonically_increasing_id())
predictionFinal = predictionFinal.withColumn("row_id", monotonically_increasing_id())

# join by "row_id"
tweets_pred = tweets_spark.select('row_id','Date/Time','User', 'Tweet', 'clean_tweet') \
                    .join(predictionFinal.select('row_id', 'prediction'), "row_id", "inner")
                

# drop column 
tweets_pred = tweets_pred.drop("row_id")

tweets_pred.show()

                                                                                

+-------------------+---------------+--------------------+--------------------+----------+
|          Date/Time|           User|               Tweet|         clean_tweet|prediction|
+-------------------+---------------+--------------------+--------------------+----------+
|2009-04-07 05:20:31|     starkissed|@LettyA ahh ive a...| ahh ive always w...|       1.0|
|2009-04-07 05:20:44|  ChicagoCubbie|I hate when I hav...|I hate when I hav...|       0.0|
|2009-04-07 05:21:11|       Pbearfox|@julieebaby awe i...| awe i love you t...|       1.0|
|2009-04-07 05:21:49|      kennypham|Sad, sad, sad. I ...|Sad sad sad I don...|       0.0|
|2009-04-07 05:22:47|    Lindsey0920|@oanhLove I hate ...| I hate when that...|       0.0|
|2009-04-07 05:23:43|   BrookeAmanda|i really hate how...|i really hate how...|       0.0|
|2009-04-07 05:25:52|     thelazyboy|sleep soon... i j...|sleep soon i just...|       0.0|
|2009-04-07 05:27:08|         eyezup|@mercedesashley D...| Damn The grind i...|       1.0|

# Textblod and Varder

In [31]:
from textblob import TextBlob

In [32]:
import nltk
nltk.download('vader_lexicon')
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from textblob import TextBlob
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType




[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/hduser/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [33]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Função de análise de sentimento com TextBlob
@udf(FloatType())
def sentiment(tweet):
    return TextBlob(tweet).sentiment.polarity

# Função de análise de sentimento com VADER
@udf(FloatType())
def sentiment_vader(tweet):
    sid = SentimentIntensityAnalyzer()
    return sid.polarity_scores(tweet)['compound']

# Aplicar diretamente no DataFrame
tweets_pred = tweets_pred.withColumn("textblob", sentiment(tweets_pred["clean_tweet"])) \
                         .withColumn("vader", sentiment_vader(tweets_pred["clean_tweet"]))


In [34]:
tweets_pred = tweets_pred.withColumn("score", ((col("prediction") + (col("textblob")*1.5) + (col("vader")*1.5)) / 4))

In [35]:
tweets_pred.select("clean_tweet", "prediction", "textblob", "vader", "score").show(n=10)



+--------------------+----------+-----------+-------+--------------------+
|         clean_tweet|prediction|   textblob|  vader|               score|
+--------------------+----------+-----------+-------+--------------------+
| ahh ive always w...|       1.0|        0.5| 0.6369|  0.6763375028967857|
|I hate when I hav...|       0.0|       -0.8|-0.5719| -0.5144625082612038|
| awe i love you t...|       1.0|        0.5| 0.5574|  0.6465249955654144|
|Sad sad sad I don...|       0.0|     -0.575|-0.8505| -0.5345624908804893|
| I hate when that...|       0.0|       -0.8|-0.5719| -0.5144625082612038|
|i really hate how...|       0.0|     -0.225|  0.105|-0.04499999899417162|
|sleep soon i just...|       0.0|       -0.8|-0.5719| -0.5144625082612038|
| Damn The grind i...|       1.0| 0.33333334| 0.3975|  0.5240625068545341|
|Late night snack ...|       0.0|-0.45357144|-0.7906|-0.46656429022550583|
|Im missing you ba...|       1.0|      0.125| 0.9366|  0.6481000110507011|
+--------------------+---

                                                                                

In [36]:
tweets_pred.printSchema()

root
 |-- Date/Time: timestamp (nullable = true)
 |-- User: string (nullable = true)
 |-- Tweet: string (nullable = true)
 |-- clean_tweet: string (nullable = true)
 |-- prediction: double (nullable = false)
 |-- textblob: float (nullable = true)
 |-- vader: float (nullable = true)
 |-- score: double (nullable = true)



## Saving on Hadoop

In [37]:
from pyspark.sql import SparkSession
from hdfs import InsecureClient
from hdfs.util import HdfsError

In [38]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth
from hdfs import InsecureClient
from hdfs.util import HdfsError

def spark_hadoop(df, folder, partitionBy=None, spark=None):
    hdfs_base_path = "hdfs://localhost:9000"
    hdfs_folder_path = f"{hdfs_base_path}/CA4/{folder}"

    client = InsecureClient('http://localhost:9870', user='hduser')

    try:
       
        client.content(hdfs_folder_path)

        print('Os arquivos já estão no Hadoop. Lendo os arquivos.')
        df = spark.read.parquet(hdfs_folder_path)
    except HdfsError:
        print('Colocando no Hadoop.')
        if partitionBy:
            
            df = df.withColumn("year", year("Date/Time"))
            df = df.withColumn("month", month("Date/Time"))
            df = df.withColumn("day", dayofmonth("Date/Time"))
            
           
            df.write.partitionBy("year", "month", "day").parquet(hdfs_folder_path)
            print(f"Salvo em {hdfs_folder_path} particionado por {partitionBy}")
        else:
            df.write.parquet(hdfs_folder_path)
            print(f"Salvo em {hdfs_folder_path}")

    return df



df_salvo = spark_hadoop(tweets_pred, folder="sentiment", partitionBy="Date/Time", spark=spark)


Colocando no Hadoop.




Salvo em hdfs://localhost:9000/CA4/sentiment particionado por Date/Time


                                                                                

In [39]:
df_salvo.show(2)



+-------------------+-------------+--------------------+--------------------+----------+--------+-------+-------------------+----+-----+---+
|          Date/Time|         User|               Tweet|         clean_tweet|prediction|textblob|  vader|              score|year|month|day|
+-------------------+-------------+--------------------+--------------------+----------+--------+-------+-------------------+----+-----+---+
|2009-04-07 05:20:31|   starkissed|@LettyA ahh ive a...| ahh ive always w...|       1.0|     0.5| 0.6369| 0.6763375028967857|2009|    4|  7|
|2009-04-07 05:20:44|ChicagoCubbie|I hate when I hav...|I hate when I hav...|       0.0|    -0.8|-0.5719|-0.5144625082612038|2009|    4|  7|
+-------------------+-------------+--------------------+--------------------+----------+--------+-------+-------------------+----+-----+---+
only showing top 2 rows



                                                                                