In [1]:
import nltk
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import keyword_only

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class NLTKWordPunctTokenizer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, stopwords=None):
        super(NLTKWordPunctTokenizer, self).__init__()
        self.stopwords = Param(self, "stopwords", "")
        self._setDefault(stopwords=set())
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, stopwords=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def setStopwords(self, value):
        self._paramMap[self.stopwords] = value
        return self

    def getStopwords(self):
        return self.getOrDefault(self.stopwords)

    def _transform(self, dataset):
        stopwords = self.getStopwords()

        def f(s):
            tokens = nltk.tokenize.wordpunct_tokenize(s)
            return [t for t in tokens if t.lower() not in stopwords]

        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

In [3]:
sc

<pyspark.context.SparkContext at 0x104a7c0d0>

In [4]:
sentenceDataFrame = sqlContext.createDataFrame([
  (0, "Hi I heard about Spark"),
  (0, "I wish Java could use case classes"),
  (1, "Logistic regression models are neat")
], ['label', 'sentence'])

tokenizer = NLTKWordPunctTokenizer(
    inputCol="sentence", outputCol="words",  
    stopwords=set(nltk.corpus.stopwords.words('english')))

tokenizer.transform(sentenceDataFrame).show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|    0|Hi I heard about ...|  [Hi, heard, Spark]|
|    0|I wish Java could...|[wish, Java, coul...|
|    1|Logistic regressi...|[Logistic, regres...|
+-----+--------------------+--------------------+



In [5]:
import loadFilesPartial as lfp
data,Y=lfp.loadLabeled("./data/train",10)
print len(data)

20


In [6]:
import numpy as np

In [7]:
labeledData = zip(data,[y.item() for y in Y])
labeledDataRdd = sc.parallelize(labeledData)
df = sqlContext.createDataFrame(labeledDataRdd, ['review', 'label'])

In [8]:
df.show()

+--------------------+-----+
|              review|label|
+--------------------+-----+
|Bromwell High is ...|  1.0|
|Homelessness (or ...|  1.0|
|Brilliant over-ac...|  1.0|
|This is easily th...|  1.0|
|This is not the t...|  1.0|
|This isn't the co...|  1.0|
|Yes its an art......|  1.0|
|In this "critical...|  1.0|
|THE NIGHT LISTENE...|  1.0|
|You know, Robin W...|  1.0|
|Story of a man wh...|  0.0|
|Airport '77 start...|  0.0|
|This film lacked ...|  0.0|
|Sorry everyone,,,...|  0.0|
|When I was little...|  0.0|
|"It appears that ...|  0.0|
|The second attemp...|  0.0|
|I don't know who ...|  0.0|
|This film is medi...|  0.0|
|The film is bad. ...|  0.0|
+--------------------+-----+



In [9]:
tokenizer = NLTKWordPunctTokenizer(
    inputCol="review", outputCol="words",  
    stopwords=set(nltk.corpus.stopwords.words('english')))

In [10]:
tokenizer.transform(df).show()

+--------------------+-----+--------------------+
|              review|label|               words|
+--------------------+-----+--------------------+
|Bromwell High is ...|  1.0|[Bromwell, High, ...|
|Homelessness (or ...|  1.0|[Homelessness, (,...|
|Brilliant over-ac...|  1.0|[Brilliant, -, ac...|
|This is easily th...|  1.0|[easily, underrat...|
|This is not the t...|  1.0|[typical, Mel, Br...|
|This isn't the co...|  1.0|[isn, ', comedic,...|
|Yes its an art......|  1.0|[Yes, art, ..., s...|
|In this "critical...|  1.0|[", critically, a...|
|THE NIGHT LISTENE...|  1.0|[NIGHT, LISTENER,...|
|You know, Robin W...|  1.0|[know, ,, Robin, ...|
|Story of a man wh...|  0.0|[Story, man, unna...|
|Airport '77 start...|  0.0|[Airport, ', 77, ...|
|This film lacked ...|  0.0|[film, lacked, so...|
|Sorry everyone,,,...|  0.0|[Sorry, everyone,...|
|When I was little...|  0.0|[little, parents,...|
|"It appears that ...|  0.0|[", appears, many...|
|The second attemp...|  0.0|[second, attempt,...|
