# #trendOff
    #dsci6007 #final #4OrBust #camelCase

## Table of Contents

- [Introduction](#section1)
- [Aside for Cassandra and Docker](#section2)
- [Kafka](#section3)
- [Arguments to run Programs](#section11)
- [Csv Data](#section4)
- [Kafka Producer](#section10)
- [Stream Kafka Twitter data into Cassandra Data Lake](#section5)
- [Stream Kafka Twitter data into Speed Layer](#section6)
- [Perform Batch Process on Cassandra Data Lake Data](#section7)
- [Serving Layer](#section8)
- [Plot](#section9)
- [Future](#section12)

<a id='section1'></a>

## Introduction

![](Image/de713pg.png)

This notebook outlines what I did for my final in my Data Engineering Course at Galvanize University.  The project relies on code from Scala, Spark, a bit of CQL(executed by spark), and some Python(solely for plotting) and relies on the big data technologies of Kafka, Cassandra and Docker.  

The goal of my project was to use different big data technologies to implement a lambda architecture where someone could compare the Twitter word usage of one user verses another.

My project implements a lambda architecture.  A stream of tweets are pumped into Kafka where a Kafka Producer sends out the tweets.  I have two consumer groups, one for the speed layer of my lambda architecture and another for the batch layer of my lambda architecture.  The speed layer ingest the kafka stream via spark where I combine all the tweets from a given user and then save them to a table in my Cassandra cluster which is on Docker(usually there are not multiple tweets to combine for a user since people do not usually send two tweets within a minute of eachother).  The batch layer also ingests the Kafka stream using Spark, but saves the tweet and most of its meta data(8 features in total) to a table in my Cassandra cluster on Docker.  The data in this table accumulates and waits for a batch process to be performed on it.  I perform a batch process on this data via Spark where I grab all the data stored in this table, group all the tweets for each user, and then save the data to a new table in my Cassandra cluster.  I finally have a serving layer that uses some Spark, Scala and functional programming to grab the relevant data for the two users from both the speed and batch tables to grab the most frequently, mutually used words between the two users and the counts for how often each user used the given word.  I store these three arrays in a text file where then some python code grabs the data and produces a grouped bar chart for the two users word counts.

Also I was able to run this data pipeline on both my local machine and an EC2 instance.  The majority of the execution of this code was done on EC2.

<a id='section2'></a>

## Aside for Cassandra and Docker

I used a Cassandra cluster with 3 nodes on Docker to store all the data for this project.  I was able to find this repo that made running a Cassandra cluster on Docker very easy.  It is located [here.](https://github.com/pokle/cassandra)

**These are the lines used:**

docker run -d --name cass1 poklet/cassandra start

docker run -d --name cass2 --link cass1:seed poklet/cassandra start seed

docker run -d --name cass3 --link cass1:seed poklet/cassandra start seed


**You can even use cql on your cluster by using the line:**

docker run -it --rm --net container:cass1 poklet/cassandra cqlsh

<a id='section3'></a>

## Kafka

Kafka was used in this project because I wanted to implement a lambda architecture.  To implement a lambda architecture you must have at least two identical streams of your data and since Twitter will not allow for multiple streaming from the same account at the same time, I had to to pump my Twitter stream through Kafka to create the two streams of identical data.

I found this [website](http://kafka.apache.org/documentation.html) very useful in how to set up and run Zookeeper and Kafka on your local machine or even EC2.

**The line to run Zookeeper:**

bin/zookeeper-server-start.sh config/zookeeper.properties

**The line to run Kafka:**

bin/kafka-server-start.sh config/server.properties



<a id='section11'></a>

## Arguments to run Programs

The below code instantiates arguments that will be passed to the spark-submit commands below.  It is recommended to only use these inline calls to spark-submit for the csv, batch process and serving layer.

In [18]:
import subprocess

home_dir = "/home/ubuntu"
name1 = "wagener_emily"#username of the first user you would like to query
name2 = "wagener_emily"#username of the second user you would like to query
filename = "/home/ubuntu/projectFile/testTwitterInClass"#location of the output file for serving layer
csvFile = "/home/ubuntu/projectFile/twitterData/Stream-Table\ 1.csv"#location of csv file to be placed in data lake
cassandra_ip = "172.17.0.2"#ip of cassandra
kafka_ip = "127.0.0.1"#ip of zookeeper that kafka is using

<a id='section4'></a>

## CSV Data

For this project we had to use two data sources.  I found a dataset of tweets in a csv format that I used Spark SQL and Spark Dataframes to take these tweets from the csv and place them in the Cassandra table to wait to have the batch process performed on them.  I got the data from the csv [here](http://followthehashtag.com/content/uploads/USA-Geolocated-tweets-free-dataset-Followthehashtag.zip).

If you are to use the line of code below please make sure to specify the ip address of the Cassandra cluster and the location of the csv file.

In [None]:
subprocess.call("%s/spark-1.6.1-bin-hadoop2.6/bin/spark-submit --class 'SimpleApp' --master local[4] --packages datastax:spark-cassandra-connector:1.6.0-s_2.10,\databricks:spark-csv:1.4.0-s_2.10 %s/csv/target/scala-2.10/simpleapp_2.10-1.0.jar %s %s"%(home_dir,home_dir,cassandra_ip,csvFile),shell=True) 

<a id='section10'></a>

## Kafka Producer

I used the code from a repo produced by [Marcel Krcah](http://marcelkrcah.net) on his Github, which can be found [here.](https://github.com/mkrcah/scala-kafka-twitter)  The code uses Scala to to take a twitter stream, encode the tweets in an Avro schema and then sends them as a Kafka message via a Kafka producer.  Marcel's code only used the username and tweet, but since I wanted to get more meta data I had to alter his code so I could grab this extra data about each tweet.  I also took the liberty of commenting his code, to check my own understanding.  Marcel's code provides an easy and efficient way to send tweets through a Kafka producer.

<a id='section5'></a>

## Stream Kafka Twitter data into Cassandra Data Lake

I also used [Marcel Krcah](http://marcelkrcah.net) Kafka Consumer code to help guide me in how to set up a spark application that ingests a kafka stream.  I used his code as a starting point to inform how to set up a dstream that ingests Kafka data.  Along the way I realized that since I was creating my own package that I had to jar Marcel's Avro schema that I altered.  The instructions I used can be found [here.](https://avro.apache.org/docs/1.7.7/gettingstartedjava.html)  Once I jar'ed the schema I was able to easily convert the Kafka messages from binary to an Avro object.  Once I had a dstream of Avro objects I could take all the meta data from each tweet and save it into a Cassandra table.

If you are to use the line of code below please make sure to specify the ip address of the Cassandra cluster and the Zookeeper IP.

In [None]:
subprocess.call("%s/spark-1.6.1-bin-hadoop2.6/bin/spark-submit --class 'net.jacob.SimpleApp' --master local[4] --packages datastax:spark-cassandra-connector:1.6.0-s_2.10,\org.apache.spark:spark-streaming-kafka_2.10:1.6.1,\com.twitter:bijection-avro_2.10:0.9.2 %s/streamk/target/scala-2.10/simpleapp_2.10-1.0.jar %s %s"%(home_dir,home_dir,cassandra_ip,kafka_ip),shell=True) 

<a id='section6'></a>

## Stream Kafka Twitter data into Speed Layer

The code for the speed layer is almost identical to the code that saves the Twitter data into the Cassandra Data Lake.  The two differences are I only care about user and tweet(not all the extra meta data) and I reduceByKey based on user and combine each user's tweet before saving the data into another Cassandra table.  As mentioned above usually there are not multiple tweets to combine for a user since people do not usually send two tweets within a minute of eachother.  Therefore a each row in the table for the speed layer usually only contains a single tweet.

If you are to use the line of code below please make sure to specify the ip address of the Cassandra cluster and the Zookeeper IP.

In [None]:
subprocess.call("%s/spark-1.6.1-bin-hadoop2.6/bin/spark-submit --class 'net.jacob.SimpleApp' --master local[4] --packages datastax:spark-cassandra-connector:1.6.0-s_2.10,\org.apache.spark:spark-streaming-kafka_2.10:1.6.1,\com.twitter:bijection-avro_2.10:0.9.2 %s/speedk/target/scala-2.10/simpleapp_2.10-1.0.jar %s %s"%(home_dir,home_dir,cassandra_ip,kafka_ip),shell=True) 

<a id='section7'></a>

## Perform Batch Process on Cassandra Data Lake Data

The code in the batch layer performs a very similar process as the speed layer but instead of combining a user's tweets on the currently streaming data, it is performed on all the streaming data that has been collected in our Cassandra data lake.  The code grabs all the data from the Cassandra data lake via Spark SQL.  Then all the tweets for a given user are combined into a single string.  The Cassandra table that held the previous batch output is deleted and a new table is produced where the new batch output is stored.

If you are to use the line of code below please make sure to specify the ip address of the Cassandra cluster.

In [19]:
subprocess.call("%s/spark-1.6.1-bin-hadoop2.6/bin/spark-submit --class 'SimpleApp' --master local[4] --packages datastax:spark-cassandra-connector:1.6.0-s_2.10 %s/batch/target/scala-2.10/simpleapp_2.10-1.0.jar %s"%(home_dir,home_dir,cassandra_ip), shell=True)

0

<a id='section8'></a>

## Serving Layer

The serving layer takes two users, find the most frequent, mutually used words and then produces the the number of times each word was used for each user.  The program grabs the relevant data from the speed table and batch output table for each user.  Then the program finds the most frequent, mutually used words for the two users.  Then, for each user, the program finds the number of times the given user used each word.  Three arrays are produced: an array containing the words, an array containing the word frequencies for the first user and an array containing the word frequencies for the second user.  These three arrays are saved in a text file.

If you are to use the line of code below please make sure to specify the ip address of the Cassandra cluster, the username for the first user, the username for the second user, and the location where you want the output file to be saved.

In [20]:
subprocess.call("%s/spark-1.6.1-bin-hadoop2.6/bin/spark-submit --class 'SimpleApp' --master local[4] --packages datastax:spark-cassandra-connector:1.6.0-s_2.10 %s/serving/target/scala-2.10/simpleapp_2.10-1.0.jar %s %s %s %s"%(home_dir,home_dir,cassandra_ip,name1,name2,filename),shell=True)

0

<a id='section9'></a>

## Plot

This portion of the code access the output text file from the serving layer and produces a grouped box plot.

In [25]:
from collections import defaultdict
import numpy as np
import pandas as pd
import itertools
import plotly.plotly as py
import plotly.graph_objs as go
import plotly.tools as tls


def get_lists(filename):#reads in the data from serving layer
    def get_counts(filtered_data):
        words = []
        name1_counts =[]
        name2_counts = []
        def get_counts_inner(word_list):
            words.append(word_list[0])
            name1_counts.append(int(word_list[1]))
            name2_counts.append(int(word_list[2]))
        map(get_counts_inner,filtered_data)
        return words,name1_counts,name2_counts
    
    data=[]
    with open(filename) as f:
        data+=f.readlines()
    processed_data = [i.replace("\n","").replace("(","").replace(")","").split(",") for i in data]
    filtered_data = filter(lambda i: i[0]!="",processed_data)
    return get_counts(filtered_data)

def plot_bar(words,name1_count,name2_count):#generates plotly plot from converted serving layer data
    # the code example used for this function can be found here https://plot.ly/python/bar-charts/
    trace0 = go.Bar(
        x=words,
        y=name1_count,
        name=name1,
        marker=dict(
            color='rgb(49,130,189)'
        )
    )
    trace1 = go.Bar(
        x=words,
        y=name2_count,
        name=name2,
        marker=dict(
            color='rgb(204,204,204)',
        )
    )

    data = [trace0, trace1]
    layout = go.Layout(
        title='%s vs %s Twitter Word Count'%(name1,name2),
        #xaxis=dict(tickangle=-45),
        barmode='group',
    )

    fig = go.Figure(data=data, layout=layout)
    py.iplot(fig, filename='testcase')

In [26]:
plot_bar(*get_lists(filename+"/part-00000"))

In [None]:
tls.embed("https://plot.ly/~jwbaum91/62/wagener-emily-vs-wagener-emily-twitter-word-count/")

<a id='section12'></a>

## Future

- Do the speed layer in Flink

- Do the batch layer in Scalding

- Make the batch layer alternate between two Cassandra tables so you can perform the serving layer even when the batch layer is updating.

