# Project 03 - Due Monday, November 13 at 12pm

*Objectives*: Use Spark to process and perform basic analysis on non-relational data, including its DataFrame and SQL interfaces.

*Grading criteria*: The tasks should all be completed, and questions should all be answered with Python code, SQL queries, shell commands, and markdown cells.  The notebook itself should be completely reproducible (using AWS EC2 instance based on the provided AMI) from start to finish; another person should be able to use the code to obtain the same results as yours.  Note that you will receive no more than partial credit if you do not add text/markdown cells explaining your thinking when appropriate.

*Attestation*: **Work in groups**.  At the end of your submitted notebook, identify the work each partner performed and attest that each contributed substantially to the work.

*Deadline*: Monday, November 13, 12pm.  One member of each group must submit your notebook to Blackboard; you should not submit it separately.

## Part 1 - Setup

Begin by setting up Spark and fetching the project data.  

**Note**: you may want to use a larger EC2 instance type than normal.  This project was prepared using a `t2.xlarge` instance.  Just remember that the larger the instance, the higher the per-hour charge, so be sure to remember to shut your instance down when you're done, as always.

### About the data

We will use JSON data from Twitter; we saw an example of this in class.  It should parse cleanly, allowing you to focus on analysis.

This data was gathered using GWU Libraries' [Social Feed Manager](http://sfm.library.gwu.edu/) application during a recent game of the MLB World Series featuring the Los Angeles Dodgers and Houston Astros.  This first file tells you a little bit about how it was gathered:

In [1]:
!wget https://s3.amazonaws.com/2017-dmfa/project-3/9670f3399f774789b7c3e18975d25611-README.txt

--2017-11-12 23:13:27--  https://s3.amazonaws.com/2017-dmfa/project-3/9670f3399f774789b7c3e18975d25611-README.txt
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.1.227
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.1.227|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1920 (1.9K) [text/plain]
Saving to: ‘9670f3399f774789b7c3e18975d25611-README.txt’


2017-11-12 23:13:27 (160 MB/s) - ‘9670f3399f774789b7c3e18975d25611-README.txt’ saved [1920/1920]



In [2]:
!cat 9670f3399f774789b7c3e18975d25611-README.txt

This is an export created with Social Feed Manager.

EXPORT INFORMATION
Selected seeds: All seeds
Export id: 9670f3399f774789b7c3e18975d25611
Export type: twitter_filter
Format: Full JSON
Export completed:  Oct. 30, 2017, 11:21:04 p.m. EDT
Deduplicate: No

COLLECTION INFORMATION
Collection name: test set for world series
Collection id: 34e3f7460b5c4df09d64a1e61fd81238
Collection set: mlb-test (collection set id d6e8c27b1bc942e78790aa55a82b3a7a)
Harvest type: Twitter filter
Collection description: running for just one hour, just for fun.

Harvest options:
Media: No
Web resources: No

Seeds:
* Track: dodgers,astros - Active

Change log:

Change to test set for world series (collection) on Oct. 30, 2017, 10:59:56 p.m. EDT by dchud:
* is_active: "True" changed to "False"

Change to test set for world series (collection) on Oct. 30, 2017, 10:58:51 p.m. EDT by dchud:
* is_on: "True" changed to "False"

Change to test set for world series (collection) on Oct. 2

The most important pieces in that metadata are:

 * It tracked tweets that mentioned "dodgers" or "astros".  Every item in this set should refer to one or the other, or both.
 * This data was not deduplicated; we may see individual items more than once.
 * Data was collected between October 29 and October 30.  Game 5 of the Series was played during this time.
 
You should not need to know anything about baseball to complete this assignment.

**Please note**: sometimes social media data contains offensive material.  This data set has not been filtered; if you do come across something inappropriate, please do your best to ignore it if you can.

## Fetch the data

The following files are available:

 * https://s3.amazonaws.com/2017-dmfa/project-3/9670f3399f774789b7c3e18975d25611_003.json
 * https://s3.amazonaws.com/2017-dmfa/project-3/9670f3399f774789b7c3e18975d25611_004.json
 * https://s3.amazonaws.com/2017-dmfa/project-3/9670f3399f774789b7c3e18975d25611_005.json
 * https://s3.amazonaws.com/2017-dmfa/project-3/9670f3399f774789b7c3e18975d25611_006.json
 
