# PSI Twitter Notebook

In [2]:
#loading of libraries

import requests, os,re, csv, pixiedust
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
from pixiedust import sc
from dateutil import parser
from pyspark.ml.linalg import Vectors
from pyspark.mllib.clustering import KMeans, KMeansModel, BisectingKMeans
from numpy import array, dot, isnan
from numpy.linalg import norm
from itertools import groupby
from math import log10, sqrt
from functools import reduce

In [3]:
#Initializes the PixieDust monitor
pixiedust.enableJobMonitor()
sqlContext = pixiedust.SQLContext(sc)

Succesfully enabled Spark Job Progress Monitor


Exception in thread Thread-5:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.6/site-packages/pixiedust/utils/sparkJobProgressMonitor.py", line 47, in startSparkJobProgressMonitor
    progressMonitor = SparkJobProgressMonitor()
  File "/opt/conda/lib/python3.6/site-packages/pixiedust/utils/sparkJobProgressMonitor.py", line 174, in __init__
    self.addSparkListener()
  File "/opt/conda/lib/python3.6/site-packages/pixiedust/utils/sparkJobProgressMonitor.py", line 203, in addSparkListener
    _env.getTemplate("sparkJobProgressMonitor/addSparkListener.scala").render()
  File "/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2131, in run_cell_magic
    result = fn(magic_arg_s, cell)
  File "<decorator-gen-126>", line 2, in scala
  File "/opt/

- You can quickly instantiatie the Spark Dataframe (used in part 2 and 3) from a pre-parsed .csv file in codeblock 1.6
- Creation of the original Spark Dataframe is timeconsuming but is shown in parts 1.1 - 1.5 (also shows creation of .csv)
- Parts 2 and 3 of this script uses the pre-parsed .csv file

# 1. Pre-processing of Twitter data


### 1.1 Initializing and loading of functions

In [4]:
#requests and os are used in the download_file function
#dateutil is used to convert strings containing temporal data into datetime objects
#pandas is used for easy queries on dataframes

#downloads any file from a url to a specified location
def download_file(url, location):
    filename = os.path.join(location, url.split("/")[-1].split("?")[0])
    r = requests.get(url, stream=True)
    with open(filename, 'wb') as f:
        for chunk in r.iter_content(chunk_size=1024): 
            if chunk: 
                f.write(chunk)
    return filename

#removes unwanted characters found in certain tweets
def sym_filter(string):
    for char in ["{", "}", "(", ")"]:
        if string.count(char) == 1:
            string = string.replace(char, "")
    return string

#loads the PSI_tweets.txt file as a list of list if already available locally
def offline_PSI_tweets(n, txt_location = './PSI_tweets.txt'):    
    i = 0
    tweets = []
    with open(txt_location) as txt_file:
        for line in txt_file:
            tweets += [line]
            if i == (n-1):
                return(tweets)
            i += 1

#checks whether expanding the join limit could find enable finding the next match  
def possible_expand(og_match, next_match, string):
    test = False
    match = re.findall(pattern = (".*" + og_match + " (.*),.*" + next_match), string = string)
    for i in range(2, 10):
        if next_match in ('"'.join(match[0].split('"')[:i]) + '"'):
            test = True
    return test


#gives the (right) closest present variable found in the tweet relative to a given variable
def check_next(og_element, check_list, tweet, og_index):
    closest_list = []
    copy_check_list = check_list[:]
    copy_check_list.pop(copy_check_list.index(og_element))
    
    for element in copy_check_list:
        if element in tweet:
            for check in re.finditer(("[ |{}]" + element), tweet):
                if check:
                    if (og_index - (check.start() + 1)) < 0:
                        closest_list += [[(og_index - check.start()), element]]  
    if len(closest_list) == 0:
        return None
    else:
        return pd.DataFrame(closest_list).loc[pd.DataFrame(closest_list)[0].idxmax()][1]   

### 1.2 Downloading of Twitter dataset

In [None]:
#Downloads prehosted dataset from Dropbox
url = 'https://www.dropbox.com/s/yz5biypudzjpc12/PSI_tweets.txt?dl=1'
location = './' #relative location for Linux, saves it in the same folder as this script
download_file(url, location)

### 1.3 Collecting all tweets as list

In [None]:
tweets_RDD = sc.textFile('./PSI_tweets.txt')
tweets = tweets_RDD.collect() #or tweets = tweets_RDD.take(500) for some testing 

#You can also use the offline_PSI_tweets() function to instatiate your data if its already downloaded
## tweets = offline_PSI_tweets(x)

### 1.4 Extracing every variable from each Tweet 

