# kafkaReceiveDataPy
This notebook receives data from Kafka on the topic 'test', and stores it in the 'time_test' table of Cassandra (created by cassandra_init.script in startup_script.sh).

```
CREATE KEYSPACE test_time WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE test_time.sent_received(
 time_sent TEXT,
 time_received TEXT,
PRIMARY KEY (time_sent)
);
```

A message that gives the current time is received every second. 

## Add dependencies

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'
import time

## Load modules and start SparkContext
Note that SparkContext must be started to effectively load the package dependencies. Two cores are used, since one is needed for running the Kafka receiver.

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \
    .setAppName("Streaming test") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1")
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

## SaveToCassandra function
Takes a list of tuple (rows) and save to Cassandra 

In [None]:
def saveToCassandra(rows):
    if not rows.isEmpty(): 
        sqlContext.createDataFrame(rows).write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="sent_received", keyspace="test_time")\
        .save()

## Create streaming task
* Receive data from Kafka 'test' topic every five seconds
* Get stream content, and add receiving time to each message
* Save each RDD in the DStream to Cassandra. Also print on screen

In [None]:
ssc = StreamingContext(sc, 5)
kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test': 1})
data = kvs.map(lambda x: x[1])
rows= data.map(lambda x:Row(time_sent=x,time_received=time.strftime("%Y-%m-%d %H:%M:%S")))
rows.foreachRDD(saveToCassandra)
rows.pprint()

## Start streaming

In [None]:
ssc.start()

## Stop streaming

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=True)

## Get Cassandra table content

In [None]:
data=sqlContext.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="sent_received", keyspace="test_time")\
    .load()
data.show()

## Get Cassandra table content using SQL

In [None]:
data.registerTempTable("sent_received");
data.printSchema()
data=sqlContext.sql("select * from sent_received")
data.show()

# TP

CREATE KEYSPACE test_stocks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};


CREATE TABLE test_stocks.sent_received(time TEXT, exchange TEXT, date TEXT, price TEXT, PRIMARY KEY (time));


In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'
import time

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \
    .setAppName("Streaming test") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1")
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

In [17]:
def saveToCassandra(rows):
    print('boo')
    if not rows.isEmpty(): 
        sqlContext.createDataFrame(rows).write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="sent_received", keyspace="test_stocks")\
        .save()
    else:
        print('rien')

In [18]:
def parserowrecue(x):
    l=x.split(',')
    return Row(time=l[0], exchange=l[1], date=l[2], price=l[3])
ssc = StreamingContext(sc, 5)
kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test_stock': 1})
data = kvs.map(lambda x: x[1])
rows= data.map(parserowrecue)
rows.foreachRDD(saveToCassandra)
rows.pprint()

In [19]:
ssc.start()

boo
-------------------------------------------
Time: 2019-04-25 13:01:50
-------------------------------------------
Row(date=u'2009-03-23', exchange=u'NYSE', price=u'4.53', time=u'2019-04-25 12:52:34')
Row(date=u'2009-03-20', exchange=u'NYSE', price=u'5.15', time=u'2019-04-25 12:52:35')
Row(date=u'2009-03-19', exchange=u'NYSE', price=u'5.3', time=u'2019-04-25 12:52:36')
Row(date=u'2009-03-18', exchange=u'NYSE', price=u'4.05', time=u'2019-04-25 12:52:37')
Row(date=u'2009-03-17', exchange=u'NYSE', price=u'3.78', time=u'2019-04-25 12:52:38')
Row(date=u'2009-03-16', exchange=u'NYSE', price=u'4.08', time=u'2019-04-25 12:52:39')
Row(date=u'2009-03-13', exchange=u'NYSE', price=u'3.95', time=u'2019-04-25 12:52:40')
Row(date=u'2009-03-12', exchange=u'NYSE', price=u'3.46', time=u'2019-04-25 12:52:41')
Row(date=u'2009-03-11', exchange=u'NYSE', price=u'3.68', time=u'2019-04-25 12:52:42')
Row(date=u'2009-03-10', exchange=u'NYSE', price=u'3.42', time=u'2019-04-25 12:52:43')
...

boo
--------------

In [20]:
ssc.stop(stopSparkContext=False,stopGraceFully=True)

boo
-------------------------------------------
Time: 2019-04-25 13:02:30
-------------------------------------------
Row(date=u'2006-11-14', exchange=u'NYSE', price=u'70.84', time=u'2019-04-25 13:02:25')
Row(date=u'2006-11-13', exchange=u'NYSE', price=u'70.9', time=u'2019-04-25 13:02:26')
Row(date=u'2006-11-10', exchange=u'NYSE', price=u'70.9', time=u'2019-04-25 13:02:27')
Row(date=u'2006-11-09', exchange=u'NYSE', price=u'71.24', time=u'2019-04-25 13:02:28')

boo
rien
-------------------------------------------
Time: 2019-04-25 13:02:35
-------------------------------------------

boo
rien
-------------------------------------------
Time: 2019-04-25 13:02:40
-------------------------------------------



In [24]:
Row(date='2222-11-22', exchange='XXXX', price='123', time='blabla').time

'blabla'

In [25]:
sqlContext.createDataFrame(Row(date=StringType('2222-11-22'), exchange='XXXX', price='123', time='blabla')).write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="sent_received", keyspace="test_stocks")\
        .save()

NameError: name 'StringType' is not defined