### Examples Of PySpark ML

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder.appName('Missing').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/03 17:41:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/03 17:41:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/07/03 17:41:08 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/07/03 17:41:08 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/07/03 17:41:08 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


In [3]:
spark

In [4]:
### Read the Dataset
training=spark.read.csv('pyspark4.csv',header=True,inferSchema=True)

In [5]:
training

DataFrame[Name: string, Age: int, Experience: int, Salary: int]

In [6]:
training.show()

+------+---+----------+------+
|  Name|Age|Experience|Salary|
+------+---+----------+------+
|  Titi| 31|        10| 30000|
| Doyin| 30|         8| 25000|
| Sunny| 29|         4| 20000|
|  Paul| 24|         3| 20000|
| Aisha| 21|         1| 15000|
|Tinubu| 23|         2| 18000|
+------+---+----------+------+



In [7]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [8]:
training.columns

['Name', 'Age', 'Experience', 'Salary']

In [9]:
###[Age,Experience]------> new feature---->independent feature

In [10]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["Age","Experience"],outputCol="Independent Features")

In [11]:
output=featureassembler.transform(training)

In [12]:
output.show()

+------+---+----------+------+--------------------+
|  Name|Age|Experience|Salary|Independent Features|
+------+---+----------+------+--------------------+
|  Titi| 31|        10| 30000|         [31.0,10.0]|
| Doyin| 30|         8| 25000|          [30.0,8.0]|
| Sunny| 29|         4| 20000|          [29.0,4.0]|
|  Paul| 24|         3| 20000|          [24.0,3.0]|
| Aisha| 21|         1| 15000|          [21.0,1.0]|
|Tinubu| 23|         2| 18000|          [23.0,2.0]|
+------+---+----------+------+--------------------+



In [13]:
output.columns

['Name', 'Age', 'Experience', 'Salary', 'Independent Features']

In [14]:
finalized_data=output.select("Independent Features","Salary")

In [15]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [16]:
from pyspark.ml.regression import LinearRegression
##train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features', labelCol='Salary')
regressor=regressor.fit(train_data)

23/07/03 17:41:20 WARN Instrumentation: [d0ba538f] regParam is zero, which might cause numerical instability and overfitting.
23/07/03 17:41:20 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/07/03 17:41:20 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/07/03 17:41:20 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [17]:
### Coefficients
regressor.coefficients

DenseVector([-79.3651, 1693.1217])

In [18]:
### Intercepts
regressor.intercept

15714.285714285747

In [19]:
### Prediction
pred_results=regressor.evaluate(test_data)

In [20]:
pred_results.predictions.show()

+--------------------+------+------------------+
|Independent Features|Salary|        prediction|
+--------------------+------+------------------+
|          [23.0,2.0]| 18000|17275.132275132273|
|          [30.0,8.0]| 25000| 26878.30687830688|
+--------------------+------+------------------+



23/07/03 17:41:23 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [21]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(1301.587301587304, 2026734.9738249276)