In [1]:
#    Spark
from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json
#    PyMongo
import pymongo
#    Time
import datetime

## Preparing Database

In [2]:
MONGODB_URI = "mongodb://localhost:27017/query2"
tweets_collection = "predictions"

client = pymongo.MongoClient(MONGODB_URI)
parsed_dburi = pymongo.uri_parser.parse_uri(MONGODB_URI)
#       Database
db = client[parsed_dburi['database']]

#       Collection
tweets_col = db[tweets_collection]
#       Index
#tweets.create_index("tweet_text", unique=True)
tweets_col.create_index("created_at")

u'created_at_1'

## Create Streaming Context

In [3]:
zkQuorum = "localhost:2181"
topic = "twitter-stream"
batch_interval = 1
window_length = 15 * batch_interval
frequency = 6 * batch_interval
ssc = StreamingContext(sc, batch_interval)

tweets = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})

## Parse the inbound message as json

In [4]:
parsed = tweets.map(lambda v: json.loads(v[1]))
#parsed = parsed.filter(lambda tweet: tweet['geo']['coordinates'] != None)
#parsed.filter(lambda tweet: tweet['metadata']['place']['country'] == 'United States')
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()

## Extract Text from each tweet

In [5]:
text_dstream = parsed.map(lambda tweet: tweet['text'])
#text_dstream.pprint()

## Extract time from each tweet (edit)

In [None]:
time_dstream = parsed.map(lambda tweet: tweet['created_at'])

## Filter tweets by location (edit)

In [None]:
#-125.0011, 24.9493, -66.9326, 49.5904

## Process the extracted text

In [6]:
import nltk
import csv
import string
import re
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords

In [7]:
def filter_out_unicode(tweet):
  
    try:
        clean_tweet = str(tweet)
    except UnicodeEncodeError:
        pass
    return clean_tweet

def expand_around_chars(text, characters):
    for char in characters:
        text = text.replace(char, ' ' + char + ' ')
    return text

def strip_quotations_newline(text):
    clean_tweet = ' '.join(text.split())
    clean_tweet = clean_tweet.encode('utf-8')
    clean_tweet = clean_tweet.replace('",\'','')
    clean_tweet = re.sub(r'''(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’]))''', "", clean_tweet)
    clean_tweet = re.sub(r'''(@[A-Za-z0-9]+)''', "", clean_tweet)
    clean_tweet = re.sub("([0-9]+)", "", clean_tweet)
    clean_tweet = re.sub(r'[^\x00-\x7F]+','', clean_tweet)
    return clean_tweet

def split_text(text):
    text = strip_quotations_newline(text)
    text = expand_around_chars(text, '\/".,()&[]{}:;!-_\'')
    splitted_text = text.split(' ')
    cleaned_text = [x for x in splitted_text if len(x) > 2]
    text_lowercase = [x.lower() for x in cleaned_text]
    return text_lowercase

In [8]:
mess = text_dstream.map(lambda text: ' '.join(split_text(text)))

In [1]:
mess

NameError: name 'mess' is not defined

## Load feature extraction pipeline

In [9]:
from pyspark.mllib.classification import SVMModel

In [10]:
from pyspark.ml import PipelineModel

In [11]:
from pyspark.ml import linalg as ml_linalg

def as_mllib(v):
    if isinstance(v, ml_linalg.SparseVector):
        return MLLibVectors.sparse(v.size, v.indices, v.values)
    elif isinstance(v, ml_linalg.DenseVector):
        return MLLibVectors.dense(v.toArray())
    else:
        raise TypeError("Unsupported type: {0}".format(type(v)))

## SparkStreaming

In [12]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession, DataFrameWriter
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col, monotonically_increasing_id
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.linalg import Vector as MLVector, Vectors as MLVectors
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors
from pyspark.mllib.regression import LabeledPoint

def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']

words = mess
# Convert RDDs of the words DStream to DataFrame and run SQL query
def process(time, rdd):

    # Get the singleton instance of SparkSession
    if not(rdd.isEmpty() and rdd != None):
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(tweet_text=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame.
        wordsDataFrame.createOrReplaceTempView("tweets")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = \
            spark.sql("select tweet_text from tweets")
        pipeModel = PipelineModel.load("target/tmp/pythonHashingTFModel_new")
        # Extract features
        tfidfData = pipeModel.transform(wordsDataFrame)
        #tfidfData.show()
        doo = tfidfData.select("features").rdd
        doo_data = doo.map(lambda y: LabeledPoint(0, as_mllib(y[0])))
        doo_df = spark.createDataFrame(doo_data)
        # Trained model 
        sameModel = SVMModel.load(sc, "target/tmp/pythonSVMWithSGDModel_new")
        doo_lab = doo_data.map(lambda p: (p.label, sameModel.predict(p.features)))
        doo_label = spark.createDataFrame(doo_lab)
        fin = doo_label.selectExpr("_1 as label", "_2 as prediction")
        preds = fin.select("prediction")
        text = wordCountsDataFrame.withColumn("id", monotonically_increasing_id())
        preds = preds.withColumn("id", monotonically_increasing_id())
        text_preds = preds.join(text, "id", "outer").drop("id")
        
        ## Running not removing duplicates though
        #text_preds.dropDuplicates(['tweet_text'])
        
        text_preds.write.format("com.stratio.datasource.mongodb").mode('append').options(host='localhost:27017',database='query2', collection='predictions').save()
        #tweets_col.update_many({}, { '$set' : { "created_at" : datetime.datetime.utcnow() } }, True, True)
        
        #n_0 = text_preds.filter(text_preds.prediction != 1).count()
        text_preds.show(40, truncate=False)

words.foreachRDD(process)
ssc.start()
ssc.awaitTermination(timeout=60)


-------------------------------------------
Time: 2017-10-04 13:55:56
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 13:55:57
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 13:55:58
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 13:55:59
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 13:56:00
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 13:56:01
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 13:56:02
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 13:56:03
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 13:56:04
----------

In [None]:
dfView = spark.read.format('com.stratio.datasource.mongodb').options(host='localhost:27017', database='query2', collection='predictions').load()
dfView.show(truncate=False)

In [None]:
tweets_col.count({"prediction" : 1})

In [None]:
tweets_col.delete_many({})