In [1]:
import threading
import Queue
import time
import json
import re
import string
import numpy as np
from pyspark.streaming import StreamingContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer 

In [2]:
BATCH_INTERVAL = 10   # How frequently to update (seconds)
WINDOWS_LENGTH = 60   # the duration of the window
SLIDING_INTERVAL = 20 # the interval at which the window operation is performed

In [3]:
def get_json(myjson):
  try:
    json_object = json.loads(myjson)
  except ValueError, e:
    return False
  return json_object

In [4]:
def get_coord2(post):
    coord = tuple()
    try:
        if post['coordinates'] == None:
            coord = post['place']['bounding_box']['coordinates']
            coord = reduce(lambda agg, nxt: [agg[0] + nxt[0], agg[1] + nxt[1]], coord[0])
            coord = tuple(map(lambda t: t / 4.0, coord))
        else:
            coord = tuple(post['coordinates']['coordinates'])
    except TypeError:
        #print ('error get_coord')
        coord=(0,0)
    return coord

In [5]:
def doc2vec(document):
    doc_vec = np.zeros(100)
    tot_words = 0

    for word in document:
        try:
            vec = np.array(lookup_bd.value.get(word))
            if vec!= None:
                doc_vec +=  vec
                tot_words += 1
        except:
            continue

    #return(tot_words)

    return doc_vec / float(tot_words)

In [6]:
remove_spl_char_regex = re.compile('[%s]' % re.escape(string.punctuation)) # regex to remove special characters
stopwords=[u'rt', u're', u'i', u'me', u'my', u'myself', u'we', u'our', u'ours', u'ourselves', u'you', u'your', u'yours', u'yourself', u'yourselves', u'he', u'him', u'his', u'himself', u'she', u'her', u'hers', u'herself', u'it', u'its', u'itself', u'they', u'them', u'their', u'theirs', u'themselves', u'what', u'which', u'who', u'whom', u'this', u'that', u'these', u'those', u'am', u'is', u'are', u'was', u'were', u'be', u'been', u'being', u'have', u'has', u'had', u'having', u'do', u'does', u'did', u'doing', u'a', u'an', u'the', u'and', u'but', u'if', u'or', u'because', u'as', u'until', u'while', u'of', u'at', u'by', u'for', u'with', u'about', u'against', u'between', u'into', u'through', u'during', u'before', u'after', u'above', u'below', u'to', u'from', u'up', u'down', u'in', u'out', u'on', u'off', u'over', u'under', u'again', u'further', u'then', u'once', u'here', u'there', u'when', u'where', u'why', u'how', u'all', u'any', u'both', u'each', u'few', u'more', u'most', u'other', u'some', u'such', u'no', u'nor', u'not', u'only', u'own', u'same', u'so', u'than', u'too', u'very', u's', u't', u'can', u'will', u'just', u'don', u'should', u'now']

def tokenize(text):
    tokens = []
    text = text.encode('ascii', 'ignore') #to decode
    text=re.sub('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*(),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) # to replace url with ''
    text = remove_spl_char_regex.sub(" ",text)  # Remove special characters
    text=text.lower()


    for word in text.split():
        if word not in stopwords \
            and word not in string.punctuation \
            and len(word)>1 \
            and word != '``':
                tokens.append(word)
    return tokens

In [7]:
#To setup the streaming data
ssc = StreamingContext(sc, BATCH_INTERVAL)
#ssc.checkpoint("checkpoint")

# Create a DStream that will connect to hostname:port, like localhost:9999
dstream = ssc.socketTextStream("localhost", 8889)
dstreamwin=dstream.window(WINDOWS_LENGTH, SLIDING_INTERVAL)

dstream_tweets=dstream.map(lambda post: get_json(post))\
     .filter(lambda post: post != False)\
     .filter(lambda post: 'created_at' in post)\
     .map(lambda post: (get_coord2(post)[0],get_coord2(post)[1],post["text"]))\
     .filter(lambda tpl: tpl[0] != 0)\
     .filter(lambda tpl: tpl[2] != '')\
     .map(lambda tpl: (tpl[0],tpl[1],tokenize(tpl[2])))\
     .map(lambda tpl:(tpl[0],tpl[1],tpl[2],doc2vec(tpl[2])))
    
dstream_tweets.pprint()

In [8]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2016-10-27 12:25:40
-------------------------------------------
(-105.14858050000001, 20.697052499999998, ['important', 'conversation', 'include', 'sexual', 'intimacy', 'vicecanada'], array([ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan]))
(-76.48447368, 44.23345825, ['ice', 'wolves', 'girls', 'getting', 'ready', 'go', 'ice', 'grind

KeyboardInterrupt: 