In [1]:
%AddDeps org.apache.spark spark-sql-kafka-0-10_2.11 2.4.0 --transitive

Marking org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 for download
Obtained 12 files


In [2]:
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()

// For implicit conversions from RDDs to DataFrames
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

spark = org.apache.spark.sql.SparkSession@578b252c


org.apache.spark.sql.SparkSession@578b252c

In [3]:
val schema = new StructType()
  .add($"time".timestamp)
  .add($"action".string)

schema = StructType(StructField(time,TimestampType,true), StructField(action,StringType,true))


StructType(StructField(time,TimestampType,true), StructField(action,StringType,true))

In [4]:
val IP="localhost"
val TOPIC="events"
val ds1 = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", IP + ":9092")
          .option("zookeeper.connect", IP + ":2181")
          .option("subscribe", TOPIC)
          .option("startingOffsets", "earliest")
          .option("max.poll.records", 10)
          .option("failOnDataLoss", false)
          .load()

// Now you can use this schema in from_json method like below.

val df = ds1.select($"value" cast "string" as "json")
            .select(from_json($"json", schema) as "data")
            .select("data.*")

IP = localhost
TOPIC = events
ds1 = [key: binary, value: binary ... 5 more fields]
df = [time: timestamp, action: string]


[time: timestamp, action: string]

In [5]:
// Same query as staticInputDF
val streamingCountsDF = 
  df
    .groupBy($"action", window($"time", "1 hour"))
    .count()

// Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

streamingCountsDF = [action: string, window: struct<start: timestamp, end: timestamp> ... 1 more field]


true

In [16]:
spark.conf.set("spark.sql.shuffle.partitions", "1")  // keep the size of shuffles small

val query =
  streamingCountsDF
    .writeStream
    .format("memory")        // memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     // counts = name of the in-memory table
    .outputMode("complete")  // complete = all the counts should be in the table
    .start()

query = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7233e1a1


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7233e1a1

In [17]:
var sqlstr="select action, date_format(window.end, \"MMM-dd HH:mm\") as time, count from counts order by time, action"
var df=spark.sql(sqlstr)
df.show()

+------+------------+-----+
|action|        time|count|
+------+------------+-----+
| Close|Jul-26 11:00|   11|
|  Open|Jul-26 11:00|  179|
| Close|Jul-26 12:00|    1|
|  Open|Jul-26 12:00|    5|
+------+------------+-----+



sqlstr = select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action
df = [action: string, time: string ... 1 more field]


[action: string, time: string ... 1 more field]

In [18]:
var sqlstr="select action, sum(count) as total_count from counts group by action order by action"
var df=spark.sql(sqlstr)
df.show()

+------+-----------+
|action|total_count|
+------+-----------+
| Close|         12|
|  Open|        184|
+------+-----------+



sqlstr = select action, sum(count) as total_count from counts group by action order by action
df = [action: string, total_count: bigint]


[action: string, total_count: bigint]

In [19]:
// Finally, you can stop the query running in the background, either by clicking on the 'Cancel' link in the cell of the query, or by executing query.stop(). Either way, when the query is stopped, the status of the corresponding cell above will automatically update to TERMINATED.
query.stop()

## sink to kafka


In [18]:
println(streamingCountsDF.isStreaming)
println(streamingCountsDF.schema)

true
StructType(StructField(action,StringType,true), StructField(window,StructType(StructField(start,TimestampType,true), StructField(end,TimestampType,true)),false), StructField(count,LongType,false))


In [21]:
import java.util.UUID
import org.apache.spark.sql.functions._
import spark.implicits._

val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = streamingCountsDF
  // .selectExpr("CAST(action AS STRING)", "CAST(count AS STRING)")
  .select(to_json(struct($"action", $"count", $"window.end")).alias("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", IP + ":9092")
  .option("topic", "events-out")
  .option("checkpointLocation", checkpointLocation)
  .outputMode("complete")
  .start()


checkpointLocation = /tmp/temporary-df6e5550-5551-4324-9e18-144f5c613905
ds = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@30e22d4


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@30e22d4

In [22]:
ds.stop()