# Acceleration Example

### Imports

In [None]:
import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
import org.apache.kafka.common.serialization.{BytesDeserializer, StringDeserializer}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.sql.types.StructType
import collection.JavaConverters.mapAsJavaMapConverter

### Create Streaming Context

In [None]:
val ssc = new StreamingContext(sc, Seconds(1))
ssc.remember(Minutes(1))

### Setup Kafka input stream

In [None]:
val consumerParams = Map[String, Object](
  "bootstrap.servers" -> "kafka:9092",
  "key.deserializer" -> classOf[BytesDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-notebook",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("android")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, consumerParams)
)

### Expected Input Schema

In [None]:
val schema = new StructType().add("x", "float").add("y", "float").add("z", "float").add("timestamp", "long")

### Stream Processing

In [None]:
stream.foreachRDD { rdd =>
  spark.read.schema(schema).json(rdd.map(_.value())).createOrReplaceTempView("locations")
  spark.sql("select avg(x), avg(y), avg(z), min(timestamp) from locations").toJSON.foreachPartition {
    partition =>

      val producerParams = Map[String, Object](
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092",
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer"
      )
 
      val producer = new KafkaProducer[String, String](producerParams.asJava)
      
      partition.foreach { s =>
        producer.send(new ProducerRecord[String, String]("acceleration", s))
      }
      
      producer.close()
  }
}

### Start stream

In [None]:
ssc.start()

### Lets see what we really read

In [None]:
%%SQL
select * from locations

### Stop stream

In [None]:
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }

#### Verify the contents in Kafka using the console consumer

The following command line tools can help print the contents to the console.
```sh
./bin/kafka-console-consumer --topic test-output --zookeeper localhost:2181
```