# PySpark - Project Tweets

In [1]:
# Load the libraries
import os
import numpy as np
import pandas as pd
from pyspark.sql.functions import to_timestamp
from pyspark import SparkConf

from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, StringType
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer

#  Initialize Spark Session 

In [2]:
spark = SparkSession.builder.appName('project_tweets').getOrCreate()

24/05/07 19:58:57 WARN Utils: Your hostname, muhammad-Vm resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/05/07 19:58:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/07 19:59:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Read the Dataset

In [3]:
# Load the ProjectTweets into Hadoop in the named folder 'CA2BD'

data = spark.read.csv('hdfs://localhost:9000/user/hduser/CA2BD/New_Tweets.csv', header=True, inferSchema=True)


                                                                                

In [4]:
# Display the structure of schema
data.printSchema()

root
 |-- ids: long (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [5]:
num_rows = data.count()
print(f"Number of rows: {num_rows}")


num_columns = len(data.columns)
print(f"Number of columns: {num_columns}")

                                                                                

Number of rows: 1600000
Number of columns: 5


In [6]:
data.summary().show()

24/05/07 20:00:08 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+--------------------+--------------------+--------+-------------------+--------------------+
|summary|                 ids|                date|    flag|               user|                text|
+-------+--------------------+--------------------+--------+-------------------+--------------------+
|  count|             1600000|             1600000| 1600000|            1600000|             1600000|
|   mean|1.9988175522956276E9|                NULL|    NULL|4.325887521835714E9|                NULL|
| stddev| 1.935760736226745E8|                NULL|    NULL|5.16273321845489E10|                NULL|
|    min|          1467810369|Fri Apr 17 20:30:...|NO_QUERY|       000catnap000|                 ...|
|    25%|          1956912543|                NULL|    NULL|            32508.0|                NULL|
|    50%|          2002093413|                NULL|    NULL|           130587.0|                NULL|
|    75%|          2177048867|                NULL|    NULL|          1100101.0|  

In [7]:
conf = SparkConf().set("spark.sql.legacy.timeParserPolicy", "LEGACY")
spark = spark.builder.config(conf=conf).getOrCreate()

In [8]:
tweets = data.withColumn("date", to_timestamp(data["date"], "EEE MMM dd HH:mm:ss zzz yyyy"))

In [9]:
tweets = tweets.select("ids", "date", "user", "text")

In [10]:
tweets.show(truncate=True, n=5)

[Stage 8:>                                                          (0 + 1) / 1]

+----------+-------------------+---------------+--------------------+
|       ids|               date|           user|                text|
+----------+-------------------+---------------+--------------------+
|1467810369|2009-04-07 06:19:45|_TheSpecialOne_|@switchfoot http:...|
|1467810672|2009-04-07 06:19:49|  scotthamilton|is upset that he ...|
|1467810917|2009-04-07 06:19:53|       mattycus|@Kenichan I dived...|
|1467811184|2009-04-07 06:19:57|        ElleCTF|my whole body fee...|
|1467811193|2009-04-07 06:19:57|         Karoli|@nationwideclass ...|
+----------+-------------------+---------------+--------------------+
only showing top 5 rows



                                                                                

In [11]:
tweets.printSchema()

root
 |-- ids: long (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [12]:
from pyspark.sql.functions import year, month, count, format_string, col

In [13]:
df = tweets.groupBy(year("date").alias("year"), month("date").alias("month")).count() \
                 .orderBy(["year", "month"])


df = df.withColumn("percentage", format_string("%.2f%%", ((col("count")/tweets.count())*100)))


df.show(truncate=False)



+----+-----+------+----------+
|year|month|count |percentage|
+----+-----+------+----------+
|2009|4    |100025|6.25%     |
|2009|5    |554060|34.63%    |
|2009|6    |945915|59.12%    |
+----+-----+------+----------+



                                                                                

# Sentiment Analysis

# Logistic Regression Classifier Model

In [14]:
#import modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

In [15]:
#read csv file into dataFrame with automatically inferred schema
tweets_csv = spark.read.csv("hdfs://localhost:9000/user/hduser/CA2BD/tweets.csv", inferSchema=True, header=True)
tweets_csv.show(truncate=False, n=3)

+------+---------+---------------+---------------------------------+
|ItemID|Sentiment|SentimentSource|SentimentText                    |
+------+---------+---------------+---------------------------------+
|1038  |1        |Sentiment140   |that film is fantastic #brilliant|
|1804  |1        |Sentiment140   |this music is really bad #myband |
|1693  |0        |Sentiment140   |winter is terrible #thumbs-down  |
+------+---------+---------------+---------------------------------+
only showing top 3 rows



In [16]:
#select only "SentimentText" and "Sentiment" column, 
#and cast "Sentiment" column data into integer
data = tweets_csv.select("SentimentText", col("Sentiment").cast("Int").alias("label"))
data.show(truncate = False,n=5)

+---------------------------------+-----+
|SentimentText                    |label|
+---------------------------------+-----+
|that film is fantastic #brilliant|1    |
|this music is really bad #myband |1    |
|winter is terrible #thumbs-down  |0    |
|this game is awful #nightmare    |0    |
|I love jam #loveit               |1    |
+---------------------------------+-----+
only showing top 5 rows



In [17]:
#divide data, 70% for training, 30% for testing
dividedData = data.randomSplit([0.7, 0.3]) 
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print ("Training data rows:", train_rows, "; Testing data rows:", test_rows)

Training data rows: 1350 ; Testing data rows: 582


In [18]:
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

In [19]:
tokenizer = Tokenizer(inputCol="SentimentText", outputCol="SentimentWords")
tokenizedTrain = tokenizer.transform(trainingData)
tokenizedTrain.show(truncate=False, n=5)

+-------------------------+-----+------------------------------+
|SentimentText            |label|SentimentWords                |
+-------------------------+-----+------------------------------+
|I adore cheese #bestever |1    |[i, adore, cheese, #bestever] |
|I adore cheese #brilliant|1    |[i, adore, cheese, #brilliant]|
|I adore cheese #favorite |1    |[i, adore, cheese, #favorite] |
|I adore cheese #loveit   |1    |[i, adore, cheese, #loveit]   |
|I adore cheese #thumbs-up|1    |[i, adore, cheese, #thumbs-up]|
+-------------------------+-----+------------------------------+
only showing top 5 rows



In [20]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate=False, n=5)

+-------------------------+-----+------------------------------+---------------------------+
|SentimentText            |label|SentimentWords                |MeaningfulWords            |
+-------------------------+-----+------------------------------+---------------------------+
|I adore cheese #bestever |1    |[i, adore, cheese, #bestever] |[adore, cheese, #bestever] |
|I adore cheese #brilliant|1    |[i, adore, cheese, #brilliant]|[adore, cheese, #brilliant]|
|I adore cheese #favorite |1    |[i, adore, cheese, #favorite] |[adore, cheese, #favorite] |
|I adore cheese #loveit   |1    |[i, adore, cheese, #loveit]   |[adore, cheese, #loveit]   |
|I adore cheese #thumbs-up|1    |[i, adore, cheese, #thumbs-up]|[adore, cheese, #thumbs-up]|
+-------------------------+-----+------------------------------+---------------------------+
only showing top 5 rows



In [21]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate=False, n=5)

