In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

In [2]:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext, HiveContext,SparkSession
from pyspark.ml import Pipeline,PipelineModel


In [3]:
spark = SparkSession\
.builder\
.appName("test-spark-streaming")\
.master("local[*]")\
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")\
.getOrCreate();

In [None]:
data_source = "windowsos"
tenant_name = "itd"
time_window="day"
entity_type="ip"
anomaly_type="profile"
model_type="sklearn"
model_name="isolationforest"

BASE_PATH = "/Users/tuhinsharma/Documents/sstech/"+tenant_name
ANOMALY_DATA_REPOSITORY = BASE_PATH + "/models_data/data"

USER_PROFILE_DATA_PATH = ANOMALY_DATA_REPOSITORY + "/{data_source}/{entity_type}/{anomaly_type}/{time_window}.json"
data_path = USER_PROFILE_DATA_PATH.format\
                                           (data_source=data_source,\
                                            entity_type=entity_type,anomaly_type="profile",time_window=time_window)
    
ANOMALY_MODEL_REPOSITORY = BASE_PATH + "/models_data/model"
PROFILE_ANOMALY_MODEL_PATH = ANOMALY_MODEL_REPOSITORY + "/{data_source}/{entity_type}/{anomaly_type}/{time_window}/{model_type}/{model_name}"




In [None]:
model_path = PROFILE_ANOMALY_MODEL_PATH.format(data_source=data_source,\
                                  entity_type=entity_type,anomaly_type=anomaly_type,time_window=time_window,\
                                 model_type=model_type,model_name=model_name)

In [None]:
import json
def binary_to_dict(x):
    my_dict = json.loads(x)
    return my_dict["dns_error_count"]*2

In [None]:
from pyspark.sql.types import StringType, StructField, StructType

sc = spark.sparkContext
ssc = StreamingContext(sc, 5)
brokers, topic = "localhost:9092", "mytesttopic"
kvs = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1]).map(binary_to_dict)

lines.pprint()

ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2019-03-26 22:55:10
-------------------------------------------
3904
3906
3908

-------------------------------------------
Time: 2019-03-26 22:55:15
-------------------------------------------
3910
3912
3914
3916
3918

-------------------------------------------
Time: 2019-03-26 22:55:20
-------------------------------------------
3920
3922
3924
3926
3928



In [None]:
outputPath = '/tmp/checkpoint'

def getSqlContextInstance(sparkContext):
	if ('sqlContextSingletonInstance' not in globals()):
		globals()['sqlContextSingletonInstance'] = HiveContext(sparkContext)
	return globals()['sqlContextSingletonInstance']

def createContext():
	sc = SparkContext(appName="PythonStreamingKafkaChCount")
	ssc = StreamingContext(sc, 5)
	zkQuorum, topic = sys.argv[1:]
	kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
	parsed = kvs.map(lambda k,v: json.loads(v))
	times_dstream = parsed.map(lambda x: x['time'])
	channel_dstream = parsed.map(lambda x: x['data'])

	count_values_windowed = channel_dstream.flatMap(lambda x: x.items())\
		.reduceByKeyAndWindow(lambda x,y: x+y, lambda x, y:x-y, 30,5)

	def writeRecord(time, rdd):
		try:
			hiveContext = getSqlContextInstance(rdd.context)
			rowRdd = rdd.map(lambda x: Row(time=time, key=x[0], value=x[1]))
			print(rowRdd.take(5))
			wordsDataFrame = hiveContext.createDataFrame(rowRdd)
			wordsDataFrame.show()
			wordsDataFrame.registerTempTable("wc")
			wordsDataFrame = hiveContext.sql("INSERT INTO TABLE word_count SELECT time, key, value from wc")
		except Exception as e:
			print(str(e))
			pass

	count_values_windowed.foreachRDD(writeRecord)
	return ssc

if __name__ == "__main__":
	if len(sys.argv) != 3:
		print("Usage: [filename].py <zk> <topic>", file=sys.stderr)
		exit(-1)
	else:
		print("Creating new context")
		if os.path.exists(outputPath):
			os.remove(outputPath)

		ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
		ssc.start()
		ssc.awaitTermination()