# PySpark:

Pyspark is an interface for Apache Spark in Python is often used for 
large scale data processing and Machine Learning.
Spark with Python is PySpark Library:
Pyspark will handle huge amount of data set i,e 128GB in distributed system.

In [1]:
!pip install pyspark





In [2]:
import pyspark

In [3]:
import pandas as pd
pd.read_csv('PySpark1.csv')

Unnamed: 0,Name,Age
0,Obarai,25
1,Vasanth,26
2,Pinku,27


In [4]:
#Start Spark Session

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark=SparkSession.builder.appName('Practise').getOrCreate()

In [7]:
spark

In [8]:
df_pyspark=spark.read.csv('PySpark1.csv') #Read Pyspark %%file

In [9]:
df_pyspark

DataFrame[_c0: string, _c1: string]

In [10]:
df_pyspark.show()

+-------+---+
|    _c0|_c1|
+-------+---+
|   Name|Age|
| Obarai| 25|
|Vasanth| 26|
|  Pinku| 27|
+-------+---+



In [11]:
df_pyspark=spark.read.option('header','true').csv('PySpark1.csv')

In [12]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [13]:
df_pyspark.head(3)

[Row(Name='Obarai', Age='25'),
 Row(Name='Vasanth', Age='26'),
 Row(Name='Pinku', Age='27')]

In [14]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)



# Build the Pyspark Session

In [15]:
from pyspark.sql import SparkSession

In [16]:
spark=SparkSession.builder.appName('Practise').getOrCreate()

In [17]:
spark

# Read the DataSet

In [18]:
#Read the DataSet
spark.read.option('header','true').csv('PySpark2.csv')

DataFrame[Name: string, age: string, Experience: string]

In [19]:
df_pyspark=spark.read.option('header','true').csv('PySpark2.csv',inferSchema=True)

In [20]:
##Check the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [21]:
#Option -2 Read the dataset
df_pyspark=spark.read.csv('PySpark2.csv',header=True,inferSchema=True)

In [22]:
##Check the schema
df_pyspark.show()

+-----+---+----------+
| Name|age|Experience|
+-----+---+----------+
|Pinku| 25|         5|
|Tinku| 27|         7|
|Sunny| 28|         8|
+-----+---+----------+



In [23]:
##Check the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [24]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [25]:
#Selecting Columns

In [26]:
df_pyspark.columns

['Name', 'age', 'Experience']

In [27]:
df_pyspark.head()

Row(Name='Pinku', age=25, Experience=5)

In [28]:
df_pyspark.head(3)

[Row(Name='Pinku', age=25, Experience=5),
 Row(Name='Tinku', age=27, Experience=7),
 Row(Name='Sunny', age=28, Experience=8)]

In [29]:
#Selecting columns and perform operations

In [30]:
df_pyspark.show()

+-----+---+----------+
| Name|age|Experience|
+-----+---+----------+
|Pinku| 25|         5|
|Tinku| 27|         7|
|Sunny| 28|         8|
+-----+---+----------+



In [31]:
df_pyspark.select('Name').show()

+-----+
| Name|
+-----+
|Pinku|
|Tinku|
|Sunny|
+-----+



In [32]:
type(df_pyspark.select('Name'))

pyspark.sql.dataframe.DataFrame

In [33]:
df_pyspark.select(['Name','Experience']).show() #Select two columns

+-----+----------+
| Name|Experience|
+-----+----------+
|Pinku|         5|
|Tinku|         7|
|Sunny|         8|
+-----+----------+



In [34]:
df_pyspark['Name'] #Pick a columns


Column<'Name'>

In [35]:
df_pyspark.dtypes #Select data Types

[('Name', 'string'), ('age', 'int'), ('Experience', 'int')]

In [36]:
#Describe option similar to pandas

In [37]:
df_pyspark.describe().show()

+-------+-----+------------------+------------------+
|summary| Name|               age|        Experience|
+-------+-----+------------------+------------------+
|  count|    3|                 3|                 3|
|   mean| NULL|26.666666666666668| 6.666666666666667|
| stddev| NULL|1.5275252316519468|1.5275252316519468|
|    min|Pinku|                25|                 5|
|    max|Tinku|                28|                 8|
+-------+-----+------------------+------------------+



# Adding Columns in Data Frame

In [38]:
df_pyspark.withColumn('Experience After 2 years',df_pyspark['Experience']+2)

