# Getting started processing Kafka with Spark

The first thing we'll need to do is tell Spark where to find the Kafka driver before we set Spark up.  Currently, our notebook images are built against Spark 2.2.  If you're using this with a different version of Spark, be sure to change `SPARK_VERSION` in the cell below before executing it.

In [1]:
import os
SPARK_VERSION="2.2.0"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:%s pyspark-shell" % SPARK_VERSION

Next up, we'll connect to Spark by establishing a `SparkSession`.

In [2]:
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Social Firehose") \
    .getOrCreate()

We're going to begin by loading the contents of a Kafka topic into a data frame.  Because Spark data frames are _lazy_, or recomputed when accessed, this data frame will always have the most recent collection of messages in it.

In [3]:
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "my-cluster-kafka:9091") \
  .option("subscribe", "social-firehose") \
  .load()

We can see that this data frame always has the most recent collection of messages by running the `count()` action on it twice with a short delay in the middle.  Note how many messages are generated in ten seconds:

In [4]:
import time
a = df.count()
time.sleep(10)
b = df.count()
(a, b)

(155212, 155296)

We can inspect the first few messages, but they'll be in a pretty raw format.

In [5]:
df.take(3)

[Row(key=None, value=bytearray(b'{"text": "My dog loves this stuff. #marketing #fail", "userid": "4775376264", "update_id": "00000000000000000001"}'), topic=u'social-firehose', partition=0, offset=0, timestamp=datetime.datetime(2018, 4, 26, 12, 24, 1, 32000), timestampType=0),
 Row(key=None, value=bytearray(b'{"text": "#Charles and #Mary had had her orders, the wind had kept her face averted, and her tongue motionless.", "userid": "0923284533", "update_id": "00000000000000000002"}'), topic=u'social-firehose', partition=0, offset=1, timestamp=datetime.datetime(2018, 4, 26, 12, 24, 1, 167000), timestampType=0),
 Row(key=None, value=bytearray(b'{"text": "When #theevening was as follows. #Mary", "userid": "0269074790", "update_id": "00000000000000000003"}'), topic=u'social-firehose', partition=0, offset=2, timestamp=datetime.datetime(2018, 4, 26, 12, 24, 1, 290000), timestampType=0)]

Now we'll import some functions and types from the Spark library so we can do something more useful with our data set.

In [6]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json
from pyspark.sql.functions import column
from pyspark.sql.types import StringType
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField

The first thing we'll do is extract the JSON payloads of the messages; we'll inspect the first ten as a sanity check.

In [7]:
values = df.select(df["value"].cast(StringType()).alias("value"))
values.show(10)

