In [1]:
import os
import sys
spark_home = os.environ['SPARK_HOME'] = '/Users/liang/Downloads/spark-1.4.1-bin-hadoop2.6/'
if not spark_home:
    raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home,'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Python version 2.7.10 (default, May 28 2015 17:04:42)
SparkContext available as sc, HiveContext available as sqlContext.


In [2]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [3]:
# Prepare training documents, which are labeled.
LabeledDocument = Row("id", "text", "label")
training = sc.parallelize([(0, "a b c d e spark", 1.0),
                           (1, "b d", 0.0),
                           (2, "spark f g h", 1.0),
                           (3, "hadoop mapreduce", 0.0),
                           (4, "b spark who", 1.0),
                           (5, "g d a y", 0.0),
                           (6, "spark fly", 1.0),
                           (7, "was mapreduce", 0.0),
                           (8, "e spark program", 1.0),
                           (9, "a e c l", 0.0),
                           (10, "spark compile", 1.0),
                           (11, "hadoop software", 0.0)
                           ]) \
    .map(lambda x: LabeledDocument(*x)).toDF()
training.collect()

[Row(id=0, text=u'a b c d e spark', label=1.0),
 Row(id=1, text=u'b d', label=0.0),
 Row(id=2, text=u'spark f g h', label=1.0),
 Row(id=3, text=u'hadoop mapreduce', label=0.0),
 Row(id=4, text=u'b spark who', label=1.0),
 Row(id=5, text=u'g d a y', label=0.0),
 Row(id=6, text=u'spark fly', label=1.0),
 Row(id=7, text=u'was mapreduce', label=0.0),
 Row(id=8, text=u'e spark program', label=1.0),
 Row(id=9, text=u'a e c l', label=0.0),
 Row(id=10, text=u'spark compile', label=1.0),
 Row(id=11, text=u'hadoop software', label=0.0)]

In [4]:
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10) # L2 regularization
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [5]:
# Use a ParamGridBuilder to construct a grid of parameters to search over.
paramGrid = ParamGridBuilder() \
        .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
        .addGrid(lr.regParam, [0.1, 0.01]) \
        .build()

In [6]:
# Run cross-validation, and choose the best set of parameters.
crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=BinaryClassificationEvaluator(),
                              numFolds=2)  # use 3+ folds in practice
cvModel = crossval.fit(training)


In [7]:
# Prepare test documents, which are unlabeled.
Document = Row("id", "text")
test = sc.parallelize([(4, "spark i j k"),
                       (5, "l m n"),
                       (6, "spark hadoop spark"),
                       (7, "apache hadoop")]) \
    .map(lambda x: Document(*x)).toDF()

# Make predictions on test documents and print columns of interest.
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
    print(row)

sc.stop()

Row(id=4, text=u'spark i j k', prediction=1.0)
Row(id=5, text=u'l m n', prediction=0.0)
Row(id=6, text=u'spark hadoop spark', prediction=1.0)
Row(id=7, text=u'apache hadoop', prediction=0.0)
