### Kafka & Spark Streaming 

In [1]:
import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'  

from pyspark import SparkContext  
from pyspark.streaming import StreamingContext  
from pyspark.streaming.kafka import KafkaUtils 
import happybase
from bs4 import BeautifulSoup
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import avro.schema
import io, random
from avro.io import DatumReader
from textblob import TextBlob

#### Clean a tweet

In [2]:
def clean_tweet(text):
    pat1 = r'@[A-Za-z0-9]+'
    pat2 = r'https?://[A-Za-z0-9./]+'
    combined_pat = r'|'.join((pat1, pat2))
    soup = BeautifulSoup(text, 'lxml')
    souped = soup.get_text()
    stripped = re.sub(combined_pat, '', souped)
    try:
        clean = stripped.decode("utf-8-sig").replace(u"\ufffd", "?")
    except:
        clean = stripped
        
    letters_only = re.sub("[^a-zA-Z]", " ", clean)
    lower_case = letters_only.lower()
    lower_case = lower_case.replace("rt", "")
    words = word_tokenize(lower_case)
    
    stop_words = set(stopwords.words('english'))
    filtered_words = []
    
    for w in words:
        if w not in stop_words:
            filtered_words.append(w)
        
    return " ".join(filtered_words).strip()

#### Decode a tweet (Avro)

In [3]:
def decode_tweet(msg):
    schema_path ="twitter.avsc"
    schema = avro.schema.Parse(open(schema_path).read())
    bytes_reader = io.BytesIO(msg)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    tweet = reader.read(decoder)
    
    return tweet

#### Save a tweet to HBase

In [4]:
server = "localhost"
table_name = "tweets"

def save_tweet(tweet):
    connection = happybase.Connection(server)
    tweets_table = connection.table('tweets')
    neg_count_table = connection.table('neg_counter')
    pos_count_table = connection.table('pos_counter')
    
    words = word_tokenize(tweet['cleaned_text'])
    
    for w in words:
        if tweet['target'] == 'positive':
            pos_count_table.counter_inc(w, b'info:counter', value=1)
        elif tweet['target'] == 'negative':
            neg_count_table.counter_inc(w, b'info:counter', value=1)
            
    tweets_table.put(str(tweet["id"]), {b'tweet:text': tweet["text"], b'tweet:cleaned_text': tweet["cleaned_text"], b'tweet:target': tweet["target"]})

### Classify a tweet

In [5]:
def predict_tweet(text):
    analysis = TextBlob(text)
    
    if analysis.sentiment.polarity > 0:
        return 'positive'
    elif analysis.sentiment.polarity == 0:
        return 'neutral'
    else:
        return 'negative'

In [6]:
def process_tweets(tweets):
    if tweets.isEmpty():
        return
    
    for tweet in tweets.collect():
        tweet["cleaned_text"] = clean_tweet(tweet["text"])
        tweet["target"] = predict_tweet(tweet["cleaned_text"])
        save_tweet(tweet)

In [None]:
batch_duration = 10 # 10 seconds
topic = 'twitter' # kafka topic

sc = SparkContext("local[*]", "SentimentAnalysisWithSpark")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, batch_duration) 
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {topic: 1}, valueDecoder=decode_tweet)

parsed = kafkaStream.map(lambda t: t[1])
parsed.foreachRDD(process_tweets)

ssc.start()  
ssc.awaitTermination()

  ' that document to Beautiful Soup.' % decoded_markup
  ' that document to Beautiful Soup.' % decoded_markup