The JSON structure was exploited to create a list of variables generally contained within each tweet, albeit in a different order. For each variable in our list, our filter would initially check whether this element was found within the tweet and would then determine the location of other present variables relative to the first one. Using each closest pair of variables as (fixed) outer bounds for each variable enables extraction of the data in between using regular expressions. Since the data for each variable could potentially contain literal representations of other variables (someone could tweet using the string "probability:"), the script splits up every tweet from the first variable onwards using comma’s (JSON's default delimitation character) and joins each part of the string until the second variable is found, an indication of the outer bounds.

In [None]:
#Produces a list of list containing all variables in the check_list per tweet
#The multi_list list accounts for variables of the same name (tweet_id and profile_id are both called id for example)
#The count of multi_list (containing already processed variables) is used to indicate the right regex match

check_list = ['sentiment:', 'metadata:', 'probability:', 'possibly_sensitive:', 'in_reply_to_user_id_str:', 'created_at:', 'truncated:', 'source:', 'retweet_count:', 'retweeted:', 'in_reply_to_screen_name:', 'id_str:', 'in_reply_to_user_id:', 'id:', 'text:', 'lang:', 'class:', 'favorited:', 'utc_offset:', 'friends_count:', 'profile_image_url_https:', 'profile_background_image_url:', 'listed_count:', 'default_profile_image:', 'favourites_count:', 'is_translator:', 'description:', 'created_at:', 'profile_background_image_url_https:', 'protected:', 'screen_name:', 'profile_link_color:', 'id_str:', 'is_translation_enabled:', 'translator_type:', 'geo_enabled:', 'profile_background_color:', 'id:', 'lang:', 'has_extended_profile:', 'profile_sidebar_border_color:', 'profile_text_color:', 'verified:', 'profile_image_url:', 'time_zone:', 'url:', 'contributors_enabled:', 'profile_banner_url:', 'entities:', 'statuses_count:', 'default_profile:', 'followers_count:', 'profile_use_background_image:', 'name:', 'location:', 'profile_sidebar_fill_color:', 'gtype:', 'bbox:', 'latitude:', 'type:', 'longitude:']
parsed_tweets = []

for tweet in tweets:
    multi_list = []
    row = [None] * len(check_list)
    for i in range(len(check_list)):
        
        if len(re.findall( ("[ |{}]" + check_list[i]), string=tweet))  == 0:
            row[i] = None
            multi_list += [check_list[i]]

        if len(re.findall( ("[ |{}]" + check_list[i]), string=tweet)) > 0:
            index_list = []
            for check in re.finditer(("[ |{}]" + check_list[i]), string=tweet):
                if check:
                    index_list += [check.start() + 1]

            og_index = index_list[multi_list.count(check_list[i])]
            next_match = check_next(og_element = check_list[i], check_list=check_list, tweet = tweet, og_index= og_index)
            x = 0

            if next_match:
                match = re.findall(pattern = (".*" + check_list[i] + " (.*),.*" + next_match), string = tweet)
                if len(match) > 0:
                    if possible_expand(check_list[i], next_match, tweet): 
                        while next_match not in ('"'.join(match[0].split('"')[:x]) + '"'):
                            x += 1
                        row[i] = [('"'.join(match[0].split('"')[:(x-1)]) + '"')]
                        multi_list += [check_list[i]]

                    elif next_match in match[0]:
                        while not next_match in ",".join(match[0].split(",")[:x]):
                            x += 1
                        row[i] = [",".join(match[0].split(",")[:(x -1)])]
                        multi_list += [check_list[i]]
                    elif not next_match in match[0]:
                        row[i] = match[0]
                        
            elif (i == (len(check_list) -1) and check_list[-1] in tweet):
                match = re.findall(pattern = (".*" + check_list[-1] + " (.*)[}|)][}|)]"), string = tweet)
                if len(match) == 1:
                    row[i] = match[0]
                else:
                    row[i] = None
                    

        else:
            row[i] = None
            multi_list += [check_list[i]]    
    parsed_tweets += [row]

### 1.5 Filtering unrequired colums and imposing a fixed structure for the remainder

In [None]:
#Creates a list of lists (data) that only contains the wanted colums:
#'sentiment:, probability:, possibly_sensitive:, in_reply_to_user_id_str:, created_at:, in_reply_to_screen_name:, id_str:, in_reply_to_user_id:, id:, text:, lang:, class:, utc_offset:, description:, created_at:, screen_name:, geo_enabled:, lang:, time_zone:, name:, location:, latitude:, longitude:'

data = []

for tweet in parsed_tweets:
    filtered_check_list = []
    line = []
    for i in range(len(tweet)):
        if i in [0, 2, 3, 4, 5, 10, 11, 12, 13, 14, 15, 16, 18, 26, 27, 30, 35, 38, 44, 53, 54, 58, 60]:
            line += [tweet[i]]
            filtered_check_list += [check_list[i]]
    data += [line]
    
#Set any data found to None if length == 0 (indicating null values), all data is still of type string
for tweet in data:
    for i in range(len(tweet)):
        if tweet[i]:
            if len(tweet[i]) == 0:
                tweet[i] = None
                
#If the join limit was expanded in the first step, the joined values are still in a list of len(1), these are unpacked in thi sstep
for x in range(3):
    for tweet in data:
        for i in range(len(tweet)):
            if tweet[i]:
                if type(tweet[i]) == list:
                    if len(tweet[i]) == 1:
                        tweet[i] = tweet[i][0]
                        try:
                            tweet[i] = eval(tweet[i])
                        except:
                            tweet[i] = tweet[i]

                            
#Explictely sets the data type for each column as it needs to be fixed to be able to convert it to a Spark Dataframe
for tweet in data:
    for i in range(len(tweet)):
        #0
        if i == 0:
            if tweet[i]:
                try:
                    tweet[i] = [int(eval(tweet[i])[0]), int(eval(tweet[i])[1]), int(eval(tweet[i])[2]), eval(tweet[i])[3]]
                except:
                    tweet[i] = None

        #1
        if i == 1:
            if tweet[i]:
                try:
                    tweet[i] = float(tweet[i])
                except:
                    tweet[i] = None
        #2            
        if i == 2:
            if tweet[i]:
                try:
                    tweet[i] = eval(sym_filter(tweet[i]).title())
                except:
                    tweet[i] = None
        #3            
        if i == 3:
            if tweet[i]:
                try:
                    tweet[i] = int(tweet[i])
                except:
                    tweet[i] = None

        #4            
        if i == 4:
            if tweet[i]:
                try:
                    tweet[i] = parser.parse(eval(tweet[i]))
                except:
                    tweet[i] = None

        #5            
        if i == 5:
            if tweet[i]:
                try:
                    tweet[i] = str(tweet[i])
                except:
                    tweet[i] = None

        #6            
        if i == 6:
            #tweet[i] = "Temp_fill"
            if tweet[i]:
                try:
                    tweet[i] = int(eval(tweet[i]))
                except:
                    tweet[i] = None

        #7            
        if i == 7:
            if tweet[i]:
                try:
                    tweet[i] = int(tweet[i])
                except:
                    tweet[i] = None

        #8            
        if i == 8:
            if tweet[i]:
                try:
                    tweet[i] = int(tweet[i])
                except:
                    tweet[i] = None


        #9            
        if i == 9:
            if tweet[i]:
                try:
                    tweet[i] = str(tweet[i])
                except:
                    tweet[i] = None


        #10            
        if i == 10:
            if tweet[i]:
                try:
                    tweet[i] = str(tweet[i])
                except:
                    tweet[i] = None

        #11            
        if i == 11:
            if tweet[i]:
                try:
                    tweet[i] = str(tweet[i])
                except:
                    tweet[i] = None


        #12            
        if i == 12:
            if tweet[i]:
                try:
                    tweet[i] = int(tweet[i])
                except:
                    tweet[i] = None


        #13            
        if i == 13:
            if tweet[i]:
                try:
                    tweet[i] = str(tweet[i])
                except:
                    tweet[i] = None

        #14            
        if i == 14:
            if tweet[i]:
                try:
                    tweet[i] = parser.parse(eval(tweet[i]))
                except:
                    tweet[i] = None


        #15            
        if i == 15:
            if tweet[i]:
                try:
                    tweet[i] = str(tweet[i])
                except:
                    tweet[i] = None


        #16
        if i == 16:
            if tweet[i]:
                try:
                    tweet[i] = eval(sym_filter(tweet[i]).title())
                except:
                    tweet[i] = None


        #17            
        if i == 17:
            if tweet[i]:
                try:
                    tweet[i] = str(tweet[i])
                except:
                    tweet[i] = None


        #18            
        if i == 18:
            if tweet[i]:
                try:
                    tweet[i] = str(tweet[i])
                except:
                    tweet[i] = None

        #19            
        if i == 19:
            if tweet[i]:
                try:
                    tweet[i] = str(tweet[i])
                except:
                    tweet[i] = None

        #20            
        if i == 20:
            if tweet[i]:
                try:
                    tweet[i] = str(tweet[i])
                except:
                    tweet[i] = None

        #21
        if i == 21:
            if tweet[i]:
                try:
                    tweet[i] = float(tweet[i])
                except:
                    tweet[i] = None


        #22
        if i == 22:
            if tweet[i]:
                try:
                    tweet[i] = float(tweet[i])
                except:
                    tweet[i] = None

### (opt). Converting the data to Spark Dataframe

In [None]:
#If you ran the upper code, the data can be directly converted to a Spark DF
#Else, you can just use a pre-parsed .csv file available in the next codeblock
tweetDF = sqlContext.createDataFrame(data, filtered_check_list)

### (opt). Writing the parsed list containing all Tweets to a .csv file

In [None]:
#writes the result to csv file
import csv
with open("parsed_tweets.csv", "w") as f:
    writer = csv.writer(f)
    writer.writerow(filtered_check_list)
    for row in data:
        writer.writerow(row)

### 1.6 Instantiating the Spark Dataframe from CSV file

In [5]:
#Upper code was used to produce a .csv file containing all parsed Twitter data
#Was uploaded to Dropbox and made available via a public link, lower function downloads that file

url = "https://www.dropbox.com/s/he5tvdxriqj9s9b/parsed_tweets.csv?dl=1"
location = './'
download_file(url, location)

'./parsed_tweets.csv'

In [82]:
#Creates a csv_tweets list containing all parsed data (see upper codeblocks)
csv_tweets = []

with open("./parsed_tweets.csv") as csvfile:
    readCSV = csv.reader(csvfile, delimiter = ",")
    headers = next(readCSV)
    for row in readCSV:
        csv_tweets += [row] 

#Since Python imports CSV segments as strings, the fixed structure has to be imposed again
#Evaluates 3 times to remove all nested quotation marks ("'text'")
for x in range(3):
    for tweet in csv_tweets:
        for i in range(len(tweet)):

            if len(str(tweet[i])) == 0:
                tweet[i] = None
            try:
                tweet[i] = eval(tweet[i])       
            except:
                pass
            
            try:
                tweet[i] = parser.parse(tweet[i])
            except:
                pass
            
#Imposes and evaluates fixed data types for all columns    
for tweet in csv_tweets:
    for i in range(len(tweet)):
        if tweet[i] is not None:
            if tweet[i] == Ellipsis:
                tweet[i] = None

            if i in [5, 9, 10, 11, 13, 15, 17, 18, 19, 20]:
                try:
                    tweet[i] = str(tweet[i])
                except:
                    pass

            if i == 0:
                try:
                    tweet[i] = [int(tweet[i][0]), int(tweet[i][1]), int(tweet[i][2]), str(tweet[i][3])] 
                except:
                    pass

            if i in [1, 21, 22]:
                try:
                    tweet[i] = float(tweet[i])
                except:
                    pass

            if i in [2, 16]:
                try:
                    tweet[i] = eval(tweet[i].title())
                except:
                    pass
            if i in [4, 14]:

                try:
                    tweet[i] = parser.parse(tweet[i])
                except:
                    pass
            if i == 6:
                try:
                    tweet[i] = int(tweet[i])
                except:
                    pass
            if i in [3, 7, 8, 12]:

                try:
                    tweet[i] = int(tweet[i])
                except:
                    pass
            if i == 5:

                try:
                    tweet[i] = str(tweet[i])
                except:
                    pass
                

#Creates a Spark Dataframe from the list parsed from the premade CSV file
tweetDF_csv = sqlContext.createDataFrame(csv_tweets, headers)

# 2. Finding Clusters Using Coordinates

In [175]:
## create empty list for saving all the coordinates of tweets
coordinates= []

## open .csv file as file, only extract coordiantes
with open('./parsed_tweets.csv',newline='') as file:
    csv_headers = next(file)
    for i in csv.reader(file):
        coordinates+= [[i[-2],i[-1]]]
        
coordinates = coordinates[1:]


## create RDD object containing all the coordinates
data = sc.parallelize(coordinates)

### filter out coordinates which are empty and also convert string coordinates to float value
coordinates_process = data.filter(lambda x : (len(x[0]) != 0) and (len(x[1])!=0)).map(lambda x : (float(x[0]),float(x[1]))).map(lambda x : (Vectors.dense(x),))

## take first to see the result
print(coordinates_process.take(1))


## convert RDD to spark dataframe
df = spark.createDataFrame(coordinates_process, ["features"])
## give an impression what does it look like
df.show()

## create empty cost list
cost_all = []

##specify the K value range which will be used to find optimal k cluster
k = range(2,100,2)

## try every k value , and calculate cost for corresponding cost ,then save costs into empty cost list
for i in k:
    kmeans = KMeans(k=i, seed=1)
    model = kmeans.fit(df)
    cost = model.computeCost(df)###type(cost) is float
    cost_all = cost_all+[cost]


In [174]:
## every k and its cost value 
plt.plot(k,cost_all)
plt.ylabel('cost')
plt.xlabel('K-cluster')
plt.show()

In [None]:
## try to find the k value where the cost starts stable
index = []
for i in range(len(cost_all)-1):
    if (cost_all[i]-cost_all[i+1]) < 0.1:
        index +=[i]
index_final = index[0]

## get the optimal k value
k_final = k[index_final]
print('perfect k is :',k_final)


## use the optimal k value and run model again 
kmeans = KMeans(k=k_final, seed=1)
model = kmeans.fit(df)
##retrieve the cluster centers
centers = model.clusterCenters()
## get cost value
cost_final = model.computeCost(df)###type(cost) is float
print('cost_final = ',cost_final)
print(centers)


In [None]:
### convert all the features and their labels(which cluster they belong to) to pandas.dataframe
transformed = model.transform(df).select("features", "prediction")
pdf = transformed.toPandas()
### separate coordiantes
coor = [ [i[0],i[1]] for i in pdf['features']]
### extract labels 
label = pdf['prediction']
for i in range(len(coor)):
    coor[i] = coor[i]+[label[i]]

center = pd.DataFrame(centers,columns = ['lat','lon'])
dataframe = pd.DataFrame(coor,columns = ['lat','lon','label'])

In [None]:
## print culstered result 
plt.scatter(dataframe['lat'], dataframe['lon'],c = dataframe['label'],cmap=plt.cm.Paired)
plt.scatter(center['lat'],center['lon'],marker='s')
plt.show()

# 3. Twitter Text Content Analysis

### 3.1 Load the data and pre-process it

In [177]:
pd.read_csv("./parsed_tweets.csv")

Unnamed: 0,sentiment:,probability:,possibly_sensitive:,in_reply_to_user_id_str:,created_at:,in_reply_to_screen_name:,id_str:,in_reply_to_user_id:,id:,text:,...,description:,created_at:.1,screen_name:,geo_enabled:,lang:.1,time_zone:,name:,location:,latitude:,longitude:
0,"[3, -2, 0, 'long']",2.627796e-06,False,,2016-10-09 02:38:13+00:00,,7.849460e+17,,6.681141e+07,Good day on the burn. @ Mount Shavano Trailhea...,...,"""I'm an avid outdoorsman originally from Sprin...",2016-10-09 02:38:13+00:00,"""CragSquatch""",True,"""en""","""Atlantic Time (Canada)""","""Alec Villarreal""","""Durango, CO""",38.595941,-106.199650
1,"[0, 0, 0, 'long']",9.053528e-07,False,,2017-06-21 15:01:11+00:00,,8.775419e+17,,3.332052e+08,Serenity. @ Mt. Shavano And Tabeguache https:/...,...,"""be patient. work hard. stay humble. NWU soccer.""",2017-06-21 15:01:11+00:00,"""rymian""",True,"""en""","""Central Time (US & Canada)""","""ryan anderson""","""""",38.596673,-106.196988
2,"[2, 0, 1, 'long']",9.536176e-08,False,,2017-06-25 03:14:41+00:00,,8.788137e+17,,2.571868e+08,Aspen trees make me happy. @ Mt. Shavano And T...,...,"""An epic failure who has received a love that ...",2017-06-25 03:14:41+00:00,"""revrickydean""",True,"""en""",Central Time (US & Canada),"""Ricky Jones""","""Tulsa, OK""",38.596673,-106.196988
3,"[0, 0, 0, 'long']",1.000000e+00,False,,2017-05-04 03:43:59+00:00,,8.599769e+17,,2.517352e+07,2 mile afternoon hike at high elevation. @ Mt....,...,"""Father, outdoor enthusiast, food and beverage...",2017-05-04 03:43:59+00:00,"""patrickau""",True,"""en""","""Mountain Time (US & Canada)""","""Patrick Payne""","""Salida, Colorado""",38.596673,-106.196988
4,"[0, 0, 0, 'long']",1.609444e-10,False,,2017-05-04 10:59:04+00:00,,8.600864e+17,,2.517352e+07,Hiking the Colorado Trail @ Mt. Shavano And Ta...,...,"""Father, outdoor enthusiast, food and beverage...",2017-05-04 10:59:04+00:00,"""patrickau""",True,"""en""","""Mountain Time (US & Canada)""","""Patrick Payne""","""Salida, Colorado""",38.596673,-106.196988
5,"[0, 0, 0, 'long']",1.052048e-09,False,,2017-05-14 02:56:17+00:00,,8.635888e+17,,2.517352e+07,Sleepy #camping @ Mt. Shavano And Tabeguache h...,...,"""Father, outdoor enthusiast, food and beverage...",2017-05-14 02:56:17+00:00,"""patrickau""",True,"""en""","""Mountain Time (US & Canada)""","""Patrick Payne""","""Salida, Colorado""",38.596673,-106.196988
6,"[0, 0, 0, 'long']",1.041908e-05,False,,2017-05-14 12:06:47+00:00,,8.637273e+17,,2.517352e+07,Just posted a photo @ Mt. Shavano And Tabeguac...,...,"""Father, outdoor enthusiast, food and beverage...",2017-05-14 12:06:47+00:00,"""patrickau""",True,"""en""","""Mountain Time (US & Canada)""","""Patrick Payne""","""Salida, Colorado""",38.596673,-106.196988
7,"[0, 0, 0, 'long']",1.041908e-05,False,,2017-05-14 12:11:14+00:00,,8.637284e+17,,2.517352e+07,Just posted a photo @ Mt. Shavano And Tabeguac...,...,"""Father, outdoor enthusiast, food and beverage...",2017-05-14 12:11:14+00:00,"""patrickau""",True,"""en""","""Mountain Time (US & Canada)""","""Patrick Payne""","""Salida, Colorado""",38.596673,-106.196988
8,"[2, -1, 1, 'short']",1.129235e-09,False,,2016-12-07 01:22:00+00:00,,8.063077e+17,,2.517352e+07,Christmas tree permit! #christmastreehunting @...,...,"""Father, outdoor enthusiast, food and beverage...",2016-12-07 01:22:00+00:00,"""patrickau""",True,"""en""","""Mountain Time (US & Canada)""","""Patrick Payne""","""Salida, Colorado""",38.594561,-106.198577
9,"[2, -1, 1, 'short']",1.000000e+00,False,,2017-05-04 03:42:42+00:00,,8.599766e+17,,2.517352e+07,Afternoon hike with moose viewing! @ Shavano/T...,...,"""Father, outdoor enthusiast, food and beverage...",2017-05-04 03:42:42+00:00,"""patrickau""",True,"""en""","""Mountain Time (US & Canada)""","""Patrick Payne""","""Salida, Colorado""",38.594561,-106.198577


In [178]:
#3 Load the csv data as a panda dataframe
tweets_df = pd.read_csv("./parsed_tweets.csv")

#4 Create rdd of the text of every tweet
tweetrdd =  sc.parallelize(tweets_df['text:'], 10)\
              .filter(lambda x: type(x) == str)\
              .map(lambda tweet: re.sub(('\n'), '', tweet))\
              .map(lambda tweet: tweet.split(' @ ', )[0])\
              .map(lambda tweetwords: re.sub('https?', '', tweetwords))\
            .map(lambda tweetwords: re.sub(' co', '', tweetwords))\

#5 Set the n_oftweets to analyze and create a subset rdd out of the original tweet texts
n_oftweets = 900
tweetrddsubset = sc.parallelize(tweetrdd.take(n_oftweets))

#6 Partition the tweetrdd into a list of words per individual tweet and filter out the special characters&stopwords
stopwords = ["ourselves", "hers", "between", "yourself", "but", "again", "there", "about", "once", "during", "out", "very", "having", "with", "they", "own", "an", "be", "some", "for", "do", "its", "yours", "such", "into", "of", "most", "itself", "other", "off", "is", "s", "am", "or", "who", "as", "from", "him", "each", "the", "themselves", "until", "below", "are", "we", "these", "your", "his", "through", "don", "nor", "me", "were", "her", "more", "himself", "this", "down", "should", "our", "their", "while", "above", "both", "up", "to", "ours", "had", "she", "all", "no", "when", "at", "any", "before", "them", "same", "and", "been", "have", "in", "will", "on", "does", "yourselves", "then", "that", "because", "what", "over", "why", "so", "can", "did", "not", "now", "under", "he", "you", "herself", "has", "just", "where", "too", "only", "myself", "which", "those", "i", "after", "few", "whom", "t", "being", "if", "theirs", "my", "against", "a", "by", "doing", "it", "how", "further", "was", "here", "than"]
wordrdd_tweet = tweetrddsubset.map(lambda line: [re.findall('\w*', word.lower())[0] for word in line.split(' ')] )\
                        .map(lambda tweet: list(filter(lambda tweet: tweet is not '', tweet)))\
                        .map(lambda wordlist: [word for word in wordlist if word not in stopwords])
            
#7 Costruct the rdd of all words of all tweets together
wordrdd = tweetrddsubset.flatMap(lambda line: line.split())\
                  .map(lambda word: word.lower())\
                  .map(lambda word: re.findall('\w*' , word)[0])\
                  .filter(lambda word: word is not '')

### 3.2 Compute the dissimilarity between every [*tweet with every tweet*]

In [180]:
#1 count the number of words as a summary statistic
total_wordcount = wordrdd.count()

#2 Count the number of times the word repeats within the same tweet for all tweets
countedwordpairrdd_tweet = wordrdd_tweet.map(lambda tweet: sorted(tweet))\
                                    .map(lambda tweetsorted: [(word,len(list(n))) for word,n in groupby(tweetsorted)])

#3 Calculate the term frequency (TF) per tweet as RDD 
tf_tweet = countedwordpairrdd_tweet.map(lambda wordpair: [(word, n/len(wordpair)) for word, n in wordpair])

#4 Create a unique word rdd and count the number of words as a summary statistic
wordrdd_unique = wordrdd.map(lambda word: (word,1))\
                        .reduceByKey(lambda key, next_key : key + next_key)\
                        .map(lambda wordpair: wordpair[0])

n_uniquewords = wordrdd_unique.count()

#5 Convert&collect uniquewordrdd to K-V pair list with zeros to append to every tweet to equalize tweetsize later on
uniquewordvector_withzeros = wordrdd_unique.map(lambda word: (word, 0)).collect()

#6 Remove duplicate words per tweet and count total number of tweets each word occurs in
idf  = countedwordpairrdd_tweet.map(lambda tweet: [(word, 1) for word,n in tweet])\
                               .flatMap(lambda tweet: tweet)\
                               .reduceByKey(lambda value_so_far, next_value : value_so_far + next_value)\
                               .map(lambda wordpair: (wordpair[0], log10(n_oftweets/wordpair[1])))              

#7 Collect the IDF of all unique words as a dictionary
idfdict = idf.collectAsMap()

#8 Create TFIDF by multiplying TF of every word per tweet with the lookedup IDF value from the IDF dictionary above
tfidf_word_tweet = tf_tweet.map(lambda wordpair: [ (word, tf*idfdict[word]) for word,tf in wordpair ] )

#9 Equalize every tweet's length by inserting missing unique wordpairs created in step #12, zip result with index
total_indexedtfidfmatrix = tfidf_word_tweet.map(lambda tweetwordpairs: tweetwordpairs + list(filter(lambda word: word[0] not in [word for word, tfidf in tweetwordpairs], uniquewordvector_withzeros)))\
                                           .map(lambda tweet: sorted(tweet, key=lambda x: x[0]))\
                                           .map(lambda wordvector: [tfidf for word, tfidf in wordvector])\
                                           .map(lambda tfidfvector: Vectors.dense([tfidf for tfidf in tfidfvector]))\
                                           .zipWithIndex()      

# 10 Create a matrix that uses the cartesian product between every tweet to construct tweet comparison containers.
# Then compute the cosine dissimilarity between every such container to get a measure of distance between each tweet 
dissimilarity_rdd = total_indexedtfidfmatrix.cartesian(total_indexedtfidfmatrix)\
                                            .sortBy(lambda pairof_vectorindexpairs: (pairof_vectorindexpairs[0][1],  pairof_vectorindexpairs[1][1]) )\
                                            .map(lambda pairof_vectorindexpairs: (1-(pairof_vectorindexpairs[0][0].dot(pairof_vectorindexpairs[1][0])/(pairof_vectorindexpairs[0][0].norm(2)*pairof_vectorindexpairs[1][0].norm(2))),  (pairof_vectorindexpairs[0][1], pairof_vectorindexpairs[1][1]) ))\
                                            .map(lambda tweetsimilaritypair: (1.0, tweetsimilaritypair[1]) if isnan(tweetsimilaritypair[0]) else tweetsimilaritypair)

# 11 Compartamentalize every entry of the dissimilarityrdd per tweet to construct indexed matrix that compares each                   
# tweet with every other tweet (watch out this results in a very LARGE MATRIX!)
dissimilarity_matrix = dissimilarity_rdd.groupBy(lambda x: x[1][0])\
                                        .map(lambda tweetvsrest: list(tweetvsrest[1]))
    
    


### 3.3  Compute the dissimilarity vectors of  [*co-ocurring words*]

In [None]:
#1 Construct a term frequency matrix for every word instead of every tweet
total_indexedtfmatrix = tf_tweet.map(lambda tweetwordpairs: tweetwordpairs + list(filter(lambda word: word[0] not in [word for word, tfidf in tweetwordpairs], uniquewordvector_withzeros)))\
                                .map(lambda tweet: sorted(tweet, key=lambda x: x[0]))\
                                .flatMap(lambda x: x)\
                                .map(lambda x: (x[0],[x[1]]))\
                                .reduceByKey(lambda x, y: x+y)\
                                .map(lambda wordvector: wordvector[1])\
                                .map(lambda tf_wordvector: Vectors.dense([tf for tf in tf_wordvector]))\
                                .zipWithIndex()  

#2 Calculate the dissimilarity between every word                
dissimilarity_rdd_word = total_indexedtfmatrix.cartesian(total_indexedtfmatrix)\
                                                .sortBy(lambda pairof_vectorindexpairs: (pairof_vectorindexpairs[0][1],  pairof_vectorindexpairs[1][1]) )\
                                                .map(lambda pairof_vectorindexpairs: (1-(pairof_vectorindexpairs[0][0].dot(pairof_vectorindexpairs[1][0])/(pairof_vectorindexpairs[0][0].norm(2)*pairof_vectorindexpairs[1][0].norm(2))),  (pairof_vectorindexpairs[0][1], pairof_vectorindexpairs[1][1]) ))\
                                                .map(lambda wordsimilaritypair: (1.0, wordsimilaritypair[1]) if isnan(wordsimilaritypair[0]) else wordsimilaritypair)

# Make a matrix out of every word vs every other word            
dissimilarity_wordmatrix = dissimilarity_rdd_word.groupBy(lambda x: x[1][0])\
                                                 .map(lambda wordvsrest: list(wordvsrest[1]))            

### (opt). Print summary statistics of the data

In [None]:
# Print summary of totals with line below
print('total words:', total_wordcount, ', unique words:', n_uniquewords, ', number of tweets:', n_oftweets)

### 3.4 Cluster and plot the [*co-ocurring words*] dissimilarity cluster costs with K-Means using different parameters

In [None]:
# 1. Load the word dissimilarity data into a dataframe
Data = dissimilarity_wordmatrix.map(lambda similarityvector: array([similarityentry[0] for similarityentry in similarityvector]))\
                           .map(lambda x : (Vectors.dense(x),))
df = spark.createDataFrame(Data, ["features"])

# 2. Loop through all models 
k = range(2,12,2)
cost_all = []

for i in k:
    bkm = BisectingKMeans(k=i).setSeed(1)
    model = bkm.fit(df)
    cost = model.computeCost(df)###type(cost) is float
    cost_all = cost_all+[cost]

# 3. Plot costs vs clusters 
plt.plot(k,cost_all)
plt.ylabel('cost')
plt.xlabel('K-cluster')
plt.show()

### 3.5 Choose optimal model for [*co-ocurring words*] clusters and print the words within these clusters

In [None]:
k_final = 8

# Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(k_final).setSeed(1)
model = bkm.fit(df)
prediction = model.transform(df).select('prediction').collect()
labels = [p.prediction for p in prediction ]

# Evaluate clustering.
cost = model.computeCost(df)
print("Within Set Sum of Squared Errors = " + str(cost))   

### 3.6 Print the top results of the [*co-ocurring words*] per cluster

In [None]:
orderedwordrdd_unique = sorted(wordrdd_unique.collect())

#0. Set a threshold for the minimum number of occurences 
threshold = 4

#1. Calculate the number of word occurs per tweet
wordcounts_tweet = countedwordpairrdd_tweet.map(lambda wordpair: [(word, n) for word, n in wordpair])

#2. Create a dictionary to look up the total number of document occurences each word has
noftweet_worddict = wordcounts_tweet.map(lambda tweetwordpairs: tweetwordpairs + list(filter(lambda word: word[0] not in [word for word, tfidf in tweetwordpairs], uniquewordvector_withzeros)))\
                                    .map(lambda tweet: sorted(tweet, key=lambda x: x[0]))\
                                    .flatMap(lambda x: x)\
                                    .map(lambda x: (x[0],[x[1]]))\
                                    .reduceByKey(lambda x, y: x+y)\
                                    .map(lambda pair: (pair[0], reduce(lambda x, y: x+y, pair[1]) ))\
                                    .collectAsMap()

#3. Divide all the words in each word cluster with their number of total document occurences 
wordclusters = sc.parallelize(labels)\
                 .zipWithIndex()\
                 .map(lambda pair: (pair[0], ((orderedwordrdd_unique[pair[1]]), noftweet_worddict[orderedwordrdd_unique[pair[1]]])))\
                 .map(lambda x: (x[0], [x[1]]) )\
                 .reduceByKey(lambda x,y: x+y)\
                 .map(lambda cluster: (cluster[0], sorted(cluster[1], key=lambda list: -list[1] )))

#4. Pick out the top words per cluster with the preset threshold in 0
topwordclusters = wordclusters.map(lambda cluster: (cluster[0], [x for x,y in cluster[1] if y >= threshold])).collect()


#5. Print out the results
print('Most representative words for each cluster:')
number = 0

for cluster in topwordclusters:
    print (number, '\n', ', '.join(cluster[1]))
    number += 1

### 3.7 Plot histograms of the clusters of [*co-ocurring words*]

In [None]:
#1. Plot the histogram of the words vs number of clusters
plt.hist(labels, facecolor='purple')
plt.xlabel('Clusters')
plt.ylabel('Number of word')
plt.grid(True)

plt.show()

### 3.8 Cluster and plot the [*tweet dissimilarity*] vector matrix with K-Means with different parameters

In [None]:
# 1. Load the tweet dissimilarity data into a dataframe
Data = dissimilarity_matrix.map(lambda similarityvector: array([similarityentry[0] for similarityentry in similarityvector]))\
                           .map(lambda x : (Vectors.dense(x),))
df = spark.createDataFrame(Data, ["features"])

# 2. Loop through all models 
k = range(2,10,2)
cost_all = []

for i in k:
    bkm = BisectingKMeans(k=i).setSeed(1)
    model = bkm.fit(df)
    cost = model.computeCost(df)
    cost_all = cost_all+[cost]

# 3. Plot costs vs clusters 
plt.plot(k,cost_all)
plt.ylabel('cost')
plt.xlabel('K-cluster')
plt.show()

### 3.9 Choose the optimal model for the [*tweet*] clustering

In [None]:
k_final = 6

# 1. Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(k_final).setSeed(1)
final_model = bkm.fit(df)
prediction = final_model.transform(df).select('prediction').collect()
labels = [p.prediction for p in prediction ]

# 2. Evaluate clustering.
cost = final_model.computeCost(df)
print("Within Set Sum of Squared Errors = " + str(cost))   

### 3.10 Couple all [*tweets*] with cluster and analyze the TFIDF for every word that falls in a tweet cluster. Then, retrieve the top 10 distinctive words for each cluster.

In [None]:
# 0 Create tweetclusterindices
tweetclusterindices = sc.parallelize(labels)\
                        .zipWithIndex()\
                        .map(lambda pair: (pair[1], pair[0]))\
                        .collect()


# 1 Merge all tweets in cluster
words_clusterrdd = wordrdd_tweet.map(lambda cluster: ' '.join(cluster))\
                                 .zipWithIndex()\
                                 .map(lambda pair: (pair[0], tweetclusterindices[pair[1]]))\
                                 .map(lambda pair: (pair[0], pair[1][1]))\
                                 .groupBy(lambda pair: pair[1])\
                                 .map(lambda pair: list(pair[1]))\
                                 .map(lambda cluster: [pair[0] for pair in cluster])\
                                 .map(lambda cluster: [' '.join(cluster)])\
                                 .map(lambda cluster: cluster[0].split())
# 2 Count word pair                                  
countedwordpairrdd_cluster = words_clusterrdd.map(lambda tweet: sorted(tweet))\
                                    .map(lambda tweetsorted: [(word,len(list(n))) for word,n in groupby(tweetsorted)])

#3 Calculate the term frequency (TF) per cluster
tf_cluster = countedwordpairrdd_cluster.map(lambda wordpair: [(word, n/len(wordpair)) for word, n in wordpair])

#4 Remove duplicate words per cluster and count total number of clusters each word occurs in
idf_cluster  = countedwordpairrdd_cluster.map(lambda tweet: [(word, 1) for word,n in tweet])\
                               .flatMap(lambda tweet: tweet)\
                               .reduceByKey(lambda value_so_far, next_value : value_so_far + next_value)\
                               .map(lambda wordpair: (wordpair[0], log10(k_final/wordpair[1]))) 

#5 Collect the IDF of all unique words as a dictionary
idfdictcluster = idf.collectAsMap()

#6 Create TFIDF by multiplying TF of every word per cluster with the IDF value from the IDF dictionary above
tfidf_word_cluster = tf_cluster.map(lambda wordpair: [ (word, tf*idfdictcluster[word]) for word,tf in wordpair ] )

#7 Get representatitve words per cluster (tf and tfidf)
sortedtfidf_word_cluster = tfidf_word_cluster.map(lambda cluster: [b for a,b in sorted((-tup[1], tup) for tup in cluster)] )
sortedtf_word_cluster = tf_cluster.map(lambda cluster: [b for a,b in sorted((-tup[1], tup) for tup in cluster)] )

#8 Get representatitve words per cluster
tfidf_representative_words_cluster = [sortedtfidf_word_cluster.collect()[index][:10] for index in range(k_final)]
toptfidf_representative_words_cluster = list(map(lambda x: [y[0] for y  in x], tfidf_representative_words_cluster))

tf_representative_words_cluster = [sortedtf_word_cluster.collect()[index][:10] for index in range(k_final)]
toptf_representative_words_cluster = list(map(lambda x: [y[0] for y  in x], tf_representative_words_cluster))

print('Most representative words for each cluster with TFIDF:')
number = 0
for cluster in toptfidf_representative_words_cluster:
    print (number, cluster)
    number += 1

print('\n')
    
print('Most representative words for each cluster with TF:')
number2 = 0
for cluster in toptf_representative_words_cluster:
    print (number2, cluster)
    number2 += 1

### 3.11 Create a Histogram of all the tweet clusters

In [None]:
#1. Load the cluster information
tweets_cluster_plot =  list(map(lambda pair: pair[1], tweetclusterindices))

#2. Plot the histogram of the tweets vs the clusters
plt.hist(tweets_cluster_plot, facecolor='purple')
plt.xlabel('Clusters')
plt.ylabel('Number tweets')
plt.grid(True)

plt.show()