# Acceleration Example

### Imports

In [1]:
import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
import org.apache.kafka.common.serialization.{BytesDeserializer, StringDeserializer}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.sql.types.StructType
import collection.JavaConverters.mapAsJavaMapConverter

In [2]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
  
import spark.implicits._

### Create Streaming Context

In [3]:
val ssc = new StreamingContext(sc, Seconds(1))
ssc.remember(Minutes(1))

### Setup Kafka input stream

In [6]:
val consumerParams = Map[String, Object](
  "bootstrap.servers" -> "172.16.238.10:9092",
  "key.deserializer" -> classOf[BytesDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-notebook",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("sample-topic")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, consumerParams)
)

### Expected Input Schema

{"time":1990,"type":"DOTA_COMBATLOG_DAMAGE","value":4,"attackername":"npc_dota_hero_huskar","targetname":"npc_dota_hero_huskar","sourcename":"npc_dota_hero_huskar","targetsourcename":"npc_dota_hero_huskar","attackerhero":true,"targethero":true,"attackerillusion":false,"targetillusion":false,"inflictor":"item_armlet"}

In [7]:
val schema = new StructType().add("time", "long").add("type", "string").add("value", "int")

### Stream Processing

In [8]:
stream.foreachRDD { rdd =>
  spark.read.schema(schema).json(rdd.map(_.value())).createOrReplaceTempView("logs")
  spark.sql("select * FROM logs").toJSON.foreachPartition {
    partition =>

      val producerParams = Map[String, Object](
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "172.16.238.10:9092",
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer"
      )
 
      val producer = new KafkaProducer[String, String](producerParams.asJava)
      
      partition.foreach { s =>
        if (s != "{}")
            producer.send(new ProducerRecord[String, String]("sample-topic", s))
      }
      
      producer.close()
  }
}

### Start stream

In [12]:
ssc.start()

### Lets see what we really read

In [13]:
%%SQL
select * from logs

+----+----+-----+
|time|type|value|
+----+----+-----+
+----+----+-----+



### Stop stream

In [None]:
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }

#### Verify the contents in Kafka using the console consumer

The following command line tools can help print the contents to the console.
```sh
./bin/kafka-console-consumer.sh --topic sample-topic --bootstrap-server localhost:9092
```

In [12]:
// Subscribe to 1 topic
val df = spark.readStream.option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "sample-topic").load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]

Name: java.lang.IllegalArgumentException
Message: 'path' is not specified
StackTrace:   at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:227)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:227)
  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
  at org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.getOrElse(ddl.scala:112)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:226)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
  at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)