In [1]:
!pip install pyspark



In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
# Always start pyspark session before working with spark
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [4]:
spark

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
file_path = '/content/drive/My Drive/sample_dataset/Orders.csv'
df_pyspark = spark.read.csv(file_path)

In [7]:
df_pyspark.show()

+--------+-----------+------------+----------+------------+
|     _c0|        _c1|         _c2|       _c3|         _c4|
+--------+-----------+------------+----------+------------+
|order_id|customer_id|warehouse_id|order_date|shipper_date|
|     789|       3731|        8118|01/01/2019|  01/04/2019|
|     790|       3486|        8118|  1/1/2019|    1/4/2019|
|     791|       2623|        8118|  1/1/2019|    1/4/2019|
|     792|       9869|        8118|  1/1/2019|    1/4/2019|
|     793|       6866|        8118|  1/1/2019|    1/4/2019|
|     794|       8055|        8118|  1/1/2019|    1/4/2019|
|     795|       1152|        8118|  1/1/2019|    1/4/2019|
|     796|       5765|        8118|  1/1/2019|    1/4/2019|
|     797|       6709|        8118|  1/1/2019|    1/4/2019|
|     798|       4866|        2666|  1/1/2019|    1/4/2019|
|     799|       4515|        2666|  1/1/2019|    1/4/2019|
|     800|       9618|        2666|  1/1/2019|    1/4/2019|
|     801|       2337|        2666|  1/1

In [8]:
# To fix the headers
df_pyspark = spark.read.option('header', 'true').csv(file_path)

In [9]:
df_pyspark.show()

+--------+-----------+------------+----------+------------+
|order_id|customer_id|warehouse_id|order_date|shipper_date|
+--------+-----------+------------+----------+------------+
|     789|       3731|        8118|01/01/2019|  01/04/2019|
|     790|       3486|        8118|  1/1/2019|    1/4/2019|
|     791|       2623|        8118|  1/1/2019|    1/4/2019|
|     792|       9869|        8118|  1/1/2019|    1/4/2019|
|     793|       6866|        8118|  1/1/2019|    1/4/2019|
|     794|       8055|        8118|  1/1/2019|    1/4/2019|
|     795|       1152|        8118|  1/1/2019|    1/4/2019|
|     796|       5765|        8118|  1/1/2019|    1/4/2019|
|     797|       6709|        8118|  1/1/2019|    1/4/2019|
|     798|       4866|        2666|  1/1/2019|    1/4/2019|
|     799|       4515|        2666|  1/1/2019|    1/4/2019|
|     800|       9618|        2666|  1/1/2019|    1/4/2019|
|     801|       2337|        2666|  1/1/2019|    1/4/2019|
|     802|       1166|        2666|  1/1

In [10]:
type(df_pyspark)

In [11]:
df_pyspark.head(3)

[Row(order_id='789', customer_id='3731', warehouse_id='8118', order_date='01/01/2019', shipper_date='01/04/2019'),
 Row(order_id='790', customer_id='3486', warehouse_id='8118', order_date='1/1/2019', shipper_date='1/4/2019'),
 Row(order_id='791', customer_id='2623', warehouse_id='8118', order_date='1/1/2019', shipper_date='1/4/2019')]

In [12]:
# More information regarding my columns
df_pyspark.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- warehouse_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- shipper_date: string (nullable = true)



### 2. Pyspark DataFrames- Part 1

In [13]:
#  read the dataset
# inferSchema - If not used it all columns will be of string datatype.
df_pyspark = spark.read.option('header','true').csv(file_path, inferSchema = True)

In [14]:
# Check the datatypes of the column (Schema)
df_pyspark.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- warehouse_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- shipper_date: string (nullable = true)



In [15]:
# Best way to read csv file in pyspark
df_pyspark = spark.read.csv(file_path, header = True, inferSchema = True)
df_pyspark.show()

+--------+-----------+------------+----------+------------+
|order_id|customer_id|warehouse_id|order_date|shipper_date|
+--------+-----------+------------+----------+------------+
|     789|       3731|        8118|01/01/2019|  01/04/2019|
|     790|       3486|        8118|  1/1/2019|    1/4/2019|
|     791|       2623|        8118|  1/1/2019|    1/4/2019|
|     792|       9869|        8118|  1/1/2019|    1/4/2019|
|     793|       6866|        8118|  1/1/2019|    1/4/2019|
|     794|       8055|        8118|  1/1/2019|    1/4/2019|
|     795|       1152|        8118|  1/1/2019|    1/4/2019|
|     796|       5765|        8118|  1/1/2019|    1/4/2019|
|     797|       6709|        8118|  1/1/2019|    1/4/2019|
|     798|       4866|        2666|  1/1/2019|    1/4/2019|
|     799|       4515|        2666|  1/1/2019|    1/4/2019|
|     800|       9618|        2666|  1/1/2019|    1/4/2019|
|     801|       2337|        2666|  1/1/2019|    1/4/2019|
|     802|       1166|        2666|  1/1