+-------------------------+-----+------------------------------+---------------------------+
|SentimentText            |label|SentimentWords                |MeaningfulWords            |
+-------------------------+-----+------------------------------+---------------------------+
|I adore cheese #bestever |1    |[i, adore, cheese, #bestever] |[adore, cheese, #bestever] |
|I adore cheese #brilliant|1    |[i, adore, cheese, #brilliant]|[adore, cheese, #brilliant]|
|I adore cheese #favorite |1    |[i, adore, cheese, #favorite] |[adore, cheese, #favorite] |
|I adore cheese #loveit   |1    |[i, adore, cheese, #loveit]   |[adore, cheese, #loveit]   |
|I adore cheese #thumbs-up|1    |[i, adore, cheese, #thumbs-up]|[adore, cheese, #thumbs-up]|
+-------------------------+-----+------------------------------+---------------------------+
only showing top 5 rows



[Stage 27:>                                                         (0 + 1) / 1]                                                                                

In [22]:
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTrainData = hashTF.transform(SwRemovedTrain).select(
    'label', 'MeaningfulWords', 'features')
numericTrainData.show(truncate=False, n=3)

+-----+---------------------------+-------------------------------------------+
|label|MeaningfulWords            |features                                   |
+-----+---------------------------+-------------------------------------------+
|1    |[adore, cheese, #bestever] |(262144,[1689,91011,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #brilliant]|(262144,[1689,45361,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #favorite] |(262144,[1689,100089,108624],[1.0,1.0,1.0])|
+-----+---------------------------+-------------------------------------------+
only showing top 3 rows



# Train our classifier model using training data

In [23]:
lr = LogisticRegression(labelCol="label", featuresCol="features", 
                        maxIter=10, regParam=0.01)
model = lr.fit(numericTrainData)
print ("Training is done!")

                                                                                

Training is done!


# Training the model

In [24]:
tokenizedTest = tokenizer.transform(testingData)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest).select(
    'Label', 'MeaningfulWords', 'features')
