# Twitter stream classification on #cassandra using Spark streaming

In [1]:
import time
from pyspark.streaming import StreamingContext
from pyspark.mllib.classification import LogisticRegressionModel
from pyspark.ml.feature import HashingTF, IDF, RegexTokenizer
from pyspark.sql import Row
import twitter
import dateutil.parser
import json
from pyspark.sql import functions as funs

In [3]:
def clean_dataframe(input_df):
    onlywords_df = input_df.select(funs.lower(
        funs.regexp_replace(
        input_df.text,'(\^|\?|\!|\$|\(|\)|\\b\\w{1}\\b\\s?|\"|\'|RT|:|\.|@[^\s]+|http[^\s]+|#|[0-9]+)','')).alias('text'),'id')
    return onlywords_df.select(funs.trim(funs.regexp_replace(onlywords_df.text,'(\+|_|-|\,|\s+)',' ')).alias('text'),'id')

In [5]:
def tokenization(input_df):
    tokenizer = RegexTokenizer(inputCol="text", outputCol="words")
    return tokenizer.transform(input_df)

In [6]:
class Tweet(dict):
    def __init__(self, tweet_in):
        super(Tweet, self).__init__(self)
            if tweet_in and tweet_in['lang']=='en':
                self['timestamp'] = dateutil.parser.parse(tweet_in[u'created_at']).replace(tzinfo=None).isoformat()
                self['text'] = tweet_in['text']
                self['hashtags'] = []
                for hash in tweet_in['entities']['hashtags']:
                    self['hashtags'].append(hash['text'])
                self['urls'] = [x for x in tweet_in['entities']['urls']]
                self['id'] = tweet_in['id']
                self['screen_name'] = tweet_in['user']['screen_name'].encode('utf-8')
                self['user_description'] = tweet_in['user']['description']
                self['user_id'] = tweet_in['user']['id']
            else:
                self['timestamp'] = ''
                self['text'] = 'No TEXT'
                self['hashtags'] = []
                self['urls'] = []
                self['id'] = tweet_in['id']
                self['screen_name'] = tweet_in['user']['screen_name']
                self['user_description'] = ''
                self['user_id'] = ''


## Fill this for twitter auth

In [7]:
def connect_twitter():
    twitter_stream = twitter.TwitterStream(auth=twitter.OAuth(
    token = "...",
    token_secret = "...",
    consumer_key = "...",
    consumer_secret = "..."))
    return twitter_stream


In [None]:
def combine(sv1,sv2):
    size = sv1.size + sv2.size
    max_ind = sv1.size
    indices1 = sv1.indices
    indices2 = [max_ind+x for x in sv2.indices]
    joined_indices = np.concatenate([indices1,indices2])
    values1 = sv1.values
    values2 = sv2.values
    joined_values = np.concatenate([values1,values2])
    return SparseVector(size,joined_indices,joined_values)

s1 = SparseVector(2,[0],[1])
s2 = SparseVector(3,[0,2],[1,1])
print combine(s1,s2)

In [None]:
def lower_word(hashtags):
    hashtags = [x.lower() for x in hashtags]
    return ' '.join(hashtags)

In [None]:
def feature_extraction_with_hash(input_df,number_hash,number_words):
    lower_hashtags = (input_df.rdd.map(lambda row:[lower_word(row['hashtags']),row['text'],row['id']])).toDF(['hashtags','text','id'])
  
    #tokenization
    tok = RegexTokenizer(inputCol="text", outputCol="words")
    tok_hash = RegexTokenizer(inputCol='hashtags',outputCol='tags')
  
    tokenized_test_words = tok.transform(lower_hashtags)
    tokenized_all = tok_hash.transform(tokenized_test_words)
  
    #hashingTF
    tags_hash = HashingTF(inputCol="tags", outputCol="features_tags", numFeatures=number_hash)
    words_hash = HashingTF(inputCol="words", outputCol="features_words", numFeatures=number_words)
    tf_feat = tags_hash.transform(tokenized_all)
    tf_all = words_hash.transform(tf_feat)
  
    #combine SparseVectors
    tf_joined = (tf_all.rdd.map(lambda row:[combine(row['features_tags'],row['features_words']),row['text'],row['hashtags'],row['id']])).toDF(['features','text','hashtags','id'])
  
    return tf_joined

