In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('word2vec').getOrCreate()

In [2]:
import os.path
import zipfile

if not os.path.exists("unlabeledTrainData.tsv"):
    with zipfile.ZipFile("unlabeledTrainData.tsv.zip", "r") as zip_ref:
        zip_ref.extractall(".")

if not os.path.exists("labeledTrainData.tsv"):
    with zipfile.ZipFile("labeledTrainData.tsv.zip", "r") as zip_ref:
        zip_ref.extractall(".")

if not os.path.exists("testData.tsv"):
    with zipfile.ZipFile("testData.tsv.zip", "r") as zip_ref:
        zip_ref.extractall(".")

In [3]:
from html.parser import HTMLParser

class MLStripper(HTMLParser):
    def __init__(self):
        self.reset()
        self.strict = False
        self.convert_charrefs= True
        self.fed = []
    
    def handle_data(self, d):
        self.fed.append(d)
    
    def get_data(self):
        return ' '.join(self.fed)

In [4]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

class StripHtmlTags(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super().__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        def strip_tags(html):
            s = MLStripper()
            s.feed(html)
            return s.get_data()

        t = StringType()
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(strip_tags, t)(in_col))

In [5]:
# Copied from https://stackoverflow.com/a/32337101/512251
import nltk

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class NLTKWordPunctTokenizer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, stopwords=None):
        super().__init__()
        self.stopwords = Param(self, "stopwords", "")
        self._setDefault(stopwords=set())
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, stopwords=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setStopwords(self, value):
        self._paramMap[self.stopwords] = value
        return self

    def getStopwords(self):
        return self.getOrDefault(self.stopwords)

    def _transform(self, dataset):
        stopwords = self.getStopwords()

        def f(s):
            tokens = nltk.tokenize.wordpunct_tokenize(s)
            return [t for t in tokens if t.lower() not in stopwords]

        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

In [6]:
from pyspark.ml import Pipeline

colName = 'review'

stripper = StripHtmlTags(inputCol=colName, outputCol='strippedReview')
tokenizer = NLTKWordPunctTokenizer(inputCol='strippedReview', outputCol='tokens')
tokenizationPipeline = Pipeline(stages=[stripper, tokenizer])


In [7]:
supervisedTrain = spark.read.csv('./labeledTrainData.tsv', sep='\t', header='true', inferSchema='true')
supervisedTrain.printSchema()
supervisedTrain.show()
supervisedTrain.groupby('sentiment').count().show()

root
 |-- id: string (nullable = true)
 |-- sentiment: integer (nullable = true)
 |-- review: string (nullable = true)

+-------+---------+--------------------+
|     id|sentiment|              review|
+-------+---------+--------------------+
| 5814_8|        1|With all this stu...|
| 2381_9|        1|"The Classic War ...|
| 7759_3|        0|The film starts w...|
| 3630_4|        0|It must be assume...|
| 9495_8|        1|Superbly trashy a...|
| 8196_8|        1|I dont know why p...|
| 7166_2|        0|This movie could ...|
|10633_1|        0|I watched this vi...|
|  319_1|        0|A friend of mine ...|
|8713_10|        1|<br /><br />This ...|
| 2486_3|        0|What happens when...|
|6811_10|        1|Although I genera...|
|11744_9|        1|"Mr. Harvey Light...|
| 7369_1|        0|I had a feeling t...|
|12081_1|        0|note to George Li...|
| 3561_4|        0|Stephen King adap...|
| 4489_1|        0|`The Matrix' was ...|
| 3951_2|        0|Ulli Lommel's 198...|
|3304_10|        1|

We will use both supervised and unsupervised data to compute word embeddings.

In [9]:
from pyspark.ml.feature import Word2Vec, Word2VecModel

word2VecModel = None
word2VecModelPath = 'word2vec.model'
if not os.path.exists(word2VecModelPath):
    unsupervisedTrain = spark.read.csv('./unlabeledTrainData.tsv', sep='\t', header='true', inferSchema='true')
    unsupervisedDf = unsupervisedTrain.select('review')
    supervisedDf = supervisedTrain.select('review')
    df = unsupervisedDf.union(supervisedDf)
    df = tokenizationPipeline.fit(df).transform(df)
    df = df.drop('review', 'strippedReview')
    word2Vec = Word2Vec(vectorSize=200, seed=42, inputCol="tokens", outputCol="wordVectors")
    word2VecModel = word2Vec.fit(df)
    word2VecModel.save(word2VecModelPath)
else:
    word2VecModel = Word2VecModel.load(word2VecModelPath)

In [10]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf = RandomForestClassifier(labelCol='sentiment', featuresCol='wordVectors')
classificationPipeline = Pipeline(stages=[tokenizationPipeline, word2VecModel, rf])
grid = ParamGridBuilder().addGrid(rf.numTrees, [30])\
                         .addGrid(rf.maxDepth, [30])\
                         .build()
cv = CrossValidator(estimator=classificationPipeline, 
                    estimatorParamMaps=grid, 
                    evaluator=BinaryClassificationEvaluator(labelCol='sentiment'),
                    numFolds=10)

model = cv.fit(supervisedTrain)
supervisedTrain = model.transform(supervisedTrain)

evaluator = BinaryClassificationEvaluator(labelCol='sentiment')
r = evaluator.evaluate(supervisedTrain)
r

Py4JJavaError: An error occurred while calling o216.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 41.0 failed 1 times, most recent failure: Lost task 1.0 in stage 41.0 (TID 408, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.tree.impl.DTStatsAggregator.<init>(DTStatsAggregator.scala:77)
	at org.apache.spark.ml.tree.impl.RandomForest$$anonfun$12$$anonfun$13.apply(RandomForest.scala:541)
	at org.apache.spark.ml.tree.impl.RandomForest$$anonfun$12$$anonfun$13.apply(RandomForest.scala:537)
	at scala.Array$.tabulate(Array.scala:331)
	at org.apache.spark.ml.tree.impl.RandomForest$$anonfun$12.apply(RandomForest.scala:537)
	at org.apache.spark.ml.tree.impl.RandomForest$$anonfun$12.apply(RandomForest.scala:534)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:743)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:742)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:742)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:563)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:198)
	at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:139)
	at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:45)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.tree.impl.DTStatsAggregator.<init>(DTStatsAggregator.scala:77)
	at org.apache.spark.ml.tree.impl.RandomForest$$anonfun$12$$anonfun$13.apply(RandomForest.scala:541)
	at org.apache.spark.ml.tree.impl.RandomForest$$anonfun$12$$anonfun$13.apply(RandomForest.scala:537)
	at scala.Array$.tabulate(Array.scala:331)
	at org.apache.spark.ml.tree.impl.RandomForest$$anonfun$12.apply(RandomForest.scala:537)
	at org.apache.spark.ml.tree.impl.RandomForest$$anonfun$12.apply(RandomForest.scala:534)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 42682)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.6/socketserver.py", line 696, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/pyspark/serializers.py", line 685, in read_int
    raise EOFError
EOFError
----------------------------------------


In [None]:
test = spark.read.csv('./testData.tsv', sep='\t', header='true', inferSchema='true')
test.printSchema()
test.show()

In [None]:
test = model.transform(test)

In [None]:
test.show()

In [None]:
import os.path
from pyspark.sql.types import IntegerType

filePath = 'prediction.csv'
if not os.path.exists(filePath):
    test.select('id', 'prediction')\
        .coalesce(1)\
        .withColumn('sentiment', test['prediction'].cast(IntegerType()))\
        .drop('prediction')\
        .write.csv(filePath, header='true')