# Week 9 lecture notes

This week we'll dig into [Apache Spark](http://spark.apache.org/), a little more, exploring its DataFrame, SQL, and Streaming APIs.

First we start up Spark the usual way:

In [None]:
import os

In [None]:
os.environ['SPARK_HOME'] = '/usr/local/lib/spark'

In [None]:
import findspark

In [None]:
findspark.init()

In [None]:
from pyspark import SparkContext

In [None]:
spark = SparkContext(appName='test')

In [None]:
spark

## The DataFrame API

Let's start with the DataFrame API.  It's for array-oriented operations, just like you might already be used to with R or Python's Pandas module.

Note that you can find some introductory docs for this and the SQL API on the [Apache Spark docs page](https://spark.apache.org/docs/latest/sql-programming-guide.html).

The first step is that we load data a little differently.  We'll step away from the bikes this week and look at something else:  social media data from Twitter.

Note that the data in these examples, and more data you can obtain for yourself, came from the GWU Libraries' [Social Feed Manager](https://sfm.library.gwu.edu/) app.  You can log in and use it yourself, though note that access is restricted to campus or VPN connections.

First we obtain a `SQLContext` from our existing `SparkContext`.

In [None]:
from pyspark import SQLContext

In [None]:
sqlc = SQLContext(spark)

In [None]:
sqlc

In [None]:
!wget https://s3.amazonaws.com/2017-dmfa/week-9/solar-eclipse-tweets.csv

In [None]:
!mv solar-eclipse-tweets.csv tweets.csv

In [None]:
!wc -l tweets.csv

In [None]:
!head tweets.csv | csvcut -n

In [None]:
!head -5 tweets.csv | csvlook

The `read.csv()` function on `SQLContext` is very handy.  Take a close look at the parameters.

In [None]:
tweets = sqlc.read.csv("tweets.csv", header=True, inferSchema=True)

In [None]:
tweets.count()

In [None]:
tweets.take(5)

Looks like what we've seen before, yes?  Except that these are `Rows`, not an RDD.

They **do** have an RDD under the hood, though.

In [None]:
tweets.rdd

In [None]:
tweets.rdd.count()

You can do a little more with a `DataFrame` than you can with an `RDD`:

In [None]:
tweets.columns

This is all well and good, but how well did schema inference work?

In [None]:
tweets.printSchema()

Not very well!  This is not uncommon.  You might have to cast some columns to other types, like in this example:

In [None]:
import pyspark
dir(pyspark.sql.types)

In [None]:
from pyspark.sql.types import DateType

In [None]:
tweets = tweets.withColumn("created_at", tweets["created_at"].cast(DateType()))

In [None]:
tweets.printSchema()

In [None]:
tweets.select('created_at').take(5)

All fixed!

Note that you can define a full schema at load time to avoid this problem.  It would be good if `inferSchema()` were a little more reliable though, although as we'll see in a minute, our data isn't exactly clean.


### Operations on DataFrames

DataFrames support many of the kinds of df operations you're used to, they are all just a little different.  Use the docs!

In [None]:
tweets.take(2)

In [None]:
tweets.head(2)

In [None]:
tweets.show(2)

In [None]:
tweets.count()

In [None]:
tweets.describe('followers_count').show()

Whoops, looks like we've got some slop in our data.  This might be due to some strange characters in the mix.  Clean that up in a handy wrangling tool...

In [None]:
tweets.select("screen_name", "text").show(5)

In [None]:
tweets.filter("followers_count > 15000") \
    .select("followers_count") \
    .orderBy("followers_count", ascending=False) \
    .show(10)

Whoops, looks like another data type problem.  We can fix that, too.

In [None]:
from pyspark.sql.types import IntegerType
tweets = tweets.withColumn("followers_count", tweets["followers_count"].cast(IntegerType()))

In [None]:
tweets.filter("followers_count > 15000") \
    .select("followers_count") \
    .orderBy("followers_count", ascending=False) \
    .show(10)

Who are these popular tweeters?

In [None]:
tweets.filter("followers_count > 5000000").select("screen_name").show(20)

https://twitter.com/people

Yep - that looks about right.

Now that we have that column sorted out:

In [None]:
tweets.describe("followers_count").show()

In [None]:
tweets.orderBy("created_at", ascending=False).select("created_at").show(10)

## Using SQL with DataFrames

All you need to do to get going with SQL is to register a table from your data frame, like so:

In [None]:
tweets.createOrReplaceTempView("tweets")

In [None]:
sqlc.sql("SELECT COUNT(*) FROM tweets")

In [None]:
sqlc.sql("SELECT COUNT(*) FROM tweets").show()

In [None]:
sqlc.sql("""
    SELECT followers_count 
    FROM tweets
    ORDER BY followers_count DESC
""").show(10)

In [None]:
sqlc.sql("""
    SELECT screen_name
    FROM tweets
    WHERE followers_count > 5000000
    ORDER BY screen_name
""").show(10)

### Other data types:  JSON

We can load in non-CSV data as well, such as JSON.  Here is a set of tweet data in JSON format, the original source.  It's much less likely to have wrangling issues.

In [None]:
!wget https://s3.amazonaws.com/2017-dmfa/week-9/mlb-world-series/9670f3399f774789b7c3e18975d25611_001.json

In [None]:
!mv 9670f3399f774789b7c3e18975d25611_001.json mlb.json

In [None]:
!wc -l mlb.json

In [None]:
!head -2 mlb.json 

JSON data is pretty common these days, and Python makes it easy to work with.  Here's what it looks like from Python:

In [None]:
!head -1 mlb.json > mlb1.json

In [None]:
import json
mlb = json.load(open("mlb1.json"))

In [None]:
mlb['user']['screen_name']

In [None]:
mlb['user']['followers_count']

Okay, that's a tour of one tweet.  Let's look at a lot more.

In [None]:
mlb = sqlc.read.json("mlb.json")

In [None]:
mlb

In [None]:
mlb.count()

In [None]:
mlb.printSchema()

In [None]:
sample = mlb.sample(False, 0.1, 12345)

In [None]:
sample.count()

There is **hierarchy** in JSON structures like tweets.  We can use `.` to address this:

In [None]:
sample.orderBy("user.followers_count", ascending=False).show(10)

In [None]:
small_sample = mlb.sample(False, 0.01, 12345)

In [None]:
small_sample.count()

In [None]:
small_sample.orderBy("user.followers_count", ascending=False).show(10)

In [None]:
small_sample.rdd.take(1)

In [None]:
small_sample.rdd.flatMap(lambda r: r['text'].split(' ')) \
    .map(lambda t: (t, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .takeOrdered(10, key=lambda pair: -pair[1])

In [None]:
sample.rdd.flatMap(lambda r: r['text'].split(' ')) \
    .map(lambda t: (t, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .takeOrdered(10, key=lambda pair: -pair[1])

Most importantly, we can do things like this:

In [None]:
sample.createOrReplaceTempView("sample")

In [None]:
sqlc.sql("SELECT * FROM sample").take(1)

In [None]:
sqlc.sql("""
    SELECT user.screen_name, user.followers_count AS fc
    FROM sample
    ORDER BY fc DESC
""").show(5)

#### Some test stuff for later

In [None]:
!ls *.json

In [None]:
big_sample = sqlc.read.json("*.json")

In [None]:
big_sample.count()