In [1]:
%matplotlib inline
import matplotlib.pylab as plt
plt.rcParams['figure.figsize'] = (10,6)
plt.rcParams['font.size'] = 18
plt.style.use('fivethirtyeight')

# Uncovering World Events using Twitter Hashtags

## ... and learning about Spark `DataFrames` along the way

In this notebook, we will use temporal information about Twitter hashtags to discover trending topics and potentially uncover world events as they occurred. You've already used this dataset in a previous homework, but here we will be working with it in Apache Spark.

### Hashtags

The idea here is that when an event is happening and people are having a conversation about it on Twitter, a set of uniform hashtags that represent the event spontaneously evolves. Twitter users then use those hashtags to communicate with one another. Some hashtags, like `#RT` for "retweet" or just `#retweet` are used frequently and don't tell us much about what is going on. But a sudden appearance of a hashtag like `#oscars` probably indicates that the oscars are underway. For a particularly cool example of this type of analysis, check out [this blog post about earthquake detection using Twitter data](https://blog.twitter.com/official/en_us/a/2015/usgs-twitter-data-earthquake-detection.html) (although they search the text and not necessarily hashtags).

## Initialize the `SparkSession`

In [2]:
import getpass
import pyspark
from pyspark.sql import SparkSession

conf = pyspark.conf.SparkConf()
conf.setMaster('yarn')
conf.setAppName('twitter-{0}'.format(getpass.getuser()))
conf.set('spark.executor.memory', '4g')
conf.set('spark.executor.instances', '6')
conf.set('spark.port.maxRetries', '100')
sc = pyspark.SparkContext.getOrCreate(conf)
conf = sc.getConf()
sc

In [3]:
spark = SparkSession(sc)

## Set up the hashtag `DataFrame` (10 points / 60)