### Q1.1 - Select at least one and obtain it using `wget`.  Verify the file sizes using the command line.

Each file should contain exactly 100,000 tweets.  

*Note*: you are only required to use one of these files, but you may use more than one.  It will be easier to process more data if you use a larger EC2 instance type, as suggested above.  Use the exact same set of files throughout the assignment.

**Answer**

In [47]:
!wget https://s3.amazonaws.com/2017-dmfa/project-3/9670f3399f774789b7c3e18975d25611_003.json

--2017-11-13 00:59:32--  https://s3.amazonaws.com/2017-dmfa/project-3/9670f3399f774789b7c3e18975d25611_003.json
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.80.235
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.80.235|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 595711407 (568M) [application/json]
Saving to: ‘9670f3399f774789b7c3e18975d25611_003.json’


2017-11-13 00:59:38 (91.6 MB/s) - ‘9670f3399f774789b7c3e18975d25611_003.json’ saved [595711407/595711407]



For your reference, here is the text of one Tweet, randomly selected from one of these files.  You might wish to study its structure and refer to it later.

In [38]:
!cat *.json | shuf -n 1 > 9670f3399f774789b7c3e18975d25611_003.json

In [40]:
import json
print(json.dumps(json.load(open("9670f3399f774789b7c3e18975d25611_003.json")), indent=2))

