## CSCE 676 :: Data Mining and Analysis :: Texas A&M University :: Fall 2021

***Apache PySpark***

**There are three objectives:** 

* Become familiar with Apache Spark
* Get hands-on experience using Spark built-in functions; namely, LDA and PageRank
* Understand and implement Trawling algorithm to find user communities

Write your collaboration/references here
- https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html
- https://stackoverflow.com/questions/59129462/pyspark-getting-the-most-common-value-of-a-column
- https://github.com/alejandronotario/LDA-Topic-Modeling/blob/master/pySpark/LDA_pySpark.ipynb
- https://stackoverflow.com/questions/56115833/how-to-get-the-topic-using-pyspark-lda
- Dr. Cav's starter code for LDA on Campuswire
- https://spark.apache.org/docs/2.3.0/ml-frequent-pattern-mining.html
- https://stackoverflow.com/questions/46956026/how-to-convert-column-with-string-type-to-int-form-in-pyspark-data-frame
- https://stackoverflow.com/questions/31058504/spark-1-4-increase-maxresultsize-memory
- https://stackoverflow.com/questions/55234587/merge-two-column-in-spark-dataframe-to-form-single-column
- https://graphframes.github.io/graphframes/docs/_site/quick-start.html

## Introduction to the Dataset
We will use a dataset of tweets concerning members of the US congress. The data spans almost a year (from October 3rd, 2018 to September 25th, 2019) covering 576 of the members. Any tweet or retweet posted by the 576 members or directed to them by other Twitter users were collected.

Originally there were more than 200 million tweets collected but we have sampled 400,000 tweets for this homework.

Below is a summary of all datasets used for this homework:

| Dataset                        | Description |
| :---                                                       | :---
| Congress members               | 576 twitter ids and screen names |
| Sample tweets                  |400k sample tweets|
| User hashtags                  | all pairs of <user, hashtag>|
| User mentions                  | all pairs of <src_user_id, src_dest_id, frequency> |



Following are the steps that you need to execute to install spark. Remember you need to execute them everytime after your runtime is disconnected