DataFrame[Name: string, age: int, Experience: int, Experience After 2 years: int]

In [39]:
df_pyspark=df_pyspark.withColumn('Experience After 2 years',df_pyspark['Experience']+2)

In [40]:
df_pyspark.show()

+-----+---+----------+------------------------+
| Name|age|Experience|Experience After 2 years|
+-----+---+----------+------------------------+
|Pinku| 25|         5|                       7|
|Tinku| 27|         7|                       9|
|Sunny| 28|         8|                      10|
+-----+---+----------+------------------------+



# Dropping Columns in Data Frame

In [43]:
df_pyspark.drop('Experience After 2 years')

DataFrame[Name: string, age: int, Experience: int]

In [45]:
df_pyspark=df_pyspark.drop('Experience After 2 years')

In [46]:
df_pyspark.show()

+-----+---+----------+
| Name|age|Experience|
+-----+---+----------+
|Pinku| 25|         5|
|Tinku| 27|         7|
|Sunny| 28|         8|
+-----+---+----------+



# Rename the columns

In [50]:
df_pyspark.withColumnRenamed('Name','New Name').show()

+--------+---+----------+
|New Name|age|Experience|
+--------+---+----------+
|   Pinku| 25|         5|
|   Tinku| 27|         7|
|   Sunny| 28|         8|
+--------+---+----------+



# PySpark Handling Missing Values

#Dropping Columns

#Dropping Rows

#Various Parameter In Dropping Functionalities

#Handling Missing Values by Mean

In [51]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Practise').getOrCreate()

In [None]:
#Read The Data Set

In [55]:
df_pyspark=spark.read.csv('PySpark3.csv',header=True,inferSchema=True)

In [56]:
df_pyspark.show()

+---------+----+---------+------+
|     Name| age|Experiene|Salary|
+---------+----+---------+------+
|   Krish |  31|       10| 30000|
|Sudhanshu|  30|        8| 25000|
|    Sunny|  29|        4| 20000|
|     Paul|  24|        3| 20000|
|   Harsha|  21|        1| 15000|
|  Shubham|  23|        2| 18000|
|   Mahesh|NULL|     NULL| 40000|
|     NULL|  34|       10| 38000|
|     NULL|  36|     NULL|  NULL|
+---------+----+---------+------+



# Drop the Missing values columns

In [58]:
df_pyspark.drop('Name').show()

+----+---------+------+
| age|Experiene|Salary|
+----+---------+------+
|  31|       10| 30000|
|  30|        8| 25000|
|  29|        4| 20000|
|  24|        3| 20000|
|  21|        1| 15000|
|  23|        2| 18000|
|NULL|     NULL| 40000|
|  34|       10| 38000|
|  36|     NULL|  NULL|
+----+---------+------+



In [59]:
df_pyspark.show()

+---------+----+---------+------+
|     Name| age|Experiene|Salary|
+---------+----+---------+------+
|   Krish |  31|       10| 30000|
|Sudhanshu|  30|        8| 25000|
|    Sunny|  29|        4| 20000|
|     Paul|  24|        3| 20000|
|   Harsha|  21|        1| 15000|
|  Shubham|  23|        2| 18000|
|   Mahesh|NULL|     NULL| 40000|
|     NULL|  34|       10| 38000|
|     NULL|  36|     NULL|  NULL|
+---------+----+---------+------+



In [60]:
#Drop Null values

In [61]:
df_pyspark.na.drop().show() #.na .drop drop all null value rows

+---------+---+---------+------+
|     Name|age|Experiene|Salary|
+---------+---+---------+------+
|   Krish | 31|       10| 30000|
|Sudhanshu| 30|        8| 25000|
|    Sunny| 29|        4| 20000|
|     Paul| 24|        3| 20000|
|   Harsha| 21|        1| 15000|
|  Shubham| 23|        2| 18000|
+---------+---+---------+------+



In [62]:
## any=How

In [69]:
df_pyspark.na.drop(how="all").show() #Remove all rows having all Null

+---------+----+---------+------+
|     Name| age|Experiene|Salary|
+---------+----+---------+------+
|   Krish |  31|       10| 30000|
|Sudhanshu|  30|        8| 25000|
|    Sunny|  29|        4| 20000|
|     Paul|  24|        3| 20000|
|   Harsha|  21|        1| 15000|
|  Shubham|  23|        2| 18000|
|   Mahesh|NULL|     NULL| 40000|
|     NULL|  34|       10| 38000|
|     NULL|  36|     NULL|  NULL|
+---------+----+---------+------+