In [16]:
# check the datatype of the dataframe
type(df_pyspark)

#### Dataframe - Dataframe is data structure, because inside this we can perform various kinds of operations.

## Selecting columns and Indexing

In [17]:
# checking column names
df_pyspark.columns

['order_id', 'customer_id', 'warehouse_id', 'order_date', 'shipper_date']

In [18]:
# checking first few columns
df_pyspark.head(3)

[Row(order_id=789, customer_id=3731, warehouse_id=8118, order_date='01/01/2019', shipper_date='01/04/2019'),
 Row(order_id=790, customer_id=3486, warehouse_id=8118, order_date='1/1/2019', shipper_date='1/4/2019'),
 Row(order_id=791, customer_id=2623, warehouse_id=8118, order_date='1/1/2019', shipper_date='1/4/2019')]

In [19]:
# Select one specific column
df_pyspark.select('order_id').show()  # for 1 column
# type(df_pyspark.select('order_id'))   # datatype of 1 column


+--------+
|order_id|
+--------+
|     789|
|     790|
|     791|
|     792|
|     793|
|     794|
|     795|
|     796|
|     797|
|     798|
|     799|
|     800|
|     801|
|     802|
|     803|
|     804|
|     805|
|     806|
|     807|
|     808|
+--------+
only showing top 20 rows



In [20]:
# to pickup multiple columns
df_pyspark.select(['order_id','customer_id']).show()

+--------+-----------+
|order_id|customer_id|
+--------+-----------+
|     789|       3731|
|     790|       3486|
|     791|       2623|
|     792|       9869|
|     793|       6866|
|     794|       8055|
|     795|       1152|
|     796|       5765|
|     797|       6709|
|     798|       4866|
|     799|       4515|
|     800|       9618|
|     801|       2337|
|     802|       1166|
|     803|       4376|
|     804|       9832|
|     805|       6046|
|     806|       6046|
|     807|       3710|
|     808|       7025|
+--------+-----------+
only showing top 20 rows



In [22]:
# df_pyspark['order_id'].show()
# this wont work. we need to use select function

In [23]:
# datatypes of columns
df_pyspark.dtypes

[('order_id', 'int'),
 ('customer_id', 'int'),
 ('warehouse_id', 'int'),
 ('order_date', 'string'),
 ('shipper_date', 'string')]

In [24]:
# Check Describe option: similar to pandas
df_pyspark.describe().show()

+-------+-----------------+-----------------+------------------+----------+------------+
|summary|         order_id|      customer_id|      warehouse_id|order_date|shipper_date|
+-------+-----------------+-----------------+------------------+----------+------------+
|  count|             9999|             9999|              9999|      9999|        9999|
|   mean|           5788.0|5470.125812581258| 5556.591559155916|      NULL|        NULL|
| stddev|2886.607004772212|2601.663953249516|2532.7890496578652|      NULL|        NULL|
|    min|              789|             1001|              1543|01/01/2019|  01/01/2020|
|    max|            10787|             9999|              9080|  9/9/2019|    9/9/2019|
+-------+-----------------+-----------------+------------------+----------+------------+



In [25]:
# Adding columns in pyspark dataframe
# .show() - is just to show the df
df_pyspark.withColumn('new_order_id', df_pyspark['order_id']+1).show(10)

+--------+-----------+------------+----------+------------+------------+
|order_id|customer_id|warehouse_id|order_date|shipper_date|new_order_id|
+--------+-----------+------------+----------+------------+------------+
|     789|       3731|        8118|01/01/2019|  01/04/2019|         790|
|     790|       3486|        8118|  1/1/2019|    1/4/2019|         791|
|     791|       2623|        8118|  1/1/2019|    1/4/2019|         792|
|     792|       9869|        8118|  1/1/2019|    1/4/2019|         793|
|     793|       6866|        8118|  1/1/2019|    1/4/2019|         794|
|     794|       8055|        8118|  1/1/2019|    1/4/2019|         795|
|     795|       1152|        8118|  1/1/2019|    1/4/2019|         796|
|     796|       5765|        8118|  1/1/2019|    1/4/2019|         797|
|     797|       6709|        8118|  1/1/2019|    1/4/2019|         798|
|     798|       4866|        2666|  1/1/2019|    1/4/2019|         799|
+--------+-----------+------------+----------+-----

In [26]:
# To drop the columns
df_pyspark.drop('new_order_id').show()

