#### Pyspark Basics Intro

In [2]:
import spark
import pandas as pd

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('practice').getOrCreate()
spark

In [5]:
df_pandas = pd.read_csv('demo.csv')
df_pandas

Unnamed: 0,name,age,experience,salary
0,ben,23.0,8.0,30000.0
1,tom,55.0,20.0,1500.0
2,bill,16.0,2.0,100000.0
3,mars,39.0,15.0,5000.0
4,miles,25.0,9.0,12000.0
5,adore,42.0,18.0,25000.0
6,shiloh,,,40000.0
7,,34.0,10.0,
8,,36.0,,


In [6]:
df_pyspark = spark.read.csv('demo.csv')

In [7]:
df_pyspark.show()

+------+----+----------+------+
|   _c0| _c1|       _c2|   _c3|
+------+----+----------+------+
|  name| age|experience|salary|
|   ben|  23|         8| 30000|
|   tom|  55|        20|  1500|
|  bill|  16|         2|100000|
|  mars|  39|        15|  5000|
| miles|  25|         9| 12000|
| adore|  42|        18| 25000|
|shiloh|null|      null| 40000|
|  null|  34|        10|  null|
|  null|  36|      null|  null|
+------+----+----------+------+



In [8]:
spark.read.option('header','true').csv('demo.csv')

DataFrame[name: string, age: string, experience: string, salary: string]

In [9]:
# considers first row as header
df_pyspark = spark.read.option('header','true').csv('demo.csv')

In [10]:
df_pyspark.show()

+------+----+----------+------+
|  name| age|experience|salary|
+------+----+----------+------+
|   ben|  23|         8| 30000|
|   tom|  55|        20|  1500|
|  bill|  16|         2|100000|
|  mars|  39|        15|  5000|
| miles|  25|         9| 12000|
| adore|  42|        18| 25000|
|shiloh|null|      null| 40000|
|  null|  34|        10|  null|
|  null|  36|      null|  null|
+------+----+----------+------+



In [11]:
type(df_pandas)

pandas.core.frame.DataFrame

In [12]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [13]:
df_pyspark.head()

Row(name='ben', age='23', experience='8', salary='30000')

In [14]:
df_pyspark.head(3)

[Row(name='ben', age='23', experience='8', salary='30000'),
 Row(name='tom', age='55', experience='20', salary='1500'),
 Row(name='bill', age='16', experience='2', salary='100000')]

# Part 1 - Dataframes

In [15]:
# similarto df.info()
#everything is a string
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- experience: string (nullable = true)
 |-- salary: string (nullable = true)



In [16]:
# auto detect dtype
df_pyspark = spark.read.option('header','true').csv('demo.csv', inferSchema= True)

In [17]:
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [18]:
# alternate method
df_pyspark = spark.read.csv('demo.csv', header= True, inferSchema= True)
df_pyspark.show()

+------+----+----------+------+
|  name| age|experience|salary|
+------+----+----------+------+
|   ben|  23|         8| 30000|
|   tom|  55|        20|  1500|
|  bill|  16|         2|100000|
|  mars|  39|        15|  5000|
| miles|  25|         9| 12000|
| adore|  42|        18| 25000|
|shiloh|null|      null| 40000|
|  null|  34|        10|  null|
|  null|  36|      null|  null|
+------+----+----------+------+



In [19]:
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [20]:
df_pyspark.columns

['name', 'age', 'experience', 'salary']

In [21]:
# return type is dataframe
df_pyspark.select('name')

DataFrame[name: string]

In [22]:
type(df_pyspark.select('name'))

pyspark.sql.dataframe.DataFrame

In [23]:
df_pyspark.select('name').show()

+------+
|  name|
+------+
|   ben|
|   tom|
|  bill|
|  mars|
| miles|
| adore|
|shiloh|
|  null|
|  null|
+------+



In [24]:
df_pyspark.select(['name','experience'])

DataFrame[name: string, experience: int]

In [25]:
df_pyspark.select(['experience','name']).show()

