<a href="https://colab.research.google.com/github/ekaratnida/Data_Streaming_and_Realtime_Analytics/blob/main/Week6_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!python --version

Python 3.7.12


In [None]:
!pip install confluent-kafka

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.8-bin-hadoop2.7.tgz
!pip install findspark

In [3]:
!wget "https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.4.8/spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar"

--2021-09-25 01:54:06--  https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.4.8/spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12002039 (11M) [application/java-archive]
Saving to: ‘spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar’


2021-09-25 01:54:06 (92.0 MB/s) - ‘spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar’ saved [12002039/12002039]



In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /content/spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar pyspark-shell'

In [5]:
import findspark
findspark.init()

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random
import pyspark
import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1
import time

kafka_topic_name = "stream-expresso-input"
kafka_bootstrap_servers = 'ec2-13-229-46-113.ap-southeast-1.compute.amazonaws.com:9092'

In [58]:
sc = pyspark.SparkContext()
ssc = StreamingContext(sc,1)

In [59]:
tf_idf_table = list()
df_table = dict()

tf_idf_broadcast = sc.broadcast(tf_idf_table)
tf_idf_broadcast.value.append(dict())
df_broadcast = sc.broadcast(df_table)

doc_count = sc.accumulator(0)

print(tf_idf_broadcast.value)
print(df_broadcast.value)

[{}]
{}


# Calculate TF

In [60]:
import math
def calculate_tf(data_dict):
    total = 0
    result = dict()
    for i in data_dict.values():
        total += i
        
    for k, v in data_dict.items():
        result[k] = v / total
    return result



# Calculate IDF

In [61]:
def calculate_idf():

  word_appear_count = {} 
  N = len(tf_idf_broadcast.value)
  #Count N of word to find that word appear each time from every page
  for word in df_broadcast.value: 
    for doc in tf_idf_broadcast.value:
      for key, value in doc["tf"].items():  
        if word == key and word not in word_appear_count:
          word_appear_count[word] = 1
        elif word == key and word in word_appear_count:
          word_appear_count[word] += 1
  
  #Calculate IDF
  for doc in tf_idf_broadcast.value:
    result_idf = {}
    for key, value in doc["tf"].items():
      idf = math.log10((1+N)/(1+word_appear_count[key])) + 1
      result_idf[key] = idf
    doc["idf"] = result_idf

# Public Most important words to Consumer

In [62]:
from confluent_kafka import Producer

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


def publishToKafkaOutput(json):

  kafka_output_topic_name = "stream-expresso-output"

  p = Producer({'bootstrap.servers': kafka_bootstrap_servers, 'message.max.bytes': '2048576'})
  p.poll(0)
  p.produce(kafka_output_topic_name, value=json, callback=delivery_report)
  p.flush()


# Processing TF IDF

In [63]:
import json
def convertToJson(list):

  results = {}
  for index, page in enumerate(list):
    pageKey = f"page{index+1}"
    results[pageKey] = page

  return json.dumps(results, indent = 4)

def calculateTFIDFAndGetTop(dict, top):
  results = {} 
  top_dict = {}
  for key_tf, value_tf in dict['tf'].items():
    for key_idf, value_idf in dict['idf'].items():
      if key_tf == key_idf:
        results[key_tf] = value_tf * value_idf

  sort_orders = sorted(results.items(), key=lambda x: x[1], reverse=True)[:top]
  
  for item in sort_orders:
    top_dict[item[0]] = item[1]
  
  return top_dict

def is_new_page(data):
  return data == '#####'

def is_end_of_the_book(data):
  return data == '@@@@@@@'

def echo(time, rdd):
  datas = rdd.collect()
  df_page = {}
  if len(datas) > 0:
    print(f'Page #: {doc_count.value + 1}')
    for data in datas:

      print(data)

      # if new doc
      if is_new_page(data[0]):
        
        tf_idf_dict = {}
        tf_idf_dict["tf"] = calculate_tf(df_page)
        tf_idf_dict["idf"] = {}

        tf_idf_broadcast.value[doc_count.value] = tf_idf_dict   
        doc_count.add(1)

        tf_idf_broadcast.value.append(dict())

      elif is_end_of_the_book(data[0]):
        tf_idf_broadcast.value.pop()
        calculate_idf()
        
        
        dict_rdd = sc.parallelize(tf_idf_broadcast.value)
        df = dict_rdd.map(lambda d: dict(calculateTFIDFAndGetTop(d,1)))

        json = convertToJson(df.collect())

        publishToKafkaOutput(json)
        print(tf_idf_broadcast.value)
        print(json)
        
      else:
        if data[0] not in df_page:
          # if it's a new keyword for the doc set df for this keyword to 1 otherwise increase 1.
          df_page[data[0]] = data[1]
          df_broadcast.value[data[0]] = data[1]
        else:
          df_page[data[0]] += data[1] 
          df_broadcast.value[data[0]] += data[1]


In [64]:
kvs = KafkaUtils.createStream(ssc, kafka_bootstrap_servers, 'spark-streaming-consumer', {kafka_topic_name:1}) 
kvs = KafkaUtils.createDirectStream(ssc, [kafka_topic_name], {"metadata.broker.list": kafka_bootstrap_servers})
kvs = KafkaUtils.createDirectStream(ssc, [kafka_topic_name], {
                        'bootstrap.servers':kafka_bootstrap_servers,
                        'group.id':'test-group',
                        'auto.offset.reset':'largest'})

In [65]:
lines = kvs.map(lambda x: x[1])

counts1 = lines.flatMap(lambda line: line.split(' ')).map(lambda word: (word, 1))#.reduceByKey(lambda a, b: a+b)
counts1.foreachRDD(echo)
counts1.pprint()

ssc.start()

ssc.awaitTerminationOrTimeout(60)
ssc.stop()
sc.stop()

-------------------------------------------
Time: 2021-09-25 03:21:17
-------------------------------------------

-------------------------------------------
Time: 2021-09-25 03:21:18
-------------------------------------------

-------------------------------------------
Time: 2021-09-25 03:21:19
-------------------------------------------

-------------------------------------------
Time: 2021-09-25 03:21:20
-------------------------------------------

-------------------------------------------
Time: 2021-09-25 03:21:21
-------------------------------------------

-------------------------------------------
Time: 2021-09-25 03:21:22
-------------------------------------------

-------------------------------------------
Time: 2021-09-25 03:21:23
-------------------------------------------

-------------------------------------------
Time: 2021-09-25 03:21:24
-------------------------------------------

-------------------------------------------
Time: 2021-09-25 03:21:25
----------

In [41]:
ssc.stop()
sc.stop()