# Spark session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob

def preprocessing(lines):
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

# text classification
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity
def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity
def text_classification(words):
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("word"))
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))
    return words

if __name__ == "__main__":
    # create Spark session

    spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()

    # read the tweet data from socket
    lines = spark.readStream.format("socket").option("host", "0.0.0.0").option("port", 5656).load()
    # Preprocess the data
    words = preprocessing(lines)
    # text classification to define polarity and subjectivity
    words = text_classification(words)

    words = words.repartition(10)
    query = words.writeStream.queryName("all_tweets")\
        .outputMode("append").format("parquet")\
        .option("path", "./parc")\
        .option("checkpointLocation", "./check")\
        .trigger(processingTime='60 seconds').start()
    query.awaitTermination()

22/05/28 21:41:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/28 21:41:53 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


root
 |-- value: string (nullable = true)



22/05/28 21:42:25 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
22/05/28 21:42:40 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
22/05/28 21:42:55 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources


In [3]:
from pyspark.sql import SparkSession
# initialise sparkContext
spark = SparkSession.builder \
    .master('local') \
    .appName('TwitterAggregations') \
    .config('spark.executor.memory', '2gb') \
    .config("spark.cores.max", "2") \
    .getOrCreate()

sc = spark.sparkContext

# using SQLContext to read parquet file
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# to read parquet file
df = sqlContext.read.parquet('parc/')

df.createOrReplaceTempView("ParquetTable")
df.printSchema()
# df.show(truncate = False)

root
 |-- created_at: timestamp (nullable = true)
 |-- text: string (nullable = true)
 |-- full_text: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- retweet_count: string (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)
 |-- friends_count: string (nullable = true)
 |-- followers_count: string (nullable = true)
 |-- statuses_count: string (nullable = true)



In [4]:
df.count()

507

In [9]:
from pyspark.sql.functions import *

df.filter((df['full_text']=='')|df['full_text'].isNull()|isnan(df['full_text'])).count()

314

In [14]:
select_columns=['text','full_text','retweet_count','screen_name','description','location','friends_count','followers_count','statuses_count']
# Subsetting the required columns from the DataFrame
df_strings_only=df.select(*select_columns)
#  calculate all the missing values in the DataFrame, you can use the following command:
df_strings_only.select([count(when((col(c)=='') | col(c).isNull() | isnan(c), c)).alias(c) for c in df_strings_only.columns]).show()

+----+---------+-------------+-----------+-----------+--------+-------------+---------------+--------------+
|text|full_text|retweet_count|screen_name|description|location|friends_count|followers_count|statuses_count|
+----+---------+-------------+-----------+-----------+--------+-------------+---------------+--------------+
|   0|      314|            0|          0|        149|     193|            0|              0|             0|
+----+---------+-------------+-----------+-----------+--------+-------------+---------------+--------------+



In [20]:
#  frequencies of categorical variables
#  Categorical variables are any string variables that exist in a dataset.
df.groupBy(df['screen_name']).count().sort(desc("count")).show(10, False)



+---------------+-----+
|screen_name    |count|
+---------------+-----+
|Numrahkhalid   |24   |
|AbdulQayyum998 |13   |
|Arbab_Sajjad786|10   |
|khanadel       |9    |
|azadpakistan4  |7    |
|Shafeeq83017532|6    |
|YousifB48749828|6    |
|aksidr         |4    |
|Mahnoor1005    |4    |
|Shamshair_ch   |4    |
+---------------+-----+
only showing top 10 rows



                                                                                

In [25]:
# Subsetting and creating a temporary DataFrame to eliminate any missing values
# Dropping null descriptions
df_temp_drop_na_description=df.filter((df['description']!='')&(df['description'].isNotNull()) &(~isnan(df['description'])))


In [26]:
# Subsetting the DataFrame to titles that are repeated more than one time
df_temp_text_repeat = df.groupby(df['text']).count().filter("`count` >1").sort(col("count").desc()).show(10,False)