In [66]:
##Threshold

In [71]:
df_pyspark.na.drop(how="any",thresh=2).show() #Atleast two Non Null Values present

+---------+----+---------+------+
|     Name| age|Experiene|Salary|
+---------+----+---------+------+
|   Krish |  31|       10| 30000|
|Sudhanshu|  30|        8| 25000|
|    Sunny|  29|        4| 20000|
|     Paul|  24|        3| 20000|
|   Harsha|  21|        1| 15000|
|  Shubham|  23|        2| 18000|
|   Mahesh|NULL|     NULL| 40000|
|     NULL|  34|       10| 38000|
+---------+----+---------+------+



In [72]:
df_pyspark.na.drop(how="any",thresh=1).show()

+---------+----+---------+------+
|     Name| age|Experiene|Salary|
+---------+----+---------+------+
|   Krish |  31|       10| 30000|
|Sudhanshu|  30|        8| 25000|
|    Sunny|  29|        4| 20000|
|     Paul|  24|        3| 20000|
|   Harsha|  21|        1| 15000|
|  Shubham|  23|        2| 18000|
|   Mahesh|NULL|     NULL| 40000|
|     NULL|  34|       10| 38000|
|     NULL|  36|     NULL|  NULL|
+---------+----+---------+------+



In [73]:
df_pyspark.na.drop(how="any",thresh=3).show()

+---------+---+---------+------+
|     Name|age|Experiene|Salary|
+---------+---+---------+------+
|   Krish | 31|       10| 30000|
|Sudhanshu| 30|        8| 25000|
|    Sunny| 29|        4| 20000|
|     Paul| 24|        3| 20000|
|   Harsha| 21|        1| 15000|
|  Shubham| 23|        2| 18000|
|     NULL| 34|       10| 38000|
+---------+---+---------+------+



In [None]:
##SubSet

In [77]:
df_pyspark.na.drop(how="any",subset=['Age']).show() #Row containing null values deleted

+---------+---+---------+------+
|     Name|age|Experiene|Salary|
+---------+---+---------+------+
|   Krish | 31|       10| 30000|
|Sudhanshu| 30|        8| 25000|
|    Sunny| 29|        4| 20000|
|     Paul| 24|        3| 20000|
|   Harsha| 21|        1| 15000|
|  Shubham| 23|        2| 18000|
|     NULL| 34|       10| 38000|
|     NULL| 36|     NULL|  NULL|
+---------+---+---------+------+



In [76]:
df_pyspark.na.drop(how="any",subset=['Experiene']).show() 

+---------+---+---------+------+
|     Name|age|Experiene|Salary|
+---------+---+---------+------+
|   Krish | 31|       10| 30000|
|Sudhanshu| 30|        8| 25000|
|    Sunny| 29|        4| 20000|
|     Paul| 24|        3| 20000|
|   Harsha| 21|        1| 15000|
|  Shubham| 23|        2| 18000|
|     NULL| 34|       10| 38000|
+---------+---+---------+------+



# Fill The Missing Values

In [79]:
df_pyspark.na.fill('Missing Vlaues').show()

+--------------+----+---------+------+
|          Name| age|Experiene|Salary|
+--------------+----+---------+------+
|        Krish |  31|       10| 30000|
|     Sudhanshu|  30|        8| 25000|
|         Sunny|  29|        4| 20000|
|          Paul|  24|        3| 20000|
|        Harsha|  21|        1| 15000|
|       Shubham|  23|        2| 18000|
|        Mahesh|NULL|     NULL| 40000|
|Missing Vlaues|  34|       10| 38000|
|Missing Vlaues|  36|     NULL|  NULL|
+--------------+----+---------+------+



In [81]:
df_pyspark.na.fill('Missing Vlaues',['Experiene','age']).show()

+---------+----+---------+------+
|     Name| age|Experiene|Salary|
+---------+----+---------+------+
|   Krish |  31|       10| 30000|
|Sudhanshu|  30|        8| 25000|
|    Sunny|  29|        4| 20000|
|     Paul|  24|        3| 20000|
|   Harsha|  21|        1| 15000|
|  Shubham|  23|        2| 18000|
|   Mahesh|NULL|     NULL| 40000|
|     NULL|  34|       10| 38000|
|     NULL|  36|     NULL|  NULL|
+---------+----+---------+------+



