# What is Spark MLlib?

Apache Spark’s Machine Learning Library (MLlib) is designed for simplicity, scalability, and easy integration with other tools. With the scalability, language compatibility, and speed of Spark, data scientists can focus on their data problems and models instead of solving the complexities surrounding distributed data (such as infrastructure, configurations, and so on). Built on top of Spark, MLlib is a scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, and underlying optimization primitives. Spark MLLib seamlessly integrates with other Spark components such as Spark SQL, Spark Streaming, and DataFrames and is installed in the Databricks runtime. The library is usable in Java, Scala, and Python as part of Spark applications, so that you can include it in complete workflows. MLlib allows for preprocessing, munging, training of models, and making predictions at scale on data. You can even use models trained in MLlib to make predictions in Structured Streaming. Spark provides a sophisticated machine learning API for performing a variety of machine learning tasks, from classification to regression, clustering to deep learning. 

(https://databricks.com/glossary/what-is-machine-learning-library)

# Loading Dataset

In [0]:
data = spark.sql('SELECT * FROM employee')
data.display()

satisfaction_level,last_evaluation,number_project,average_montly_hours,time_spend_company,Work_accident,promotion_last_5years,department,salary,left
0.38,0.53,2,157.0,3,0,0,,low,1
0.8,0.86,5,262.0,6,0,0,sales,,1
0.11,0.88,7,272.0,4,0,0,sales,,1
0.72,,5,,5,0,0,sales,low,1
0.37,0.52,2,159.0,3,0,0,sales,low,1
0.41,0.5,2,153.0,3,0,0,sales,low,1
0.1,0.77,6,247.0,4,0,0,sales,low,1
0.92,0.85,5,259.0,5,0,0,sales,low,1
0.89,1.0,5,224.0,5,0,0,sales,low,1
0.42,0.53,2,142.0,3,0,0,sales,low,1


In [0]:
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

In [0]:
data = data.na.drop("any")

In [0]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

In [0]:
data.printSchema()

# Dealing with Categorical Features in PySpark

In [0]:
from pyspark.ml.feature import Imputer, VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer
from pyspark.sql.types import StringType, DoubleType

In [0]:
# numeric features
numeric_features =[f.name for f in data.schema.fields if not isinstance(f.dataType, StringType)]
print(numeric_features)

In [0]:
# categorical features
categorical_features = [f.name for f in data.schema.fields if isinstance(f.dataType, StringType)]
print(categorical_features)

In [0]:
# string indexer for department
department_indexer = StringIndexer(inputCol='department', outputCol='departmentIndex')

In [0]:
# one hot encoder for department
department_encoder = OneHotEncoder(inputCol='departmentIndex', outputCol='departmentVec')

In [0]:
# string indexer for salary
salary_indexer = StringIndexer(inputCol='salary', outputCol='salaryIndex')

In [0]:
# one hot encoder for salary
salary_encoder = OneHotEncoder(inputCol='salaryIndex', outputCol='salaryVec')

# Vector Assembler

In [0]:
# vector assembler
assembler = VectorAssembler(inputCols=['satisfaction_level', 'last_evaluation', 'number_project', 
                                      'average_montly_hours', 'time_spend_company', 'Work_accident',
                                      'promotion_last_5years', 'departmentVec', 'salaryVec'],
                            outputCol='features')

# Logistic Regression

In [0]:
# lets now import the model now
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features', labelCol='left')

# What is Pipeline?

A machine learning pipeline is a way to codify and automate the workflow it takes to produce a machine learning model. Machine learning pipelines consist of multiple sequential steps that do everything from data extraction and preprocessing to model training and deployment.

For data science teams, the production pipeline should be the central product. It encapsulates all the learned best practices of producing a machine learning model for the organization’s use-case and allows the team to execute at scale. Whether you are maintaining multiple models in production or supporting a single model that needs to be updated frequently, an end-to-end machine learning pipeline is a must.

(https://valohai.com/machine-learning-pipeline/)

In [0]:
from pyspark.ml import Pipeline

In [0]:
pipeline = Pipeline(stages = [department_indexer, salary_indexer,
                              department_encoder, salary_encoder,
                              assembler, lr])

In [0]:
# split data
train, test = data.randomSplit([0.7,0.3])

In [0]:
# fit the pipeline
pipeline_fit = pipeline.fit(train)

In [0]:
results = pipeline_fit.transform(test)

In [0]:
results.head(1)

In [0]:
results.select('prediction', 'left').show()

# Model Evaluator

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [0]:
evaluator = BinaryClassificationEvaluator(labelCol='left')

In [0]:
# by default it displays  AUC
evaluator.evaluate(results)