In [None]:
!pip install pyspark
!pip install graphframes
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
!curl -L -o "/usr/local/lib/python3.7/dist-packages/pyspark/jars/graphframes-0.8.1-spark3.0-s_2.12.jar" https://repos.spark-packages.org/graphframes/graphframes/0.8.1-spark3.0-s_2.12/graphframes-0.8.1-spark3.0-s_2.12.jar
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 66 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 57.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=44a67a66e957fa09f715f28c73c5cfe12c5774bb9a9d522a99733a7225849307
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2
Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Collecting nose
  Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[K     |████████████████████████████████| 154 kB 6.3 MB/s 
[?25hInstallin

The cell below will download the dataset in us-congress-tweets directory

In [None]:
!mkdir us-congress-tweets
!wget https://us-congress-tweets.s3.amazonaws.com/congress_members.csv -O us-congress-tweets/congress_members.csv
!wget https://us-congress-tweets.s3.amazonaws.com/congress-sample-400k.json -O us-congress-tweets/congress-sample-400k.json
!wget https://us-congress-tweets.s3.amazonaws.com/user_hashtags-1m.csv -O us-congress-tweets/user_hashtags-1m.csv
!wget https://us-congress-tweets.s3.amazonaws.com/user_mentions.csv -O us-congress-tweets/user_mentions.csv

--2021-10-16 17:25:09--  https://us-congress-tweets.s3.amazonaws.com/congress_members.csv
Resolving us-congress-tweets.s3.amazonaws.com (us-congress-tweets.s3.amazonaws.com)... 52.217.41.172
Connecting to us-congress-tweets.s3.amazonaws.com (us-congress-tweets.s3.amazonaws.com)|52.217.41.172|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13835 (14K) [text/csv]
Saving to: ‘us-congress-tweets/congress_members.csv’


2021-10-16 17:25:10 (475 KB/s) - ‘us-congress-tweets/congress_members.csv’ saved [13835/13835]

--2021-10-16 17:25:10--  https://us-congress-tweets.s3.amazonaws.com/congress-sample-400k.json
Resolving us-congress-tweets.s3.amazonaws.com (us-congress-tweets.s3.amazonaws.com)... 52.217.41.172
Connecting to us-congress-tweets.s3.amazonaws.com (us-congress-tweets.s3.amazonaws.com)|52.217.41.172|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 202495914 (193M) [application/json]
Saving to: ‘us-congress-tweets/congress-sample-400k

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf

In [None]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050").set("spark.driver.memory", "3g").set("spark.executor.memory", "3g")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()


Now we need to setup ngrok so that we can view spark UI even if its hosted on Google colab's runtime. Please go to https://dashboard.ngrok.com/login, create an account and get the authtoken and replace 'XXXXX' with the auth token for your user. You can also comment that line out to create a temporary tunnel, but it expires soon and won't you use spark monitor further.

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

--2021-10-16 17:25:20--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.161.241.46, 54.237.133.81, 52.202.168.65, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.161.241.46|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2021-10-16 17:25:20 (33.4 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [None]:
get_ipython().system_raw('./ngrok http 4050 &')
!./ngrok authtoken 1z0V1FwU9b723YEcUjxOg3Y1FHm_7fFGsSKJP3ER4wug6Etzd
!curl -s http://localhost:4040/api/tunnels | python3 -c "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml
https://fae9-35-222-84-146.ngrok.io


In [None]:
# First let's read Twitter ids and screen names of the 576 US congress members

congress_members = spark.read.csv("us-congress-tweets/congress_members.csv", header=True)
congress_members.show()
print("Number of congress members tracked:", congress_members.count())

+------------------+---------------+
|            userid|    screen_name|
+------------------+---------------+
|         776664410|  RepCartwright|
|         240363117|   RepTomMarino|
|837722935095930883| RepScottTaylor|
|        1069124515|     RepLaMalfa|
|818460870573441028|  RepTomGarrett|
|         163570705|     repcleaver|
|          19739126|      GOPLeader|
|          33563161| RepJoseSerrano|
|        2861616083|USRepGaryPalmer|
|        1074518754| SenatorBaldwin|
|         305620929|  Call_Me_Dutch|
|         381152398| RepTerriSewell|
|         834069080| RepDavidRouzer|
|         249787913|  SenatorCarper|
|         188019606|        Clyburn|
|         217543151|SenatorTimScott|
|          39249305| USRepMikeDoyle|
|          33537967|   amyklobuchar|
|         249410485|  SanfordBishop|
|          23124635|    TomColeOK04|
+------------------+---------------+
only showing top 20 rows

Number of congress members tracked: 576


We can use `spark.read.json(...)` without schema to load the tweets into a dataframe but this will be slow for two reasons:
* First, it will make one pass over the data to build a schema of the content, then a second pass to read the content and parse it to the dataframe. 
* It will read all the content of the Tweet JSON objects but we only need few fields for a given task.

Thus we define our own schema something like the following:

In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as F

user_schema = StructType([
    StructField('created_at',StringType(),True),
    StructField('followers_count',LongType(),True),
    StructField('id',LongType(),True),
    StructField('name',StringType(),True),
    StructField('screen_name',StringType(),True)
])

hashtag_schema = ArrayType(StructType([StructField('text',StringType(),True)]))
user_mentions_schema = ArrayType(StructType([StructField('id',LongType(),True),
                                             StructField('screen_name',StringType(),True)]))
entities_schema = StructType([
    StructField('hashtags',hashtag_schema,True),
    StructField('user_mentions',user_mentions_schema,True)
    ])

retweeted_status_schema =StructType([        
        StructField("id", LongType(), True),
        StructField("in_reply_to_user_id", LongType(), True),
        StructField("in_reply_to_status_id", LongType(), True),
        StructField("created_at", TimestampType(), True),
        StructField("user", user_schema)
    ])

tweet_schema =StructType([
        StructField("text", StringType(), True),
        StructField("id", LongType(), True),
        StructField("in_reply_to_user_id", LongType(), True),
        StructField("in_reply_to_status_id", LongType(), True),
        StructField("created_at", StringType(), True),
        StructField("user", user_schema),
        StructField("entities", entities_schema),
        StructField("retweeted_status", retweeted_status_schema)
    ])

Now we are ready to read the tweets with `spark.read.json` passing our own schema as follows:

In [None]:
tweets = spark.read\
                   .json('us-congress-tweets/congress-sample-400k.json', tweet_schema)\
                   .withColumn('user_id',F.col('user.id'))
tweets.printSchema()

root
 |-- text: string (nullable = true)
 |-- id: long (nullable = true)
 |-- in_reply_to_user_id: long (nullable = true)
 |-- in_reply_to_status_id: long (nullable = true)
 |-- created_at: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- followers_count: long (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- screen_name: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- user_mentions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- screen_name: string (nullable = true)
 |-- retweeted_status: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- in_reply_to_user_id: long (nullable 

## (10 points) Part 1: Exploratory Data Analysis (Small Scale)

How many unique users and original tweets (i.e. not retweets) are there? 

In [None]:
tweets.head(10)

[Row(text='RT @maddow: "I hear a lot about about lack of corroboration. You know, you don\'t get corroboration if you don\'t talk to corroborating witne…', id=1047999818038050816, in_reply_to_user_id=None, in_reply_to_status_id=None, created_at='2018-10-05T00:00:02.000Z', user=Row(created_at='2017-03-22T03:11:54.000Z', followers_count=527, id=844386122822119424, name='Quazy', screen_name='quazy101'), entities=Row(hashtags=[], user_mentions=[Row(id=16129920, screen_name='maddow')]), retweeted_status=Row(id=1047941326975766530, in_reply_to_user_id=None, in_reply_to_status_id=None, created_at=datetime.datetime(2018, 10, 4, 20, 7, 36), user=Row(created_at='2008-09-04T15:02:12.000Z', followers_count=9503143, id=16129920, name='Rachel Maddow MSNBC', screen_name='maddow')), user_id=844386122822119424),
 Row(text="@ChuckGrassley @SenateMajLdr It's time for you to leave Congress &amp; let the country heal from all of the damage you'… https://t.co/ncaQg3nisb", id=1047999818579292160, in_reply_to

In [None]:
#ANSWER
# your code here for unique users
"""
Steps: I select the user id column, drop all duplicates, and then get a total count
"""
unique_users = tweets.select('user_id').distinct().count()
print("Unique Users Count = ", unique_users)

Unique Users Count =  198098


In [None]:
#ANSWER
# your code here for original tweets
"""
Steps: same process as above but with Tweet IDs. Here I am also dropping all retweeted status = NaN/Null
"""
unique_tweets = tweets.select('id').where(tweets.retweeted_status.isNull()).count()
print("Original Tweets Count = ", unique_tweets)

Original Tweets Count =  210753


Who are the ten most mentioned users in the sample?

In [None]:
#ANSWER
# code here
"""
Steps:
1) use explode function to 'singularize' the multi row columns in screen name
2) get count of each screen name and store it in count
3) order the dataframe by count to view the top 10
"""
from pyspark.sql.functions import desc, explode

exploded_mentions = tweets.select(explode(tweets.entities.user_mentions.screen_name).alias('screen_names'))
most_mentioned_users = exploded_mentions.groupBy('screen_names').count()
most_mentioned_users.orderBy(desc('count')).show(10)


+---------------+-----+
|   screen_names|count|
+---------------+-----+
|realDonaldTrump|24522|
|   senatemajldr|15021|
|LindseyGrahamSC|14412|
|     SenSchumer|13738|
|  RepAdamSchiff|13288|
|  SpeakerPelosi|12478|
|     marcorubio| 9957|
|     Jim_Jordan| 9453|
|     SenSanders| 6861|
|    RepSwalwell| 6224|
+---------------+-----+
only showing top 10 rows



What are the top hashtags used?

In [None]:
#ANSWER
# code here
"""
Steps: same process as above but with hashtags
"""
exploded_hashtags = tweets.select(explode(tweets.entities.hashtags).alias('hashtags'))
most_mentioned_hashtags = exploded_hashtags.groupBy('hashtags').count()
most_mentioned_hashtags.orderBy(desc('count')).show(10)

+---------------+-----+
|       hashtags|count|
+---------------+-----+
|    {Venezuela}| 1224|
|  {MoscowMitch}| 1123|
|{TrumpShutdown}|  705|
|         {MAGA}|  535|
|{MuellerReport}|  396|
|  {NancyPelosi}|  349|
| {MaduroRegime}|  347|
|        {Trump}|  338|
| {BuildTheWall}|  333|
| {GreenNewDeal}|  297|
+---------------+-----+
only showing top 10 rows



## (25 points) Part 2: Textual Analysis (LDA)
Using the LDA algorithm provided by the Spark Machine Learning (ML) library, find out the ten most important topics.

You may want to work on a small sample first but report your results on the whole dataset.

Hint:

 1) For better results aggregate all tweets for a user into a single document

2) For debugging sample 10% of the tweets, but at the end report your results on entire dataset

In [None]:
from pyspark.sql.functions import *
from pyspark.ml.feature import * 
from pyspark.ml.clustering import LDA, LDAModel
from pyspark.sql import SQLContext

In [None]:
# your code here
# HINT - Aggregate tweets together based on user IDs and create a new column. On that new column, use preprocessing/cleaning techniuqes (stop word, removal, lower case, tf-idf, etc.), and use LDA.

"""
1) Originally, for debugging purposes, only 10% of tweets are sampled. Later 100% was used.
2) First, a new dataframe 'user_document' is created with id and their respective aggregated texts
3) remove punctuation function is applied to remove redundant characters
4) words are then tokenized into individual objects in array
5) StopWordsRemover function is used to further filter the document
"""

# Function to remove punctuations
def removePunctuation(column):
    return trim(lower(regexp_replace(column, '[^\s@a-zA-Z0-9]', ''))).alias("document")

# Sample 100% of the tweets
sampled_tweets = tweets   # .sample(False, 0.1, 42)

# Concat all tweets from a user and create tweet documents
user_document = sampled_tweets.groupby("user.id").agg(F.concat_ws(" ", F.collect_list("text")).alias("document"))

# Remove punctuation marks from the tweets
user_document = user_document.select("id", removePunctuation(F.col("document")))

# Tokenize to obtain words from tweets
tokenizer = Tokenizer(inputCol="document", outputCol="words")
wordsDataFrame = tokenizer.transform(user_document)

# Remove stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
wordsDataFrame = remover.transform(wordsDataFrame)

In [None]:
# TODO - build CountVectorizer, TF-IDF vectors, run LDA
wordsDataFrame.show()
user_document.take(5)

+--------+--------------------+--------------------+--------------------+
|      id|            document|               words|            filtered|
+--------+--------------------+--------------------+--------------------+
|  820694|@jennycohn1 @ronw...|[@jennycohn1, @ro...|[@jennycohn1, @ro...|
| 5385802|rt @chrismurphyct...|[rt, @chrismurphy...|[rt, @chrismurphy...|
|12953952|rt @magagwen @rep...|[rt, @magagwen, @...|[rt, @magagwen, @...|
|13290282|you folks have pr...|[you, folks, have...|[folks, proven, l...|
|13492362|rt @comradezaco r...|[rt, @comradezaco...|[rt, @comradezaco...|
|14060717|next week 2020 de...|[next, week, 2020...|[next, week, 2020...|
|14555541|rt @kim @repadams...|[rt, @kim, @repad...|[rt, @kim, @repad...|
|14883391|rt @4evernevertru...|[rt, @4evernevert...|[rt, @4evernevert...|
|14884712|@reptedlieu keeps...|[@reptedlieu, kee...|[@reptedlieu, kee...|
|15197290|rt @kylegriffin1 ...|[rt, @kylegriffin...|[rt, @kylegriffin...|
|15261298|rt @billclarkphot...|[rt, @b

[Row(id=820694, document='@jennycohn1 @ronwyden thiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiis rt @blogdiva @speakerpelosi democrats against\nimpeachmenthearingsnow \n\n1 @repcummings \n2 @adamschiff \n3 @reprichardneal\n4 @speakerpel rt @hansilowang just in chair of house appropriations subcommittee that funds 2020census @repjoseserrano dny says hes been diagnos'),
 Row(id=5385802, document='rt @chrismurphyct november is coming httpstcoeroueiu67j rt @epluribusunumep @greenjeanasset @firni @senatemajldr @gop @potus definitely not an anaconda hes a   shitviper rt @smunk8munk @bettynjackson @firni @chrismurphyct @amandablount2 installed by putin to cause helter skelter  working out perfectly lo'),
 Row(id=12953952, document='rt @magagwen @repmarkmeadows there is information coming that will curl your hair  adam schiff has seen documents that he knows wil'),
 Row(id=13290282, document='you folks have proven yourselves to be liars cheaters amp thieves if youd learn how to use your brains you could 

In [None]:
"""
Steps: use count vectorizer to get token count matrix as 'raw_features'
"""
cv = CountVectorizer(inputCol='filtered', outputCol='raw_features', vocabSize=20000, minDF=10.0)
cvmodel = cv.fit(wordsDataFrame)
result_cv = cvmodel.transform(wordsDataFrame)

In [None]:
"""
Steps use idf to get word importance from raw features into new 'features' column
"""
idf = IDF(inputCol='raw_features', outputCol='features')
idfmodel = idf.fit(result_cv)
result_tfidf = idfmodel.transform(result_cv)
result_tfidf.show()

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      id|            document|               words|            filtered|        raw_features|            features|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  820694|@jennycohn1 @ronw...|[@jennycohn1, @ro...|[@jennycohn1, @ro...|(20000,[0,1,10,18...|(20000,[0,1,10,18...|
| 5385802|rt @chrismurphyct...|[rt, @chrismurphy...|[rt, @chrismurphy...|(20000,[0,1,5,40,...|(20000,[0,1,5,40,...|
|12953952|rt @magagwen @rep...|[rt, @magagwen, @...|[rt, @magagwen, @...|(20000,[0,1,43,37...|(20000,[0,1,43,37...|
|13290282|you folks have pr...|[you, folks, have...|[folks, proven, l...|(20000,[4,205,913...|(20000,[4,205,913...|
|13492362|rt @comradezaco r...|[rt, @comradezaco...|[rt, @comradezaco...|(20000,[0,32,568,...|(20000,[0,32,568,...|
|14060717|next week 2020 de...|[next, week, 2020...|[next, week, 2020...

In [None]:
df_model = result_tfidf.select('id', 'filtered', 'features')
df_model.show()

+--------+--------------------+--------------------+
|      id|            filtered|            features|
+--------+--------------------+--------------------+
|  820694|[@jennycohn1, @ro...|(20000,[0,1,10,18...|
| 5385802|[rt, @chrismurphy...|(20000,[0,1,5,40,...|
|12953952|[rt, @magagwen, @...|(20000,[0,1,43,37...|
|13290282|[folks, proven, l...|(20000,[4,205,913...|
|13492362|[rt, @comradezaco...|(20000,[0,32,568,...|
|14060717|[next, week, 2020...|(20000,[92,119,18...|
|14555541|[rt, @kim, @repad...|(20000,[0,1,9,842...|
|14883391|[rt, @4evernevert...|(20000,[0,3,57,13...|
|14884712|[@reptedlieu, kee...|(20000,[37,158,50...|
|15197290|[rt, @kylegriffin...|(20000,[0,223,243...|
|15261298|[rt, @billclarkph...|(20000,[0,25,144,...|
|15416371|[rt, @speakerpelo...|(20000,[0,2,10,19...|
|15541045|                  []|(20000,[1],[1.075...|
|15651137|[rt, @nancypelosi...|(20000,[0,4,23,59...|
|15743275|[rt, @mattduss, c...|(20000,[0,20,47,6...|
|16372633|[rt, @chrismurphy...|(20000,[0,7,17,

In [None]:
"""
Steps: LDA is run to get topic distribution weights
"""
num_topics = 40
max_iterations = 50
lda_model = LDA(k=num_topics, maxIter=max_iterations)

In [None]:
model = lda_model.fit(df_model)

In [None]:
model.describeTopics(10).show()

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[250, 384, 457, 3...|[0.01206026249284...|
|    1|[71, 300, 606, 88...|[0.01715147394253...|
|    2|[438, 619, 1231, ...|[0.02916916995825...|
|    3|[633, 1, 727, 715...|[0.01072635970688...|
|    4|[189, 289, 549, 9...|[0.01903251228658...|
|    5|[152, 243, 465, 4...|[0.01626939361307...|
|    6|[379, 387, 313, 1...|[0.01394543821063...|
|    7|[326, 337, 430, 8...|[0.01501716037174...|
|    8|[224, 262, 81, 9,...|[0.01837542240277...|
|    9|[1, 309, 551, 641...|[0.00971620366182...|
|   10|[388, 1, 3, 173, ...|[0.00986935010978...|
|   11|[46, 366, 36, 341...|[0.02257380707938...|
|   12|[546, 637, 554, 1...|[0.01446753421206...|
|   13|[533, 813, 1140, ...|[0.02151886025257...|
|   14|[217, 296, 437, 9...|[0.01531259964344...|
|   15|[495, 282, 1, 759...|[0.01611321619500...|
|   16|[1034, 1, 1030, 1...|[0.01157277522048...|


In [None]:
transformed = model.transform(df_model)
transformed.show()

+--------+--------------------+--------------------+--------------------+
|      id|            filtered|            features|   topicDistribution|
+--------+--------------------+--------------------+--------------------+
|  820694|[@jennycohn1, @ro...|(20000,[0,1,10,18...|[1.72724035919036...|
| 5385802|[rt, @chrismurphy...|(20000,[0,1,5,40,...|[3.07626862502096...|
|12953952|[rt, @magagwen, @...|(20000,[0,1,43,37...|[0.36770541508102...|
|13290282|[folks, proven, l...|(20000,[4,205,913...|[3.92306231377565...|
|13492362|[rt, @comradezaco...|(20000,[0,32,568,...|[3.90648267384191...|
|14060717|[next, week, 2020...|(20000,[92,119,18...|[3.81904868593841...|
|14555541|[rt, @kim, @repad...|(20000,[0,1,9,842...|[0.00187650864282...|
|14883391|[rt, @4evernevert...|(20000,[0,3,57,13...|[3.41908755665457...|
|14884712|[@reptedlieu, kee...|(20000,[37,158,50...|[3.47856130878392...|
|15197290|[rt, @kylegriffin...|(20000,[0,223,243...|[2.44218028755828...|
|15261298|[rt, @billclarkph...|(20000,

For each topic, print out 10 words to describe it

In [None]:
"""
Steps: each term index is mapped to its respective term weight, and then their weighted value of the word is chosen to represent in the 'Topic Descriptions' column
"""
vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)
ldatopics = model.describeTopics(10)

def map_termID_to_word(termIndices):
  words = []
  for termID in termIndices:
    words.append(vocab_broadcast.value[termID])
  return words

udf_map_termID_to_Word = udf(map_termID_to_word, ArrayType(StringType()))

ldatopics_mapped = ldatopics.withColumn("Topic Descriptions", udf_map_termID_to_Word(ldatopics.termIndices))


In [None]:
#ANSWER
### TOPIC DESCRIPTIONS TO THE RIGHT OF THE TABLE ----->>>>>>###
ldatopics_mapped.show(41, False)

+-----+---------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
|topic|termIndices                                              |termWeights                                                                                                                                                                                                                    |Topic Descriptions                                                                                                           |
+-----+---------------------------------------------------------+-------------------------------------------------------------------------------------------------------

## (45 points) Part 3: Community Detection
User-hashtag relations have been extracted and saved in the file `us-congress-tweets/user_hashtags-1m.csv`. If a user uses a hashtag there will be a record with the userid and the hashtag.

For community detection we will implement the Trawling algorithm based on frequent itemset mining using [FPGrowth](https://spark.apache.org/docs/2.3.0/ml-frequent-pattern-mining.html) to find potential user communities in the dataset. FPGrowth is like apriori as discussed in class, but operates a bit differently. Although we don't need to completely understand the algorithm for this assignment, feel free to explore on your own! Explore different values for the support parameter, and report your results in the cells below.



In [None]:
# your code here. Explain all steps.
from pyspark.ml.fpm import FPGrowth
df = spark.read.csv("us-congress-tweets/user_hashtags-1m.csv")

In [None]:
"""
Steps:
1) to do something like the opposite of an explode function, I merged the hashtags that had the same IDs as sets
2) FPGrowth algorithm is run with low min support to get itemset relations
"""
grouped_hashtags = df.groupBy('_c0').agg(collect_set('_c1').alias('hashtags'))
grouped_hashtags.show()

+-------------------+--------------------+
|                _c0|            hashtags|
+-------------------+--------------------+
|1002338860162936832|[ThankGod, Democr...|
|1002752931341299712|[impeach, POTUS, ...|
|1003231727643496448|[BelieveWomen, My...|
|1004434675044057088|        [DeportOmar]|
|1004865478274244610|       [MoscowMitch]|
|1006268755058577408|   [WAECCertificate]|
|1007154784430968834|[Chavista, MAGAve...|
|           10073222|          [RandPaul]|
|1007386606217637888|[116thCongress, B...|
|         1007629891|     [TrumpShutdown]|
|1008731226281332736|[WithoutWomenMenW...|
|1008765038432096256|      [Tech, Cities]|
|1009406348222717952|         [Kavanaugh]|
|          100973954|[WestBaltimore, R...|
|1009786407030845440|          [midterms]|
|1009829347476287493|[DCIranFreedomMarch]|
|1009830390767505409|[EndTheShutdown, ...|
|1011299850296717312|          [Axios360]|
|1011778525093867520|        [Newsmakers]|
|1012339433541160966|                [UN]|
+----------

In [None]:
fpgrowth = FPGrowth(itemsCol='hashtags', minSupport=0.0001)
model = fpgrowth.fit(grouped_hashtags)
frequent_hashtags = model.freqItemsets.orderBy(desc('freq'))

In [None]:
frequent_hashtags.filter(size('items') == 2).show(20, False)

+------------------------------+----+
|items                         |freq|
+------------------------------+----+
|[MaduroRegime, Venezuela]     |93  |
|[TrumpShutdown, MuellerReport]|83  |
|[NancyPelosi, MAGA]           |76  |
|[BREAKING, MuellerReport]     |63  |
|[Trump, NancyPelosi]          |63  |
|[TrumpShutdown, MoscowMitch]  |63  |
|[MuellerReport, MoscowMitch]  |63  |
|[MAGA, MuellerReport]         |62  |
|[ForThePeople, TrumpShutdown] |61  |
|[Kavanaugh, MAGA]             |61  |
|[Kavanaugh, MuellerReport]    |60  |
|[Mueller, MuellerReport]      |60  |
|[ForThePeople, MuellerReport] |59  |
|[Trump, MuellerReport]        |58  |
|[HR1, MuellerReport]          |56  |
|[BuildTheWall, MAGA]          |56  |
|[BREAKING, MoscowMitch]       |56  |
|[NancyPelosi, MuellerReport]  |55  |
|[Democrats, NancyPelosi]      |54  |
|[Maduro, MaduroRegime]        |54  |
+------------------------------+----+
only showing top 20 rows



In [None]:
frequent_hashtags.filter(size('items') == 3).show(20, False)

+-----+----+
|items|freq|
+-----+----+
+-----+----+



List two user communities you think are interesting. Explain why they are reasonable communities.

You can use https://twitter.com/intent/user?user_id=? to find out more info about the users

[BuildTheWall, MAGA] -> 56 frequency: Reasonable hashtags together given that most #MAGA supportors also support #BuildTheWall which was one of Trump's failed promises.

[TrumpShutdown, MuellerReport] -> 83 frequency: Reasonable hashtags together given that Robert Mueller released a report stating that the Trump campaign and Russia worked together. Both tweets emerged when the report went public and people wanted Trump to get impeached.

What value for support did you choose and why?

The objective to picking an optimum support is that it's low enough to obtain some frequent itemsets that have k>=2 but not too low that the itemsets occurred by chance (random/useless information). I tested minsup = 0.1, 0.01, and 0.001 which were too high and did not produce any itemsets. I tested minsup = 0.0001 which produced multiple k=2 itemsets and settled on this. I also tested minsup = 0.00001 which produced a k=3 itemset but this support felt like it produced some random itemsets.

## (20 points) Part 4: Personalized PageRank
Assume you are given a task to recommend Twitter users for the speaker of the House to engage with.

Construct a user-mentions network using relations in `us-congress-tweets/user_mentions.csv`

Run Personalized PageRank with source (id=15764644) and find out top 10 accounts to recommend.

In [None]:
from graphframes import *
from pyspark.sql.functions import *
from pyspark.sql import SQLContext

In [None]:
# your network construction code here
"""
Steps:
1) e = the entire user mentions csv as is, but with minor modifications, like changing them to longtype and removing empty rows
2) v = merged set of src and dst as id
3) GraphFrame constructs the network
"""
e = spark.read.csv("us-congress-tweets/user_mentions.csv", header=True)
e = e.withColumn("src", e["src"].cast(LongType()))
e = e.withColumn("dst", e["dst"].cast(LongType()))
e = e.withColumn("count", e["count"].cast(LongType()))
e = e.dropna()

v = e.select(col("src").alias("id")).union(e.select(col("dst").alias("id"))).distinct()

# v = e.select(explode(array('src', 'dst')).alias('id')).distinct().dropna()
# v = v.withColumn("id", v["id"].cast(LongType()))

g = GraphFrame(v, e)

In [None]:
# your Personalized PageRank code here
results = g.pageRank(sourceId=15764644, maxIter=5, resetProbability=0.15)



In [None]:
# Top 10 accounts to recommend 
# You can use https://twitter.com/intent/user?user_id=? to find out more info about the users
results.vertices.select('id', 'pagerank').orderBy(desc('pagerank')).show(11)


+-------------------+--------------------+
|                 id|            pagerank|
+-------------------+--------------------+
|           15764644|  0.4552197044272686|
|          199325935| 0.12897891625439276|
|         1092979962| 0.12897891625439276|
|          381152398| 0.12897891625439276|
|         1487846678|0.012181342090692647|
|          131208075|0.012181342090692647|
|           43963249|0.012181342090692647|
|           10613072|0.012181342090692647|
|           30912937|0.012181342090692647|
|           60618211|0.012181342090692647|
|1080198683713507335|0.012181342090692647|
+-------------------+--------------------+
only showing top 11 rows