+----------+------+
|experience|  name|
+----------+------+
|         8|   ben|
|        20|   tom|
|         2|  bill|
|        15|  mars|
|         9| miles|
|        18| adore|
|      null|shiloh|
|        10|  null|
|      null|  null|
+----------+------+



In [26]:
df_pyspark.dtypes

[('name', 'string'), ('age', 'int'), ('experience', 'int'), ('salary', 'int')]

In [27]:
df_pyspark.describe()

DataFrame[summary: string, name: string, age: string, experience: string, salary: string]

In [28]:
df_pyspark.describe().show()

+-------+-----+------------------+------------------+-----------------+
|summary| name|               age|        experience|           salary|
+-------+-----+------------------+------------------+-----------------+
|  count|    7|                 8|                 7|                7|
|   mean| null|             33.75|11.714285714285714|          30500.0|
| stddev| null|12.302729081677075|  6.29058253037257|33626.62635472075|
|    min|adore|                16|                 2|             1500|
|    max|  tom|                55|                20|           100000|
+-------+-----+------------------+------------------+-----------------+



In [29]:
# add column
df_pyspark = df_pyspark.withColumn('experience after 2 years', df_pyspark['experience']+2)
df_pyspark

DataFrame[name: string, age: int, experience: int, salary: int, experience after 2 years: int]

In [30]:
df_pyspark.show()

+------+----+----------+------+------------------------+
|  name| age|experience|salary|experience after 2 years|
+------+----+----------+------+------------------------+
|   ben|  23|         8| 30000|                      10|
|   tom|  55|        20|  1500|                      22|
|  bill|  16|         2|100000|                       4|
|  mars|  39|        15|  5000|                      17|
| miles|  25|         9| 12000|                      11|
| adore|  42|        18| 25000|                      20|
|shiloh|null|      null| 40000|                    null|
|  null|  34|        10|  null|                      12|
|  null|  36|      null|  null|                    null|
+------+----+----------+------+------------------------+



In [31]:
# drop col
df_pyspark.drop('experience after 2 years')

DataFrame[name: string, age: int, experience: int, salary: int]

In [32]:
df_pyspark.show()

+------+----+----------+------+------------------------+
|  name| age|experience|salary|experience after 2 years|
+------+----+----------+------+------------------------+
|   ben|  23|         8| 30000|                      10|
|   tom|  55|        20|  1500|                      22|
|  bill|  16|         2|100000|                       4|
|  mars|  39|        15|  5000|                      17|
| miles|  25|         9| 12000|                      11|
| adore|  42|        18| 25000|                      20|
|shiloh|null|      null| 40000|                    null|
|  null|  34|        10|  null|                      12|
|  null|  36|      null|  null|                    null|
+------+----+----------+------+------------------------+



In [33]:
# drop column
df_pyspark = df_pyspark.drop('experience after 2 years')
df_pyspark.show()

+------+----+----------+------+
|  name| age|experience|salary|
+------+----+----------+------+
|   ben|  23|         8| 30000|
|   tom|  55|        20|  1500|
|  bill|  16|         2|100000|
|  mars|  39|        15|  5000|
| miles|  25|         9| 12000|
| adore|  42|        18| 25000|
|shiloh|null|      null| 40000|
|  null|  34|        10|  null|
|  null|  36|      null|  null|
+------+----+----------+------+



In [34]:
# rename col
df_pyspark = df_pyspark.withColumnRenamed('name','first name')
df_pyspark.show()

+----------+----+----------+------+
|first name| age|experience|salary|
+----------+----+----------+------+
|       ben|  23|         8| 30000|
|       tom|  55|        20|  1500|
|      bill|  16|         2|100000|
|      mars|  39|        15|  5000|
|     miles|  25|         9| 12000|
|     adore|  42|        18| 25000|
|    shiloh|null|      null| 40000|
|      null|  34|        10|  null|
|      null|  36|      null|  null|
+----------+----+----------+------+



## Handling missing values

#### Drop Missing Values

In [35]:
# drop any row having a null
df_pyspark.na.drop().show()