+--------------------+
|               value|
+--------------------+
|{"text": "My dog ...|
|{"text": "#Charle...|
|{"text": "When #t...|
|{"text": "#CHAPTE...|
|{"text": "I do no...|
|{"text": "I was l...|
|{"text": "We are ...|
|{"text": "#Adeiu ...|
|{"text": "The Adm...|
|{"text": "I don't...|
+--------------------+
only showing top 10 rows



The next thing we'll do is impose some structure on the messages by converting the serialized JSON objects into actual records:

1.  First, we'll declare a `StructType` for the structure of our messages (three strings, named `text`, `user_id`, and `update_id`),
2.  Next, we'll convert the JSON strings to structures using the `from_json` dataframe function, and
3.  Finally, we'll `SELECT` the fields of the object so we have something that looks like a flat database tuple.

In [29]:
structure = StructType([StructField(fn, StringType(), True) for fn in "text userid update_id".split()])
records = values.select(from_json(values["value"], structure).alias("json")) \
                .select(column("json.update_id"), column("json.userid").alias("user_id"), column("json.text"))
records.show(10)

+--------------------+----------+--------------------+
|           update_id|   user_id|                text|
+--------------------+----------+--------------------+
|00000000000000000001|4775376264|My dog loves this...|
|00000000000000000002|0923284533|#Charles and #Mar...|
|00000000000000000003|0269074790|When #theevening ...|
|00000000000000000004|4315843244|#CHAPTERXV Mr. #W...|
|00000000000000000005|2511009588|I do not wonder a...|
|00000000000000000006|7167348316|I was looking for...|
|00000000000000000007|5084291355|We are now in the...|
|00000000000000000008|4188418893|#Adeiu my dear gi...|
|00000000000000000009|5242205026|The Admiral has h...|
|00000000000000000010|2887352009|I don't think I c...|
+--------------------+----------+--------------------+
only showing top 10 rows



We can perform database-style aggregations on this data frame, like identifying the users responsible for the most status updates:

In [21]:
user_counts = records.groupBy("user_id").count().orderBy("count", ascending=False)
user_counts.show()

+----------+-----+
|   user_id|count|
+----------+-----+
|0922592934|   91|
|8305094984|   88|
|0208803656|   85|
|6414099675|   85|
|4322814958|   85|
|9493244448|   84|
|0294625135|   84|
|6119887869|   82|
|4425599042|   82|
|5085703212|   82|
|6860711509|   82|
|1719408877|   82|
|0438456890|   82|
|3877766942|   82|
|5021951084|   82|
|1375660794|   81|
|3401507657|   81|
|0703247354|   81|
|2718811439|   81|
|1263931099|   81|
+----------+-----+
only showing top 20 rows



If you run that query several times with a short delay in between, you may get different results since the data frame will reflect newly-arriving messages.  Try it out!

We can also count the number of users who have issued status updates (because of how we're generating the synthetic stream of updates, there is an upper bound on this number):

In [26]:
records.select("user_id").distinct().count()

9881

We can also identify the most prolix users.  You probably have some social media connections who take advantage of every extra bit of character limit; a query like this will show you who they are!

In [28]:
from pyspark.sql.functions import length
user_loquacity = records.select(column("user_id"), length("text").alias("update_len")) \
  .groupBy("user_id") \
  .avg() \
  .orderBy("avg(update_len)", ascending=False)
user_loquacity.show()

+----------+------------------+
|   user_id|   avg(update_len)|
+----------+------------------+
|5387330032|             286.0|
|1220312886|             276.0|
|8114510028|             274.0|
|0870963719|             269.0|
|5774183600|             255.0|
|0636996103|             254.0|
|0127054018|             252.5|
|3368031437|             247.0|
|4876959382|             243.0|
|4928759035|             242.0|
|7123302005|             241.0|
|3159573159|             238.0|
|2933825725|             236.0|
|9078604168|             233.0|
|5497686303|232.66666666666666|
|4193361006|             232.0|
|3772975877|             230.5|
|5975623269|             228.5|
|1711685690|             228.0|
|2245469716|             227.0|
+----------+------------------+
only showing top 20 rows



We can also identify the most popular hashtags in users' updates.  We'll start by turning each update into an array of words.  Then we'll explode each array into multiple rows, so that each row has a separate, single element, i.e.

```
1, 2, "foo bar blah"
```

would become

```
1, 2, [foo, bar, blah]
```

which would become

```
1, 2, foo
1, 2, bar
1, 2, blah
```

We'll then filter for hashtags (keeping only words starting with `#`) so we can find the most popular!


In [44]:
words = records.select(explode(split("text", " ")).alias("word"))
hashtags = words.filter(column("word").startswith("#"))

In [45]:
words.show()

+----------+
|      word|
+----------+
|        My|
|       dog|
|     loves|
|      this|
|    stuff.|
|#marketing|
|     #fail|
|  #Charles|
|       and|
|     #Mary|
|       had|
|       had|
|       her|
|   orders,|
|       the|
|      wind|
|       had|
|      kept|
|       her|
|      face|
+----------+
only showing top 20 rows



In [46]:
hashtags.show()

+--------------+
|          word|
+--------------+
|    #marketing|
|         #fail|
|      #Charles|
|         #Mary|
|   #theevening|
|         #Mary|
|    #CHAPTERXV|
|    #Woodhouse|
|         #Mary|
|         #Mary|
|         #Mary|
|    #Woodhouse|
|         #Mary|
|        #Adeiu|
|#SusanL.LETTER|
|      #SEVENTH|
|  #C.LUTTERELL|
| #LesleyCastle|
|     #February|
|         #16th|
+--------------+
only showing top 20 rows



In [48]:
hashtag_counts = hashtags.groupBy("word").count().orderBy("count", ascending=False)
hashtag_counts.show()

+----------+-----+
|      word|count|
+----------+-----+
|    #first| 2413|
|      #one| 2207|
|      #two| 1862|
|#Elizabeth| 1721|
|    #Fanny| 1370|
|#Catherine|  981|
|     #Anne|  870|
|   #Amazon|  811|
| #Crawford|  783|
|     #Emma|  762|
| #Marianne|  748|
|   #Weston|  727|
|     #Jane|  719|
|     #half|  676|
|   #Bennet|  624|
|   #Thomas|  619|
|   #Elinor|  584|
|   #Edward|  574|
|    #Elton|  540|
|   #second|  526|
+----------+-----+
only showing top 20 rows

