Following thread would run on the Server. Broadly, I try to accomplish following sub-tasks. 

1. Mount drive for spark installation (refer to google colab doc)
2. Install spark and test it using simple examples . 
3. Connect to the client and create a spark datastream ([Spark streaming](https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html))
4. map data preprocessing, cleaning and sentimental analysis functions to datastream 
    Text preprocessing and cleaning includes : 
            4.1. Extract Emojis and decode 
            4.2. Remove punctuations 
            4.3. Remove URLs
            4.4. Remove RT , normalize negation 
            4.5. Remove more than 2 repeated characters (remaining)
            4.6. remove stop words 
            4.7. Decode abbreviations 
5. Normalize/lemmatize words 
6. execute mapping and display real time sentiment. 

Let's go !

- Mount Drive 

In [0]:
from google.colab import drive 
drive.mount('/content/drive/')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive/


-Install Hadoop/spark

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark

- Set env variables

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"
import findspark
findspark.init()


- Check whether pyspark works as expected using simple examples

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("temp2")
sc = SparkContext.getOrCreate(conf=conf)

def cal_ave (a, b):
  return (a[0]+b[0], a[1]+b[1])

def map_nonzero (no):
  if float(no) == 0 : 
    return (('apple', [0, 0]))
  else:
    return (('apple', [no, 1]))
  

a= sc.parallelize([0.0,0.00,0.0,-0.1]);
a = a.map(map_nonzero)
a = a.reduceByKey(cal_ave)
a = a.map(lambda x : (x[0], x[1][0]/x[1][1]) if x[1][1] != 0 else (x[0], 0))
print(a.take(10))

[('apple', -0.1)]


Following cell is just to make sure that there is no other spark context on . If yes, stop it . Not the best way to code it , but I'm just trying to get this to work ! 

In [0]:
sc.stop()
ssc.stop()

*   Streaming data is list of tweets. Each batch is a second long and correpsonds to 1 RDD. So Datastream can be thought as list of RDDs. When a function is mapped to Datastream, its mapped to each RDD. 





In [0]:
import emoji
import nltk
from textblob import TextBlob
nltk.download('stopwords')
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
import emoji

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
import re 
import string
import json


#What are we tracking :
query = 'apple'



#self explanatory
conf = SparkConf()  # create spark configuration
conf.setAppName("NIK_TWITTERSTREAM")
sc = SparkContext(conf=conf) # create spark instance with the above configuration
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)  # creat the Streaming Context from the above spark context with window size 2 seconds
ssc.checkpoint("checkpoint_TwitterApp")  # setting a checkpoint to allow RDD recovery
dataStream = ssc.socketTextStream('b0eee369c1ec',9205)   # read data from port 9009



#get  twitter slangs and abbreviations from webopedia 

def get_abbreviation (text):
  
  with open('/content/twitter_abb_dic.json') as fp : 
    abb_dic = json.load(fp)
  
  new_text = []
  for word in text.strip().split():
    if word in abb_dic.keys():
      new_text.append(abb_dic[word])
    else:
      new_text.append(word)
  
  return(' '.join(new_text))    

# sentiment    

def get_sentiments (tweet):
    return(TextBlob(tweet).sentiment.polarity)


  
def clean_tweet (tweet) :
  
  # Clean tweet and get sentiment
  
  tweet = ''.join([' '+emoji.demojize(char).replace('_', ' ').replace(':', '') if char in emoji.UNICODE_EMOJI else char for char in tweet]) ; 
  table = "".maketrans(string.punctuation, '*'*len(string.punctuation))
  tweet = tweet.translate(table) 
  tweet = tweet.replace('*', '') 
  tweet = ' '.join([word if (('http' not in word) and (word[0] != '@')) else '' for word in tweet.split() ])
  tweet = ' '.join([word for word in tweet.split() if word not in stopwords.words("english")])
  tweet = get_abbreviation(tweet);
  ps    = PorterStemmer()
  tweet = ps.stem(tweet)
  
  return(tweet)  
  
  
def cal_ave (a, b):
  return (a[0]+b[0], a[1]+b[1])

def map_nonzero (no):
  if float(no) == 0 : 
    return (('apple', [0, 0]))
  else:
    return (('apple', [no, 1]))  
  
#print tweet count in each batch (of 1 second)
dataStream.count().pprint()

#print tweets within a batch
dataStream.pprint()

#clean tweets
clean_tweets = dataStream.map(clean_tweet)
#clean_tweets.pprint()

#Extract sentiment 
sentiments = clean_tweets.map(get_sentiments)

#We are interested in Non-neutral sentiment 
sentiments = sentiments.map(map_nonzero)

#Average out sentiment over a batch
sentiments = sentiments.reduceByKey(cal_ave)
sentiments = sentiments.map(lambda x : (x[0], x[1][0]/x[1][1]) if x[1][1] != 0 else (x[0], 0))
#sentiments = sentiments.reduceByKey(lambda x, y : x+y)

#print sentiment ( on scale of -1 to 1 , 1 being extremely positive , -1 being extremely negative)
sentiments.pprint()


try : 
  # start the streaming computation
  ssc.start()
  # wait for the streaming to finish
  ssc.awaitTermination()
except   KeyboardInterrupt:
  sc.stop()
  ssc.stop()
  raise ; 



[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
-------------------------------------------
Time: 2019-08-03 23:57:06
-------------------------------------------

-------------------------------------------
Time: 2019-08-03 23:57:06
-------------------------------------------

-------------------------------------------
Time: 2019-08-03 23:57:06
-------------------------------------------

-------------------------------------------
Time: 2019-08-03 23:57:08
-------------------------------------------

-------------------------------------------
Time: 2019-08-03 23:57:08
-------------------------------------------

-------------------------------------------
Time: 2019-08-03 23:57:08
-------------------------------------------

-------------------------------------------
Time: 2019-08-03 23:57:10
-------------------------------------------
19

-------------------------------------------
Time: 2019-08-03 23:57:10
---

KeyboardInterrupt: ignored