# Twitter Analysis

In [1]:
import pyspark


'C:\\spark\\spark-3.0.0-preview2-bin-hadoop2.7'

In [2]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import desc

In [3]:
#creating spark, streaming and SQL context
spark_context = SparkContext()
streaming_context = StreamingContext(spark_context, 10)
sqlContext = SQLContext(spark_context)
spark = SparkSession.builder.appName('Sparksql').getOrCreate()

In [4]:
# initiate streaming text from a TCP (socket) source:
tcp_socket_stream = streaming_context.socketTextStream("127.0.0.1", 5555)
# RDDs that contain the tweets with socket_stream window of size 60, or 60 #seconds windows of time
rdds = tcp_socket_stream.window(60)

In [5]:
import string

#creating a function to remove all punctuation from the tweets
def remove_punctuation(text):
    no_punc_text = "".join([t for t in text if t not in string.punctuation])
    return no_punc_text

In [6]:
from collections import namedtuple

fields = ("word", "count")
Tweet = namedtuple( 'Tweet', fields )

(rdds.map(lambda text: remove_punctuation(text)) #this removes punctuation from every tweets before its stored in the RDDs
 .flatMap( lambda text: text.split( " " ) ) #Splits to a list
 .filter(lambda word: word.lower()) 
 .map( lambda word: (word, 1))
  .reduceByKey( lambda a, b: a + b )
  .map( lambda rec: Tweet( rec[0], rec[1]))
  .foreachRDD( lambda rdd: rdd.toDF().sort("count").createOrReplaceTempView("tweets") )) #creating a temp SQL table so that the words in the tweets can be easily accessed later

In [7]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

%matplotlib inline

Run 'requestTweets.py' file now.
After starting the Streaming Context, wait 5 minutes so that enough tweets are collected before querying the SQL Context and doing the analysis of the tweets.

In [8]:
#streaming has started and we the tweets are now being saved into the RDDs 
streaming_context.start()

In [9]:
structured_DB = sqlContext.sql( 'Select word, count from tweets' ) #filtering the SQL table by word and count so that we can analyze the results
structured_DB.show()

+------------------+-----+
|              word|count|
+------------------+-----+
|           brought|    1|
|           article|    1|
|        10monthold|    1|
|          kpolantz|    1|
|            police|    1|
|               pa…|    1|
|          ThankYou|    1|
|httpstcoV57Lg46XWw|    1|
|          these…RT|    1|
|          overstay|    1|
|            Having|    1|
|            Liste…|    1|
|         Employees|    1|
|            mkraju|    1|
|          German’s|    1|
|              Food|    1|
|              Meat|    1|
|    administration|    1|
|        Processing|    1|
|            CNN…RT|    1|
+------------------+-----+
only showing top 20 rows



In [10]:
df_tweetsDB = structured_DB.toPandas() #converts filtered SQL table to Pandas DataFrame

In [11]:
list(df_tweetsDB.columns.values) #listing the column names of the Pandas DataFrame so that we know what we working with

['word', 'count']

In [12]:
type(df_tweetsDB) #print the type of the dataframe to make sure its a pandas dataframe

pandas.core.frame.DataFrame

In [13]:
#This function removes all the website links from the tweets/words
def clean_data(df):
    result=[]
    for index, row in df_tweetsDB.iterrows():
        if ("https" not in row.word):
            result.append(str(row.word.lower()))
    return result

In [14]:
#we look at the results of the clean_data function
words = clean_data(df_tweetsDB)
words

