# Consumer for Tweeter API 

## Imports

In [None]:
from confluent_kafka import Producer  
!pip install vaderSentiment
!pip install nltk
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
import re
from pyspark.ml.classification import LogisticRegressionModel
import pandas as pd
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline



[nltk_data] Downloading package stopwords to /home/linuxu/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


## Consumer Configuration

In [None]:
from confluent_kafka import Consumer

conf = {'bootstrap.servers': "localhost:8080",
        'group.id': "foo",
        'auto.offset.reset': 'smallest',
        'session.timeout.ms' : 10000}


consumer = Consumer(conf) #creating consumer with my configuration

## Processing function for incoming tweets

In [None]:
def remove_HTML(text):
    """
    Inputs a string and outputs a string free of any HTML tags
    """
    tag = re.compile(r'<.*?>')
    
    return tag.sub(r'',text)

def remove_URL(text):
    """
    Inputs a string and outputs a string free of any URLs
    """
    url = re.compile(r'https?://\S+|www\.\S+')
    
    return url.sub(r'',text)

def remove_emojis(text):
    """
    Inputs a string and outputs a string free of any emojis
    """
    emoji = re.compile("["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
    "]+", flags=re.UNICODE)
    
    return emoji.sub(r'',text)

def remove_punctuations(text):
    """
    Inputs a string and outputs a string free of any punctuations
    """
    punct = re.compile(r'[^\w\s]')
    
    return punct.sub(r'',text)

In [None]:
# set of all stopwords
stop = set(stopwords.words('english'))
stop.remove('not') # exclude not

def remove_stop_words(text):
    """
    inputs a text string and outputs a string without any stopwords
    """
    sentence = [] # list without any stopwords
    for word in text.split():
        if word not in stop:
            sentence.append(word)
            
    return " ".join(sentence)

In [None]:
def clean_text(text):
    """
    inputs a string:
    -------------------------------------
    outputs a string free from 
    1) html-tags
    2) urls
    3) punctuations
    4) stopwords
    """
    text = remove_HTML(text)
    text = remove_URL(text)
    text = remove_punctuations(text)
    text = remove_stop_words(text)
    
    return text

## Prepration for model

In [None]:
# Creating the pipeline for feature extraction
path ="/home/model2"
newLR = LogisticRegressionModel.load(path)

# tokenizing the data
tokenizer = Tokenizer(inputCol="tweet", outputCol="words")

# Creating an instance of the TF-IDF
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

# the complete pipeline: sequence of various stages
pipeline = Pipeline(stages=[tokenizer, hashtf, idf])

In [None]:
sqlContext = SQLContext(sc)
file_path = '/home/cleaned_train.csv'
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(file_path)
(train_set, val_set, test_set) = df.randomSplit([0.98, 0.01, 0.01], seed = 2000)

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)

In [None]:
def model_predict(test_tweet_):
    features = pipelineFit.transform(test_tweet_)
    preds = newLR.transform(features)
    return preds

## Consumer Loop

The Loop:

1.   The consumer recieves the tweets and pulls the text from them. 
2.   The text gets cleaned using the proccessing functions and return clean text.
3. Using **VADER lexicon library** - pull the compound score on the given tweet and classify it to be POSITIVE or NEGATIVE. we ignored the NEUTRAL label because we want to extract the emotions from the text. see the cell below for more information on VADER library.
4. Model prediction - get the model prediciton to the given tweet :
    1.   1 - POSITIVE
    2.   0 - NEGATIVE
5. If the model prediction and the VADER classification are correlate, then we classify them to the matching label. if they are not correlated, then we decide the tweet label is unspecified. if the VADER classification is NEUTRAL, the label will be what the model predicted. 
6. Positive tweet ratio calculation - we want what is the precenge of the positive tweets of all the tweets recieved by the consumer (expect the unspecified tweets). 






### VADER 
VADER is a lexicon and a rule-based sentiment analysis tool for social media text. The lexicon has been built manually, by aggregating ratings coming from 10 human annotators.
Its precision should be higher than the resources created automatically. Moreover, being specifically tuned for social media, it also covers emojis and abbreviations (e.g., “lmao”, “lol”) that other dictionaries normally don’t. 