In [82]:
df_pyspark.show()

+---------+----+---------+------+
|     Name| age|Experiene|Salary|
+---------+----+---------+------+
|   Krish |  31|       10| 30000|
|Sudhanshu|  30|        8| 25000|
|    Sunny|  29|        4| 20000|
|     Paul|  24|        3| 20000|
|   Harsha|  21|        1| 15000|
|  Shubham|  23|        2| 18000|
|   Mahesh|NULL|     NULL| 40000|
|     NULL|  34|       10| 38000|
|     NULL|  36|     NULL|  NULL|
+---------+----+---------+------+



In [84]:
#Replace Experience columns with mean missing values

In [101]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experiene', 'Salary'], 
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experiene', 'Salary']]
    ).setStrategy("mean")

In [105]:
# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+----+---------+------+-----------+-----------------+--------------+
|     Name| age|Experiene|Salary|age_imputed|Experiene_imputed|Salary_imputed|
+---------+----+---------+------+-----------+-----------------+--------------+
|   Krish |  31|       10| 30000|         31|               10|         30000|
|Sudhanshu|  30|        8| 25000|         30|                8|         25000|
|    Sunny|  29|        4| 20000|         29|                4|         20000|
|     Paul|  24|        3| 20000|         24|                3|         20000|
|   Harsha|  21|        1| 15000|         21|                1|         15000|
|  Shubham|  23|        2| 18000|         23|                2|         18000|
|   Mahesh|NULL|     NULL| 40000|         29|                4|         40000|
|     NULL|  34|       10| 38000|         34|               10|         38000|
|     NULL|  36|     NULL|  NULL|         36|                4|         20000|
+---------+----+---------+------+-----------+-------

In [103]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experiene', 'Salary'], 
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experiene', 'Salary']]
    ).setStrategy("median")

In [106]:
# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+----+---------+------+-----------+-----------------+--------------+
|     Name| age|Experiene|Salary|age_imputed|Experiene_imputed|Salary_imputed|
+---------+----+---------+------+-----------+-----------------+--------------+
|   Krish |  31|       10| 30000|         31|               10|         30000|
|Sudhanshu|  30|        8| 25000|         30|                8|         25000|
|    Sunny|  29|        4| 20000|         29|                4|         20000|
|     Paul|  24|        3| 20000|         24|                3|         20000|
|   Harsha|  21|        1| 15000|         21|                1|         15000|
|  Shubham|  23|        2| 18000|         23|                2|         18000|
|   Mahesh|NULL|     NULL| 40000|         29|                4|         40000|
|     NULL|  34|       10| 38000|         34|               10|         38000|
|     NULL|  36|     NULL|  NULL|         36|                4|         20000|
+---------+----+---------+------+-----------+-------

# PySpark Dataframes
Filter Operation

&,|,==

~

In [109]:
from pyspark.sql import SparkSession

In [110]:
spark=SparkSession.builder.appName('dataframe').getOrCreate()

In [116]:
df_pyspark=spark.read.csv('PySpark3.csv',header=True,inferSchema=True)
df_pyspark.show()

+---------+----+---------+------+
|     Name| age|Experiene|Salary|
+---------+----+---------+------+
|   Krish |  31|       10| 30000|
|Sudhanshu|  30|        8| 25000|
|    Sunny|  29|        4| 20000|
|     Paul|  24|        3| 20000|
|   Harsha|  21|        1| 15000|
|  Shubham|  23|        2| 18000|
|   Mahesh|NULL|     NULL| 40000|
|     NULL|  34|       10| 38000|
|     NULL|  36|     NULL|  NULL|
+---------+----+---------+------+



# Filter Operation

In [118]:
### Salary of the people less than or equal to 20000
df_pyspark.filter("Salary<=20000").show()

+-------+---+---------+------+
|   Name|age|Experiene|Salary|
+-------+---+---------+------+
|  Sunny| 29|        4| 20000|
|   Paul| 24|        3| 20000|
| Harsha| 21|        1| 15000|
|Shubham| 23|        2| 18000|
+-------+---+---------+------+



In [119]:
df_pyspark.filter("Salary<=20000").select(['Name','age']).show()

+-------+---+
|   Name|age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [120]:
df_pyspark.filter(df_pyspark['Salary']<=20000).show()

