In [1]:
import re
import pymongo
import findspark 
findspark.init()
import pyspark
from pyspark.sql.functions import col, udf
import pymongo_spark
pymongo_spark.activate()
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, CountVectorizer, IDFModel, StopWordsRemover
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier, RandomForestClassificationModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, ParamGridBuilder, TrainValidationSplit

In [2]:
spark = pyspark.sql.SparkSession.builder \
            .master("local[4]") \
            .appName('Testing Spark Mongo') \
            .getOrCreate()

In [5]:
path = '../../kafka_files/twitterstream_0.jsonl'
df = spark.read.json(path, columnNameOfCorruptRecord='Text')
df.printSchema()
df.show(1, truncate=False)

root
 |-- Text: string (nullable = true)

+----------------------------------------------------------------------------------------------------------------------------+
|Text                                                                                                                        |
+----------------------------------------------------------------------------------------------------------------------------+
|"\"Weekly Specials: Worn 2017 Air Jordan IV Retro Motorsport Sz 11 w/ Box In Good Wearable\\u2026 https://t.co/cgdDiIKaDR\""|
+----------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



In [6]:
def filter_ads(text):
    return 'https' not in text
ads_filter = udf(filter_ads, BooleanType())

In [7]:
ads_free = df.filter(ads_filter(df.Text))
ads_free.show()

+--------------------+
|                Text|
+--------------------+
|"\"@AmateurHookup...|
|"\"i can't even e...|
|"\"@McHavy @StayC...|
+--------------------+



In [8]:
def preprocess(text):
    words = re.sub("[^a-zA-Z]", " ", text).lower().split()
    return words
pp_udf = udf(preprocess, ArrayType(StringType()))
words = ads_free.withColumn('Words', pp_udf(ads_free.Text))
words.show()

+--------------------+--------------------+
|                Text|               Words|
+--------------------+--------------------+
|"\"@AmateurHookup...|[amateurhookup, f...|
|"\"i can't even e...|[i, can, t, even,...|
|"\"@McHavy @StayC...|[mchavy, staycare...|
+--------------------+--------------------+



In [9]:
#remove stop words
remover = StopWordsRemover(inputCol="Words", outputCol="filtered")
removed = remover.transform(words)
removed.show()


+--------------------+--------------------+--------------------+
|                Text|               Words|            filtered|
+--------------------+--------------------+--------------------+
|"\"@AmateurHookup...|[amateurhookup, f...|[amateurhookup, f...|
|"\"i can't even e...|[i, can, t, even,...|[even, explain, s...|
|"\"@McHavy @StayC...|[mchavy, staycare...|[mchavy, staycare...|
+--------------------+--------------------+--------------------+



In [12]:
params_path = '../../tmp/{}'

In [13]:
#Load trained hashing frequency and transform
hf_path = params_path.format('hf')
hashingTF = HashingTF.load(hf_path)
featureized = hashingTF.transform(removed)
featureized.show()

Py4JJavaError: An error occurred while calling o183.load.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/ubuntu/capstone/tmp/hf/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1332)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1326)
	at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1367)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1366)
	at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:379)
	at org.apache.spark.ml.util.DefaultParamsReader.load(ReadWrite.scala:322)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


In [67]:
#Load trained hashing frequency and transform
idf_path = params_path.format('idfmodel')
idfmodel = IDFModel.load(idf_path)
result = idfmodel.transform(featureized)
result.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                Text|               Words|            filtered|         rawFeatures|            features|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|"\"@AmateurHookup...|[amateurhookup, f...|[amateurhookup, f...|(200,[16,33,61],[...|(200,[16,33,61],[...|
|"\"i can't even e...|[i, can, t, even,...|[even, explain, s...|(200,[2,6,25,60,7...|(200,[2,6,25,60,7...|
|"\"@McHavy @StayC...|[mchavy, staycare...|[mchavy, staycare...|(200,[60,73,74,77...|(200,[60,73,74,77...|
+--------------------+--------------------+--------------------+--------------------+--------------------+



In [77]:
#load rf model and predict
rf_path = params_path.format('rf')
rf = RandomForestClassificationModel.load(rf_path)
prediction = rf.transform(result)
prediction.select('rawPrediction', 'prediction').show()

+--------------------+----------+
|       rawPrediction|prediction|
+--------------------+----------+
|[50.9842332206184...|       0.0|
|[55.7356588277157...|       0.0|
|[50.8181693836755...|       0.0|
+--------------------+----------+



In [87]:
path_to_save = '../tmp/twitterstream_test_prediction.json'
prediction.write.json(path_to_save)


In [88]:
test = spark.read.json(path_to_save)
test.printSchema()
test.show()

root
 |-- Text: string (nullable = true)
 |-- Words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: struct (nullable = true)
 |    |-- indices: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- size: long (nullable = true)
 |    |-- type: long (nullable = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- prediction: double (nullable = true)
 |-- probability: struct (nullable = true)
 |    |-- type: long (nullable = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |-- rawFeatures: struct (nullable = true)
 |    |-- indices: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- size: long (nullable = true)
 |    |-- type: long (nullable = true)
 |    |-- values: array (nullable = true)
 |   