In [ ]:
val a = 1

### Kafka / Batch Processing

In [ ]:
var KAFKA_BOOTSTRAP_SERVERS = "192.168.0.15:9092"

var kafkaSourceOptions =
  Map("kafka.bootstrap.servers" -> KAFKA_BOOTSTRAP_SERVERS,
      "subscribe" -> "raw",
      "startingOffsets" -> "earliest",
      "endingOffsets" -> "latest")

In [ ]:
val kBatchDF =
  sparkSession
    .read
    .format("kafka")
    .options(kafkaSourceOptions)
    .load
    .cache

println(kBatchDF.count)

kBatchDF

In [ ]:
kBatchDF.printSchema

In [ ]:
kBatchDF
  .withColumn("token", $"key".cast("string"))

In [ ]:
kBatchDF
  .withColumn("token", $"key".cast("string"))
  .withColumn("json_payload", $"value".cast("string"))
  .select($"token", $"json_payload")

In [ ]:
import org.apache.spark.sql.{functions => f}

val batchDF =
  kBatchDF
    .withColumn("token", $"key".cast("string"))
    .withColumn("json_payload", $"value".cast("string"))
    .withColumn("ip_token", f.get_json_object($"json_payload", "$.ip_token") )
    .withColumn("tstamp", f.get_json_object($"json_payload", "$.tstamp").cast("timestamp") )
    .select($"token", $"ip_token", $"tstamp")
    .cache

println(batchDF.count)

batchDF

In [ ]:
val clickCountsBatchDF =
  batchDF
    .groupBy($"ip_token", window($"tstamp", "6 seconds", "2 seconds"))
    .count
    .cache

println(clickCountsBatchDF.count)

clickCountsBatchDF

In [ ]:
clickCountsBatchDF.filter($"count" > 11)

In [ ]:
clickCountsBatchDF.printSchema

In [ ]:
import org.apache.spark.sql.functions.udf

def jsonizeFrequency(count: Long): String = s"""{"frequency": ${count}}"""

val jsonizeFrequencyUDF = udf(jsonizeFrequency(_: Long))

In [ ]:
clickCountsBatchDF
  .filter($"count" > 11)
  .withColumn("key", $"ip_token")
  .withColumn("value", jsonizeFrequencyUDF($"count"))

In [ ]:
var KAFKA_BOOTSTRAP_SERVERS = "192.168.0.15:9092"

var kafkaSinkOpts =
  Map("kafka.bootstrap.servers" -> KAFKA_BOOTSTRAP_SERVERS,
      "topic" -> "analytics")

In [ ]:
clickCountsBatchDF
  .filter($"count" > 11)
  .withColumn("key", $"ip_token")
  .withColumn("value", jsonizeFrequencyUDF($"count"))
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
  .option("topic", "analytics")
  .save

println("OK")

### Kafka / Structured Streaming

In [ ]:
var KAFKA_BOOTSTRAP_SERVERS = "192.168.0.15:9092"

var kafkaSourceOpts =
  Map("kafka.bootstrap.servers" -> KAFKA_BOOTSTRAP_SERVERS,
      "subscribe" -> "raw",
      "startingOffsets" -> "latest")

In [ ]:
val kStreamDF =
  sparkSession
    .readStream
    .format("kafka")
    .options(kafkaSourceOpts)
    .load

In [ ]:
kStreamDF.printSchema

In [ ]:
import org.apache.spark.sql.{functions => f}

val streamDF =
  kStreamDF
    .withColumn("token", $"key".cast("string"))
    .withColumn("json_payload", $"value".cast("string"))
    .withColumn("ip_token", f.get_json_object($"json_payload", "$.ip_token") )
    .withColumn("tstamp", f.get_json_object($"json_payload", "$.tstamp").cast("timestamp") )
    .select($"token", $"ip_token", $"tstamp")

In [ ]:
streamDF.printSchema

In [ ]:
val clickCountsStreamDF =
  streamDF
    .withWatermark("tstamp", "10 seconds")
    .groupBy($"ip_token", window($"tstamp", "6 seconds", "2 seconds"))
    .count

In [ ]:
clickCountsStreamDF.printSchema

In [ ]:
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
import org.apache.spark.sql.streaming.OutputMode.Append

In [ ]:
var KAFKA_BOOTSTRAP_SERVERS = "192.168.0.15:9092"

var kafkaSinkOpts =
  Map("kafka.bootstrap.servers" -> KAFKA_BOOTSTRAP_SERVERS,
      "topic" -> "analytics",
      "checkpointLocation" -> "/tmp/checkpoint")

In [ ]:
val fraudDetectionStream =
  clickCountsStreamDF
    .filter($"count" > 11)
    .withColumn("key", $"ip_token")
    .withColumn("value", jsonizeFrequencyUDF($"count"))
    .writeStream
    .format("kafka")
    .trigger(ProcessingTime("2 seconds"))
    .outputMode(Append)
    .options(kafkaSinkOpts)

In [ ]:
fraudDetectionStream.start.awaitTermination

### Writing a DataFrame / DataSet to Cassandra

In [ ]:
import com.datastax.spark.connector.toDataFrameFunctions

In [ ]:
case class Country (country_id: Long, continent_id: Long, country_name: String)

For reference, here are the Cassandra definitions we are going to need further down below.

```
CREATE KEYSPACE spark_ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 };

CREATE TABLE spark_ks.spark_countries (
  country_id INT,
  continent_id INT,
  country_name TEXT,
  PRIMARY KEY ((country_id))
);

CREATE TABLE spark_ks.spark_countries_with_github_id (
  github_id TEXT,
  country_id INT,
  continent_id INT,
  country_name TEXT,
  PRIMARY KEY ( (github_id), country_id )
);
```

Here we are configuring the Cassandra host name in the Spark application.

In [ ]:
val CASSANDRA_HOST = "dbserver"

sparkSession.conf.set("spark.cassandra.connection.host", CASSANDRA_HOST)

In [ ]:
val countrySchema = Encoders.product[Country].schema

val countriesDS = sparkSession.read
                              .schema(countrySchema)
                              .csv("/opt/SparkDatasets/geography/countries.csv")
                              .as[Country]

Finally, here we are saving the data into Cassandra.

In [ ]:
countriesDS
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options( Map("keyspace" -> "spark_ks", "table" -> "spark_countries") )
  .save

In [ ]:
val GH: String = "your_github_id_goes_here"

In [ ]:
countriesDS.withColumn("github_id", lit(GH))

In [ ]:
countriesDS
  .withColumn("github_id", lit(GH))
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options( Map("keyspace" -> "spark_ks", "table" -> "spark_countries_with_github_id") )
  .save