# Finding Influencer people on Twitter live data using Spark Streaming

Import sparkContext & StreamingContext from PySpark library.

In [None]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import json

Create a sparkContext<br>
Create Spark Streaming Context using SC (spark context). parameter 10 is the batch interval. <br>
Every 10 second the analysis will be done.

In [None]:
sc = SparkContext()

ssc = StreamingContext(sc, 10)

Connect to socket broker using ssc (spark streaming context)<br>
Host  (localhost) & port : 6782

In [None]:
socket_stream = ssc.socketTextStream("127.0.0.1", 6782)

window function parameter sets the Window length. All the analysis will be done on tweets stored for 60 secs.

In [None]:
lines = socket_stream.window( 30 )

### Process the Stream:
1. Receives tweet message, stored in lines. **Input DStream**
2. splits the messages into words. **Apply transformation on DStream : flatMap**
3. filters all the words which start with a hashtag(#). **transformation : filter**
4. converts the words to lowercase. **transformation : map**
5. maps each tag to (word, 1). **transformation : map**
6. then reduces and counts occurrences of each hash tag. (action : reduceByKey) hashtags = **output DStream**

## Find Influential people in twitter:

Hint - For simplicity assume the algorithm to find influential person is directly proportional to followers.
Find top 10 Influential personalities from the twitter across the globe. (In an interval of 10 seconds). 

In [None]:
def follower_count(tweet):
    json_tweet = json.loads(tweet)
    if 'user' in json_tweet:
        if 'followers_count' in json_tweet['user']:
            return json_tweet['user']['followers_count']
    return 0    

In [None]:
def user_screen_name(tweet):
    json_tweet = json.loads(tweet)
    if 'user' in json_tweet:
        if 'screen_name' in json_tweet['user']:
            return json_tweet['user']['screen_name']
    return 'unknown'

In [None]:
Influential_people = lines.map( lambda text: (user_screen_name(text), follower_count(text)))

In [None]:
Influential_people = Influential_people.reduceByKey( lambda a,b:a+b)

Sort the inflence people based on the counts in decreasing order

In [None]:
Influential_people_sorted_dstream = Influential_people.transform(lambda foo:foo.sortBy(lambda x:x[0].lower()).sortBy(lambda x:x[1],ascending=False))

Print the final analysis: Most influencer people on streaming twitter data

In [None]:
Influential_people_sorted_dstream.pprint(10)

### Starting the Spark Streaming:
Spark Streaming code we have written till now will not execute, untill we start the ssc.<br>
ssc.start() will start the spark streaming context. This is the Action for the whole code. <br>
Now it'll create the lineage & DAG & do the lazy evaluation & start running the whole sequesnce of code.


In [None]:
ssc.start()

awaitTermination() is very important to stop the SSC.<br> 
When we kill this python process then this signal will be sent to awaitTermination() function.<br> 
it will finally stop the spark streaming job.

In [None]:
ssc.awaitTermination()