## CSCE 676 :: Data Mining and Analysis :: Texas A&M University :: Fall 2019


# Homework 2

- **100 points [10% of your final grade]**
- **Due Saturday, October 19 by 11:59pm**

**Goals of this homework:** There are five objectives of this homework: 

* Become familiar with Apache Spark and working in a distributed environment in the cloud
* Get hands-on experience designing and running a simple MapReduce data transformation job
* Get hands-on experience using Spark built-in functions; namely, LDA and PageRank
* Design a Pregel algorithm to find tree depth in a network
* Understand and implement Trawling algorithm to find user communities

*Submission instructions:* You should post your notebook to ecampus (look for the homework 2 assignment there). Name your submission **your-uin_hw2.ipynb**, so for example, my submission would be something like **555001234_hw2.ipynb**. Your notebook should be fully executed when you submit ... so run all the cells for us so we can see the output, then submit that. Follow the AWS guide to create a Hadoop/Spark cluster and create an empty Notebook. Copy all the cells in this notebook to the AWS notebook and continue working on your notebook in AWS. When you are done, download your notebook from AWS (navigate to the location on S3 where your notebook is saved and click download) and submit it to ecampus.

## Introduction to the Dataset
We will use a dataset of tweets concerning members of the US congress. The data spans almost a year (from October 3rd, 2018 to September 25th, 2019) covering 577 of the members. Any tweet or retweet posted by the 577 members or directed to them by other Twitter users were collected.

The data is on S3 in a bucket named s3://us-congress-tweets that you can access. There are 277,744,063 tweets. This is a huge dataset so we will not be working directly on this data all the time. Rather we will work on samples or subsets of this data but in some cases, we will ask you to execute your task on the whole dataset.

Below is a summary of all datasets used for this homework:

| Dataset                | Location in S3                                      | Description |
| :---                   | :---                                                | :---
| Congress members       | s3://us-congress-tweets/congress_members.csv        | 577 twitter ids and screen names |
| Raw tweets             | s3://us-congress-tweets/raw/\*.snappy               | the whole json objects of the tweets|
| Sample tweets          | s3://us-congress-tweets/congress-sample-10k.json.gz | 10k sample tweets|
| Trimmed tweets         | s3://us-congress-tweets/trimmed/\*.parquet          | selected fields for all tweets|
| User hashtags          | s3://us-congress-tweets/user_hashtags.csv           | all pairs of <user, hashtag>|
| User replies           | s3://us-congress-tweets/reply_network.csv           | all pairs of <reply_tweet, replied_to_tweet> |
| User mentions           | s3://us-congress-tweets/user_mentions.csv           | all pairs of <src_user_id, src_dest_id, frequency> |

Let's run some exploration below!

In [1]:
# First let's read Twitter ids and screen names of the 577 US congress members

congress_members = spark.read.csv("s3://us-congress-tweets/congress_members.csv", header=True)
congress_members.show()
print("Number of congress members tracked:", congress_members.count())

+------------------+---------------+
|            userid|    screen_name|
+------------------+---------------+
|         776664410|  RepCartwright|
|         240363117|   RepTomMarino|
|837722935095930883| RepScottTaylor|
|        1069124515|     RepLaMalfa|
|818460870573441028|  RepTomGarrett|
|         163570705|     repcleaver|
|          19739126|      GOPLeader|
|          33563161| RepJoseSerrano|
|        2861616083|USRepGaryPalmer|
|        1074518754| SenatorBaldwin|
|         305620929|  Call_Me_Dutch|
|         381152398| RepTerriSewell|
|         834069080| RepDavidRouzer|
|         249787913|  SenatorCarper|
|         188019606|        Clyburn|
|         217543151|SenatorTimScott|
|          39249305| USRepMikeDoyle|
|          33537967|   amyklobuchar|
|         249410485|  SanfordBishop|
|          23124635|    TomColeOK04|
+------------------+---------------+
only showing top 20 rows

('Number of congress members tracked:', 577)


We can use `spark.read.json(...)` without schema to load the tweets into a dataframe but this will be slow for two reasons:
* First, it will make one pass over the data to build a schema of the content, then a second pass to read the content and parse it to the dataframe. 
* It will read all the content of the Tweet JSON objects but we only need few fields for a given task.

