# kafkaReceiveDataPy
This notebook receives data from Kafka on the topic 'test', and stores it in the 'time_test' table of Cassandra (created during the building of the Docker container, see cassandra_init.script).

```
CREATE KEYSPACE test_time WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE test_time.sent_received(
 time_sent TEXT,
 time_received TEXT,
PRIMARY KEY (time_sent)
);
```

A message that gives the current time is received every second. 

## Add dependencies

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.1,com.datastax.spark:spark-cassandra-connector_2.11:1.6.0-M2 pyspark-shell'
import time
import json
import datetime
import sys
import shutil
from math import *

## Load modules and start SparkContext
Note that SparkContext must be started to effectively load the package dependencies. Two cores are used, since one is need for running the Kafka receiver.

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \
    .setAppName("Streaming test") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1")
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

## SaveToCassandra function
Takes a list of tuple (rows) and save to Cassandra 

In [3]:
def saveToCassandra(rows):
    if not rows.isEmpty(): 
        sqlContext.createDataFrame(rows).write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="test_geogen", keyspace="test_geogen_ks")\
        .save()
        
def distance_from_coord(lat1, lon1, lat2, lon2):
    D2R = pi / 180
    dlat = (lat1 - lat2) * D2R
    dlon = (lon1 - lon2) * D2R
    a = pow(sin(dlat/2.0), 2) + cos(lat1*D2R) * cos(lat2*D2R) * pow(sin(dlon/2.0), 2)
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    d = 6367 * c
    return d
        
def updateState(newValues, current_state):
    if current_state is None:
        try:
            current_state = {"total":0, "latitude":newValues[0][0], "longitude":newValues[0][1]}
        except:
            return None
    try:
        for row in newValues:
            current_state['total'] = current_state['total'] + distance_from_coord(current_state['latitude'], current_state['longitude'], row[0], row[1])
            current_state['latitude'] =  row[0]
            current_state['longitude'] = row[1]
    except Exception as e:
        current_state['error'] = e
    return current_state
    
def testUpdater(newValues, current):
    if current is None:
        print("creating")
        current = 1
    print(newValues)
    return current

def isUpdaterCalled(newValues, current):
    print("updater called")
    return 1

## Create streaming task
* Receive data from Kafka 'test' topic every five seconds
* Get stream content, and add receiving time to each message
* Save each RDD in the DStream to Cassandra. Also print on screen

In [4]:
def createContext():
    print("context created")
    #initialStateRDD = sc.parallelize([])
    #shutil.rmtree('checkpoint')


    ssc = StreamingContext(sc, 5)
    kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'geoData': 1})
    data = kvs.map(lambda x: json.loads(x[1]))
    """rows= data.map(lambda x:Row( \
                                msisdn=x['msisdn'], \
                                time_sent=datetime.datetime.fromtimestamp(x['timestamp']).strftime("%Y-%m-%d %H:%M:%S"), \
                                latitude = x['latitude'], \
                                longitude = x['longitude'], \
                                radius = x['radius'] \
                               ))#"""
    mapped = data.map(lambda x:(x['msisdn'],(x['latitude'],x['longitude'])))
    #rows.foreachRDD(saveToCassandra)
    status = mapped.updateStateByKey(updateState)
    status.pprint()
    #mapped.pprint()
    #rows.pprint()
    return ssc

## Start streaming

In [5]:
ssc = StreamingContext.getOrCreate("check15",createContext)
ssc.start()

context created
-------------------------------------------
Time: 2017-01-12 09:16:55
-------------------------------------------
(8, {'latitude': 45.77994000000004, 'total': 0.0, 'longitude': 4.864804999999998})
(2, {'latitude': 45.77818, 'total': 0.0, 'longitude': 4.85675})
(4, {'latitude': 45.77818, 'total': 0.0, 'longitude': 4.85675})
(6, {'latitude': 45.77994000000004, 'total': 0.0, 'longitude': 4.864804999999998})
(1, {'latitude': 45.77818, 'total': 0.0, 'longitude': 4.85675})
(3, {'latitude': 45.77818, 'total': 0.0, 'longitude': 4.85675})
(9, {'latitude': 45.77994000000004, 'total': 0.0, 'longitude': 4.864804999999998})
(5, {'latitude': 45.77994000000004, 'total': 0.0, 'longitude': 4.864804999999998})
(7, {'latitude': 45.77994000000004, 'total': 0.0, 'longitude': 4.864804999999998})

-------------------------------------------
Time: 2017-01-12 09:17:00
-------------------------------------------
(8, {'latitude': 45.77994000000004, 'total': 0.0, 'longitude': 4.864804999999998})
(

## Stop streaming

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=True)

## Get Cassandra table content

In [None]:
data=sqlContext.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="test_geogen", keyspace="test_geogen_ks")\
    .load()
data.show()
sys.getsizeof(data)
rw = data.filter(data.msisdn == 1).filter(data.time_sent > datetime.datetime(2017, 1, 11,14,15,30)).filter(data.latitude > 45.78).collect()

## Get Cassandra table content using SQL

In [None]:
data.registerTempTable("sent_received");
data.printSchema()
data=sqlContext.sql("select * from sent_received")
data.show()

In [None]:
rw