{
  "source": "<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>",
  "favorite_count": 0,
  "is_quote_status": false,
  "truncated": false,
  "id": 924874009405145088,
  "favorited": false,
  "retweeted": false,
  "coordinates": null,
  "place": null,
  "entities": {
    "hashtags": [],
    "user_mentions": [],
    "symbols": [],
    "urls": []
  },
  "id_str": "924874009405145088",
  "created_at": "Mon Oct 30 05:42:01 +0000 2017",
  "retweet_count": 0,
  "geo": null,
  "in_reply_to_screen_name": null,
  "quote_count": 0,
  "timestamp_ms": "1509342121827",
  "lang": "en",
  "in_reply_to_status_id": null,
  "text": "So it's going to be hard AS FUCK to win for the Dodgers and close any of the remaining games even with the best of your bullpen...",
  "in_reply_to_user_id": null,
  "user": {
    "profile_sidebar_border_color": "000000",
    "description": "Professional Bettor. #Money = Motivation. #FreePicks All Sports KING Access \ud83c\udfc8\u26bd\ufe

You can find several key elements in this example; the text, time, and language of the tweet, whether it was a reply to another user, the user's screen name along with their primary language and other account information like creation date, follower/friend/tweet counts, and perhaps their location.  If there are hashtags, user mentions, or urls present in their tweet, they will be present in the `entities` section; these are not present in every tweet.  If this is a retweet, you will see the original tweet and its information nested within.

### Q1.2 - Start up Spark, and verify the file sizes.

We will use our normal startup sequence here:

In [5]:
import os

In [6]:
os.environ['SPARK_HOME'] = '/usr/local/lib/spark'

In [7]:
import findspark

In [8]:
findspark.init()

In [9]:
from pyspark import SparkContext

In [10]:
spark = SparkContext(appName='project-03')

In [11]:
spark

In [12]:
from pyspark import SQLContext

In [13]:
sqlc = SQLContext(spark)

In [14]:
sqlc

<pyspark.sql.context.SQLContext at 0x7fa130db06a0>

In [48]:
tweets = sqlc.read.json("9670f3399f774789b7c3e18975d25611_003.json")

In [31]:
tweets.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |

Verify that Spark has loaded the same number of tweets you saw before:

**Answer**

In [46]:
!wc -l 9670f3399f774789b7c3e18975d25611_003.json

1 9670f3399f774789b7c3e18975d25611_003.json


In [14]:
tweets.count()

100000

## Part 2 - Comparing DataFrames and Spark SQL

For the next three questions, we will look at operations using both DataFrames and SQL queries. Note that `tweets` is already a DataFrame:

In [17]:
tweets

DataFrame[contributors: string, coordinates: struct<coordinates:array<double>,type:string>, created_at: string, display_text_range: array<bigint>, entities: struct<hashtags:array<struct<indices:array<bigint>,text:string>>,media:array<struct<display_url:string,expanded_url:string,id:bigint,id_str:string,indices:array<bigint>,media_url:string,media_url_https:string,sizes:struct<large:struct<h:bigint,resize:string,w:bigint>,medium:struct<h:bigint,resize:string,w:bigint>,small:struct<h:bigint,resize:string,w:bigint>,thumb:struct<h:bigint,resize:string,w:bigint>>,source_status_id:bigint,source_status_id_str:string,source_user_id:bigint,source_user_id_str:string,type:string,url:string>>,symbols:array<struct<indices:array<bigint>,text:string>>,urls:array<struct<display_url:string,expanded_url:string,indices:array<bigint>,url:string>>,user_mentions:array<struct<id:bigint,id_str:string,indices:array<bigint>,name:string,screen_name:string>>>, extended_entities: struct<media:array<struct<display_

To issue SQL queries, we need to register a table based on `tweets`:

In [49]:
tweets.createOrReplaceTempView("tweets")

### Q2.1 - Which 10 languages are most commonly used in tweets?  Verify your result by executing it with both the dataframe and with SQL.

Hint: for the dataframe, use `groupBy`, `count`, and `orderBy`.  See the documentation at https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html for details on these and other functions.

**Answer**

In [15]:
tweets.groupBy("lang").count().orderBy("count", ascending=False).show(10)

+----+-----+
|lang|count|
+----+-----+
|  en|88987|
|  es| 6825|
| und| 3058|
|  in|  210|
|  fr|  181|
|  pt|  133|
|  nl|   89|
|  ht|   83|
|  ja|   81|
|  tl|   77|
+----+-----+
only showing top 10 rows



In [19]:
sqlc.sql("SELECT lang, COUNT(*) AS count FROM tweets GROUP BY lang ORDER by count DESC").show(10)

+----+-----+
|lang|count|
+----+-----+
|  en|88987|
|  es| 6825|
| und| 3058|
|  in|  210|
|  fr|  181|
|  pt|  133|
|  nl|   89|
|  ht|   83|
|  ja|   81|
|  tl|   77|
+----+-----+
only showing top 10 rows



### Q2.2 - Which 10 time zones are most common among users?  Verify your result with both the dataframe and SQL.

*Note*: for this question, you may leave NULL values present in your results, as a way to help you understand what data is present and what is missing.

**Answer**

In [17]:
tweets.groupBy("user.time_zone").count().orderBy("count", ascending=False).show(10)

+--------------------+-----+
|           time_zone|count|
+--------------------+-----+
|                null|42239|
|Central Time (US ...|17436|
|Pacific Time (US ...|17097|
|Eastern Time (US ...| 8664|
|             Arizona| 2490|
|Mountain Time (US...| 2475|
|Atlantic Time (Ca...| 1048|
|             Caracas| 1006|
|              Hawaii|  821|
|         Mexico City|  793|
+--------------------+-----+
only showing top 10 rows



In [20]:
sqlc.sql("SELECT user.time_zone, COUNT(*) AS count FROM tweets GROUP BY time_zone ORDER by count DESC").show(10)

+--------------------+-----+
|           time_zone|count|
+--------------------+-----+
|                null|42239|
|Central Time (US ...|17436|
|Pacific Time (US ...|17097|
|Eastern Time (US ...| 8664|
|             Arizona| 2490|
|Mountain Time (US...| 2475|
|Atlantic Time (Ca...| 1048|
|             Caracas| 1006|
|              Hawaii|  821|
|         Mexico City|  793|
+--------------------+-----+
only showing top 10 rows



### Q2.3 - How many tweets mention the Dodgers?  How many mention the Astros?  How many mention both?

You may use either the dataframe or SQL to answer.  Explain why you have chosen that approach.

Hint:  you will want to look at the value of the `text` field.

**Answer**

In [22]:
sqlc.sql("SELECT text FROM tweets WHERE text LIKE '%Astros%' ").count()

34162

In [23]:
sqlc.sql("SELECT text FROM tweets WHERE text LIKE '%Dodgers%' ").count()

27822

In [24]:
sqlc.sql("SELECT text FROM tweets WHERE (text LIKE '%Dodgers%' AND text LIKE '%Astros%') ").count()

10421

## Part 3 - More complex queries

For this section, you may choose to use dataframe queries or SQL.  If you wish, you may verify results by using both, as in Part 2, but this is not required for this section.

### Q3.1 - Team mentions by location

In which users' locations are the Astros and the Dodgers being mentioned the most?  Consider each team separately, one at a time.  Discuss your findings.

Hint:  you may use either the time zones or user-specified locations for this question.

**Answer**

In [28]:
sqlc.sql("SELECT user.time_zone, count(*) As count_tz FROM tweets WHERE text LIKE '%Astros%' GROUP BY user.time_zone ORDER BY count_tz DESC").show(10)

+--------------------+--------+
|           time_zone|count_tz|
+--------------------+--------+
|                null|   13475|
|Central Time (US ...|    6286|
|Pacific Time (US ...|    5382|
|Eastern Time (US ...|    3489|
|Mountain Time (US...|     927|
|             Arizona|     708|
|Atlantic Time (Ca...|     422|
|             Caracas|     394|
|               Quito|     353|
|         Mexico City|     342|
+--------------------+--------+
only showing top 10 rows



In [25]:
sqlc.sql("SELECT user.time_zone, count(*) As count_tz FROM tweets WHERE text LIKE '% Dodgers%' GROUP BY user.time_zone ORDER BY count_tz DESC").show(10)

+--------------------+--------+
|           time_zone|count_tz|
+--------------------+--------+
|                null|    6246|
|Pacific Time (US ...|    3394|
|Central Time (US ...|    1611|
|Eastern Time (US ...|    1426|
|             Arizona|     517|
|Mountain Time (US...|     309|
|         Mexico City|     214|
|Atlantic Time (Ca...|     177|
|             Caracas|     171|
|              Alaska|     145|
+--------------------+--------+
only showing top 10 rows



### Q3.2 - Which Twitter users are being replied to the most?

Discuss your findings.

Hint: use the top-level `in_reply_to_screen_name` for this.

**Answer**

In [54]:
sqlc.sql("SELECT in_reply_to_screen_name, COUNT(*) AS count_replying FROM tweets GROUP BY in_reply_to_screen_name ORDER by count_replying DESC").show(10)

+-----------------------+--------------+
|in_reply_to_screen_name|count_replying|
+-----------------------+--------------+
|                   null|         91099|
|                 astros|           821|
|                Dodgers|           624|
|                    MLB|           196|
|          stephenasmith|           106|
|               MLBONFOX|            68|
|          DodgerInsider|            61|
|          Nick_Offerman|            54|
|                ABREG_1|            49|
|                trvisXX|            46|
+-----------------------+--------------+
only showing top 10 rows



### Q3.3 - Which 10 verified users have the most followers?  Which 10 unverified users have the most followers?

Provide both the screen names and follower counts for each.

Discuss your findings.

**Answer**

In [51]:
sqlc.sql("SELECT user.name, MAX(user.followers_count) AS count_verified FROM tweets WHERE user.verified = 'true' GROUP BY user.name ORDER by count_verified DESC").show(10)

+--------------------+--------------+
|                name|count_verified|
+--------------------+--------------+
|    Reuters Top News|      18937529|
|            Fox News|      16272836|
|            ABC News|      12551437|
|     Washington Post|      11417638|
|                 MLB|       7841255|
|                 NPR|       7289492|
|        Bill Simmons|       6000106|
|            NBC News|       5442705|
|         John Legere|       4630104|
|ABS-CBN News Channel|       4453229|
+--------------------+--------------+
only showing top 10 rows



In [52]:
sqlc.sql("SELECT user.name, MAX(user.followers_count) AS count_verified FROM tweets WHERE user.verified = 'false' GROUP BY user.name ORDER by count_verified DESC").show(10)

+--------------------+--------------+
|                name|count_verified|
+--------------------+--------------+
|    TENIENTE CHOCHOS|        833669|
|Diario El Carabobeño|        725952|
|          ❤Ƥ▲ϻ(❛‿❛)❤|        712254|
|  🗽Jeffrey Levin 🗽|        568341|
|       It's Bernard®|        559669|
|          EP | Mundo|        538525|
|              LALATE|        516139|
|    Captain Gigawatt|        503015|
|     BLACK GOKU 😈🔥|        496825|
|      EP | Venezuela|        493446|
+--------------------+--------------+
only showing top 10 rows



### Q3.4 - What are the most popular sets of hashtags among users with many followers?  Are they the same as among users with few followers?

Decide for yourself exactly how many followers you believe to be "many", and explain your decision.  You may use queries and statistics to support this decision if you wish.

Hint: if your sample tweet above does not include hashtags under the `entities` field, generate a new example by running the `shuf` command again until you find one that does.

Hint 2: the hashtag texts will be in an array, so you may need some functions you haven't used before.  If you're using SQL, see the docs for [Hive SQL](https://docs.treasuredata.com/articles/hive-functions) for details, (and consider `CONCAT_WS`, for example).

Discuss your findings.

**Answer**

In [59]:
sqlc.sql("SELECT entities.hashtags, count(*) as count_entity FROM tweets GROUP BY entities.hashtags ORDER BY count_entity DESC").show(10)

+--------------------+------------+
|            hashtags|count_entity|
+--------------------+------------+
|                  []|       54788|
|[[WrappedArray(55...|        4588|
|[[WrappedArray(34...|        3510|
|[[WrappedArray(29...|        1753|
|[[WrappedArray(12...|        1709|
|[[WrappedArray(54...|        1233|
|[[WrappedArray(98...|         961|
|[[WrappedArray(71...|         909|
|[[WrappedArray(62...|         841|
|[[WrappedArray(31...|         655|
+--------------------+------------+
only showing top 10 rows



### Q3.5 - Analyze common words in tweet text

Following the example in class, use `tweets.rdd` to find the most common interesting words in tweet text.  To keep it "interesting", add a filter that removes at least 10 common stop words found in tweets, like "a", "an", "the", and "RT" (you might want to derive these stop words from initial results).  To split lines into words, a simple split on text whitespace like we had in class is sufficient; you do not have to account for punctuation.

After you find the most common words, use dataframe or SQL queries to find patterns among how those words are used.  For example, are they more frequently used by Dodgers or Astros fans, or by people in one part of the country over another?  Explore and see what you can find, and discuss your findings.

Hint: don't forget all the word count pipeline steps we used earlier in class.

**Answer**

In [61]:
wordss = ['rt', 'the', 'in', 'a', 'to', 'of','is','i','and','this','for','it']
tweets.rdd.flatMap(lambda r: r['text'].lower().split(' ')) \
    .filter(lambda t: t and t not in wordss) \
    .map(lambda t: (t, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .takeOrdered(10, key=lambda pair: -pair[1])

[('astros', 26765),
 ('dodgers', 17965),
 ('game', 15800),
 ('@astros:', 15408),
 ('#worldseries', 12688),
 ('win', 12252),
 ('#earnhistory', 11280),
 ('@astros', 10600),
 ('world', 8366),
 ('series', 7985)]

In [62]:
sqlc.sql("SELECT user.location, COUNT(*) AS count FROM tweets WHERE text LIKE '%astros%' GROUP BY location ORDER by count DESC").show(10)

+---------------+-----+
|       location|count|
+---------------+-----+
|           null| 8809|
|    Houston, TX| 3191|
|     Texas, USA|  691|
|          Texas|  398|
|        Houston|  340|
| Houston, Texas|  316|
|Los Angeles, CA|  216|
|     Austin, TX|  215|
|  United States|  191|
|San Antonio, TX|  163|
+---------------+-----+
only showing top 10 rows



In [63]:
sqlc.sql("SELECT user.location, COUNT(*) AS count FROM tweets WHERE text LIKE '%dodgers%' GROUP BY location ORDER by count DESC").show(10)

+--------------------+-----+
|            location|count|
+--------------------+-----+
|                null| 1528|
|     Los Angeles, CA|  187|
|         Houston, TX|  109|
|     California, USA|   95|
|         Los Angeles|   33|
|       United States|   32|
|          California|   24|
|      Long Beach, CA|   21|
|Del otro lado del...|   18|
|       Las Vegas, NV|   17|
+--------------------+-----+
only showing top 10 rows



Most of the top 10 frequent keywords are tweeted from by the users in Houston, TX while 'dogers', the second most frequent keywords are mostly posted from the users in LA. 