+----------+---+----------+------+
|first name|age|experience|salary|
+----------+---+----------+------+
|       ben| 23|         8| 30000|
|       tom| 55|        20|  1500|
|      bill| 16|         2|100000|
|      mars| 39|        15|  5000|
|     miles| 25|         9| 12000|
|     adore| 42|        18| 25000|
+----------+---+----------+------+



In [36]:
# use to drop all rows that are null in every column
#df_pyspark.na.drop(how='all').show()

In [37]:
# drop any row that has a null in it
df_pyspark.na.drop(how='any').show()

+----------+---+----------+------+
|first name|age|experience|salary|
+----------+---+----------+------+
|       ben| 23|         8| 30000|
|       tom| 55|        20|  1500|
|      bill| 16|         2|100000|
|      mars| 39|        15|  5000|
|     miles| 25|         9| 12000|
|     adore| 42|        18| 25000|
+----------+---+----------+------+



In [38]:
df_pyspark.show()

+----------+----+----------+------+
|first name| age|experience|salary|
+----------+----+----------+------+
|       ben|  23|         8| 30000|
|       tom|  55|        20|  1500|
|      bill|  16|         2|100000|
|      mars|  39|        15|  5000|
|     miles|  25|         9| 12000|
|     adore|  42|        18| 25000|
|    shiloh|null|      null| 40000|
|      null|  34|        10|  null|
|      null|  36|      null|  null|
+----------+----+----------+------+



In [39]:
# delete rows where Non null values are < thresholg
# we want to keep rows with atleast 2 non null values
df_pyspark.na.drop(how='any', thresh=2).show()


+----------+----+----------+------+
|first name| age|experience|salary|
+----------+----+----------+------+
|       ben|  23|         8| 30000|
|       tom|  55|        20|  1500|
|      bill|  16|         2|100000|
|      mars|  39|        15|  5000|
|     miles|  25|         9| 12000|
|     adore|  42|        18| 25000|
|    shiloh|null|      null| 40000|
|      null|  34|        10|  null|
+----------+----+----------+------+



In [40]:
df_pyspark.na.drop(how='any', thresh=1).show()


+----------+----+----------+------+
|first name| age|experience|salary|
+----------+----+----------+------+
|       ben|  23|         8| 30000|
|       tom|  55|        20|  1500|
|      bill|  16|         2|100000|
|      mars|  39|        15|  5000|
|     miles|  25|         9| 12000|
|     adore|  42|        18| 25000|
|    shiloh|null|      null| 40000|
|      null|  34|        10|  null|
|      null|  36|      null|  null|
+----------+----+----------+------+



In [41]:
# drop null values from a specific column
df_pyspark.na.drop(how='any', subset= ['experience']).show()

+----------+---+----------+------+
|first name|age|experience|salary|
+----------+---+----------+------+
|       ben| 23|         8| 30000|
|       tom| 55|        20|  1500|
|      bill| 16|         2|100000|
|      mars| 39|        15|  5000|
|     miles| 25|         9| 12000|
|     adore| 42|        18| 25000|
|      null| 34|        10|  null|
+----------+---+----------+------+



#### Fillin missing values

<H8> FILL

In [42]:
# replace all null values where col is string
# only use for strings
df_pyspark.na.fill('n/a').show()

+----------+----+----------+------+
|first name| age|experience|salary|
+----------+----+----------+------+
|       ben|  23|         8| 30000|
|       tom|  55|        20|  1500|
|      bill|  16|         2|100000|
|      mars|  39|        15|  5000|
|     miles|  25|         9| 12000|
|     adore|  42|        18| 25000|
|    shiloh|null|      null| 40000|
|       n/a|  34|        10|  null|
|       n/a|  36|      null|  null|
+----------+----+----------+------+



In [43]:
# fill single col
df_pyspark.na.fill('22', 'experience').show()