In [8]:
def get_next_tweet(twitter_stream):
    #stream = twitter_stream.statuses.sample(block=True)
    stream = twitter_stream.statuses.filter(track="#cassandra")
    tweet_in = None
    while not tweet_in or 'delete' in tweet_in:
        tweet_in = stream.next()
    tweet_parsed = Tweet(tweet_in)
    print tweet_parsed
    return json.dumps(tweet_parsed)

In [9]:
from pyspark.sql import functions as funs
'''
def process(time,rdd):
  input_df = (rdd.map(lambda json_str:(json.loads(json_str))).map(lambda r:Row(r['text'],r['id']))).toDF(['text','id'])
  cleaned_df = clean_dataframe(input_df)
  #cleaned_df.show()
  tokenized = tokenization(cleaned_df)
  #tokenized.show()
  hashing_tf_model = HashingTF(inputCol="words", outputCol="features", numFeatures=1000)
  tfidf = hashing_tf_model.transform(tokenized)

  #idf = IDF(inputCol="rawFeatures", outputCol="features")
  #idf_model = idf.fit(hashed_tf_df)
  #tfidf = idf_model.transform(hashed_tf_df)
  #tfidf.show()

  lr_model = LogisticRegressionModel.load(sc,'/FileStore/tables/92zrseiu1471850178155/lr_model2.mdl')
  lr_model.clearThreshold()
  prediction = tfidf.rdd.map(lambda lp: (float(lr_model.predict(lp['features'])),lp['text']))
  print 'Prediction:',prediction.take(1)
  return prediction
'''

def getFeatures(time,rdd):
    input_df = (rdd.map(lambda json_str:(json.loads(json_str))).map(lambda r:Row(r['text'],r['id']))).toDF(['text','id'])
    cleaned_df = clean_dataframe(input_df)
    tokenized = tokenization(cleaned_df)
    hashing_tf_model = HashingTF(inputCol="words", outputCol="features", numFeatures=1000)
    tfidf = hashing_tf_model.transform(tokenized)
    tfidf.show()
  
    return tfidf.rdd

def getFeaturesWithTags(time,rdd):
    input_df = (rdd.map(lambda json_str:(json.loads(json_str))).map(lambda r:Row(r['text'],r['id'],r['hashtags']))).toDF(['text','id','hashtags'])
  
    cleaned_df = clean_dataframe(input_df)
    features = feature_extraction_with_hash(cleaned_df,400,1900)
    features.show()
    return features.rdd
  
def process_rdd_queue(twitter_stream,lr_model):
    # Create the queue through which RDDs can be pushed to
    # a QueueInputDStream
    rddQueue = []
  
    '''
    Wait 5 tweet then finish
    '''
    for i in range(1):
        rddQueue +=[ssc.sparkContext.parallelize([get_next_tweet(twitter_stream)], 5)]
  
    #(ssc.queueStream(rddQueue).transform(getFeatures).transform(lambda _, rdd:rdd).map(lambda row:(lr_model.predict(row['features']),row['text']))).pprint()
    #lines.pprint()
    (ssc.queueStream(rddQueue).transform(getFeaturesWithTags).transform(lambda _, rdd:rdd).map(lambda row:(lr_model.predict(row['features']),row['text']))).pprint()


## Change file path for loading model

In [10]:

ssc = StreamingContext(sc, 1)
# Instantiate the twitter_stream
twitter_stream = connect_twitter()
# Get RDD queue of the streams json or parsed
filename_model = '/FileStore/tables/oaohfspx1472031253448/trained_model.lrm'
lr_model = LogisticRegressionModel.load(sc,filename_model)
lr_model.clearThreshold()
process_rdd_queue(twitter_stream,lr_model)
ssc.start()
time.sleep(10)


In [11]:
ssc.stop(stopSparkContext=True, stopGraceFully=True)