## Pipeline
In machine learning this is common to run a sequence of algorithms to process and learn from data. E.g. you create a NLP model to classify the items and you used the steps like - clean text: lower, remove stop words, tokenization,... and than we create a classifier - this is a pipeline but we do it step by step. 

>A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. For Transformer stages, the transform() method is called on the DataFrame. For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the PipelineModel, or fitted Pipeline), and that Transformer’s transform() method is called on the DataFrame.



In [None]:
#beacuse we have SparkContext load already we can use it and getOrCreate the spark session 
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
df_data = spark.read\
    .option("header", "true")\
    .option("mode", "DROPMALFORMED")\
    .option("delimiter", '^')\
    .csv('small_data.csv') 

In [None]:
df_data = df_data.dropna(subset=['title'])

In [None]:
from pyspark.sql.functions import udf # import user definion function 
choose_only_first_root_category = udf(lambda x: eval(str(x))[0][0]) # create a function

In [None]:
df_data = df_data.withColumn('root_category', choose_only_first_root_category(df_data.categories))

In [None]:
train, test = df_data.randomSplit([0.8, 0.2], seed=12345)

In [None]:
from pyspark.sql.functions import col, lower
train_clean = train.withColumn('lower_sentence', lower(col('title')))
test_clear = test.withColumn('lower_sentence', lower(col('title')))

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer
from pyspark.ml import Pipeline # import pipeline object

In [None]:
stringIndexer = StringIndexer(inputCol="root_category", outputCol="indexed", handleInvalid='error')

In [None]:
stringIndexer_model = stringIndexer.fit(train_clean)

In [None]:
train_clean = stringIndexer_model.transform(train_clean)

In [None]:
test_clear = stringIndexer_model.transform(test_clear)

In [None]:
from pyspark.ml.classification import NaiveBayes # import Naive Bayes

In [None]:
tokenizer = Tokenizer(inputCol="lower_sentence", outputCol="words_tokenizer_pipeline")
remover = StopWordsRemover(inputCol="words_tokenizer_pipeline", outputCol="filtered_pipeline")
cv = CountVectorizer(inputCol="filtered_pipeline", outputCol="features_pipeline")
nb = NaiveBayes(modelType="multinomial", featuresCol="features_pipeline", labelCol="indexed",) 

In [None]:
pipeline = Pipeline(stages=[tokenizer, remover, cv, nb])

In [None]:
model = pipeline.fit(train_clean)

In [None]:
prediction = model.transform(test_clear)

In [None]:
prediction.select('prediction', 'indexed').show()

In [None]:
prediction.createOrReplaceTempView("prediction")

In [None]:
df_sql = spark.sql("select (select count(indexed) from prediction where indexed = prediction) / count(*) from prediction")

In [None]:
df_sql.show()