### Add spark streaming kafka to Jupyter


In [1]:
import kafka.serializer.StringDecoder

### This will take a while.  To add kafka (and S3) to this application, under the covers we ran

```
dse spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0,org.apache.hadoop:hadoop-aws:2.7.2 ...
```

The jupyter kernel definition file is here:  See it for yourself. One can always add custom settings to it:

`~/.local/share/jupyter/kernels/spark-dse-cluster/kernel.json`


In [2]:
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.streaming._
import Ordering.StringOrdering
import scala.util.Try

### A simple case class for the stock trades

In [3]:
case class Trade (
stock_symbol:String,
exchange:String,
trade_timestamp: String,
price: Float,
quantity: Int)

### A case class for stats
  There is a non-default constructor to make a stats block out of trade.
  
  The munge function is a reduce that takes 2 stats blocks and gives you the total volume, total price, highest, lowest, first, last, delta over the period, and a list of the trades over the period

In [4]:
case class Stats (stock_symbol: String,
      volume: Int,
      total_price: Float,
      high: Float,
      low: Float,
      average: Float,
      oldest: Float,
      oldest_timestamp: String,
      newest: Float,
      newest_timestamp: String,
      delta: Float,
      trades: Seq[String]) {

  def this(t:Trade) =
    this(stock_symbol = t.stock_symbol,
      volume       = t.quantity,
      total_price  = t.price,
      high         = t.price,
      low          = t.price,
      oldest       = t.price,
      oldest_timestamp = t.trade_timestamp,
      newest       = t.price,
      newest_timestamp = t.trade_timestamp,
      delta        = 0F,
      average      = t.price,
      trades = Seq(f"${t.quantity}%d@${t.price}%1.2f")
    )

  def munge(r:Stats) = Stats(
    stock_symbol  = stock_symbol,
    volume        = volume + r.volume,
    total_price   = total_price + r.total_price,
    high          = high max r.high,
    low           = low min r.low,
    oldest        = if (oldest_timestamp < r.oldest_timestamp) oldest else r.oldest,
    oldest_timestamp = if (oldest_timestamp < r.oldest_timestamp) oldest_timestamp else r.oldest_timestamp,
    newest        = if (newest_timestamp > r.newest_timestamp) newest else r.newest,
    newest_timestamp = if (newest_timestamp > r.newest_timestamp) newest_timestamp else r.newest_timestamp,
    delta         = newest - oldest,
    average       = (total_price + r.total_price) / (volume + r.volume),
    trades        = trades ++ r.trades)
}

In [7]:
%%cql create keyspace if not exists stock with replication = {'class':'SimpleStrategy','replication_factor':1}

In [8]:
%%cql create table if not exists stock.last_10_seconds( stock_symbol text,
volume int,
high float,
low float,
average float,
delta float,
trades list<text>,
primary key (stock_symbol))

In [9]:
// The batch interval sets how we collect data for, before analyzing it in a batch
val batchInterval = Seconds(5)
val windowInterval = Seconds(60)

In [None]:
sc

 Create a new `StreamingContext`, using the SparkContext and batch interval:

In [11]:
val ssc = new StreamingContext(sc, batchInterval)

 Create a Kafka stream
 
 ### Note: Get the ip address for the host called kafka, and add it to `/etc/hosts` on all 3 nodes.
 
 

In [12]:
 val directKafkaStream = KafkaUtils.createDirectStream[
     String, String, StringDecoder, StringDecoder ](
     ssc, Map("metadata.broker.list" ->"kafka:9092"), Set("Trades"))

### map it to a DStrem of Trades

Use a simple split to turn a string like this 'foo|bar|baz' into an array.  Use some handy pattern matching to pull the 5 fields out of the array and create an instance of Trade

In [16]:
val trades = directKafkaStream
  .map{ case (tid, data) 
                => data.split('|') match { case Array(ss,ex,dt,p,q)
                            => Trade(ss,ex,dt,Try(p.toFloat).getOrElse(0F),Try(q.toInt).getOrElse(0))}}

## For each DStream reduce it by key and a sliding window.

Write that to Cassandra. Use the spark UI (You'll
find one at the bottom of the notebook, or use Safari) to look at the DAG. 

1. How many RDDs do you get in each DStream?
2. Why?

In [17]:
trades
      .map(t => (t.stock_symbol, new Stats(t)))
      .reduceByKeyAndWindow( _.munge(_) , windowInterval)
      .map(_._2)
      .saveToCassandra("stock", "last_10_seconds",
                       SomeColumns("stock_symbol", "high", "low", "average", "volume", "delta","trades"))

### Start the Stream.  This has the streaming job run in a background thread.

In [23]:
ssc.start

WARN  2016-02-25 07:14:44,667 org.apache.spark.streaming.StreamingContext: StreamingContext has already been started


### Check the Results

In [21]:
%%cql select * from stock.last_10_seconds where stock_symbol in ('IBM','MSFT','AAPL','GM','F','MMM')

stock_symbol,average,delta,high,low,trades,volume


In [None]:
// Optional just to stop
ssc.getActive.foreach { _.stop(stopSparkContext = false) }

In [None]:
%%cql select * from stock.last_10_seconds

Try this in a terminal:

```
watch -n 5 "echo \"select stock_symbol, volume, average, low, high, delta from stock.last_10_seconds where stock_symbol in ('IBM','MSFT','AAPL','GM','F','MMM');\" | cqlsh node0"
```

In [22]:
%%html <iframe src="/terminals/1" width=1000 height=400/>

In [24]:
val addr = java.net.InetAddress.getByName("node0_ext").getHostAddress
kernel.magics.html(s"""<iframe src="http://$addr:4040/stages" width=1000 height=500/>""")

### Stop the streaming

In [None]:
ssc.stop()