### PART 2 - Process the Twitter Data in Real time using Spark Streaming & Kafka Consumer

In [1]:
import sys # used to exit
import json, re
import datetime, time

from __future__ import print_function

In [2]:
# Spark Streaming Libs
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

In [13]:
# Create a Spark Context
sc = SparkContext(appName="Group2_RTD_SparkStreaming")

In [14]:
# Create a Streaming Context
sc.setLogLevel("WARN")
ssc = StreamingContext(sc,10)

#### Setup Kafka Stream (Consumer) to get the tweets from the topic 
Note: Here we use the broker server from Cloud Lab.

In [15]:
# Create a KafkaStream
kafkaStream = KafkaUtils.createStream(ssc, 'ip-20-0-32-4.ap-south-1.compute.internal:2181', 'spark-streaming', {'rtd2analysis':1})

In [16]:
ssc.checkpoint("checkpoint")

#### Lets create a function to filter stop words & emojis from the twitter text

In [17]:
import nltk
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))

def cleanTweet(tweet):
    less_stopwords = ' '.join([word for word in tweet.split() if word not in stop_words])
    return ' '.join(re.sub("(@[0-9*#+/]+)|([^0-9A-Za-z@:/._ \t])", " ", less_stopwords).split())

In [18]:
# Func to add all values
def updateFunction(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

In [19]:
# parse the tweet data
parsed = kafkaStream.map(lambda v: json.loads(v[1].encode("ascii", "ignore")))

### 1. Find Influential people in twitter:
Find top 10 Influential personalities from the twitter across the globe. (In an interval of 10 seconds).

In [20]:
# Get the user & their followers count
follower_counts = parsed.map(lambda tweet: (tweet['user']['screen_name'],tweet['user']['followers_count'])).reduceByKey(lambda x,y: y)

# Print Top 10 users with most followers in last 10 secs
sorted_FollowerCounts = follower_counts.transform(lambda foo:foo.sortBy(lambda x:x[1],ascending=False))
sorted_FollowerCounts.pprint(num=10)

### 2. Get trending Topics in twitter right now:
Find the word from the tweet which is occurring most of the times in the whole tweets corpus.
Top 10 trending topic (In an interval of 10 seconds).

Note: To find the word from the tweet which is occurring most of the times in the whole tweets corpus, we have to cumulate the counts all time. 

In [21]:
# Get text part if the tweet & clean the same
texts = parsed.map(lambda tweet: cleanTweet(tweet['text']))

# Split the text into words
# For simplicity lets select only words with length less or equal to 20 and greater than 5.
text_counts = texts.flatMap(lambda text: text.split(" ")).map(lambda word:(word,len(word))).filter(lambda t: t[1] <= 20 and t[1] > 5).reduceByKey(lambda x,y: x + y)

# Calculate cumulative counts for words
runningWordCounts = text_counts.updateStateByKey(updateFunction)

# Sort & Order
sorted_WordCounts = runningWordCounts.transform(lambda foo:foo.sortBy(lambda x:x[0]).sortBy(lambda x:x[1],ascending=False))

# print the top 10 Count
sorted_WordCounts.pprint(num=10)

In [22]:
try:
    ssc.start()
    ssc.awaitTerminationOrTimeout(1000)
    ssc.stop(stopGraceFully=True,stopSparkContext=True)
    
except KeyboardInterrupt:
    ssc.stop(stopGraceFully=True,stopSparkContext=True)


-------------------------------------------
Time: 2018-08-26 02:47:30
-------------------------------------------
(u'radioyskl', 81794)
(u'fhay_cotton_h', 24502)
(u'varelacl', 14097)
(u'tingilye', 13319)
(u'ThinkSmart2011', 10804)
(u'DeppCulture', 9446)
(u'fwckook', 7226)
(u'iAmJCotti', 6055)
(u'millarly', 5986)
(u'Havanaxeri', 5712)
...

-------------------------------------------
Time: 2018-08-26 02:47:30
-------------------------------------------
(u'FFArtistShawn', 65)
(u'PAPASA', 48)
(u'SelenaGomez', 44)
(u'really', 36)
(u'@BTS_twt', 32)
(u'@YouTube', 32)
(u'@JudicialWatch:', 30)
(u'@ProjetoCabraRE', 30)
(u'McCain', 30)
(u'LarissaManoela', 28)
...

-------------------------------------------
Time: 2018-08-26 02:47:40
-------------------------------------------
(u'Moginiki', 45344)
(u'Tt3b1', 35084)
(u'sueatia', 34290)
(u'numsaeng13', 30710)
(u'Euraridade', 26251)
(u'tssuec', 24696)
(u'peeprlxrd', 17866)
(u'unseen1_unseen', 13264)
(u'morfi', 12818)
(u'worthybieber', 12452)
...

---

-------------------------------------------
Time: 2018-08-26 02:49:30
-------------------------------------------
(u's_5022s', 15991)
(u'npnp333', 13604)
(u'langernutrition', 12404)
(u'JONATASFFC_00', 12239)
(u'danlan_jp', 11978)
(u'taki7', 11940)
(u'SEXYSLARRY', 8757)
(u'Jeje_Jones', 8679)
(u'mcutholland', 6815)
(u'LibertyLynx', 6810)
...

-------------------------------------------
Time: 2018-08-26 02:49:30
-------------------------------------------
(u'McCain', 558)
(u'@BTS_twt', 552)
(u'FFArtistShawn', 455)
(u'people', 306)
(u'iHeartRadioMMVAs', 304)
(u'180825', 288)
(u'@realDonaldTrump', 256)
(u'BTSLoveYourselfTour', 247)
(u'@ProjetoCabraRE', 240)
(u'YTUnfreezeIDOL', 238)
...

-------------------------------------------
Time: 2018-08-26 02:49:40
-------------------------------------------
(u'monst_campaign', 565462)
(u'sexandtw_', 57426)
(u's_5022s', 15991)
(u'VoteHarryLane', 15170)
(u'_little_old_me', 15060)
(u'rxbelzjm', 11103)
(u'JSandlinWriter', 9328)
(u'OT7uwu', 8667)
(u'bria

-------------------------------------------
Time: 2018-08-26 02:51:30
-------------------------------------------
(u'mankai_company', 488459)
(u'jjjyyy16', 38720)
(u'marthakorrine', 25287)
(u'sectest9', 20101)
(u'ahmadriyadh20', 14660)
(u'Tr_q81', 12982)
(u'eduard0gil', 11047)
(u'a_pegacao', 10428)
(u'jintaepiphany', 7114)
(u'kana_ogawa', 6845)
...

-------------------------------------------
Time: 2018-08-26 02:51:30
-------------------------------------------
(u'@BTS_twt', 1128)
(u'McCain', 996)
(u'FFArtistShawn', 676)
(u'YTUnfreezeIDOL', 616)
(u'people', 546)
(u'@realDonaldTrump', 544)
(u'iHeartRadioMMVAs', 544)
(u'180825', 540)
(u'@MostRequestLive', 480)
(u'BTSLoveYourselfTour', 437)
...

-------------------------------------------
Time: 2018-08-26 02:51:40
-------------------------------------------
(u'DiscoverSelf', 63265)
(u'stephpadalecki', 19438)
(u'antwanstaley', 17447)
(u'zora107', 13194)
(u'murilloamaral', 10490)
(u'jimd22551', 8964)
(u'irwinlogia', 7991)
(u'JRFegan', 7788)

-------------------------------------------
Time: 2018-08-26 02:53:30
-------------------------------------------
(u'TelkomCare', 272508)
(u'MardaniAliSera', 106096)
(u'maryrock180457', 56033)
(u'sergiooliveiram', 48616)
(u'WesterosHistory', 15579)
(u'mdwehbie', 12937)
(u'4annegs', 10504)
(u'sweet2spicey12', 10072)
(u'YUVAMARWADI', 9027)
(u'kero__bot', 8913)
...

-------------------------------------------
Time: 2018-08-26 02:53:30
-------------------------------------------
(u'@BTS_twt', 1672)
(u'McCain', 1386)
(u'FFArtistShawn', 884)
(u'YTUnfreezeIDOL', 826)
(u'180825', 816)
(u'iHeartRadioMMVAs', 768)
(u'@realDonaldTrump', 752)
(u'BTSLoveYourselfTour', 741)
(u'people', 714)
(u'@MostRequestLive', 608)
...

-------------------------------------------
Time: 2018-08-26 02:53:40
-------------------------------------------
(u'SBS8news', 441610)
(u'Ali_Alshobaili', 176922)
(u'MinutoAlviverde', 49607)
(u'ARenLosDeportes', 44978)
(u'RCdeWinter', 30110)
(u'UnKbv2', 26746)
(u'jclockyer', 20460)