In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('twitter-sentiment').getOrCreate()
spark.sparkContext.addPyFile("/home/jovyan/work/sentiment_model.py")
print("Spark context started")

Spark context started


In [2]:
spark.sparkContext.addPyFile?

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType

schema = StructType([
    StructField("target", IntegerType(), True),
    StructField("id", LongType(), True),
    StructField("raw_timestamp", StringType(), True),
    StructField("query_status", StringType(), True),
    StructField("author", StringType(), True),
    StructField("tweet", StringType(), True)
])
    
data_path = "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

raw_sentiment = spark.read.csv(data_path,header=False,schema=schema) \
    .selectExpr("(case when target=4 then 1 else 0 end) as target","tweet")



raw_sentiment.groupBy("target").count().show()

+------+------+
|target| count|
+------+------+
|     1|800000|
|     0|800000|
+------+------+



In [4]:
import pickle as pkl

def read_model(model_path):
    with open(model_path,'rb') as buffer:
        return pkl.load(buffer)

model_path = "/home/jovyan/models/tweet_sentiment.mdl"

model_object = read_model(model_path)
model_object

Pipeline(steps=[('tfidf',
                 TfidfVectorizer(max_df=0.75,
                                 preprocessor=<function preprocessor at 0x7f83114cb790>,
                                 tokenizer=<function tokenizer at 0x7f830b896040>)),
                ('clf', RandomForestClassifier(max_depth=8))])

In [5]:
# raw_sentiment.summary()
# raw_sentiment.columns
# raw_sentiment.dtypes
raw_sentiment.printSchema()

root
 |-- target: integer (nullable = false)
 |-- tweet: string (nullable = true)



In [6]:
# raw_sentiment.rdd?
raw_sentiment.rdd.mapPartitions?

In [7]:
model_object_broadcast = spark.sparkContext.broadcast(model_object)
# model_object_broadcast?
# model_object_broadcast.value

In [8]:
model_object_broadcast.value

Pipeline(steps=[('tfidf',
                 TfidfVectorizer(max_df=0.75,
                                 preprocessor=<function preprocessor at 0x7f83114cb790>,
                                 tokenizer=<function tokenizer at 0x7f830b896040>)),
                ('clf', RandomForestClassifier(max_depth=8))])

In [9]:
model_object_broadcast = spark.sparkContext.broadcast(model_object)

def block_iterator(iterator, size):
    bucket = list()
    for e in iterator:
        bucket.append(e)
        if len(bucket) >= size:
            yield bucket
            bucket = list()
    if bucket:
        yield bucket

def block_classify(iterator):
    model = model_object_broadcast.value
    for features in block_iterator(iterator, 10000):
        import pandas as pd
        import json
        features_df = pd.DataFrame(features, columns=["target","text"])
        pred = model.predict_proba(features_df["text"])
        res_df = features_df
        res_df["proba"] = pred[:,1]
        for e in json.loads(res_df.to_json(orient='records')):
            yield e

predicted_df = raw_sentiment.rdd.mapPartitions(block_classify).toDF()

predicted_df.show()



+------------+------+--------------------+
|       proba|target|                text|
+------------+------+--------------------+
|0.5326245799|     0|@switchfoot http:...|
|0.4810538855|     0|is upset that he ...|
|0.5013524962|     0|@Kenichan I dived...|
|0.4833376119|     0|my whole body fee...|
|0.4949966852|     0|@nationwideclass ...|
|0.4947119967|     0|@Kwesidei not the...|
|0.4906158438|     0|         Need a hug |
|0.5147333489|     0|@LOLTrish hey  lo...|
|0.4775261167|     0|@Tatiana_K nope t...|
|0.5013524962|     0|@twittera que me ...|
|0.5013524962|     0|spring break in p...|
|0.5013524962|     0|I just re-pierced...|
|0.4892071449|     0|@caregiving I cou...|
|0.4580293207|     0|@octolinz16 It it...|
|0.4567889877|     0|@smarrison i woul...|
|0.4551137473|     0|@iamjazzyfizzle I...|
|0.4678241956|     0|Hollis' death sce...|
|0.4989632523|     0|about to file taxes |
|0.5158630215|     0|@LettyA ahh ive a...|
|0.5063556572|     0|@FakerPattyPattz ...|
+----------

In [None]:
# predicted_df?
# predicted_df.columns
# predicted_df.dtypes
# predicted_df.printSchema()
# print((predicted_df.count(), len(predicted_df.columns)))

In [10]:
spark.stop()