### Spark and Cassandra

This tutorial aims to show how to work with cassandra and apache spark. The idea of merging the distributed dataset (Cassandra) and the Distributed framework, is that will allow us to easily play with the data on the cases that the data itself is bigger than what our machines (or a single powefull machine) can handle.

#### Activate python 3.5
The Cassandra Spark (version 2.1.1) connector does not work with the newest version of apache spark, which also does not work with python 3.6, so we need to make this trick: Install older apache spark (version 2.1.0), then use anaconda to create a python 3.5 enviroment
```bash
conda create -n py35 python=3.5 anaconda
source activate py35
```

#### Initialize Spark and Cassandra
```bash
pyspark --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host=127.0.0.1
```
References:
* https://www.youtube.com/watch?v=9lc-OJ9QJO0
* https://www.youtube.com/watch?v=GjNXK1SGDLw
* https://github.com/datastax/spark-cassandra-connector
* https://spark-packages.org/package/datastax/spark-cassandra-connector
* https://stackoverflow.com/questions/34882097/cannot-connect-to-cassandra-from-spark
* https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/
* http://www.learnbymarketing.com/618/pyspark-rdd-basics-examples/
* https://www.datacamp.com/community/tutorials/apache-spark-python#gs.l5EtU_Q
* https://spark.apache.org/docs/latest/programming-guide.html


### Create a Dataframe from Cassandra

In [1]:
df = spark.read.format("org.apache.spark.sql.cassandra").options(table="tb_drive", keyspace="mydb").load()

# Show dataframe (Distributed table structure)
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- acc: float (nullable = true)
 |-- image: binary (nullable = true)
 |-- wheel_angle: float (nullable = true)



### Get dataframe size

In [2]:
# Get the size of the dataframe
print('DataFrame size:',df.count())

DataFrame size: 6380


### Convert dataframe to pandas to display nicelly

In [3]:
# Caution because you could load more data that your memory could handle.
#df.toPandas().head(5)
df.show(5)

+--------------------+---------+--------------------+-----------+
|                  id|      acc|               image|wheel_angle|
+--------------------+---------+--------------------+-----------+
|7e1ea253-5672-11e...|      1.0|[89 50 4E 47 0D 0...|   0.138223|
|71b9ea67-574f-11e...| 0.302832|[89 50 4E 47 0D 0...|   -0.12854|
|6cf4879f-574f-11e...| 0.680465|[89 50 4E 47 0D 0...|   -0.12854|
|8a2975c3-574f-11e...|0.0510772|[89 50 4E 47 0D 0...|  -0.719196|
|decb8322-574b-11e...|      0.0|[89 50 4E 47 0D 0...|        0.0|
+--------------------+---------+--------------------+-----------+
only showing top 5 rows



### Get a batch
Get a random batch from the dataset without replacement (1% of the dataset)

In [4]:
df_batch = df.sample(False, 0.01)
print('Batch size:',df_batch.count())
df_batch.toPandas().head()

Batch size: 52


Unnamed: 0,id,acc,image,wheel_angle
0,b80dbb46-574b-11e7-9708-989096d72294,1.0,"[137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,...",0.0
1,b93f2634-574b-11e7-9708-989096d72294,0.777294,"[137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,...",0.176955
2,7c2bccfc-574e-11e7-9708-989096d72294,0.0,"[137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,...",-0.12854
3,69689aa7-574f-11e7-9708-989096d72294,0.0,"[137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,...",-0.109174
4,e2f88a5f-574f-11e7-9708-989096d72294,0.0,"[137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,...",0.0


### Do queries inside the dataframe (Using Spark Dataframe API)

In [5]:
# Get all angles different than zero
df.filter(df.wheel_angle != 0).count()

4645

In [6]:
# How many times we did not accelerate fully...
df.filter(df.acc != 1).count()

4377

### Do queries inside the dataframe (Using Spark Sql API)
We can transform a dataframe into a temporary table on the cluster allowing us to fully use SQL language.

In [7]:
df.registerTempTable("autodrive")
df_filt = sqlContext.sql("SELECT wheel_angle, acc FROM autodrive where wheel_angle between 0.1 and 1.0")
print('Number of instances:',df_filt.count())
df_filt.toPandas().head()

Number of instances: 1536


Unnamed: 0,wheel_angle,acc
0,0.138223,1.0
1,0.235052,1.0
2,0.147906,0.0
3,0.486807,0.118857
4,0.254418,0.0


### Executing stuff on the cluster
Until now we check how to filter data inside the cluster or gather some sort of statistics. But now imagine that we want to do some sort of operation on each element of our RDD or dataframe.

In [8]:
df_zero_angle = df.filter(df.wheel_angle == 0)
print('Zero angle samples:',df_zero_angle.count())

Zero angle samples: 1735


In [17]:
# Some function that will be executed on the cluster
def perturb_angles(data):
    print('Type:',type(data.wheel_angle),'Val:',data.wheel_angle)
    data.wheel_angle = data.wheel_angle + 0.01

#df_zero_jitter = df_zero_angle.foreach(perturb_angles)
#map_op = df_zero_angle.rdd.map(perturb_angles)

#new_rdd = df_zero_angle.select('wheel_angle').rdd
new_rdd = df_zero_angle.rdd
map_op = new_rdd.map(perturb_angles).collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 21.0 failed 1 times, most recent failure: Lost task 2.0 in stage 21.0 (TID 122, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-17-6db8396ecfa8>", line 4, in perturb_angles
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1506, in __setattr__
    raise Exception("Row is read-only")
Exception: Row is read-only

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-17-6db8396ecfa8>", line 4, in perturb_angles
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1506, in __setattr__
    raise Exception("Row is read-only")
Exception: Row is read-only

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
