In [11]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
#from pyspark.ml.feature import Tokenizer
#from pyspark.ml.feature import StopWordsRemover
#from pyspark.ml.feature import HashingTF
#from pyspark.ml.feature import IDF
from pyspark.ml.feature import SQLTransformer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.sql.types import *

In [12]:
# input and output folders
trainingData = "data/Ex51/data//trainingData.csv"
unlabeledData = "data/Ex51/data//unlabeledData.csv"
outputPath = "res_ex51v2/"

In [13]:
# *************************
# Training step
# *************************

# Create a DataFrame from trainingData.csv
# Training data in raw format
trainingData = spark.read.load(trainingData,\
                     format="csv",\
                     header=True,\
                     inferSchema=True)

In [14]:
trainingData.printSchema()
trainingData.show()

root
 |-- label: integer (nullable = true)
 |-- text: string (nullable = true)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    1|The Spark system ...|
|    1|Spark is a new di...|
|    0|Turin is a beauti...|
|    0|Turin is in the n...|
+-----+--------------------+



In [15]:
# Define a Python function that returns the number of words occurring in the input string
def countWords(text):
    return len(text.split(" "))

In [16]:
# Register a UDF function associated with countWords
# We explicitly report also the data type of the returned value
spark.udf.register("countWords", countWords, IntegerType())

<function __main__.countWords(text)>

In [17]:
# Define a Python function that checks if the input string contain the work "Spark"
def containsSpark(text):
    return text.find("Spark")>=0

In [18]:
# Register a UDF function associated with containsSpark
# We explicitly report also the data type of the returned value
spark.udf.register("containsSpark", containsSpark, BooleanType())

<function __main__.containsSpark(text)>

In [19]:
# Create an SQLTransformer to add two column to the input dataframe:
# numLines and SparkWord
sqlTrans = SQLTransformer(statement="""SELECT *,
countWords(text) AS numLines,
containsSpark(text) AS SparkWord
FROM __THIS__""")

In [20]:
# Use an assembler to combine "numLines" and "SparkWord" in a Vector
assembler = VectorAssembler(inputCols=["numLines", "SparkWord"],\
                            outputCol="features")

In [21]:
# Create a classification model based on the logistic regression algorithm
# We can set the values of the parameters of the 
# Logistic Regression algorithm using the setter methods.
lr = LogisticRegression()\
.setMaxIter(10)\
.setRegParam(0.01)

In [22]:
# Define the pipeline that is used to create the logistic regression
# model on the training data.
# In this case the pipeline is composed of five steps
# - text tokenizer
# - stopword removal
# - TF-IDF computation (performed in two steps)
# - Logistic regression model generation
pipeline = Pipeline().setStages([sqlTrans, assembler, lr])

In [24]:
# Execute the pipeline on the training data to build the 
# classification model
classificationModel = pipeline.fit(trainingData)

# Now, the classification model can be used to predict the class label
# of new unlabeled data

In [25]:
# *************************
# Prediction  step
# *************************
# Read unlabeled data
# Create a DataFrame from unlabeledData.csv
# Unlabeled data in raw format
unlabeledData = spark.read.load(unlabeledData,\
                     format="csv", header=True, inferSchema=True)

In [26]:
unlabeledData.printSchema()
unlabeledData.show()

root
 |-- label: string (nullable = true)
 |-- text: string (nullable = true)

+-----+--------------------+
|label|                text|
+-----+--------------------+
| null|Spark performs be...|
| null|Comparison betwee...|
| null|Turin is in Piedmont|
+-----+--------------------+



In [27]:
# Make predictions on unlabeled documents by using the 
# Transformer.transform() method.
# The transform will only use the 'features' columns
predictionsDF = classificationModel.transform(unlabeledData)

In [28]:
predictionsDF.printSchema()
predictionsDF.show()

root
 |-- label: string (nullable = true)
 |-- text: string (nullable = true)
 |-- numLines: integer (nullable = true)
 |-- SparkWord: boolean (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

+-----+--------------------+--------+---------+---------+--------------------+--------------------+----------+
|label|                text|numLines|SparkWord| features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------+---------+---------+--------------------+--------------------+----------+
| null|Spark performs be...|       5|     true|[5.0,1.0]|[-3.1272480248757...|[0.04199718899423...|       1.0|
| null|Comparison betwee...|       5|     true|[5.0,1.0]|[-3.1272480248757...|[0.04199718899423...|       1.0|
| null|Turin is in Piedmont|       4|    false|[4.0,0.0]|[3.19966999960023...|[0.96082185681571...|       0.0|
+

In [29]:
# The returned DataFrame has the following schema (attributes)
# |-- label: string (nullable = true)
# |-- text: string (nullable = true)
# |-- words: array (nullable = true)
# |    |-- element: string (containsNull = true)
# |-- filteredWords: array (nullable = true)
# |    |-- element: string (containsNull = true)
# |-- rawFeatures: vector (nullable = true)
# |-- features: vector (nullable = true)
# |-- rawPrediction: vector (nullable = true)
# |-- probability: vector (nullable = true)
# |-- prediction: double (nullable = false)

# Select only the original features (i.e., the value of the original text attribute) and 
# the predicted class for each record
predictions = predictionsDF.select("text", "prediction")

In [30]:
predictions.printSchema()
predictions.show(truncate=False)

root
 |-- text: string (nullable = true)
 |-- prediction: double (nullable = false)

+-----------------------------------+----------+
|text                               |prediction|
+-----------------------------------+----------+
|Spark performs better than Hadoop  |1.0       |
|Comparison between Spark and Hadoop|1.0       |
|Turin is in Piedmont               |0.0       |
+-----------------------------------+----------+



In [31]:
# Save the result in an HDFS output folder
predictions.write.csv(outputPath, header="true")