+-------+---+---------+------+
|   Name|age|Experiene|Salary|
+-------+---+---------+------+
|  Sunny| 29|        4| 20000|
|   Paul| 24|        3| 20000|
| Harsha| 21|        1| 15000|
|Shubham| 23|        2| 18000|
+-------+---+---------+------+



In [121]:
df_pyspark.filter((df_pyspark['Salary']<=20000) | 
                  (df_pyspark['Salary']>=15000)).show()

+---------+----+---------+------+
|     Name| age|Experiene|Salary|
+---------+----+---------+------+
|   Krish |  31|       10| 30000|
|Sudhanshu|  30|        8| 25000|
|    Sunny|  29|        4| 20000|
|     Paul|  24|        3| 20000|
|   Harsha|  21|        1| 15000|
|  Shubham|  23|        2| 18000|
|   Mahesh|NULL|     NULL| 40000|
|     NULL|  34|       10| 38000|
+---------+----+---------+------+



In [122]:
df_pyspark.filter(~(df_pyspark['Salary']<=20000)).show()

+---------+----+---------+------+
|     Name| age|Experiene|Salary|
+---------+----+---------+------+
|   Krish |  31|       10| 30000|
|Sudhanshu|  30|        8| 25000|
|   Mahesh|NULL|     NULL| 40000|
|     NULL|  34|       10| 38000|
+---------+----+---------+------+



# Pyspark GroupBy & Aggregate Functions

In [123]:
from pyspark.sql import SparkSession

In [124]:
spark=SparkSession.builder.appName('Agg').getOrCreate()

In [125]:
spark

In [126]:
df_pyspark=spark.read.csv('PySpark4.csv',header=True,inferSchema=True)

In [127]:
df_pyspark.show()

