# Table of Contents  
[**I. Mock data and model**](#Mock-data-and-model)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[I. example data](#example-data)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[II. pipeline](#pipeline)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[III. create PipelineModel](#create-PipelineModel)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[IV. apply the model](#apply-the-model)  
[**II. pipeline utility**](#pipeline-utility)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[I. getallstages():  check Pipeline and PipelineModel](#getallstages%28%29:--check-Pipeline-and-PipelineModel)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[I. check a stage](#check-a-stage)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[II. update a stage](#update-a-stage)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[II. getCode(): generate the code for creating the  ml object](#getCode%28%29:-generate-the-code-for-creating-the--ml-object)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[III. convert  transformer to estimator](#convert--transformer-to-estimator)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[I. model_to_estimator()](#model_to_estimator%28%29)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[II. pm_to_p(): covert PipelineModel to Pipeline](#pm_to_p%28%29:-covert-PipelineModel-to-Pipeline)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[IV. Other function](#Other-function)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[I. flatenStages()](#flatenStages%28%29)  
[**III. LogisticRegressionModel utility**](#LogisticRegressionModel-utility)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[I. extract_feature_name()](#extract_feature_name%28%29)  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[II. feature_importance()](#feature_importance%28%29)  

In [1]:
#spark.stop()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

# Mock data and model

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.feature import SQLTransformer
from pyspark.ml.feature import RFormula

In [4]:
import pandas as pd
pd.set_option('display.max_colwidth', 100)
pd.set_option('display.max_rows', 500000)
pd.set_option('display.max_colwidth', None)
pd.set_option("display.max_columns",5000)

### example data

In [5]:
# create dataframe
training = spark.createDataFrame([
    (0,'y', "a b c d e spark", 1.0),
    (1,'y', "b d", 0.0),
    (2, None, "spark f g h", 1.0),
    (3, 'n',"hadoop mapreduce", 0.0)
], ["id",'category', "text", "label"])

### pipeline

In [6]:
#process 'category' column
category_process=SQLTransformer(statement="""select *, coalesce(category, 'unknown') category_fillNA 
                                            from __THIS__ """)

In [7]:
#text_process: a pipeline , process text column
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="text_vector",numFeatures=16)
text_process=Pipeline(stages=[tokenizer, hashingTF])

In [8]:
features_assemble=RFormula(formula="~category_fillNA+text_vector",featuresCol='features',handleInvalid='keep')

In [9]:
lr = LogisticRegression(maxIter=5, regParam=0.001)

In [10]:
#put together into a pipeline
pipeline = Pipeline(stages=[category_process, text_process,features_assemble, lr])

### create PipelineModel

In [11]:
model = pipeline.fit(training)

### apply the model

In [12]:
training_pred=model.transform(training)

In [13]:
#import sys
#sys.path.insert(0, '/home/c07520/work/Users/c07520/sparkEXample/create_package/base_spark_ML_utils/')

# pipeline utility

In [14]:
import spark_ml_utils.pipeline_util as pu


### getallstages():  check Pipeline and PipelineModel
In practice, Pipeline and Pipelne Model could contain many stages. the getStages() function will list all the stages for easy check.

In [15]:
#use native method getStages(), not much information
pipeline.getStages()

[SQLTransformer_61223a998f0c,
 Pipeline_2458ce200fec,
 RFormula_7f1108f03d1d,
 LogisticRegression_6ff8c810fb72]

In [16]:
pu.getallstages(pipeline,'pipeline')

This is a Pipeline 


Unnamed: 0,estimator,estimator_name,inputcol,outputcol,other_parameters
0,pipeline.getStages()[0],SQLTransformer,,,"""statement=\nselect *, coalesce(category, 'unknown') category_fillNA \n from __THIS__ """
1,pipeline.getStages()[1].getStages()[0],Tokenizer,text,words,
2,pipeline.getStages()[1].getStages()[1],HashingTF,words,text_vector,
3,pipeline.getStages()[2],RFormula,,features,number of inputCol in formula: 2
4,pipeline.getStages()[3],LogisticRegression,,,


In [17]:
#similar for PipelineModel
pu.getallstages(model,'model')

This is a PipelineModel 


Unnamed: 0,transformer,transformer_name,inputcol,outputcol,other_parameters
0,model.stages[0],SQLTransformer,,,"""statement=\nselect *, coalesce(category, 'unknown') category_fillNA \n from __THIS__ """
1,model.stages[1].stages[0],Tokenizer,text,words,
2,model.stages[1].stages[1],HashingTF,words,text_vector,
3,model.stages[2],RFormulaModel,,features,number of inputCol in formula: 2
4,model.stages[3],LogisticRegressionModel,,,"labelCol : label, elasticNetParam : 0.0, regParam : 0.001"


#### check a stage

In [18]:
type(pipeline.getStages()[1].getStages()[1])

pyspark.ml.feature.HashingTF

In [19]:
pipeline.getStages()[1].getStages()[1].getNumFeatures()

16

In [43]:
pipeline.getStages()[1].getStages()[1].extractParamMap()

{Param(parent='HashingTF_40b6726a435c', name='outputCol', doc='output column name.'): 'text_vector',
 Param(parent='HashingTF_40b6726a435c', name='numFeatures', doc='number of features.'): 16,
 Param(parent='HashingTF_40b6726a435c', name='binary', doc='If True, all non zero counts are set to 1. This is useful for discrete probabilistic models that model binary events rather than integer counts. Default False.'): False,
 Param(parent='HashingTF_40b6726a435c', name='inputCol', doc='input column name.'): 'words'}

#### update a stage

In [20]:
pipeline_update=pipeline.copy()

In [21]:
pipeline_update.getStages()[1].getStages()[1].setNumFeatures(256)

HashingTF_40b6726a435c

In [22]:
pipeline_update.getStages()[1].getStages()[1].getNumFeatures()

256

### getCode(): generate the code for creating the  ml object
Can be used to check the detailed parameters of a ml object, easier to read than extractParamMap() method. In addition, a modified object can be created by modifying  the code directly ,then running the code. 

In [23]:
pstr=pu.getCode(pipeline,'pipeline2',) #pstr is a string , same as the follwing printout, containing all the code for creating the pipeline

from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import LogisticRegression

pipeline2=Pipeline(stages=[
########################################stage0
SQLTransformer(statement="""select *, coalesce(category, 'unknown') category_fillNA 
                                            from __THIS__ """)

,########################################stage1
Tokenizer(outputCol="words",inputCol="text")

,########################################stage2
HashingTF(numFeatures=16,outputCol="text_vector",inputCol="words")

,########################################stage3
RFormula(featuresCol="features",handleInvalid="keep",formula="~category_fillNA+text_vector")

,########################################stage4
LogisticRegression(maxIter=5,regParam=0.001)
])


In [24]:
#run the code 
exec(pstr)

In [25]:
#pipeline2 contains same stages as pipeline, although it is flatten.
pu.getallstages(pipeline2,'pipeline2')

This is a Pipeline 


Unnamed: 0,estimator,estimator_name,inputcol,outputcol,other_parameters
0,pipeline2.getStages()[0],SQLTransformer,,,"""statement=\nselect *, coalesce(category, 'unknown') category_fillNA \n from __THIS__ """
1,pipeline2.getStages()[1],Tokenizer,text,words,
2,pipeline2.getStages()[2],HashingTF,words,text_vector,
3,pipeline2.getStages()[3],RFormula,,features,number of inputCol in formula: 2
4,pipeline2.getStages()[4],LogisticRegression,,,


In [26]:
#for PipelineModel, getcode() return the code for its corresponding pipeline
_=pu.getCode(model,'pipeline3')

from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import LogisticRegression

pipeline3=Pipeline(stages=[
########################################stage0
SQLTransformer(statement="""select *, coalesce(category, 'unknown') category_fillNA 
                                            from __THIS__ """)

,########################################stage1
Tokenizer(outputCol="words",inputCol="text")

,########################################stage2
HashingTF(numFeatures=16,outputCol="text_vector",inputCol="words")

,########################################stage3
RFormula(featuresCol="features",handleInvalid="keep",formula="~category_fillNA+text_vector")

,########################################stage4
LogisticRegression(maxIter=5,regParam=0.001)
])


In [27]:
#it also work for any ML estimator and transformer
_=pu.getCode(pipeline.getStages()[2],'obj')

from pyspark.ml.feature import RFormula

obj=RFormula(featuresCol="features",handleInvalid="keep",formula="~category_fillNA+text_vector")


In [28]:
_=pu.getCode(model.stages[1].stages[1],'obj')

from pyspark.ml.feature import HashingTF

obj=HashingTF(numFeatures=16,outputCol="text_vector",inputCol="words")


### convert  transformer to estimator

#### model_to_estimator()

In [29]:
lrm=model.stages[3]

In [30]:
lrm

LogisticRegressionModel: uid = LogisticRegression_6ff8c810fb72, numClasses = 2, numFeatures = 19

In [31]:
lr=pu.model_to_estimator(lrm)

In [32]:
lr

LogisticRegression_47e73eca2d76

#### pm_to_p(): covert PipelineModel to Pipeline

In [33]:
pipeline4=pu.pm_to_p(model)

In [34]:
pu.getallstages(pipeline4,'pipeline4')

This is a Pipeline 


Unnamed: 0,estimator,estimator_name,inputcol,outputcol,other_parameters
0,pipeline4.getStages()[0],SQLTransformer,,,"""statement=\nselect *, coalesce(category, 'unknown') category_fillNA \n from __THIS__ """
1,pipeline4.getStages()[1],Tokenizer,text,words,
2,pipeline4.getStages()[2],HashingTF,words,text_vector,
3,pipeline4.getStages()[3],RFormula,,features,number of inputCol in formula: 2
4,pipeline4.getStages()[4],LogisticRegression,,,


one application of pm_to_p() is that, after model training,  **only PipelineModel**, not pipeline, **needs be persistented**, as PipelineModel is convertable.

### Other function

#### flatenStages()

In [35]:
model.stages

[SQLTransformer_61223a998f0c,
 PipelineModel_0a71bf5fd76d,
 RFormula_7f1108f03d1d,
 LogisticRegressionModel: uid = LogisticRegression_6ff8c810fb72, numClasses = 2, numFeatures = 19]

In [36]:
pu.flatenStages(model.stages) #note difference, the 2nd stage PipelineModel is flatten into two stages 

[SQLTransformer_61223a998f0c,
 Tokenizer_b05f6f890af3,
 HashingTF_40b6726a435c,
 RFormula_7f1108f03d1d,
 LogisticRegressionModel: uid = LogisticRegression_6ff8c810fb72, numClasses = 2, numFeatures = 19]

In [37]:
type(lrm)

pyspark.ml.classification.LogisticRegressionModel

# LogisticRegressionModel utility

In [38]:
import spark_ml_utils.LogisticRegressionModel_util as lu

### extract_feature_name()
LogisticRegressionModel does not store the feature name. extract_feature_name() is called in feature_importance() to extract feature name from dataframe schema. In additon , when stat=True, it also returns mean and standard deviation for each feature.

In [39]:
lu.extract_feature_name(training_pred,'features',stat=True)

Unnamed: 0,feature_index,feature_name,N,mean,std
0,0,category_fillNA_y,4,0.5,0.57735
1,1,category_fillNA_n,4,0.25,0.5
2,2,category_fillNA_unknown,4,0.25,0.5
3,3,text_vector_0,4,0.0,0.0
4,4,text_vector_1,4,1.25,0.957427
5,5,text_vector_2,4,0.5,0.57735
6,6,text_vector_3,4,0.0,0.0
7,7,text_vector_4,4,0.0,0.0
8,8,text_vector_5,4,0.0,0.0
9,9,text_vector_6,4,0.75,0.957427


### feature_importance()
extract feature coefficient and feature importance. See docstring for feature_importance definition.

In [40]:
lu.feature_importance(lrm_model=lrm
                      , trainDF=training_pred, trainFeatures='features'
                      , nonzero_only=True )

Unnamed: 0,feature_index,feature_name,coef,mean,std,std_coef,feature_importance
0,5,text_vector_2,3.03449,0.5,0.57735,1.751964,1.751964
1,13,text_vector_10,2.912984,0.25,0.5,1.456492,1.456492
2,0,category_fillNA_y,-2.486956,0.5,0.57735,-1.435845,1.435845
3,1,category_fillNA_n,-1.446623,0.25,0.5,-0.723312,0.723312
4,12,text_vector_9,-1.446623,0.25,0.5,-0.723312,0.723312
5,16,text_vector_13,-1.446623,0.25,0.5,-0.723312,0.723312
6,2,category_fillNA_unknown,1.133003,0.25,0.5,0.566501,0.566501
7,11,text_vector_8,1.133003,0.25,0.5,0.566501,0.566501
8,4,text_vector_1,0.508104,1.25,0.957427,0.486473,0.486473
9,9,text_vector_6,-0.109898,0.75,0.957427,-0.105219,0.105219