+----------+----+----------+------+
|first name| age|experience|salary|
+----------+----+----------+------+
|       ben|  23|         8| 30000|
|       tom|  55|        20|  1500|
|      bill|  16|         2|100000|
|      mars|  39|        15|  5000|
|     miles|  25|         9| 12000|
|     adore|  42|        18| 25000|
|    shiloh|null|      null| 40000|
|      null|  34|        10|  null|
|      null|  36|      null|  null|
+----------+----+----------+------+



<H8> IMPUTER

In [44]:
from pyspark.ml.feature import Imputer

In [45]:
# strategy = mean, median, mode
imputer = Imputer(
    inputCols=['age','experience','salary'],
    outputCols = ['{}_imputed'.format(c) for c in ['age','experience','salary']]
    ).setStrategy('mean')

In [46]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+----------+----+----------+------+-----------+------------------+--------------+
|first name| age|experience|salary|age_imputed|experience_imputed|salary_imputed|
+----------+----+----------+------+-----------+------------------+--------------+
|       ben|  23|         8| 30000|         23|                 8|         30000|
|       tom|  55|        20|  1500|         55|                20|          1500|
|      bill|  16|         2|100000|         16|                 2|        100000|
|      mars|  39|        15|  5000|         39|                15|          5000|
|     miles|  25|         9| 12000|         25|                 9|         12000|
|     adore|  42|        18| 25000|         42|                18|         25000|
|    shiloh|null|      null| 40000|         33|                11|         40000|
|      null|  34|        10|  null|         34|                10|         30500|
|      null|  36|      null|  null|         36|                11|         30500|
+----------+----

## Filter operations

In [47]:
df_pyspark = spark.read.csv('demo2.csv',header=True, inferSchema= True)
df_pyspark.show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
|   ben| 23|         8| 30000|
|   tom| 55|        20|  1500|
|  bill| 16|         2|100000|
|  mars| 39|        15|  5000|
| miles| 25|         9| 12000|
| adore| 42|        18| 25000|
|shiloh| 34|        10| 40000|
+------+---+----------+------+



In [48]:
df_pyspark.filter('salary>=15000').show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
|   ben| 23|         8| 30000|
|  bill| 16|         2|100000|
| adore| 42|        18| 25000|
|shiloh| 34|        10| 40000|
+------+---+----------+------+



In [49]:
df_pyspark.filter('salary>=15000').select(['name','age']).show()

+------+---+
|  name|age|
+------+---+
|   ben| 23|
|  bill| 16|
| adore| 42|
|shiloh| 34|
+------+---+



In [50]:
df_pyspark.filter(df_pyspark['salary'] >= 15000).show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
|   ben| 23|         8| 30000|
|  bill| 16|         2|100000|
| adore| 42|        18| 25000|
|shiloh| 34|        10| 40000|
+------+---+----------+------+



In [51]:
# multiple conditions
df_pyspark.filter((df_pyspark['salary'] >= 15000) & 
                  (df_pyspark['salary'] <= 27000)).show()



+-----+---+----------+------+
| name|age|experience|salary|
+-----+---+----------+------+
|adore| 42|        18| 25000|
+-----+---+----------+------+



In [52]:
df_pyspark.filter((df_pyspark['salary'] >= 15000) | (df_pyspark['salary'] >= 15000)).show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
|   ben| 23|         8| 30000|
|  bill| 16|         2|100000|
| adore| 42|        18| 25000|
|shiloh| 34|        10| 40000|
+------+---+----------+------+



In [53]:
# not (~)
df_pyspark.filter(~(df_pyspark['salary'] >= 15000)).show()

+-----+---+----------+------+
| name|age|experience|salary|
+-----+---+----------+------+
|  tom| 55|        20|  1500|
| mars| 39|        15|  5000|
|miles| 25|         9| 12000|
+-----+---+----------+------+



## Groupby & Aggregate functions

In [54]:
df_pyspark = spark.read.csv('demo3.csv', header= True, inferSchema= True)
df_pyspark.show()