We have prepared the hashtag data spanning the time from April-July 2016. This is a significant time in modern European history, e.g. see [Brexit](https://en.wikipedia.org/wiki/Brexit). Lets see if we can see any interesting trends about these events in the Twitter data. 

In [4]:
from datetime import datetime

import pyspark.sql.functions as functions

### TODO (1 point/10)

Load the **parquet** data from `/datasets/twitter_hashtags` into a Spark dataframe using the appropriate `SparkSession` method. 

Look at the first few rows of the dataset - note the timestamp and its units!

In [5]:
df = spark.read.parquet('/datasets/twitter_hashtags')

In [6]:
df.show()

+-------------+----+-------------+
| timestamp_ms|lang|      hashtag|
+-------------+----+-------------+
|1459492860657| und|   2896Nation|
|1459492860657| und|     Republik|
|1459492860658|  en|          win|
|1459492860658|  en|  competition|
|1459492860658|  en|     giveaway|
|1459492860664|  tl| ALDUBYayaWho|
|1459492860663|  th|        ไคตัล|
|1459492860660|  es|      muybien|
|1459492860660|  th|    FLY2NDWIN|
|1459492860665|  en|      5SOSFam|
|1459492860665|  en|  BestFanArmy|
|1459492860665|  en| iHeartAwards|
|1459492861661|  ja|      hempire|
|1459492861657|  th|        ไคตัล|
|1459492861657|  es|    Venezuela|
|1459492861657|  es|   cristianos|
|1459492861660| und|         RUSH|
|1459492861660| und|        TFBJP|
|1459492861660| und|       TFBANG|
|1459492861661|  en|BirdieSanders|
+-------------+----+-------------+
only showing top 20 rows



### User-defined functions

A neat trick of spark dataframes is that you can essentially use something very much like an RDD `map` method but without switching to the RDD. If you are familiar with database languages, this works very much like e.g. a user-defined function in SQL. 

So, for example, if we wanted to make a user-defined python function that returns the hashtags in lowercase, we could do something like this:

In [7]:
@functions.udf
def lowercase(text):
    """Convert text to lowercase"""
    return text.lower()

The @functions.udf is a "decorator" -- this is really handy python syntactic sugar and in this case is equivalent to:

```
def lowercase(text):
    return text.lower()
    
lowercase = functions.udf(lowercase)
```

It basically takes our function and adds to its functionality. In this case, it registers our function as a pyspark dataframe user-defined function (UDF).

Using these UDFs is very straightforward and analogous to other Spark dataframe operations. For example:

In [8]:
df.select(lowercase(df.hashtag)).show()

+------------------+
|lowercase(hashtag)|
+------------------+
|        2896nation|
|          republik|
|               win|
|       competition|
|          giveaway|
|      aldubyayawho|
|             ไคตัล|
|           muybien|
|         fly2ndwin|
|           5sosfam|
|       bestfanarmy|
|      iheartawards|
|           hempire|
|             ไคตัล|
|         venezuela|
|        cristianos|
|              rush|
|             tfbjp|
|            tfbang|
|     birdiesanders|
+------------------+
only showing top 20 rows



Using a framework like Spark is all about understanding the ins and outs of how it functions and knowing what it offers. One of the cool things about the dataframe API is that many functions are already defined for you (turning strings into lowercase being one of them). 

### TODO (2 points / 10)

Find the Spark python API documentation. Look for the `sql` section and find the listing of `sql.functions`. Repeat the above (turning hashtags into lowercase) but use the built-in function.

In [9]:
from pyspark.sql.functions import lower
df.select(lower(df.hashtag)).show()

+--------------+
|lower(hashtag)|
+--------------+
|    2896nation|
|      republik|
|           win|
|   competition|
|      giveaway|
|  aldubyayawho|
|         ไคตัล|
|       muybien|
|     fly2ndwin|
|       5sosfam|
|   bestfanarmy|
|  iheartawards|
|       hempire|
|         ไคตัล|
|     venezuela|
|    cristianos|
|          rush|
|         tfbjp|
|        tfbang|
| birdiesanders|
+--------------+
only showing top 20 rows



We'll work with a combination of these built-in functions and user-defined functions for the remainder of this homework. 

### TODO (7 points/10)

Create `english_df` consisting of **lowercase** hashtags from only english-language tweets. In addition, convert the timestamp to a more readable format like this and name the column `date`:

```
2016-04-01 08:41:00
```

Your `english_df` should look something like this:

```
+-------------+----+-----------+-------------------+
| timestamp_ms|lang|    hashtag|               date|
+-------------+----+-----------+-------------------+
|1459492860658|  en|        win|2016-04-01 08:41:00|
|1459492860658|  en|competition|2016-04-01 08:41:00|
|1459492860658|  en|   giveaway|2016-04-01 08:41:00|
|1459492860665|  en|    5sosfam|2016-04-01 08:41:00|
|1459492860665|  en|bestfanarmy|2016-04-01 08:41:00|
+-------------+----+-----------+-------------------+
```

## Twitter hashtag trends (50 points / 60)

In this section we will try to do a slightly more complicated analysis of the tweets. Our goal is to get an idea of tweet frequency as a function of time for certain hashtags. 

Lets build this up in steps. First, lets see how we can start to organize the tweets by their timestamps. 

As a first easy example, lets say we just want to count the number of tweets per minute over the entire span of our data. For this, we first need a "global" minute value, e.g. "minute of the year" or something similar. 

Spark provides us with some handy built-in dataframe functions that are made for transforming date and time fields. 

Have a look [here](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) to see the whole list of custom dataframe functions - you will need to use them to complete the next set of TODO items.

Note that the functions can be combined. Consider the following dataframe and its transformation:

In [10]:
from pyspark.sql import Row

# create a sample dataframe with one column "degrees" going from 0 to 180
test_df = spark.createDataFrame(sc.range(180).map(lambda x: Row(degrees=x)), ['degrees'])

# define a function "sin_rad" that first converts degrees to radians and then takes the sine using built-in functions
sin_rad = functions.sin(functions.radians(test_df.degrees))

# show the result
test_df.select(sin_rad).show()

+---------------------+
|SIN(RADIANS(degrees))|
+---------------------+
|                  0.0|
|  0.01745240643728351|
|  0.03489949670250097|
|  0.05233595624294383|
|   0.0697564737441253|
|  0.08715574274765817|
|  0.10452846326765346|
|  0.12186934340514748|
|  0.13917310096006544|
|  0.15643446504023087|
|  0.17364817766693033|
|   0.1908089953765448|
|  0.20791169081775931|
|  0.22495105434386498|
|  0.24192189559966773|
|  0.25881904510252074|
|  0.27563735581699916|
|   0.2923717047227367|
|   0.3090169943749474|
|   0.3255681544571567|
+---------------------+
only showing top 20 rows



### DataFrames `groupBy`

We used `groupBy` already in the previous notebook, but here we will take more advantage of its features. 

One important thing to note is that unlike other RDD or DataFrame transformations, the `groupBy` does not return another DataFrame, but a `GroupedData` object instead, with its own methods. These methods allow you to do various transformations and aggregations on the data of the grouped rows. 

Conceptually the procedure is a lot like this:

![groupby](https://i.stack.imgur.com/sgCn1.jpg)

The column that is used for the `groupBy` is the `key` - once we have the values of a particular key all together, we can use various aggregation functions on them to generate a transformed dataset. In this example, the aggregation function is a simple `sum`. In the simple procedure below, the `key` will be the hashtag.

### TODO (1 point / 50)

Calculate the top five most common hashtags in the whole english-language dataset.

This should be your result:

```
+-------------+------+
|      hashtag| count|
+-------------+------+
|   mtvhottest|800829|
|veranomtv2016|539028|
| iheartawards|447651|
|  bestfanarmy|429739|
|   teenchoice|345250|
+-------------+------+
```

In [11]:
from pyspark.sql.functions import from_unixtime
english_df = (df.withColumn('hashtag', lower(df.hashtag))
                .withColumn('date', from_unixtime(df.timestamp_ms/1000, 'YYYY-MM-dd hh:mm:ss'))
                .filter(df.lang.isin('en'))
             )
english_df.show(5)

+-------------+----+-----------+-------------------+
| timestamp_ms|lang|    hashtag|               date|
+-------------+----+-----------+-------------------+
|1459492860658|  en|        win|2016-04-01 08:41:00|
|1459492860658|  en|competition|2016-04-01 08:41:00|
|1459492860658|  en|   giveaway|2016-04-01 08:41:00|
|1459492860665|  en|    5sosfam|2016-04-01 08:41:00|
|1459492860665|  en|bestfanarmy|2016-04-01 08:41:00|
+-------------+----+-----------+-------------------+
only showing top 5 rows



## Daily hashtag trends

Now we will start to complicate the analysis a bit. Remember, our goal is to uncover trending topics on a timescale of a few days. A much needed column then is simply `day`. To convert the date string into day-of-year, you can use the built-in [dayofyear](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.dayofyear) function. 

In the subsequent sections we will then not only see which hashtags are globally most popular, but which ones experience the biggest changes in popularity - those are the "trending" topics. If there is suddenly a substantial increase of a hashtag over a matter of a day or two, it may signify an event taking place. 

In [14]:
from pyspark.sql.functions import dayofyear, weekofyear, month
english_df.withColumn('day', dayofyear(english_df.date)).show()

+-------------+----+-------------+-------------------+---+
| timestamp_ms|lang|      hashtag|               date|day|
+-------------+----+-------------+-------------------+---+
|1459492860658|  en|          win|2016-04-01 08:41:00| 92|
|1459492860658|  en|  competition|2016-04-01 08:41:00| 92|
|1459492860658|  en|     giveaway|2016-04-01 08:41:00| 92|
|1459492860665|  en|      5sosfam|2016-04-01 08:41:00| 92|
|1459492860665|  en|  bestfanarmy|2016-04-01 08:41:00| 92|
|1459492860665|  en| iheartawards|2016-04-01 08:41:00| 92|
|1459492861661|  en|birdiesanders|2016-04-01 08:41:01| 92|
|1459492861665|  en|     deals_us|2016-04-01 08:41:01| 92|
|1459492861665|  en|   cellphones|2016-04-01 08:41:01| 92|
|1459492862658|  en| iheartawards|2016-04-01 08:41:02| 92|
|1459492862658|  en|  bestfanarmy|2016-04-01 08:41:02| 92|
|1459492862658|  en|      5sosfam|2016-04-01 08:41:02| 92|
|1459492862661|  en|     msgprays|2016-04-01 08:41:02| 92|
|1459492863665|  en|   5050fusion|2016-04-01 08:41:03| 9

### TODO (2 points / 50)

Create a dataframe called `daily_hashtag` that includes the columns `month`, `week`, `day` and `hashtag`. Use the `english_df` you made above to start, and make sure you find the appropriate spark dataframe functions to make your life easier. Show the result.

Try to match this view:

```
+-----+----+---+-----------+
|month|week|day|    hashtag|
+-----+----+---+-----------+
|    4|  13| 92|        win|
|    4|  13| 92|competition|
|    4|  13| 92|   giveaway|
|    4|  13| 92|    5sosfam|
|    4|  13| 92|bestfanarmy|
+-----+----+---+-----------+
```

In [15]:
daily_hashtag = (english_df.withColumn('month', month(english_df.date))
                           .withColumn('week', weekofyear(english_df.date))
                           .withColumn('day', dayofyear(english_df.date))
                           .select('month', 'week', 'day', 'hashtag')
                )

daily_hashtag.show(5)

+-----+----+---+-----------+
|month|week|day|    hashtag|
+-----+----+---+-----------+
|    4|  13| 92|        win|
|    4|  13| 92|competition|
|    4|  13| 92|   giveaway|
|    4|  13| 92|    5sosfam|
|    4|  13| 92|bestfanarmy|
+-----+----+---+-----------+
only showing top 5 rows



### TODO (2 points / 50)

Now we want to calculate the number of times a hashtag is used per day. Sort in descending order of daily counts and show the result. Call the resulting dataframe `day_counts`.

Your output should look like this:

```
+---+------------+----+------+
|day|     hashtag|week| count|
+---+------------+----+------+
| 94|iheartawards|  13|190884|
| 94| bestfanarmy|  13|185789|
| 93|iheartawards|  13|124645|
| 93| bestfanarmy|  13|122741|
| 95|iheartawards|  14| 91921|
+---+------------+----+------+
```

<div class="alert alert-info">
<p>Make sure you use `cache()` when you create `day_counts` because we will need it in the steps that follow!</p>
</div>

In [16]:
day_counts = (daily_hashtag.groupby('day', 'hashtag', 'week')
                           .count()
                           .sort('count', ascending = False)
                           .cache()
             )

day_counts.show(5)

+---+------------+----+------+
|day|     hashtag|week| count|
+---+------------+----+------+
| 94|iheartawards|  13|190884|
| 94| bestfanarmy|  13|185789|
| 93|iheartawards|  13|124645|
| 93| bestfanarmy|  13|122741|
| 95|iheartawards|  14| 91921|
+---+------------+----+------+
only showing top 5 rows



### TODO (5 points / 50)

To get an idea of the most popular persistent hashtags, use the `week` column that we kept around to calculate the average number of times per day that a hashtag occurs in a week. Sort in descending order and show the top 20.

In [32]:
week_avg = (day_counts.groupby('hashtag', 'week')
          .mean('count')
          .sort('avg(count)', ascending = False)
).show(20)

+-----------------+----+------------------+
|          hashtag|week|        avg(count)|
+-----------------+----+------------------+
|     iheartawards|  13|117272.33333333333|
|      bestfanarmy|  13|114718.66666666667|
|       mtvhottest|  29| 52061.42857142857|
|       mtvhottest|  30|           47927.0|
|          5sosfam|  13|41642.333333333336|
|      harmonizers|  13|           40434.0|
|     directioners|  13|30483.666666666668|
|       teenchoice|  19|20068.571428571428|
|    veranomtv2016|  25| 17867.85714285714|
|    veranomtv2016|  26| 17116.14285714286|
|aldub9thmonthsary|  15|16189.666666666666|
|    veranomtv2016|  27| 14476.57142857143|
|       mtvhottest|  31|           14062.0|
|     iheartawards|  14|13457.142857142857|
|    veranomtv2016|  30|12510.285714285714|
|       mtvhottest|  28|12287.714285714286|
|    mtvawardsstar|  23|12191.833333333334|
|      bestfanarmy|  14| 12039.42857142857|
|     ripchristina|  23|           11963.5|
|    altonsterling|  27|        

### Using `Window` functions 

Window functions are another awesome feature of dataframes. They allow users to accomplish complex tasks using very concise and simple code. 

Above we computed just the hashtag that had the most occurrences on *any* day. Now lets say we want to know the top tweets for *each* day.  

This is a non-trivial thing to compute and requires "windowing" our data. I recommend reading this [window functions article](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html) to get acquainted with the idea. You can think of a window function as a fine-grained and more flexible `groupBy`. 

There are two things we need to define to use window functions:

1. the "window" to use, based on which columns (partitioning) and how the rows should be ordered 
2. the computation to carry out for each windowed group, e.g. a max, an average etc.

Lets see how this works by example. We will define a window function, `daily_window` that will partition data based on the `day` column. Within each window, the rows will be ordered by the daily hashtag count that we computed above. Finally, we will use the rank function **over** this window to give us the ranking of top tweets. 

In the end, this is a fairly complicated operation achieved in just a few lines of code! (can you think of how to do this with an RDD??)

In [19]:
from pyspark.sql import Window

First, we specify the window function and the ordering:

In [20]:
daily_window = Window.partitionBy('day').orderBy(functions.desc('count'))

The above window function says that we should window the data on the `day` column and order it by count. 

Now we need to define what we want to compute on the windowed data. We will start by just calculating the daily ranking of hashtags, so we can use the helpful built-in `functions.rank()` and sort:

In [21]:
daily_rank = functions.rank() \
                      .over(daily_window) \
                      .alias('rank')

In [22]:
daily_rank

Column<b'RANK() OVER (PARTITION BY day ORDER BY count DESC NULLS LAST UnspecifiedFrame) AS `rank`'>

Now we have the pieces we need to compute the top five hashtags for each day in our data:

In [23]:
(day_counts.select('day', 'count', 'hashtag', daily_rank)
           .filter('rank <= 5')
           .sort('day', 'rank')
           .show(20))

+---+------+------------+----+
|day| count|     hashtag|rank|
+---+------+------------+----+
| 92| 36288|iheartawards|   1|
| 92| 35626| bestfanarmy|   2|
| 92| 15678| harmonizers|   3|
| 92| 13962|     5sosfam|   4|
| 92|  5528|directioners|   5|
| 93|124645|iheartawards|   1|
| 93|122741| bestfanarmy|   2|
| 93| 50782| harmonizers|   3|
| 93| 42458|     5sosfam|   4|
| 93| 28171|directioners|   5|
| 94|190884|iheartawards|   1|
| 94|185789| bestfanarmy|   2|
| 94| 68507|     5sosfam|   3|
| 94| 57752|directioners|   4|
| 94| 54842| harmonizers|   5|
| 95| 91921|iheartawards|   1|
| 95| 83197| bestfanarmy|   2|
| 95| 41305|     5sosfam|   3|
| 95| 20245| harmonizers|   4|
| 95| 18909|wrestlemania|   5|
+---+------+------------+----+
only showing top 20 rows

