# Cassandra and Spark

This notebook is meant to serve as a playground for using Cassandra with Spark.


## Launch a Cassandra instanse

    $ docker run -d --name cassandra_2_1_5 -p9042:9042 cassandra:2.1.5
    

### Populate Cassandra with sample data

First connect to the instance:

    $ docker exec -it cassandra_2_1_5 cqlsh
    
Then execute the following script:

```cql
create keyspace test_ks with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

use test_ks;

create table test_tbl (
    id int,
    english text,
    symbol text,
    primary key (id)
);

insert into test_ks.test_tbl (id, english, symbol) values (1, 'one', 'xxx');

insert into test_ks.test_tbl (id, english, symbol) values (2, 'two', 'yyy');

insert into test_ks.test_tbl (id, english, symbol) values (3, 'three', 'zzz');
```

## Resources

- [Connector documentation](https://github.com/datastax/spark-cassandra-connector/tree/master/doc)
- [Spark RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html)


**NOTE**: The Cassandra Spark driver is super sensitive to the following:

- Java version
- Scala version
- Spark version
- Cassandra server version

The following script is know to work well with:

- JDK8
- Scala 2.12.10
- Spark 2.4.4
- Cassandra 2.1.5
- Driver 2.4.3

## Initialize spark

In [1]:
import $ivy.{
    `org.apache.spark::spark-sql:2.4.4`,
    `com.datastax.spark::spark-cassandra-connector:2.4.3`
}

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.cassandra._

import com.datastax.spark.connector._

val spark = NotebookSparkSession.builder
    .master("local[4]")
    .appName("FunWithCass")
    .config("spark.cassandra.connection.host", "localhost")
    .getOrCreate()

import spark.implicits._

val sc = spark.sparkContext
sc.setLogLevel("WARN")

Loading spark-stubs
Getting spark JARs


log4j:WARN No appenders could be found for logger (org.eclipse.jetty.util.log).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/04/25 19:18:30 WARN Utils: Your hostname, ouranos resolves to a loopback address: 127.0.1.1; using 192.168.1.12 instead (on interface wlp2s0)
20/04/25 19:18:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/04/25 19:18:31 INFO SparkContext: Running Spark version 2.4.4
20/04/25 19:18:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/04/25 19:18:31 INFO SparkContext: Submitted application: FunWithCass
20/04/25 19:18:31 INFO SecurityManager: Changing view acls to: yannis
20/04/25 19:18:31 INFO SecurityManager: Changing modify acls to: yannis
20/04/25 19:18:31 INFO SecurityManager: Changing view acls groups to: 
20/04/25 19:18:31 INFO SecurityManager: Changing modify acls groups to: 
20/04/25 19:18:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view perm

20/04/25 19:18:32 INFO NettyBlockTransferService: Server created on ouranos.fraglab.net:41223
20/04/25 19:18:32 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/04/25 19:18:32 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ouranos.fraglab.net, 41223, None)
20/04/25 19:18:32 INFO BlockManagerMasterEndpoint: Registering block manager ouranos.fraglab.net:41223 with 4.0 GB RAM, BlockManagerId(driver, ouranos.fraglab.net, 41223, None)
20/04/25 19:18:32 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ouranos.fraglab.net, 41223, None)
20/04/25 19:18:32 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, ouranos.fraglab.net, 41223, None)


[32mimport [39m[36m$ivy.$                                                                                                     

[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.types._
[39m
[32mimport [39m[36morg.apache.spark.sql.cassandra._

[39m
[32mimport [39m[36mcom.datastax.spark.connector._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@4ce5f537
[32mimport [39m[36mspark.implicits._

[39m
[36msc[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mSparkContext[39m = org.apache.spark.SparkContext@786d99d4

In [2]:
val local_df = Seq(
  (1, "one", "1", "I"),
  (2, "two", "2", "II"),
  (3, "three", "3", "III"),
  (4, "four", "4", "IV"),
  (5, "five", "5", "V"),
).toDF("id", "english", "arabic", "latin")

[36mlocal_df[39m: [32mDataFrame[39m = [id: int, english: string ... 2 more fields]

In [3]:
local_df.printSchema
local_df.show

root
 |-- id: integer (nullable = false)
 |-- english: string (nullable = true)
 |-- arabic: string (nullable = true)
 |-- latin: string (nullable = true)

+---+-------+------+-----+
| id|english|arabic|latin|
+---+-------+------+-----+
|  1|    one|     1|    I|
|  2|    two|     2|   II|
|  3|  three|     3|  III|
|  4|   four|     4|   IV|
|  5|   five|     5|    V|
+---+-------+------+-----+



In [4]:
val cass_df = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "test_tbl", "keyspace" -> "test_ks"))
  .load()

[36mcass_df[39m: [32mDataFrame[39m = [id: int, english: string ... 1 more field]

In [9]:
cass_df.printSchema
cass_df.show

root
 |-- id: integer (nullable = true)
 |-- english: string (nullable = true)
 |-- symbol: string (nullable = true)



+---+-------+------+
| id|english|symbol|
+---+-------+------+
|  1|    one|  null|
|  2|    two|  null|
|  4|   four|  null|
|  5|   five|  null|
|  3|  three|  null|
+---+-------+------+



In [6]:
val cass = sc.cassandraTable("test_ks", "test_tbl")

[36mcass[39m: [32mrdd[39m.[32mCassandraTableScanRDD[39m[[32mCassandraRow[39m] = CassandraTableScanRDD[6] at RDD at CassandraRDD.scala:19

In [8]:
cass.deleteFromCassandra("test_ks", "test_tbl", SomeColumns("symbol"))

In [22]:
local_df.selectExpr("id").where("id < 3").rdd
    .repartitionByCassandraReplica("test_ks", "test_tbl", 10)
    .joinWithCassandraTable("test_ks", "test_tbl")
    .collect

[36mres21[39m: [32mArray[39m[([32mRow[39m, [32mCassandraRow[39m)] = [33mArray[39m(
  ([1], CassandraRow{id: 1, english: one, symbol: null}),
  ([2], CassandraRow{id: 2, english: two, symbol: null})
)

In [None]:
val collection = sc.parallelize(Seq((4, "four", "IV"), (5, "five", "V")))
collection.saveToCassandra("test_ks", "test_tbl", SomeColumns("id", "english", "symbol"))