+------+--------------+------+
|  name|    department|salary|
+------+--------------+------+
| miles|  data science| 30000|
|shiloh|cloud engineer| 85000|
| adore|  data science|100000|
|shiloh|           iot|  5000|
| miles|           iot| 12000|
| adore|  data science| 25000|
|shiloh|cloud engineer| 40000|
+------+--------------+------+



In [55]:
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)



In [56]:
#grou
df_pyspark.groupby(df_pyspark['name']).sum()    

DataFrame[name: string, sum(salary): bigint]

In [57]:
df_pyspark.groupby(df_pyspark['name']).sum().show()

+------+-----------+
|  name|sum(salary)|
+------+-----------+
| adore|     125000|
|shiloh|     130000|
| miles|      42000|
+------+-----------+



In [58]:
df_pyspark.groupBy(df_pyspark['name']).max().show()

+------+-----------+
|  name|max(salary)|
+------+-----------+
| adore|     100000|
|shiloh|      85000|
| miles|      30000|
+------+-----------+



In [59]:
df_pyspark.groupBy(df_pyspark['department']).avg().show()

+--------------+------------------+
|    department|       avg(salary)|
+--------------+------------------+
|  data science|51666.666666666664|
|           iot|            8500.0|
|cloud engineer|           62500.0|
+--------------+------------------+



In [60]:
df_pyspark.groupBy(df_pyspark['department']).count().show()

+--------------+-----+
|    department|count|
+--------------+-----+
|  data science|    3|
|           iot|    2|
|cloud engineer|    2|
+--------------+-----+



In [61]:
df_pyspark.agg({'salary' : 'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|     297000|
+-----------+



## Mlib

In [63]:
# predict salary using age & experience
df_pyspark = spark.read.csv('demo4.csv', header= True, inferSchema= True)
df_pyspark.show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
|   ben| 31|        10| 30000|
|   tom| 30|         8| 25000|
|  bill| 29|         4| 20000|
|  mars| 24|         3| 20000|
| adore| 21|         1| 15000|
|shiloh| 23|         2| 18000|
+------+---+----------+------+



In [65]:
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [66]:
df_pyspark.columns

['name', 'age', 'experience', 'salary']

In [None]:
#independent features
# age, experience


In [67]:
# group independent features into 1 col
from pyspark.ml.feature import VectorAssembler

In [68]:
featureassembler = VectorAssembler(inputCols=['age','experience'], outputCol='independent features')

In [69]:
output = featureassembler.transform(df_pyspark)

In [70]:
output.show()

+------+---+----------+------+--------------------+
|  name|age|experience|salary|independent features|
+------+---+----------+------+--------------------+
|   ben| 31|        10| 30000|         [31.0,10.0]|
|   tom| 30|         8| 25000|          [30.0,8.0]|
|  bill| 29|         4| 20000|          [29.0,4.0]|
|  mars| 24|         3| 20000|          [24.0,3.0]|
| adore| 21|         1| 15000|          [21.0,1.0]|
|shiloh| 23|         2| 18000|          [23.0,2.0]|
+------+---+----------+------+--------------------+



In [71]:
finalised_data = output.select('independent features','salary')
finalised_data.show()

+--------------------+------+
|independent features|salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



#### Split Dataset

In [75]:
#train test split
train_data, test_data = finalised_data.randomSplit([0.75,0.25])

#### Model

In [None]:
from pyspark.ml.regression import LinearRegression

In [76]:
regressor = LinearRegression(featuresCol= 'independent features', labelCol= 'salary')
regressor = regressor.fit(train_data)

In [78]:
regressor.coefficients

DenseVector([-714.2857, 3485.7143])

In [79]:
regressor.intercept

26857.142857139796

#### Predict

In [80]:
pred_results = regressor.evaluate(test_data)

In [81]:
pred_results.predictions.show()

+--------------------+------+-----------------+
|independent features|salary|       prediction|
+--------------------+------+-----------------+
|          [30.0,8.0]| 25000|33314.28571428435|
|         [31.0,10.0]| 30000|39571.42857142653|
+--------------------+------+-----------------+



In [82]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(8942.857142855439, 80369795.91833645)