+---------+------------+------+
|     Name| Departments|Salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [128]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [130]:
## Groupby
### Grouped to find the maximum salary
df_pyspark.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(Salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [131]:
df_pyspark.groupBy('Name').avg().show()

+---------+------------------+
|     Name|       avg(Salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



In [133]:
### Groupby Departmernts  which gives maximum salary
df_pyspark.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(Salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [134]:
df_pyspark.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(Salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [135]:
df_pyspark.groupBy('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [136]:
df_pyspark.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



# Example Of PySpark ML

In [138]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Missing').getOrCreate()

In [145]:
## Read The dataset
training = spark.read.csv('test1.csv',header=True,inferSchema=True)

In [146]:
training.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [147]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [148]:
training.columns

['Name', 'age', 'Experience', 'Salary']

In [149]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["age","Experience"],outputCol="Independent Features")

In [150]:
output=featureassembler.transform(training)

In [152]:
output.show()


+---------+---+----------+------+--------------------+
|     Name|age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [153]:
output.columns

['Name', 'age', 'Experience', 'Salary', 'Independent Features']

In [154]:
finalized_data=output.select("Independent Features","Salary")

In [169]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [170]:
from pyspark.ml.regression import LinearRegression
##train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features', labelCol='Salary')
regressor=regressor.fit(train_data)

In [171]:
### Coefficients
regressor.coefficients

DenseVector([0.0, 1589.7436])

In [172]:
### Intercepts
regressor.intercept

13358.974358973992

In [173]:
### Prediction
pred_results=regressor.evaluate(test_data)

In [174]:
pred_results.predictions.show()

+--------------------+------+------------------+
|Independent Features|Salary|        prediction|
+--------------------+------+------------------+
|          [23.0,2.0]| 18000|16538.461538461514|
|          [24.0,3.0]| 20000|18128.205128205107|
+--------------------+------+------------------+



In [175]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(1666.6666666666897, 2819855.358316973)

# Linear Regression with PySpark

In [198]:
# File location and type
file_location = "/Users/vasan/tipspyspark.csv"
file_type = "csv"

# The applied options are for CSV files. For other file types, these will be ignored.
df =spark.read.csv(file_location,header=True,inferSchema=True)
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [199]:
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [200]:
df.columns

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [182]:

### Handling Categorical Features
from pyspark.ml.feature import StringIndexer

In [201]:

df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [202]:
indexer=StringIndexer(inputCol="sex",outputCol="sex_indexed")
df_r=indexer.fit(df).transform(df)
df_r.show()

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|        0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|        0.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|        1.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        0.0|
|     18.43| 3.0|  Male|    No|Sun|Dinne

In [203]:
indexer=StringIndexer(inputCols=["smoker","day","time"],outputCols=["smoker_indexed","day_indexed",
                                                                  "time_index"])
df_r=indexer.fit(df_r).transform(df_r)
df_r.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_index|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|       0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|       0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|       0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|       0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|       0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|           0.0|        1.0|       0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|       0.0|
|     26.88|3.12|  M

In [204]:

df_r.columns
     

['total_bill',
 'tip',
 'sex',
 'smoker',
 'day',
 'time',
 'size',
 'sex_indexed',
 'smoker_indexed',
 'day_indexed',
 'time_index']

In [205]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['tip','size','sex_indexed','smoker_indexed','day_indexed',
                          'time_index'],outputCol="Independent Features")
output=featureassembler.transform(df_r)

In [206]:
output.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|[1.01,2.0,1.0,0.0...|
|[1.66,3.0,0.0,0.0...|
|[3.5,3.0,0.0,0.0,...|
|[3.31,2.0,0.0,0.0...|
|[3.61,4.0,1.0,0.0...|
|[4.71,4.0,0.0,0.0...|
|[2.0,2.0,0.0,0.0,...|
|[3.12,4.0,0.0,0.0...|
|[1.96,2.0,0.0,0.0...|
|[3.23,2.0,0.0,0.0...|
|[1.71,2.0,0.0,0.0...|
|[5.0,4.0,1.0,0.0,...|
|[1.57,2.0,0.0,0.0...|
|[3.0,4.0,0.0,0.0,...|
|[3.02,2.0,1.0,0.0...|
|[3.92,2.0,0.0,0.0...|
|[1.67,3.0,1.0,0.0...|
|[3.71,3.0,0.0,0.0...|
|[3.5,3.0,1.0,0.0,...|
|(6,[0,1],[3.35,3.0])|
+--------------------+
only showing top 20 rows



In [207]:

output.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+----------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_index|Independent Features|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+----------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|       0.0|[1.01,2.0,1.0,0.0...|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|       0.0|[1.66,3.0,0.0,0.0...|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|       0.0|[3.5,3.0,0.0,0.0,...|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|       0.0|[3.31,2.0,0.0,0.0...|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|       0.0|[3.61,4.0,1.0,0.0...|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4| 

In [208]:
finalized_data=output.select("Independent Features","total_bill")

In [193]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [210]:
from pyspark.ml.regression import LinearRegression
##train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features', labelCol='total_bill')
regressor=regressor.fit(train_data)
     

In [212]:
regressor.coefficients

DenseVector([2.6152, 3.6633, -0.6616, 1.7421, -0.0792, -1.486])

In [213]:
regressor.intercept

2.4529241393132795

In [216]:
### Predictions
pred_results=regressor.evaluate(test_data)

In [217]:
## Final comparison
pred_results.predictions.show()

+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.25,2.0])|     10.07|13.048557535644939|
|(6,[0,1],[1.75,2.0])|     17.82| 14.35617147427661|
| (6,[0,1],[3.6,3.0])|     24.06|22.857642322090037|
|(6,[0,1],[7.58,4.0])|     39.42|36.929548548474386|
| (6,[0,1],[9.0,4.0])|     48.33| 40.64317213418833|
|[1.0,1.0,1.0,1.0,...|      3.07| 9.811990486537056|
|[1.5,2.0,0.0,1.0,...|     15.69|15.365245482929115|
|[1.56,2.0,0.0,0.0...|      9.94|13.780035589239283|
|[1.57,2.0,0.0,0.0...|     15.42|13.806187868011918|
|[1.61,2.0,1.0,1.0...|     10.59|15.070578766543935|
|[1.66,3.0,0.0,0.0...|     10.34|17.704857651841856|
|[1.71,2.0,0.0,0.0...|     10.27|14.172319770828786|
|[1.73,2.0,0.0,0.0...|      9.78| 12.65936457912624|
|[1.83,1.0,1.0,0.0...|     10.07| 8.596003720734892|
|[1.92,1.0,0.0,1.0...|      8.58|11.155839578898373|
|[2.0,2.0,0.0,0.0,...|      8.77|14.9307358552

In [218]:
### PErformance Metrics
pred_results.r2,pred_results.meanAbsoluteError,pred_results.meanSquaredError

(0.5944799922474994, 5.149157217970128, 49.322048740625135)