In [None]:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
import io
import requests

In [None]:
def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']


if __name__ == "__main__":
    sc = SparkContext.getOrCreate()
    ssc = StreamingContext(sc, 5)

In [None]:
socket_stream = ssc.socketTextStream("192.1.1.1", 5559)

In [None]:
lines = socket_stream.window(60)

In [None]:
tweets = lines.flatMap(lambda line: line.split("\n"))

In [None]:
# Convert RDDs of the words DStream to DataFrame and run SQL query
def process(time, rdd):
        print("========= %s =========" % str(time))
        from pyspark.sql.types import NumericType
        try:
            # Get the singleton instance of SparkSession
            spark = getSparkSessionInstance(rdd.context.getConf())

            header = ["ScreenName","FollowersCount","Language","TweetsCount","CountryCode"]
            rowRdd = rdd.map(lambda row : row.split("||"))
            jsonDataFrame = spark.createDataFrame(rowRdd, header)
            jsonDataFrame = jsonDataFrame.withColumn("FollowersCount",jsonDataFrame["FollowersCount"].cast("Int"))
            jsonDataFrame = jsonDataFrame.withColumn("TweetsCount",jsonDataFrame["TweetsCount"].cast("Int"))
            
                        
            # Creates a temporary view using the DataFrame.
            jsonDataFrame.createOrReplaceTempView("tweets")

            LangDF = \
            spark.sql("select Language,TweetsCount  from tweets")
            #4.Geo based analysis:
            #4.2 Top countries in terms of Tweets Volume
            data = LangDF.groupby('Language').agg({'TweetsCount': 'count'})
            oldColumns = data.schema.names
            newColumns = ["Language", "TweetCount"]
            TweetVolumedf = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), data)
            TweetVolumedf.createOrReplaceTempView("tweetscnt")
            vol_df = \
            spark.sql("select Language, TweetCount from tweetscnt order by TweetCount desc limit 15")
            vol_df.show()
            send_df_to_dashboard(vol_df)
            
            
            
            
        except:
            pass

        

In [None]:
def send_df_to_dashboard(df):
    # extract the hashtags from dataframe and convert them into array
    top_tags = [str(t.Language) for t in df.select("Language").collect()]
    # extract the counts from dataframe and convert them into array
    tags_count = [p.TweetCount for p in df.select("TweetCount").collect()]
    # initialize and send the data through REST API
    url = 'http://192.1.1.1:5001/updateData'
    request_data = {'label': str(top_tags), 'data': str(tags_count)}
    response = requests.post(url, data=request_data)

In [None]:
tweets.foreachRDD(process)

In [None]:
ssc.start()

In [None]:
ssc.awaitTermination()