In [None]:
%%scala
%%spark <> --noUI
import java.net.InetAddress
val driver_host = InetAddress.getLocalHost.getHostAddress
SparkSession.builder()
	.appName("jt1test2")
	.master("k8s://https://kubernetes.default.svc.cluster.local:443")
	.config("spark.kubernetes.container.image", "splicemachine/sm_k8_spark:0.0.4")
	.config("spark.executor.instances", "2")
	.config("spark.submit.deployMode", "cluster")
	.config("spark.submit.deployMode", "cluster")
	.config("spark.driver.extraClassPath", "/opt/spark/conf:/opt/spark/jars/*")
	.config("spark.executor.extraClassPath", "./:/opt/hbase/conf:/opt/splicemachine/lib/*:/opt/spark/jars/*:/opt/hbase/lib/*")
	.config("splice.spark.executor.extraLibraryPath", "/opt/native")
	.config("spark.files", "/opt/spark/conf/hbase-site.xml,/opt/spark/conf/core-site.xml,/opt/spark/conf/hdfs-site.xml,/opt/spark/jars/hbase_sql-2.8.0.1926-cdh5.14.0.jar")
	.config("spark.kubernetes.authenticate.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
	.config("spark.kubernetes.authenticate.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token")
	.config("spark.driver.host", driver_host)
	.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")


<link rel="stylesheet" href="https://doc.splicemachine.com/zeppelin/css/zepstyles.css" />

# Kafka Queue

In this notebook, kafka topic is created and the data producer is started.  The producer is currently reading data from a csv file, with each line corresponding to a message.  The message contains comma separated values of data that map to various tables in the splice database. 

* There are "Items" that have a ID, Serial Number, CreatedTime and UPCcode.  
* There are "ItemFlow" events that occur at high frequency (1,000's per second, 600,000 in total in this demo) and are ingested into Splice
* An event occurs when an Item moves from Warehouse, arrives at a store, is seen at the POS terminal, in a Dressingroom or at a Door
* There are multiple warehouses and stores, with location coordinates all in a geographic region east of London.

After verifying/setting the values for the following, the notebook (all paragraphs) can be run by just selecting the Run from top tool bar or each paragraph can be run individually in the order they appear.

* Topic Name
* Zookeeper URL
* Broker URL
* File Name of the data file  

<p class="noteIcon">
Note: This assumes kafka server is already running. The Zookeeper and Broker need to set appropriately in the next paragraph 'Set Parameters'.
</p>



In [None]:
%%scala 
z.put("topicname", "iotdemo")
z.put("zookeeper", "zookeeper-0-node.{FRAMEWORKNAME}.mesos:2182")
z.put("brokers", "kafka-0-node.{FRAMEWORKNAME}.mesos:9092")


Next we will create kafka topic
The steps to create kafka topic are
<ui>
<li> Specify the Queue parameters : like session timeout, connectiontimeout, number of partitions and replication factor.
<li> Create Zookeper client
<li> Invoke AdminUtils to create topic
</ui>

When this paragraph is run, the topic is created or error is displayed if the topic already exists

In [None]:
%%scala 
import java.util.Properties
import kafka.admin.AdminUtils
import kafka.utils.ZkUtils

//Properties for zookeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000

//Properties for Kafak Queue
val topicName=z.get("topicname").toString
val numPartitions = 10
val replicationFactor = 1

// Create a ZooKeeper client
val zkUtils = ZkUtils.apply(z.get("zookeeper").toString, sessionTimeoutMs, connectionTimeoutMs,
    false)
    

// Create  topic
val topicConfig = new Properties
AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig)


Before running this, ensure appropriate file is set to filename variable in the code below.

In [None]:
%%scala 
import org.apache.commons.io.IOUtils
import java.net.URL
import java.util.Properties
import java.nio.charset.Charset
import scala.io.Source
import java.io.{FileReader, FileNotFoundException, IOException}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer


//Properties
val brokers = z.get("brokers")
val topic = z.get("topicname").toString

val messagesPerSec=1000
val pauseBetweenMessages = 500

//val filename = "https://s3.amazonaws.com/splice-demo/iot/itemflow_small.csv"
val filename = "https://s3.amazonaws.com/splice-demo/iot/itemflow_200k.csv"
//val filename = "https://s3.amazonaws.com/splice-demo/iot/itemflow_600k.csv"
     
//Add properties
val props =new Properties
props.put("bootstrap.servers", brokers)
props.put("acks", "all")
props.put("retries",new Integer( 0))
props.put("batch.size",new Integer( 16384))
props.put("linger.ms",new Integer( 1))
props.put("buffer.memory", new Integer(33554432))
props.put("key.serializer",
        "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer",
        "org.apache.kafka.common.serialization.StringSerializer")
    
//Create Kfka producer
val producer = new KafkaProducer[String, String](props)
    
    
//Read data file
val s3fileData = sc.parallelize(
    IOUtils.toString(new URL( filename), Charset.forName("utf8")).split("\n"))
    

//Put each line from file onto Queue in batchs specified by properties
var i = 0
s3fileData.collect().foreach(line =>  {
        val message =  new ProducerRecord[String, String](topic, null, line)
        producer.send(message)
        i= i+1;
        if (i >= messagesPerSec) {
            i = 0;
            Thread.sleep(pauseBetweenMessages)
         }
   }
)
    
println ("DONE")