The compound score is computed by summing the valence scores of each word in the lexicon, adjusted according to the rules, and then normalized to be between -1 (most extreme negative) and +1 (most extreme positive). This is the most useful metric if you we a single unidimensional measure of sentiment for a given sentence.

It is also useful for researchers who would like to set standardized thresholds for classifying sentences as either positive, neutral, or negative. Typical threshold values are:

1.   positive sentiment: compound score >= 0.05
2.   neutral sentiment: (compound score > -0.05) and (compound score < 0.05)
3.   negative sentiment: compound score <= -0.05

The compound score is the one most commonly used for sentiment analysis by most researchers, including the authors.

[VADER](https://github.com/cjhutto/vaderSentiment)

In [None]:
running = True
analyzer = SentimentIntensityAnalyzer()
posRatio=0

def basic_consume_loop(consumer, topics):
    try:
        posTweetSum=0 
        counter=1 #Tweets count
        unspecifiedCounter = 0
        consumer.subscribe(topics) #consumer subscribe to kafka topic
        print("start")#debug
        while running:
            msg = consumer.poll(timeout=2.0) #poll the msg from kafka
            if msg is None: break
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                print(20*'=') #debuging  
                print('Tweet Number : ', counter)
                tweet  = msg.value().decode('utf-8').split(" ") #convert msg to string, and make it list that
             
                strTweet = msg.value().decode('utf-8')
                cleanTweet = clean_text(strTweet) 
                vs = analyzer.polarity_scores(cleanTweet) #pull compund score fron VADER
                print(f"Clean Tweet :  {cleanTweet}")
                
                test_tweet = {'tweet':[cleanTweet]} 
                test_tweet_ = pd.DataFrame(test_tweet)
                test_tweet_ = sqlContext.createDataFrame(test_tweet_)
                pred = model_predict(test_tweet_) #Model Prediction
                print('model prediction : ', pred.first()['prediction'])
             
                counter=counter+1 
                
                ifPos = 1 if vs['compound'] >= 0.05 else 0 #Classify the tweet based on VADER
                ifNeutral = 1 if (vs['compound'] > -0.05 and vs['compound'] < 0.05) else 0
                ifNeg = 1 if vs['compound'] <= -0.05 else 0

                if pred.first()["prediction"] == 1.0 and ifPos == 1:
                    posTweetSum=posTweetSum+1
                    print("Tweet Sentimental = POSITIVE")
                    
                elif pred.first()["prediction"] == 0.0 and ifNeg == 1:
                    print("Tweet Sentimental = NEGATIVE")
                    
                elif ifNeutral == 1:
                    if pred.first()["prediction"] == 1.0:
                         posTweetSum=posTweetSum+1
                         print("Tweet Sentimental = POSITIVE")
                    else:
                         print("Tweet Sentimental = NEGATIVE") 
                
                elif (pred.first()["prediction"] == 1.0 and ifNeg == 1) or (pred.first()["prediction"] == 0.0 and ifNeg != 1) :
                    unspecifiedCounter = unspecifiedCounter + 1
                    print("Tweet Sentimental = UNSPECIFIED")
                     
                if (counter - unspecifiedCounter - 1) > 0:
                    posRatio = (posTweetSum / (counter - unspecifiedCounter - 1)) 
                else:
                    posRatio = 0
                    
                print(f"Positive Ratio  = {posRatio:.2f}")
                print(20*'=') #debuging     
                
                
                    
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()
    return posRatio
    
def shutdown():
    running = False

In [None]:
posRatio = basic_consume_loop(consumer, ['twitter'])

## Ratio decision - 
if we have more than 50% positive tweets, then the tweets on that given time and subject are mostly positive. The same goes for the negative and neutral tweets. 

In [None]:
if posRatio > 0.5:
    print("Based on the given subject , most of the tweets defined as positive")

elif posRatio < 0.5:
    print("Based on the given subject , most of the tweets defined as negative")
else: 
    print("Based on the given subject , most of the tweets defined as neutral")