# spark-kafka-tableau integration
###### we will be trying to identify the top 20 trending hashtags of twitter by fwtching data from twitter and producing it @kafka and consuming it at spark for cleaning and sending back the cleaned data back to kafka so as to integrate other applications like hive and tableau to it

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

#### step 1
* start spark master and slaves
* Start zookeeper and kafka server slaves

### Connecting to ZooKeeper Using
* zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties 

### Activating the Kafka Server
* kafka-server-start.sh $KAFKA_HOME/config/server.properties



#### step 2
* create a topic in kafka named kafka-hive-spark
* kafka-topics.sh --zookeeper localhost 2181 --create --topic kafka-hive-spark --replication-factor 1 -partitions 2
* to launch a consumer at that topic 
* kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-hive-spark

In [2]:
spark = SparkSession.builder\
    .appName('kafka-hive-spark')\
    .master('local[2]')\
    .config('spark.sql.shuffle.partitions', 2)\
    .getOrCreate()

In [3]:
df = spark.readStream\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'localhost:9092')\
    .option('subscribe', 'kafka-hive-spark')\
    .load()

In [4]:
processing = df.selectExpr("CAST(value as STRING) AS value")\
    .select(from_json('value','id STRING, time STRING, text STRING').alias('x'))\
    .selectExpr("CAST(from_unixtime(x.time,'HH:mm:ss') as TIMESTAMP) as time","explode(split(lower(x.text),' ')) as hashtag")\
    .where("length(hashtag) > 1")\
    .where("substring(hashtag,1,1) = '#'")\
    .selectExpr("(time, hashtag) as result")\
    .select(to_json('result').alias('value'))

In [5]:
# processing.writeStream\
#     .format('console')\
#     .outputMode('append')\
#     .trigger(processingTime = '8 seconds')\
#     .option('truncate', 'false')\
#     .start()

#### Step 4:
    * Creatin a new topic to produce this cleaned data fo storage
        * kafka-topics.sh --zookeeper localhost 2181 --create --topic kafka-hive-spark-intermediate --replication-factor 1 --partitions 2
        
    
#### Step 5: 
    * Consumer for the new topic created:
    * kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-hive-spark-intermediate

In [6]:
processing.writeStream\
    .format('kafka')\
    .outputMode('append')\
    .trigger(processingTime = '8 seconds')\
    .option('kafka.bootstrap.servers', 'localhost:9092')\
    .option('topic', 'kafka-hive-spark-intermediate')\
    .option('checkpointLocation', '/home/sunbeam/Documents/tweepy/checkpoint3')\
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7ff022be3f60>

In [7]:
spark.streams.awaitAnyTermination()


KeyboardInterrupt