numericTest.show(truncate=False, n=2)

+-----+-------------------------------------+-------------------------------------------------------+
|Label|MeaningfulWords                      |features                                               |
+-----+-------------------------------------+-------------------------------------------------------+
|1    |[adore, classical, music, #bestever] |(262144,[91011,100089,102383,131250],[1.0,1.0,1.0,1.0])|
|1    |[adore, classical, music, #brilliant]|(262144,[45361,100089,102383,131250],[1.0,1.0,1.0,1.0])|
+-----+-------------------------------------+-------------------------------------------------------+
only showing top 2 rows



# Predicting testing data

In [25]:
#import modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

In [26]:
prediction = model.transform(numericTest)
predictionFinal = prediction.select(
    "MeaningfulWords", "prediction", "Label")
predictionFinal.show(n=4, truncate = False)
correctPrediction = predictionFinal.filter(
    predictionFinal['prediction'] == predictionFinal['Label']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", totalData, 
      ", accuracy:", correctPrediction/totalData)

[Stage 42:>                                                         (0 + 1) / 1]                                                                                

+-------------------------------------+----------+-----+
|MeaningfulWords                      |prediction|Label|
+-------------------------------------+----------+-----+
|[adore, classical, music, #bestever] |1.0       |1    |
|[adore, classical, music, #brilliant]|1.0       |1    |
|[adore, classical, music, #favorite] |1.0       |1    |
|[adore, classical, music, #loveit]   |1.0       |1    |
+-------------------------------------+----------+-----+
only showing top 4 rows



                                                                                

correct prediction: 572 , total data: 582 , accuracy: 0.9828178694158075


# Tweets Dataset - cleaning

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace

In [28]:
# Definição de expressões regulares
at_regex = r"@\w+"  # Remove usernames
link_regex = r"http\S+"  # Remove links
rt_regex = r'\bRT\b'  # Remove 'RT'
ss_regex = r'[^\w\s]'  # Remove Special strings
ds_regex = r'\s+'  # Remove spaces

In [29]:
tweets= tweets.withColumn("clean_tweet", regexp_replace("text", at_regex, ""))
tweets= tweets.withColumn("clean_tweet", regexp_replace("clean_tweet", link_regex, ""))
tweets= tweets.withColumn("clean_tweet", regexp_replace("clean_tweet", rt_regex, ""))
tweets= tweets.withColumn("clean_tweet", regexp_replace("clean_tweet", ss_regex, ""))
tweets= tweets.withColumn("clean_tweet", regexp_replace("clean_tweet", ds_regex, " "))

# Exibição dos resultados
tweets.show(truncate=False, n=5)

[Stage 49:>                                                         (0 + 1) / 1]                                                                                

+----------+-------------------+---------------+-------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------+
|ids       |date               |user           |text                                                                                                               |clean_tweet                                                                                             |
+----------+-------------------+---------------+-------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------+
|1467810369|2009-04-07 06:19:45|_TheSpecialOne_|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D| Awww thats a bummer You s

In [30]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF


In [31]:
tokenizer = Tokenizer(inputCol="clean_tweet", outputCol="words")
tokenizedData = tokenizer.transform(tweets)

swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemoved = swr.transform(tokenizedData)

hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericData = hashTF.transform(SwRemoved).select('MeaningfulWords', 'features')


numericData.show(n=3)

+--------------------+--------------------+
|     MeaningfulWords|            features|
+--------------------+--------------------+
|[, awww, thats, b...|(262144,[10345,52...|
|[upset, cant, upd...|(262144,[59577,61...|
|[, dived, many, t...|(262144,[2548,392...|
+--------------------+--------------------+
only showing top 3 rows



# Predictiong Data

In [32]:
#IMPORTING LIBRARIE
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark import SparkConf
from pyspark.sql.functions import to_timestamp


import warnings
warnings.filterwarnings('ignore')

In [33]:
prediction = model.transform(numericData)

predictionFinal = prediction.select(
    "MeaningfulWords", "prediction")

In [34]:
predictionFinal.show(truncate = False, n=10)

+---------------------------------------------------------------------------------------+----------+
|MeaningfulWords                                                                        |prediction|
+---------------------------------------------------------------------------------------+----------+
|[, awww, thats, bummer, shoulda, got, david, carr, third, day, d]                      |0.0       |
|[upset, cant, update, facebook, texting, might, cry, result, school, today, also, blah]|0.0       |
|[, dived, many, times, ball, managed, save, 50, rest, go, bounds]                      |0.0       |
|[whole, body, feels, itchy, like, fire]                                                |1.0       |
|[, behaving, im, mad, cant, see]                                                       |0.0       |
|[, whole, crew]                                                                        |0.0       |
|[need, hug]                                                                            |0.

[Stage 51:>                                                         (0 + 1) / 1]                                                                                

In [35]:
predictionFinal.count()

                                                                                

1600000

In [36]:
# Create a column with id following the data's order 
tweets = tweets.withColumn("row_id", monotonically_increasing_id())
predictionFinal = predictionFinal.withColumn("row_id", monotonically_increasing_id())

# join by "row_id"
tweets_pred = tweets.select('row_id','date','user', 'text', 'clean_tweet') \
                    .join(predictionFinal.select('row_id', 'prediction'), "row_id", "inner")
                

# drop column 
tweets_pred = tweets_pred.drop("row_id")

tweets_pred.show()

[Stage 59:>                                                         (0 + 1) / 1]

+-------------------+---------------+--------------------+--------------------+----------+
|               date|           user|                text|         clean_tweet|prediction|
+-------------------+---------------+--------------------+--------------------+----------+
|2009-04-07 06:19:45|_TheSpecialOne_|@switchfoot http:...| Awww thats a bum...|       0.0|
|2009-04-07 06:20:03|        mybirch|         Need a hug |         Need a hug |       0.0|
|2009-04-07 06:20:03|           coZZ|@LOLTrish hey  lo...| hey long time no...|       0.0|
|2009-04-07 06:20:09|        mimismo|@twittera que me ...|       que me muera |       0.0|
|2009-04-07 06:20:25|       armotley|about to file taxes |about to file taxes |       0.0|
|2009-04-07 06:20:34|      gi_gi_bee|@FakerPattyPattz ...| Oh dear Were you...|       0.0|
|2009-04-07 06:20:40|      cooliodoc|@angry_barista I ...| I baked you a ca...|       0.0|
|2009-04-07 06:20:44|  ChicagoCubbie|I hate when I hav...|I hate when I hav...|       0.0|

                                                                                

# Textblod and Varder

In [37]:
from textblob import TextBlob

In [38]:
import nltk
nltk.download('vader_lexicon')
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from textblob import TextBlob
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/hduser/nltk_data...


In [39]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Função de análise de sentimento com TextBlob
@udf(FloatType())
def sentiment(tweet):
    return TextBlob(tweet).sentiment.polarity

# Função de análise de sentimento com VADER
@udf(FloatType())
def sentiment_vader(tweet):
    sid = SentimentIntensityAnalyzer()
    return sid.polarity_scores(tweet)['compound']

# Aplicar diretamente no DataFrame
tweets_pred = tweets_pred.withColumn("textblob", sentiment(tweets_pred["clean_tweet"])) \
                         .withColumn("vader", sentiment_vader(tweets_pred["clean_tweet"]))

In [40]:
tweets_pred = tweets_pred.withColumn("score", ((col("prediction") + (col("textblob")*1.5) + (col("vader")*1.5)) / 4))

In [41]:
tweets_pred.select("clean_tweet", "prediction", "textblob", "vader", "score").show(n=10)

[Stage 64:>                                                         (0 + 1) / 1]

+--------------------+----------+-----------+-------+--------------------+
|         clean_tweet|prediction|   textblob|  vader|               score|
+--------------------+----------+-----------+-------+--------------------+
| Awww thats a bum...|       0.0|        0.2|-0.3818|-0.06817499734461308|
|is upset that he ...|       0.0|        0.0|-0.7269|-0.27258749306201935|
| not the whole crew |       0.0|        0.2|    0.0| 0.07500000111758709|
|         Need a hug |       0.0|        0.0| 0.4767|  0.1787625029683113|
| hey long time no...|       0.0| 0.27333334| 0.8286|  0.4132249988615513|
|       que me muera |       0.0|        0.0|    0.0|                 0.0|
|spring break in p...|       0.0|-0.21428572|    0.0|-0.08035714365541935|
|about to file taxes |       0.0|        0.0|    0.0|                 0.0|
| Oh dear Were you...|       0.0|        0.0| 0.1779|  0.0667125005275011|
| I baked you a ca...|       0.0|        0.0|    0.0|                 0.0|
+--------------------+---

                                                                                

In [42]:
tweets_pred.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- clean_tweet: string (nullable = true)
 |-- prediction: double (nullable = false)
 |-- textblob: float (nullable = true)
 |-- vader: float (nullable = true)
 |-- score: double (nullable = true)



# Saving on Hadoop

In [45]:
from pyspark.sql import SparkSession
from hdfs import InsecureClient
from hdfs.util import HdfsError

In [51]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth
from hdfs import InsecureClient
from hdfs.util import HdfsError

def spark_hadoop(df, folder, partitionBy=None, spark=None):
    hdfs_base_path = "hdfs://localhost:9000"
    hdfs_folder_path = f"{hdfs_base_path}/CA22BD/{folder}"

    client = InsecureClient('http://localhost:9870', user='hduser')

    try:
       
        client.content(hdfs_folder_path)

        print('Os arquivos já estão no Hadoop. Lendo os arquivos.')
        df = spark.read.parquet(hdfs_folder_path)
    except HdfsError:
        print('Colocando no Hadoop.')
        if partitionBy:
            
            df = df.withColumn("year", year("date"))
            df = df.withColumn("month", month("date"))
            df = df.withColumn("day", dayofmonth("date"))
            
           
            df.write.partitionBy("year", "month", "day").parquet(hdfs_folder_path)
            print(f"Salvo em {hdfs_folder_path} particionado por {partitionBy}")
        else:
            df.write.parquet(hdfs_folder_path)
            print(f"Salvo em {hdfs_fol


SyntaxError: positional argument follows keyword argument (2851107323.py, line 37)

[Stage 69:>   (0 + 2) / 4][Stage 70:>   (0 + 0) / 2][Stage 71:>   (0 + 0) / 2]

In [None]:
df_salvo.show(2)