Thus we define our own schema something like the following:

In [2]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
twitter_date_format="EEE MMM dd HH:mm:ss ZZZZZ yyyy"

user_schema = StructType([
    StructField('created_at',TimestampType(),True),
    StructField('followers_count',LongType(),True),
    StructField('id',LongType(),True),
    StructField('name',StringType(),True),
    StructField('screen_name',StringType(),True)
])

hashtag_schema = ArrayType(StructType([StructField('text',StringType(),True)]))
user_mentions_schema = ArrayType(StructType([StructField('id',LongType(),True),
                                             StructField('screen_name',StringType(),True)]))
entities_schema = StructType([
    StructField('hashtags',hashtag_schema,True),
    StructField('user_mentions',user_mentions_schema,True)
    ])

retweeted_status_schema =StructType([        
        StructField("id", LongType(), True),
        StructField("in_reply_to_user_id", LongType(), True),
        StructField("in_reply_to_status_id", LongType(), True),
        StructField("created_at", TimestampType(), True),
        StructField("user", user_schema)
    ])

tweet_schema =StructType([
        StructField("text", StringType(), True),
        StructField("id", LongType(), True),
        StructField("in_reply_to_user_id", LongType(), True),
        StructField("in_reply_to_status_id", LongType(), True),
        StructField("created_at", TimestampType(), True),
        StructField("user", user_schema),
        StructField("entities", entities_schema),
        StructField("retweeted_status", retweeted_status_schema)
    ])

Now we are ready to read the tweets with `spark.read.json` passing our own schema as follows:

In [3]:
tweets = spark.read.option("timestampFormat", twitter_date_format)\
                   .json('s3://us-congress-tweets/congress-sample-10k.json.gz', tweet_schema)\
                   .withColumn('user_id',F.col('user.id'))
tweets.printSchema()