+--------+-----------+------------+----------+------------+
|order_id|customer_id|warehouse_id|order_date|shipper_date|
+--------+-----------+------------+----------+------------+
|     789|       3731|        8118|01/01/2019|  01/04/2019|
|     790|       3486|        8118|  1/1/2019|    1/4/2019|
|     791|       2623|        8118|  1/1/2019|    1/4/2019|
|     792|       9869|        8118|  1/1/2019|    1/4/2019|
|     793|       6866|        8118|  1/1/2019|    1/4/2019|
|     794|       8055|        8118|  1/1/2019|    1/4/2019|
|     795|       1152|        8118|  1/1/2019|    1/4/2019|
|     796|       5765|        8118|  1/1/2019|    1/4/2019|
|     797|       6709|        8118|  1/1/2019|    1/4/2019|
|     798|       4866|        2666|  1/1/2019|    1/4/2019|
|     799|       4515|        2666|  1/1/2019|    1/4/2019|
|     800|       9618|        2666|  1/1/2019|    1/4/2019|
|     801|       2337|        2666|  1/1/2019|    1/4/2019|
|     802|       1166|        2666|  1/1

In [27]:
# Renaming columns
df_pyspark.withColumnRenamed('order_id','new_order_id').show(1)

+------------+-----------+------------+----------+------------+
|new_order_id|customer_id|warehouse_id|order_date|shipper_date|
+------------+-----------+------------+----------+------------+
|         789|       3731|        8118|01/01/2019|  01/04/2019|
+------------+-----------+------------+----------+------------+
only showing top 1 row



## 3. Pyspark Handling Missing Values
- Dropping Columns
- Dropping Rows
- Various Parameter in Dropping functionalities
- Handling Missing values by Mean, Median and Mode

In [28]:
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appname('Practice').getOrCreate()

In [29]:
file_path = '/content/drive/My Drive/sample_dataset/test2.csv'
df_pyspark = spark.read.csv(file_path, header = True, inferSchema = True)
df_pyspark.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [30]:
# Drop the column
df_pyspark.drop('Name').show(3)

+---+----------+------+
|age|Experience|Salary|
+---+----------+------+
| 31|        10| 30000|
| 30|         8| 25000|
| 29|         4| 20000|
+---+----------+------+
only showing top 3 rows



### Dropping the specific rows