+--------------------------------------------------------------------------------------------------------------------------------------------+-----+
|text                                                                                                                                        |count|
+--------------------------------------------------------------------------------------------------------------------------------------------+-----+
|RT @ImranKhanPTI: My name is Imran Khan, I am from Pakistan and I #SupportGaza                                                              |57   |
|RT @LahoriElite: Thank you Imran Khan, Keyboard Warriors and Social Media? https://t.co/vxMxIawUzn                                          |24   |
|RT @PTIPoliticsss: Imran Khan will be addressing a Jalsa in Shangla on 2nd June 2022.                                                       |20   |
|RT @JehangirMirzaa: It took an Imran Khan after 70 odd years of 'independence', to lay bare the myth that

                                                                                

In [51]:
#Casting
df_casted = df.withColumn('friends_count',df['friends_count'].cast("float"))
#After Casting
df_casted.dtypes

[('created_at', 'timestamp'),
 ('text', 'string'),
 ('full_text', 'string'),
 ('hashtags', 'array<string>'),
 ('retweet_count', 'string'),
 ('screen_name', 'string'),
 ('description', 'string'),
 ('location', 'string'),
 ('friends_count', 'float'),
 ('followers_count', 'string'),
 ('statuses_count', 'string')]

In [33]:
df.describe()

DataFrame[summary: string, text: string, full_text: string, retweet_count: string, screen_name: string, description: string, location: string, friends_count: string, followers_count: string, statuses_count: string]

In [36]:
median=df_casted.approxQuantile('friends_count',[0.5],0.1)
#Printing the Value
print ('The median of friends_count is '+str(median))

The median of friends_count is [176.0]


In [40]:
# Counts the distinct occurances of titles
df.select('screen_name').distinct().count()


                                                                                

389

In [50]:
# Filtering
df.filter(df['text'].rlike('\w*imran')).show(1,False)

+-------------------+----------------------------------------------------------------------------------------------------------------------------------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------+---------------------------------------+--------+-------------+---------------+--------------+
|created_at         |text                                                                                                                              |full_text|hashtags                                                                                                                                         |retweet_count|screen_name |description                            |location|friends_count|followers_count|statuses_count|
+-------------------+-----------------------------------------------------------------------------------------------------------------------

In [52]:
mean_pop=df_casted.agg({'friends_count': 'mean'}).collect()[0]['avg(friends_count)']
count_obs= df_casted.count()

In [53]:

df=df.withColumn('mean_friends_count',lit(mean_pop))

In [56]:
from pyspark.sql.window import *
# Step 1: Filtering the missing values
df_with_newcols=df.filter( (df['full_text'].isNotNull()) & (~isnan(df['full_text'])) )
# Step 2: Applying the window functions for calculating deciles
df_with_newcols = df_with_newcols.select('location','friends_count','followers_count','statuses_count',ntile(10).over(Window.partitionBy().orderBy(df_with_newcols['statuses_count'].desc())).alias("decile_rank"))

# Step 3:Dispalying the values
df_with_newcols.groupby("decile_rank").agg(min('statuses_count').alias('min_statuses_count'),max('statuses_count').alias('max_statuses_count'),count('statuses_count')).show()

22/06/18 20:47:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-----------+------------------+------------------+---------------------+
|decile_rank|min_statuses_count|max_statuses_count|count(statuses_count)|
+-----------+------------------+------------------+---------------------+
|          1|              8290|               998|                   20|
|          2|              6384|              8192|                   20|
|          3|            421778|              6326|                   20|
|          4|             31084|               402|                   19|
|          5|             24893|             31081|                   19|
|          6|            232650|             24892|                   19|
|          7|            211481|              2322|                   19|
|          8|              1560|             20335|                   19|
|          9|             12133|              1559|                   19|
|         10|              1000|             11996|                   19|
+-----------+------------------+------

In [61]:
df_second_best=df.withColumn('created_at_sec',minute('created_at'))
# Step 4: Define partition function
year_window = Window.partitionBy(df_second_best['created_at_sec']).orderBy(df['location'].desc())

df_second_best=df_second_best.select('text', 'created_at',rank().over(year_window).alias("rank"))
# Step 6: Find the second best rating for the year 1970
df_second_best.filter((df_second_best['created_at']==DatetimeConverter().convert('18:28:18.000Z') & (df_second_best['rank']==2)).show()

SyntaxError: unexpected EOF while parsing (339783851.py, line 7)