# Introduction to Spark MLLib in Python
<hr>

In [None]:
# Installation of PySpark package
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 40 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 47.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=3fdefdc38522dbecd55584f848d565f73c6ac5b4ced95014feacafd2ce0010c1
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
# Creation of Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark MLLib").getOrCreate()
spark

In [None]:
# Reading Dataset
spark_df = spark.read.csv("churn.csv", header=True, inferSchema=True)
spark_df.show()

+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|  Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602| Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|     Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|     Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|     Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93

In [None]:
# dataset schema
spark_df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [None]:
# Checking presence of null value in dataset
from pyspark.sql.functions import when, isnan, col, count
spark_df.select([ count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spark_df.columns]).show()

+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname|CreditScore|Geography|Gender|Age|Tenure|Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+
|        0|         0|      0|          0|        0|     0|  0|     0|      0|            0|        0|             0|              0|     0|
+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+



Since there is no null values we can move forward

In [None]:
# Creating a vector model to vectorize features
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols = ["CreditScore", "Age", "Tenure", "Balance", "NumOfProducts", "HasCrCard", "IsActiveMember", "EstimatedSalary"],
    outputCol = "Features"
)

In [None]:
# Transforming the features
vector_data = assembler.transform(spark_df)

In [None]:
vector_data.show()

+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+
|RowNumber|CustomerId|  Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|            Features|
+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+
|        1|  15634602| Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|[619.0,42.0,2.0,0...|
|        2|  15647311|     Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|[608.0,41.0,1.0,8...|
|        3|  15619304|     Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|[502.0,42.0,8.0,1...|
|   

In [None]:
# Fetching features and label from dataset
finalized_data = vector_data.select("Features", "Exited")
finalized_data.show()

+--------------------+------+
|            Features|Exited|
+--------------------+------+
|[619.0,42.0,2.0,0...|     1|
|[608.0,41.0,1.0,8...|     0|
|[502.0,42.0,8.0,1...|     1|
|[699.0,39.0,1.0,0...|     0|
|[850.0,43.0,2.0,1...|     0|
|[645.0,44.0,8.0,1...|     1|
|[822.0,50.0,7.0,0...|     0|
|[376.0,29.0,4.0,1...|     1|
|[501.0,44.0,4.0,1...|     0|
|[684.0,27.0,2.0,1...|     0|
|[528.0,31.0,6.0,1...|     0|
|[497.0,24.0,3.0,0...|     0|
|[476.0,34.0,10.0,...|     0|
|[549.0,25.0,5.0,0...|     0|
|[635.0,35.0,7.0,0...|     0|
|[616.0,45.0,3.0,1...|     0|
|[653.0,58.0,1.0,1...|     1|
|[549.0,24.0,9.0,0...|     0|
|[587.0,45.0,6.0,0...|     0|
|[726.0,24.0,6.0,0...|     0|
+--------------------+------+
only showing top 20 rows



In [None]:
# Spliting data into training and testing
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])

In [None]:
# Training the model
from pyspark.ml.regression import LinearRegression
model = LinearRegression(featuresCol="Features", labelCol="Exited")
trained_model = model.fit(train_data)

In [None]:
# Calculating coefficients
trained_model.coefficients

DenseVector([-0.0001, 0.0116, -0.0007, 0.0, -0.0031, 0.0015, -0.1504, 0.0])

In [None]:
# Intercept
trained_model.intercept

-0.14786699396392156

In [None]:
# Predicting
Pred_results = trained_model.transform(test_data)

In [None]:
Pred_results.show()

+--------------------+------+--------------------+
|            Features|Exited|          prediction|
+--------------------+------+--------------------+
|(8,[0,1,4,7],[502...|     0|  0.3217499402142587|
|(8,[0,1,4,7],[624...|     0|  0.2149798867234517|
|(8,[0,1,4,7],[626...|     0| 0.16190835110539165|
|(8,[0,1,4,7],[793...|     0|  0.1580625182516514|
|(8,[0,1,4,7],[794...|     0| 0.15502621513475862|
|[350.0,39.0,0.0,1...|     1|  0.3412887707194123|
|[350.0,54.0,1.0,1...|     1|  0.4014884613536621|
|[365.0,30.0,0.0,1...|     1| 0.24946605111174464|
|[401.0,48.0,8.0,1...|     1| 0.45446465226010735|
|[404.0,54.0,4.0,1...|     1|  0.5185769341045271|
|[408.0,40.0,3.0,0...|     0| 0.27127781510966065|
|[410.0,35.0,7.0,1...|     0| 0.14183078620467227|
|[411.0,29.0,0.0,5...|     0| 0.03221900483739468|
|[412.0,29.0,5.0,0...|     0| 0.13473402435737958|
|[415.0,32.0,5.0,1...|     0| 0.12012639003361364|
|[418.0,46.0,9.0,0...|     1|  0.1865553465359574|
|[421.0,34.0,6.0,9...|     0| 0