# Popularity Analysis of K-Drama on Twitter live data based on Language Speaker using Spark Streaming

import SparkContext and StreamingContext from PySpark Library

In [None]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns

Create a SparkContext with Appname 'StreamingTwitterAnalysis'
Setting LogLevel to ERROR. This will not print all logs which are INFO or WARN level
Create Spark StreamingContext. 10 is the batch interval

In [None]:
sc = SparkContext(appName='StreamingTwitterAnalysisKDrama', master='local[*]')
sc.setLogLevel('ERROR')
ssc = StreamingContext(sc, 10)
spark = SparkSession(sc)

Connect to socket broker using ssc.


In [None]:
socket_stream = ssc.socketTextStream('127.0.0.1', 9880)

window function parameter sets the window length. All the analysis will be done on Tweets stored for 60 secs

In [None]:
lines = socket_stream.window(60)

## Process the Stream:


In [None]:
lang_dict = {'am': 'Amharic',
 'de': 'German',
 'ml': 'Malayalam',
 'sk': 'Slovak',
 'ar': 'Arabic',
 'el': 'Greek',
 'dv': 'Maldivian',
 'sl': 'Slovenian',
 'hy': 'Armenian',
 'gu': 'Gujarati',
 'mr': 'Marathi',
 'ckb': 'Sorani Kurdish',
 'eu': 'Basque',
 'ht': 'Haitian Creole',
 'ne': 'Nepali',
 'es': 'Spanish',
 'bn': 'Bengali',
 'iw': 'Hebrew',
 'no': 'Norwegian',
 'sv': 'Swedish',
 'bs': 'Bosnian',
 'hi': 'Hindi',
 'or': 'Oriya',
 'tl': 'Tagalog',
 'bg': 'Bulgarian',
 'hi-Latn': 'Latinized Hindi',
 'pa': 'Panjabi',
 'ta': 'Tamil',
 'my': 'Burmese',
 'hu': 'Hungarian',
 'ps': 'Pashto',
 'te': 'Telugu',
 'hr': 'Croatian',
 'is': 'Icelandic',
 'fa': 'Persian',
 'th': 'Thai',
 'ca': 'Catalan',
 'in': 'Indonesian',
 'pl': 'Polish',
 'bo': 'Tibetan',
 'cs': 'Czech',
 'it': 'Italian',
 'pt': 'Portuguese',
 'zh-TW': 'Traditional Chinese',
 'da': 'Danish',
 'ja': 'Japanese',
 'ro': 'Romanian',
 'tr': 'Turkish',
 'nl': 'Dutch',
 'kn': 'Kannada',
 'ru': 'Russian',
 'uk': 'Ukrainian',
 'en': 'English',
 'km': 'Khmer',
 'sr': 'Serbian',
 'ur': 'Urdu',
 'et': 'Estonian',
 'ko': 'Korean',
 'zh-CN': 'Simplified Chinese',
 'ug': 'Uyghur',
 'fi': 'Finnish',
 'lo': 'Lao',
 'sd': 'Sindhi',
 'vi': 'Vietnamese',
 'fr': 'French',
 'lv': 'Latvian',
 'si': 'Sinhala',
 'cy': 'Welsh',
 'ka': 'Georgian',
 'lt': 'Lithuanian'}

In [None]:
def mapping(l, lang_dict):
    if l not in lang_dict:
        return None
    else:
        return lang_dict[l]

In [None]:
langs = lines.map(lambda l: mapping(l, lang_dict))

In [None]:
langs = langs.filter(lambda x: x is not None)

In [None]:
sns.set(style='whitegrid')

## Process and Visualize the Data

In [None]:
from pyspark.sql import Row
def process(spark, rdd, sns, plt):
    if not rdd.isEmpty():
        rdd = rdd.map(lambda c: Row(lang=c))
        df = spark.createDataFrame(rdd)
        df.createOrReplaceTempView('langs')
        df = spark.sql('select lang, count(*) as tweets_count \
                        from langs \
                        group by lang \
                        order by tweets_count desc \
                        limit 10')
        pd_df = df.toPandas()
        sns.barplot(x='tweets_count', y='lang', data=pd_df)
        plt.show()

In [None]:
langs.foreachRDD(lambda rdd: process(spark, rdd, sns, plt))

## Starting Spark Streaming

In [None]:
ssc.start()

In [None]:
ssc.awaitTermination()

In [None]:
ssc.stop()

In [None]:
sc.stop()