In [31]:
# 1. Here: how = 'any' . which drop a row if it contains any nulls. Default
df_pyspark.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [32]:
# 2. how = all. This will drop the row only if the row is completely null.
df_pyspark.na.drop(how = 'all').show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [33]:
# 3. thresh = threshold value.
# Here we set a threshold value = 2, and it will delete the row only if there are atleast 2 null values in n a row
df_pyspark.na.drop(how = 'any', thresh = 2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
+---------+----+----------+------+



In [34]:
# 4. subset: Just like in Pandas we pass the column name and if that column contains null values then those records will be deleted
df_pyspark.na.drop(how = 'any', subset = ['Experience']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     NULL| 34|        10| 38000|
+---------+---+----------+------+



In [35]:
# Fill the Missing values
df_pyspark.na.fill('Missing Values').show() # This will fill all the null values with Missing Values

+--------------+----+----------+------+
|          Name| age|Experience|Salary|
+--------------+----+----------+------+
|         Krish|  31|        10| 30000|
|     Sudhanshu|  30|         8| 25000|
|         Sunny|  29|         4| 20000|
|          Paul|  24|         3| 20000|
|        Harsha|  21|         1| 15000|
|       Shubham|  23|         2| 18000|
|        Mahesh|NULL|      NULL| 40000|
|Missing Values|  34|        10| 38000|
|Missing Values|  36|      NULL|  NULL|
+--------------+----+----------+------+



In [36]:
# When we want to fill missing values in specific column, we pass the column
df_pyspark.na.fill('Missing Values', 'Experience').show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [37]:
# We can also give multiple columns in a list
df_pyspark.na.fill('Missing Values', ['Name', 'age']).show()

+--------------+----+----------+------+
|          Name| age|Experience|Salary|
+--------------+----+----------+------+
|         Krish|  31|        10| 30000|
|     Sudhanshu|  30|         8| 25000|
|         Sunny|  29|         4| 20000|
|          Paul|  24|         3| 20000|
|        Harsha|  21|         1| 15000|
|       Shubham|  23|         2| 18000|
|        Mahesh|NULL|      NULL| 40000|
|Missing Values|  34|        10| 38000|
|Missing Values|  36|      NULL|  NULL|
+--------------+----+----------+------+



In [38]:
# Filling null values with mean, median and mode.
from pyspark.ml.feature import Imputer

In [39]:
imputer = Imputer(inputCols= ['age', 'Experience', 'Salary'],
                  outputCols = ['{}_imputed'.format(c) for c in ['age','Experience', 'Salary']]
                  ).setStrategy('mode')
# we just have to change setStrategy value to mean, median and mode

In [40]:
# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|NULL|      NULL| 40000|         21|                10|         40000|
|     NULL|  34|        10| 38000|         34|                10|         38000|
|     NULL|  36|      NULL|  NULL|         36|                10|         20000|
+---------+----+----------+-

## 4. Filter Operations

We will be retrieving records based on some conditions
- &, |, ==
- ~ (Not)

In [41]:
file_path = '/content/drive/My Drive/sample_dataset/test1.csv'
df_pyspark = spark.read.csv(file_path, header = True, inferSchema = True)
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [42]:
### Salary of the people less than or equal to 20000
df_pyspark.filter('Salary <= 20000').show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [43]:
### Selecting only specific columns
df_pyspark.filter('Salary <= 20000').select(['Name', 'Salary']).show()

+-------+------+
|   Name|Salary|
+-------+------+
|  Sunny| 20000|
|   Paul| 20000|
| Harsha| 15000|
|Shubham| 18000|
+-------+------+



In [44]:
# Another way to select specific condition
df_pyspark.filter(df_pyspark['Salary']<= 20000).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [45]:
# Using ~ (Not): Inverse condition or Inverse filter operation
df_pyspark.filter(~(df_pyspark['Salary']<=20000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
+---------+---+----------+------+



In [46]:
# Putting multiple conditions. Using &
df_pyspark.filter((df_pyspark['Salary']<= 20000) &
                  (df_pyspark['Salary'] >= 15000)).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [47]:
# Putting multiple conditions. Using or (|)
df_pyspark.filter((df_pyspark['Salary']<= 20000) |
                  (df_pyspark['Salary'] >= 15000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



## 4. Pyspark GroupBy and Aggregate functions

In [48]:
df_pyspark = spark.read.csv('/content/drive/My Drive/sample_dataset/test3.csv',header = True, inferSchema=True)

In [49]:
df_pyspark.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [50]:

# checking datatypes of columns
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



In [51]:
# Groupby
# Grouped to find maximum salary
df_pyspark.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [52]:
# Group by Departments which gives maximum salary
df_pyspark.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [53]:
# Group by Departments which gives maximum salary on average
df_pyspark.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [54]:
# How many people woking in the department
df_pyspark.groupBy('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [55]:
# Who's getting the max salary
df_pyspark.groupBy('Name').max().show()

+---------+-----------+
|     Name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+



In [56]:
df_pyspark.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



### Example of Pyspark ML

In [57]:
# Read the dataset
training = spark.read.csv('/content/drive/My Drive/sample_dataset/test1.csv', inferSchema = True, header = True)

In [58]:
training.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [59]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [60]:
training.columns

['Name', 'age', 'Experience', 'Salary']

VectorAssembler: It will make sure that I have my features grouped like this [age, Experience], and we will trat this as new feature and It will be an independant feature.
- [age, Experience]--->new feature--->Independent feature

In [61]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=['age', 'Experience'], outputCol = 'Independent Feature')

In [62]:
output = featureassembler.transform(training)

In [63]:
output.show()

+---------+---+----------+------+-------------------+
|     Name|age|Experience|Salary|Independent Feature|
+---------+---+----------+------+-------------------+
|    Krish| 31|        10| 30000|        [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|         [30.0,8.0]|
|    Sunny| 29|         4| 20000|         [29.0,4.0]|
|     Paul| 24|         3| 20000|         [24.0,3.0]|
|   Harsha| 21|         1| 15000|         [21.0,1.0]|
|  Shubham| 23|         2| 18000|         [23.0,2.0]|
+---------+---+----------+------+-------------------+



In [64]:
finalized_data = output.select('Independent Feature', 'Salary')

In [65]:
finalized_data.show()

+-------------------+------+
|Independent Feature|Salary|
+-------------------+------+
|        [31.0,10.0]| 30000|
|         [30.0,8.0]| 25000|
|         [29.0,4.0]| 20000|
|         [24.0,3.0]| 20000|
|         [21.0,1.0]| 15000|
|         [23.0,2.0]| 18000|
+-------------------+------+



In [66]:
from pyspark.ml.regression import LinearRegression
## train test split
train_df, test_df = finalized_data.randomSplit([0.75,0.25])
regressor = LinearRegression(featuresCol = 'Independent Feature',labelCol='Salary')
regressor = regressor.fit(train_df)

In [67]:
regressor.coefficients

DenseVector([-323.2867, 1696.8066])

In [68]:
regressor.intercept

22295.299605311826

In [69]:
pred = regressor.evaluate(test_df)

In [70]:
pred.predictions.show()

+-------------------+------+------------------+
|Independent Feature|Salary|        prediction|
+-------------------+------+------------------+
|         [21.0,1.0]| 15000|17203.085755292585|
+-------------------+------+------------------+



In [71]:
pred.meanSquaredError, pred.meanAbsoluteError

(4853586.845173097, 2203.0857552925845)