root
 |-- text: string (nullable = true)
 |-- id: long (nullable = true)
 |-- in_reply_to_user_id: long (nullable = true)
 |-- in_reply_to_status_id: long (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- created_at: timestamp (nullable = true)
 |    |-- followers_count: long (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- screen_name: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- user_mentions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- screen_name: string (nullable = true)
 |-- retweeted_status: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- in_reply_to_user_id: long (nul

## (6 points) Part 1a: Exploratory Data Analysis (Small Scale)

How many unique users and original tweets (i.e. not retweets) are there? 

In [4]:
# your code here for unique users
users = tweets.select(F.col("user.id"))
users = users.dropDuplicates()
print("Unique user:", users.count())

('Unique user:', 9735)


In [63]:
# your code here for original tweets
originals = tweets.select(F.col("retweeted_status.id"))
print("Original tweets:", originals.filter("id is null").count())

('Original tweets:', 3327)


Who are the ten most mentioned users in the sample?

In [78]:
# code and output here
mentioned = tweets.select(F.col("retweeted_status.user.name")).filter("name is not null")
mentioned.groupBy("name").count().sort("count", ascending=False).limit(10).show()

+--------------------+-----+
|                name|count|
+--------------------+-----+
|         Adam Schiff|  333|
|         Marco Rubio|  250|
|       Chuck Schumer|  210|
|        Nancy Pelosi|  210|
|        Chris Murphy|  148|
|     Rep. Matt Gaetz|  124|
|   Senator Rand Paul|   98|
|Senator Jeff Merkley|   90|
|       Amy Klobuchar|   84|
|       Kamala Harris|   74|
+--------------------+-----+



What are the top hashtags used?

In [107]:
# code and output here
tweets.select(F.explode(F.col("entities.hashtags"))).groupBy("col").count().sort("count", ascending=False).limit(10).show()

+---------------+-----+
|            col|count|
+---------------+-----+
|    [Venezuela]|  102|
|[TrumpShutdown]|   42|
| [MaduroRegime]|   29|
|       [Maduro]|   20|
|         [MAGA]|   20|
|  [NancyPelosi]|   19|
|[MuellerReport]|   17|
| [ForThePeople]|   14|
|    [Kavanaugh]|   14|
| [BuildTheWall]|   14|
+---------------+-----+



## (4 points) Part 1b: Exploratory Data Analysis (Large Scale)
Repeat the above queries but now against the whole dataset defined in the dataframe below. 

In [5]:
trimmed_files = [x[0] for x in spark.read.csv("s3://us-congress-tweets/trimmed/files.txt").collect()]
tweets_all = spark.read.parquet(*trimmed_files)
tweets_all.printSchema()

root
 |-- text: string (nullable = true)
 |-- id: long (nullable = true)
 |-- in_reply_to_user_id: long (nullable = true)
 |-- in_reply_to_status_id: long (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- created_at: timestamp (nullable = true)
 |    |-- followers_count: long (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- screen_name: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- user_mentions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- screen_name: string (nullable = true)
 |-- retweeted_status: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- in_reply_to_user_id: long (nul

In [110]:
# your code here for unique users
users_all = tweets_all.select(F.col("user.id"))
users_all = users_all.dropDuplicates()
print("Unique user:", users_all.count())

('Unique user:', 10749403)


In [111]:
# your code here for original tweets
originals_all = tweets_all.select(F.col("retweeted_status.id"))
print("Original tweets:", originals_all.filter("id is null").count())

('Original tweets:', 96887299)


In [112]:
# Top mentioned users code and output here
mentioned_all = tweets_all.select(F.col("retweeted_status.user.name")).filter("name is not null")
mentioned_all.groupBy("name").count().sort("count", ascending=False).limit(10).show()

+------------------+-------+
|              name|  count|
+------------------+-------+
|       Adam Schiff|5279430|
|   Rep. Jim Jordan|3549600|
|     Chuck Schumer|3441048|
|      Nancy Pelosi|3231205|
|       Marco Rubio|3190516|
|    Bernie Sanders|2786791|
|Rep. Eric Swalwell|2767102|
|    Lindsey Graham|2579218|
|      Mark Meadows|2295392|
|      Chris Murphy|2170298|
+------------------+-------+



In [113]:
# Top hashtags code and output here
tweets_all.select(F.explode(F.col("entities.hashtags"))).groupBy("col").count().sort("count", ascending=False).limit(10).show()

+---------------+-------+
|            col|  count|
+---------------+-------+
|    [Venezuela]|1206418|
|  [MoscowMitch]|1105708|
|[TrumpShutdown]| 632069|
|         [MAGA]| 469507|
|[MuellerReport]| 405471|
|  [NancyPelosi]| 359063|
| [MaduroRegime]| 349757|
|        [Trump]| 312651|
| [BuildTheWall]| 311186|
| [GreenNewDeal]| 272289|
+---------------+-------+



## (10 points) Part 2: Textual Analysis (LDA)
Using the LDA algorithm provided by the Spark Machine Learning (ML) library, find out the ten most important topics. Use `s3://us-congress-tweets/trimmed/*.parquet` for this task (you can reuse `tweets_all` dataframe from Part1b). 

You may want to work on a small sample first but report your results on the whole dataset.

Hint: for better results aggregate all tweets for a user into a single document

In [6]:
# your code here
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer, StopWordsRemover

print('Start preprocessing')
print('Group by user IDs')
documents = tweets_all.select(F.col("user.id"), "text").groupBy("id").agg(F.collect_list("text").alias("text"))
print('Aggregate posts by same user')
documents = documents.withColumn("text", F.concat_ws(" ", "text")).sort("id")
print('Split string into arrays')
documents = documents.select("id", F.split("text", " ").alias("words"))
print('Remove generic stopwords')
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
documents = remover.transform(documents).select("id", "filtered")
print('Explode list back into words')
documents = documents.select("id", F.explode("filtered"))
documents = documents.select("id", F.lower(F.col("col")))
print('Now remove all symbols and the word RT')
expr = "^[a-z]+$"
documents = documents.filter(documents["lower(col)"].rlike(expr))
documents = documents.filter(documents["lower(col)"] != "rt")
print('Preprocessing almost done... combining rows back into list')
documents = documents.select("id", "lower(col)").groupBy("id").agg(F.collect_list("lower(col)").alias("words"))

# fit a CountVectorizerModel from the corpus.
print('CountVectorizer conversion')
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=65535, minDF=2.0)
model = cv.fit(documents)
result = model.transform(documents)

# user1: aggie aggie -> [2, 0]
# user2: aggie dior -> [1, 1]

# Trains a LDA model.
print('LDA fitting')
lda = LDA(k=10, maxIter=10)
model2 = lda.fit(result)

topics = model2.describeTopics(10)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

Start preprocessing
Group by user IDs
Aggregate posts by same user
Split string into arrays
Remove generic stopwords
Explode list back into words
Now remove all symbols and the word RT
Preprocessing almost done... combining rows back into list
CountVectorizer conversion
LDA fitting
The topics described by their top-weighted terms:
+-----+------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topic|termIndices                                     |termWeights                                                                                                                                                                                                                    |
+-----+------------------------------------------------+-------------------------------------------------

In [7]:
# your code here
from pyspark import SparkContext
vocab = model.vocabulary
vocab_broadcast = sc.broadcast(vocab)

def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    return words

udf_map_termID_to_Word = F.udf(map_termID_to_Word , ArrayType(StringType()))

topics_mapped = topics.withColumn("topic_desc", udf_map_termID_to_Word(topics.termIndices))
topics_mapped.select("topic", "topic_desc").show(truncate=False)


+-----+---------------------------------------------------------------------------------+
|topic|topic_desc                                                                       |
+-----+---------------------------------------------------------------------------------+
|0    |[de, transport, la, le, les, en, et, un, des, pour]                              |
|1    |[please, de, thank, el, la, need, support, que, y, help]                         |
|2    |[democrats, people, house, president, trump, like, us, border, one, american]    |
|3    |[trump, president, people, house, one, senate, like, american, us, congress]     |
|4    |[like, trump, people, get, know, one, think, president, time, need]              |
|5    |[transport, public, people, need, like, get, us, one, new, please]               |
|6    |[de, la, que, el, en, y, los, es, del, por]                                      |
|7    |[transport, public, new, free, people, hong, please, tak, naik, road]            |
|8    |[am

## (10 points) Part 3a: MapRedce
In this task, design a MapReduce program in python that reads all the original tweets (no retweets) in the sample tweets (`congress-sample-10k.json.gz`) and if a tweet is a reply to another tweet then output a record of the form <src_id, src_user, dst_id, dst_user>.

Create a small cluster (2 or 3 nodes) as per the AWS Guide and then ssh to your cluster and use Hadoop streaming to execute your mapreduce program.

Note: the Hadoop streaming jar file can be found at `/usr/lib/hadoop-mapreduce/hadoop-streaming.jar`

In [None]:
# your mapper function


In [None]:
# your reducer function

In [None]:
# your Hadoop job submission command (copy/paste your command from the terminal)

How many reply relationships did you get?

In [None]:
# code to read job output and count

## (5 points) Part 3b: Going Large-Scale with MapReduce

Rerun the same MapReduce job above but on the whole dataset (`s3://us-congress-tweets/raw/*.snappy`).
All the files under `s3://us-congress-tweets/raw` can be read from the following file:

`s3://us-congress-tweets/raw/files.txt`

Use shell scripting to parse this file and prepare the input to your MapReduce job as  comma seperated string of all the files. (e.g. your input should be like this `s3://us-congress-tweets/raw/part-00000.snappy,s3://us-congress-tweets/raw/part-00001.snappy,s3://us-congress-tweets/raw/part-00002.snappy,...`)

Inspecting the job logs, how many files did the job operate on? how many input splits were there?

In [None]:
# Your answer here

How many reply relationships did you get?

In [None]:
# Number of reply records

## (30 points) Part 4: Graph Analysis
In this task, we would like to compute the longest path in *tweet reply* graphs and then perform some statistical calculations on the result. We will use Pregel implementation from GraphFrames for this task. Ignore paths that are longer than 20.

First, construct your tweet reply network using tweet-reply records in this file `s3://us-congress-tweets/reply_network.csv`.
From this file, use src_id and dst_id. The dst_id is the id of the tweet being replied to and the src_id is the id of the replying tweet.

In [1]:
# your network construction code here
from graphframes import *
from graphframes.lib import Pregel
sc.setCheckpointDir("hdfs:///tmp/graphframes_checkpoint") # this is needed for any GraphFrames operation
reply_network = spark.read.csv("s3://us-congress-tweets/reply_network.csv", header=True)
reply_network = reply_network.sort(['src_id', 'dst_id'], ascending=[1, 1])

In [2]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
vertices = reply_network.select('src_id')
vertices.union(reply_network.select('dst_id'))
vertices = vertices.distinct()
vertices = vertices.selectExpr('src_id as id')
edges = reply_network.select('src_id','dst_id')
edges = edges.selectExpr('src_id as src','dst_id as dst')

In [3]:
graph = GraphFrame(vertices, edges)
indegrees = graph.inDegrees
indegrees = indegrees.orderBy('inDegree', ascending=False)

What are the top replied to tweets? (show 20)

In [4]:
# your code here
indegrees.show(n=20)

+-------------------+--------+
|                 id|inDegree|
+-------------------+--------+
|1157787985041088513|   94351|
|1048314564826292227|   76396|
|1111289977143545856|   68172|
|1155949756792725510|   65241|
|1137060666223878144|   57764|
|1062461047892787204|   53767|
|1158036816089497601|   50059|
|1144730911889428480|   45905|
|1155949605147648006|   44810|
|1098312693436596226|   41836|
|1150408691713265665|   41825|
|1129831615952236546|   41556|
|1150859069084905472|   39020|
|1144078421670150144|   38687|
|1155132215208161281|   38572|
|1088141172638400512|   37102|
|1155469517092470784|   36961|
|1168938037071482881|   36728|
|1154161356171599877|   36552|
|1131740851909083137|   36386|
+-------------------+--------+
only showing top 20 rows



How many graphs in the reply network? (Hint: use connectedComponents function)

In [5]:
# your code here
connectedComponents = graph.connectedComponents()
connectedComponents.show()

+-------------------+---------+
|                 id|component|
+-------------------+---------+
|1047540049506533376|       26|
|1047540679885373441|       29|
|1047608661822959616|      474|
|1047675670011109377|      964|
|1047859519496249345|     1470|
|1047862190311702528|     1697|
|1047871839924621313|     1806|
|1047881600938041344|     1950|
|1047887394773585920|     2040|
|1047899767890690048|     2214|
|1047903874399461376|     2250|
|1047922640390950913|     2453|
|1047926495782608897|     2509|
|1047927555611922434|     2529|
|1047954452605546496|     2927|
|1047965454638051328|     3091|
|1048001380756144129|     3506|
|1048025435345473536|     3764|
|1048206673880518656|     4590|
|1048223881528594432|     2533|
+-------------------+---------+
only showing top 20 rows



In [6]:
numComponents = connectedComponents.select(F.countDistinct("component"))
numComponents.show()

+-------------------------+
|count(DISTINCT component)|
+-------------------------+
|                 51575731|
+-------------------------+



Now, design and execute a Pregel program that will calculate the longest paths for all reply graphs in the network. Explain your design.

In [None]:
# Idea is every node receives max(pregel.msg()) from 
# nodes that are directed to it.
# A node with no incoming edge will always stay 0 as shown below,
# using Coalesce
# Another node that has only one incoming edge from the above node
# will stay at 1 no matter how many iterations we run, as 0 is the only
# msg it receives. 
# After n iterations, nodes that have the longest path of length 
# greater than n will have n. Other nodes that have those with smaller
# length will converge to that length. 

In [7]:
# your pregel code here
import pyspark.sql.functions as F
dists = graph.pregel \
             .setMaxIter(20) \
             .withVertexColumn("dist", 
                               F.lit(0),
                               F.coalesce(Pregel.msg() + F.lit(1), F.lit(0))
                              ) \
             .sendMsgToDst(Pregel.src("dist"))\
             .aggMsgs(F.max(Pregel.msg())).run() 

In [8]:
longest_paths = connectedComponents.join(dists, connectedComponents.id == dists.id)
longest_paths.show()

+-------------------+---------+-------------------+----+
|                 id|component|                 id|dist|
+-------------------+---------+-------------------+----+
|1047537184780238848|        0|1047537184780238848|   2|
|1047537374744498179|        1|1047537374744498179|   0|
|1047537449914839040|        2|1047537449914839040|   0|
|1047537740940828673|        3|1047537740940828673|   0|
|1047537748826050561|        4|1047537748826050561|   0|
|1047537817008705537|        5|1047537817008705537|   0|
|1047538087675514880|        6|1047538087675514880|   0|
|1047538120223334403|        7|1047538120223334403|   0|
|1047538157472821248|        8|1047538157472821248|   0|
|1047538405347872768|        9|1047538405347872768|   0|
|1047538563015987200|       10|1047538563015987200|   3|
|1047538681131753472|       11|1047538681131753472|   2|
|1047538715516723202|       12|1047538715516723202|   0|
|1047538733069885440|       13|1047538733069885440|   0|
|1047538846605475844|       14|

What is the average longest path length for all reply graphs in the network?

In [9]:
# your code here
answer = longest_paths.select("component", "dist").groupBy("component").agg(F.max("dist"))
answer.show()

+----------+---------+
| component|max(dist)|
+----------+---------+
|8589934658|        0|
|8589934965|        0|
|8589935171|        0|
|8589935183|        1|
|8589935298|        0|
|8589935317|        0|
|8589935770|        1|
|8589935936|        0|
|8589936112|        0|
|8589936348|        0|
|8589936424|        0|
|8589936566|        0|
|8589936761|        9|
|8589936870|        0|
|8589936972|        1|
|8589937582|       10|
|8589937874|       20|
|8589937892|        1|
|8589937972|        5|
|8589938024|        0|
+----------+---------+
only showing top 20 rows



In [10]:
answer.select("max(dist)").groupBy().avg().collect()

[Row(avg(max(dist))=0.1638112506830005)]

In [1]:
# My result may be lower by 1 than other results. If this is the case, this is because my algorithm computes
# "edge length" of the longest paths. If you are looking for "vertex length" then the answer will be 1.1638...

## (30 points) Part 5: Community Detection
User-hashtag relations have been extracted and saved in the file `s3://us-congress-tweets/user_hashtags.csv`. If a user uses a hashtag there will be a record with the userid and the hashtag.

Use the Trawling algorithm discussed in class to find potential user communities in the dataset. (Hint: use FPGrowth in the Spark ML package). Explore different values for the support parameter.

In [11]:
# your code here. Explain all steps.
import pyspark.sql.functions as F
hash_tags = spark.read.csv("s3://us-congress-tweets/user_hashtags.csv", header=True)

In [12]:
hash_tags_2 = hash_tags.groupby("shotoniphone").agg(F.collect_list("154408627"))

In [34]:
hash_tags_2.show()

+------------+----------------------+
|shotoniphone|collect_set(154408627)|
+------------+----------------------+
|       0by30|  [228852304, 27080...|
| 100silences|  [825277375235633154]|
| 1015PinCode|  [1116725511353294...|
|       10ENE|  [150330846, 19434...|
|    10points|          [1555824914]|
|  12Russians|  [1059589752419168...|
|  12Senators|  [42373491, 198080...|
|     13Febre|  [78509222, 446281...|
|    15thJTSR|  [1243575894, 5130...|
|     1611KJB|  [704032426788425728]|
|         17f|           [404845688]|
|    187oNADA|  [9787211844718141...|
|       1970s|  [107438367, 84199...|
| 1986AMNESTY|            [15730096]|
|      19DAYS|  [9392143817321512...|
|      1Ahole|  [831606285086900225]|
|  1AntifaDad|  [101896473, 24509...|
|     1DDrive|  [8091302832913121...|
|1DollarNancy|  [746565631391080450]|
|          1L|  [1049838435928813...|
+------------+----------------------+
only showing top 20 rows



In [None]:
hash_tags_2.dtypes

In [13]:
from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="collect_list(154408627)", minSupport=0.0005)
model = fpGrowth.fit(hash_tags_2)

In [14]:
# Display frequent itemsets.
model.freqItemsets.show()
# I encountered an unsolvable error.

Py4JJavaError: An error occurred while calling o350.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 41.0 failed 4 times, most recent failure: Lost task 2.3 in stage 41.0 (TID 3345, ip-172-31-12-159.ec2.internal, executor 34): ExecutorLostFailure (executor 34 exited caused by one of the running tasks) Reason: Container from a bad node: container_1572567793815_0001_01_000039 on host: ip-172-31-12-159.ec2.internal. Exit status: 50. Diagnostics: Exception from container-launch.
Container id: container_1572567793815_0001_01_000039
Exit code: 50
Stack trace: ExitCodeException exitCode=50: 
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
	at org.apache.hadoop.util.Shell.run(Shell.java:869)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
	at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:235)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)


Container exited with a non-zero exit code 50
.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:401)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


List two user communities you think are interesting. Explain why they are reasonable communities.

You can use https://twitter.com/intent/user?user_id=? to find out more info about the users

In [None]:
# community 1

In [None]:
# community 2

What value for support did you choose and why?

In [None]:
# Answer here

## (10 points) Part 6: Personalized PageRank
Assume you are given a task to recommend Twitter users for the speaker of the House to engage with.

Construct a user-mentions network using relations in `s3://us-congress-tweets/user_mentions.csv`

Run Personalized PageRank with source (id=15764644) and find out top accounts to recommend.

In [1]:
# your network construction code here

from graphframes import *
from graphframes.lib import Pregel
sc.setCheckpointDir("hdfs:///tmp/graphframes_checkpoint") # this is needed for any GraphFrames operation
reply_network = spark.read.csv("s3://us-congress-tweets/user_mentions.csv", header=True)


In [2]:
reply_network = reply_network.filter(reply_network.src == 15764644)
reply_network.show()

+--------+-------------------+-----+
|     src|                dst|count|
+--------+-------------------+-----+
|15764644|          199325935|    5|
|15764644|         1092979962|    4|
|15764644|          381152398|    6|
|15764644|1079854463211397120|    2|
|15764644|         2916086925|    2|
|15764644|          294084341|    1|
|15764644| 716458790229581824|    1|
|15764644|         1051127714|    2|
|15764644| 855482971868069890|    3|
|15764644|         2966570782|    2|
|15764644|1076161611033968640|    2|
|15764644|          122453931|    1|
|15764644|           14857525|    1|
|15764644|          233783568|    3|
|15764644|1078355119920562176|    6|
|15764644| 827258161841135623|    1|
|15764644|         2244340904|    1|
|15764644|         1339931490|    1|
|15764644|           15751055|    1|
|15764644|          584912320|    2|
+--------+-------------------+-----+
only showing top 20 rows



In [3]:
from graphframes import *
from graphframes.lib import Pregel
sc.setCheckpointDir("hdfs:///tmp/graphframes_checkpoint") # this is needed for any GraphFrames operation
from pyspark.sql.types import *
import pyspark.sql.functions as F
vertices = reply_network.select('src')
vertices.union(reply_network.select('dst'))
vertices = vertices.selectExpr('src as id')
vertices = vertices.distinct()
edges = reply_network.select('src','dst')
graph = GraphFrame(vertices, edges)
vertices = graph.outDegrees
graph = GraphFrame(vertices, edges)

In [4]:
# your Personalized PageRank code here
import pyspark.sql.functions as F # needed for defining literals below and the sum(...) function
ranks = graph.pregel \
             .setMaxIter(5) \
             .withVertexColumn("rank", 
                               F.lit(1.0),
                               Pregel.msg() * F.lit(0.85) + F.lit(0.15) 
                              ) \
             .sendMsgToDst(Pregel.src("rank")/Pregel.src("outDegree"))\
             .aggMsgs(F.sum(Pregel.msg())).run()  

In [7]:
# Top 10 accounts to recommend 
ranks.orderBy("rank", ascending=False)
ranks.show()

# You can use https://twitter.com/intent/user?user_id=? to find out more info about the users

+--------+---------+-------------------+
|      id|outDegree|               rank|
+--------+---------+-------------------+
|15764644|      342|0.15037373589338263|
+--------+---------+-------------------+



# Troubleshooting Tips

* If you get "spark not available" error, this most likely means the Kernel is python and not PySpark. Just change the Kernel to PySpark and it should work.


* If your notebook seems stuck (may happen if you force stop a cell), you may need to ssh to your master node and kill the spark application associated with the notebook     
    Use `yarn application -list` to find the application id and then `yarn application -kill app-id` to kill it. After that restart your notebook from the browser.


* If you like, you may also ssh to the master node and run `pyspark` and execute your code directly in the shell.

* If you face difficulties accessing the pages for the jobs for example to see logs and so on then you can open all needed ports when you create the cluster. (e.g. 8088)

* If you want to see logs for a MapReduce job from the terminal use the following command:

    `yarn logs -applicationId <application_id>`


* To kill a MapReduce job use:

    `yarn  application -kill <application_id>`