['kumar',
 'research',
 'administrator',
 'twometerrule',
 'hedge',
 'regarding',
 'visa',
 'uae',
 'wrapped',
 'plastic',
 'talking',
 'resurgence…rt',
 'thought',
 'focusing',
 'o…rt',
 'whowhat',
 'globalwitness',
 'eu',
 'cannot',
 'wed',
 'like',
 'debkilroy',
 'i”m',
 'result',
 'among',
 'save',
 'assanges',
 'hes',
 'lung',
 'no',
 'cases',
 'tourists',
 'v…',
 'respiratory',
 'droplet',
 'dissemination',
 'bbcrealitycheck',
 'came',
 'pregnant',
 'b52malmet',
 'w…rt',
 'towards',
 'them',
 'etsy',
 'turn',
 'offers',
 'therapy',
 'interesting',
 'osha',
 'owner',
 'he',
 'worke…',
 'amidst',
 'vada',
 'pav',
 '“feed',
 'arabtimeskuwait',
 'infections',
 'mojahedineng',
 'novel',
 '334',
 'letha…rt',
 'thehindu',
 'claimed',
 '‘fight',
 'cs’',
 'traders',
 'loss',
 'incident…',
 'tracing',
 'happen',
 'getting',
 'want',
 'milords',
 'alert',
 'ever',
 'prayers',
 'anytime',
 'pump',
 'economy',
 'heartbreaking',
 'carolina',
 'isolated',
 'during…analysis',
 'scotgov',
 'below

In [15]:
tweets = pd.DataFrame({'word': words}) #creating a Pandas DataFrame that contains only the words

In [16]:
from profanity_check import predict
from autocorrect import Speller

spell = Speller(lang='en')

#function to generate the profanity of each tweet
def offensive(df):
    result = []
    for word in df:
        result.append((predict([word])==1)[0])
    return result

#function to generate a list of boolean values that corresponds to whether the tweets are misspelt or not
def misspelt(df):
    result = []
    for word in df:
        if (word[:2])=="RT" or ("@" in word) or ("#" in word) or ("." in word) or ("'" in word): #if a tweets contains any of these then its not misspelt
            result.append(False)
        else:
            result.append(word != spell(word))
    return result



In [None]:
tweets["offensive"] = offensive(tweets["word"]) #adding a 'offensive' column to our Pandas DataFrame
tweets["misspelt"] = misspelt(tweets["word"]) #adding a 'misspelt' column to our Pandas DataFrame

In [None]:
tweets #visualizing results of previous additions

In [None]:
not_offensive = tweets['offensive'].value_counts()[0] #getting the number of words that are not offensive
offensive = tweets.shape[0] - not_offensive #subtracting the number of not offensive words from the total number of words so that we can get the number of offensive words 

In [None]:
#printing a table of the results for the offensive and non-offensive words and their associated word count
fig, ax = plt.subplots()
fig.patch.set_visible(False)
ax.axis('off')
ax.axis('tight')
df = pd.DataFrame([[not_offensive, offensive]], columns=['Non-offensive', 'Offensive'])
ax.table(cellText=df.values, colLabels=df.columns, loc='center')
plt.savefig("offensive_table", transparent=True)
plt.show()

In [None]:
correctly_spelt = tweets['misspelt'].value_counts()[0]
misspelt = tweets.shape[0] - correctly_spelt

In [None]:
#printing a table of the results for the misspelt and correctly words and their associated word count
fig, ax = plt.subplots()
fig.patch.set_visible(False)
ax.axis('off')
ax.axis('tight')
df = pd.DataFrame([[correctly_spelt, misspelt]], columns=['Correctly Spelt', 'Misspelt'])
ax.table(cellText=df.values, colLabels=df.columns, loc='center')
plt.savefig("misspelt_table", transparent=True)
plt.show()

In [None]:
#visualizing results for offensive vs non-offensive words in a bar graph
x_labels = ('Offensive', 'Non-offensive')
y_pos = np.arange(len(x_labels))
performance = [offensive, not_offensive]

plt.bar(y_pos, performance, align='center', alpha=0.5)
plt.xticks(y_pos, x_labels)
plt.ylabel('Count')
plt.title('Offensive vs Non-offensive')
plt.savefig("offensive", transparent=True)
plt.show()

In [None]:
#visualizing results for misspelt vs correctly spelt words in a bar graph

x_labels = ('Correctly Spelt', 'Misspelt')
y_pos = np.arange(len(x_labels))
performance = [correctly_spelt, misspelt]

plt.bar(y_pos, performance, align='center', alpha=0.5)
plt.xticks(y_pos, x_labels)
plt.ylabel('Count')
plt.title('Correctly Spelt vs Misspelt Words')
plt.savefig("misepelt", transparent=True)
plt.show()

In [None]:
tweets

In [None]:
#this function returns the correctly spelt words
def correct(df):
    result = []
    for word in df:
        result.append(spell(word))
    return result

In [None]:
misspelt_words = tweets[tweets['misspelt']==True]['word']
misspelt_words = pd.DataFrame({'misspelt word': misspelt_words})
misspelt_words['correctly spelt'] = correct(misspelt_words['misspelt word'])

In [None]:
#visualizing results for misspelt words and their correct spelling
misspelt_words

In [32]:
#this ends the spark session or streaming session
streaming_context.stop()