# Start a Spark session 

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DT_1').getOrCreate()

# Load data from HDFS

In [2]:
row_df = spark.read.format("csv")\
         .option("header", "true")\
         .option("delimiter", "\t")\
         .load("hdfs://mycluster/user/dtree/data/train.tsv")

# Pre-preparation of data

In [14]:
#set up UDF Function
from pyspark.sql.functions import udf

def replace_question(x):
    return ("0" if x=="?" else x)

#transform replace_question into DataFrames UDF 
replace_question=udf(replace_question)

#####   the column names in row_df
'url',
 'urlid',
 'boilerplate',
 'alchemy_category',
 'alchemy_category_score',
 'avglinksize',
 'commonlinkratio_1',
 'commonlinkratio_2',
 'commonlinkratio_3',
 'commonlinkratio_4',
 'compression_ratio',
 'embed_ratio',
 'framebased',
 'frameTagRatio',
 'hasDomainLink',
 'html_ratio',
 'image_ratio',
 'is_news',
 'lengthyLinkDomain',
 'linkwordscore',
 'news_front_page',
 'non_markup_alphanum_characters',
 'numberOfLinks',
 'numwords_in_url',
 'parametrizedLinkRatio',
 'spelling_errors_ratio',
 'label'
 
 categorrical data -->alchemy category
 
 numerical data -->alchemy_category_score~spelling_errors_ratio

In [15]:
from pyspark.sql.functions import col 

df = row_df.select(
    ['url', 'alchemy_category']+  #transform string datatype to double ->numerical
    [replace_question(col(column)).cast("double").alias(column) 
    for column in row_df.columns[4:]])

# Split data into training set & test set

In [16]:
train_df, test_df = df.randomSplit([0.7, 0.3])

In [62]:
%%html
<img src="spark_pipeline.PNG", width=450, height=200>

# Build a pipeline

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

StringIndexer = StringIndexer(
                    inputCol='alchemy_category',
                    outputCol='alchemy_category_Index')

encoder = OneHotEncoderEstimator(dropLast=False,
                       inputCols=["alchemy_category_Index"],
                       outputCols=["alchemy_category_IndexVec"])

assemblerInputs=['alchemy_category_IndexVec']+row_df.columns[4:-1]
assembler = VectorAssembler(
            inputCols=assemblerInputs,
            outputCol="features")

dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

#to build models with different parameter set -> 2(impurity)*3(depth)*3(bins)=18 models
paramGrid = ParamGridBuilder()\
    .addGrid(dt.impurity, ["gini","entropy"])\
    .addGrid(dt.maxDepth, [5, 10, 15])\
    .addGrid(dt.maxBins, [10,15,20])\
    .build()

evaluator = BinaryClassificationEvaluator(
                rawPredictionCol="rawPrediction",
                labelCol="label",
                metricName="areaUnderROC")

#numFolds=3 ->two-third of training set is sub-training set, 
#one-third of training set is validation set
#each model will be validated for three times beacuse numfolds=3
#thus, the number of iteration will be 18*3=54 times
cv=CrossValidator(estimator=dt, evaluator=evaluator,
                  estimatorParamMaps=paramGrid, numFolds=3)

cv_pipeline = Pipeline(stages=[StringIndexer,encoder,assembler, cv])

# Create a pipeline model

In [22]:
#.fit -> to train a pipeline model 
cv_pipelineModel=cv_pipeline.fit(train_df)

In [26]:
type(cv_pipelineModel)

pyspark.ml.pipeline.PipelineModel

In [20]:
#CrossValidator will pick up the best model
cv_pipeline.getStages()

[StringIndexer_4354a6f57209c862fd95,
 OneHotEncoderEstimator_4e6bb53284d55eb85583,
 VectorAssembler_4b4ea7be58d97270b8c2,
 CrossValidator_443e9b76dee01c5d26ee]

# Check the best model 

In [57]:
bestModel=cv_pipelineModel.stages[3].bestModel

In [58]:
bestModel

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_413ab7a66b5b2713b18f) of depth 15 with 1377 nodes

In [61]:
#have a look on the parameter set for the best model
bestModel.extractParamMap()

{Param(parent='DecisionTreeClassifier_413ab7a66b5b2713b18f', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees.'): False,
 Param(parent='DecisionTreeClassifier_413ab7a66b5b2713b18f', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext'): 10,
 Param(parent='DecisionTreeClassifier_413ab7a66b5b2713b18f', name='featuresCol', doc='features column name'): 'features',
 Param(parent='DecisionTreeClassifier_413ab7a66b5b2713b18f', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'gini',
 Param(parent='DecisionTreeClassifier_413ab7a66b5b2713b18f', name='l

# Evaluate the model with AUC

In [25]:
# output a new DataFrame with predicted labels appended as a column
predictions = cv_pipelineModel.transform(test_df)
auc = evaluator.evaluate(predictions)
auc

0.6678613522590456

# Save and load the model

In [27]:
cv_pipelineModel.save("hdfs://mycluster/user/oracle/dt/model")

In [28]:
from pyspark.ml import Pipeline
reloaded_cv_model= Pipeline.load("hdfs://mycluster/user/oracle/dt/model")

In [29]:
reloaded_cv_model

PipelineModel_47dc9380f9f430fb2e42