#Data Wrangling With PySpark

#### Reading Data

In this Notebook I will show you how to do a Data Wrangling with PySpark. The database used is about a set of cars, their characteristics and sale price.

Database File Link: https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-DA0101EN-SkillsNetwork/labs/Data%20files/auto.csv

In [0]:
filename = "dbfs:/FileStore/shared_uploads/jonathan.sales@aluno.faculdadeimpacta.com.br/auto-6.csv"
df = spark.read.format('csv')\
     .option('header', 'false')\
     .load(filename)

In [0]:
display(df.first())

Row(_c0='3', _c1='?', _c2='alfa-romero', _c3='gas', _c4='std', _c5='two', _c6='convertible', _c7='rwd', _c8='front', _c9='88.6', _c10='168.8', _c11='64.1', _c12='48.8', _c13='2548', _c14='dohc', _c15='four', _c16='130', _c17='mpfi', _c18='3.47', _c19='2.68', _c20='9.0', _c21='111', _c22='5000', _c23='21', _c24='27', _c25='13495')

#### Renaming Columns

In [0]:
from pyspark.sql.functions import *
df = df.withColumnRenamed('_c0',"symboling")\
.withColumnRenamed('_c1',"normalized_losses")\
.withColumnRenamed('_c2',"make")\
.withColumnRenamed('_c3',"fuel_type")\
.withColumnRenamed('_c4',"aspiration")\
.withColumnRenamed('_c5',"num_of_doors")\
.withColumnRenamed('_c6',"body_style")\
.withColumnRenamed('_c7',"drive_wheels")\
.withColumnRenamed('_c8',"engine_location")\
.withColumnRenamed('_c9',"wheel_base")\
.withColumnRenamed('_c10',"length")\
.withColumnRenamed('_c11',"width")\
.withColumnRenamed('_c12',"height")\
.withColumnRenamed('_c13',"curb_weight")\
.withColumnRenamed('_c14',"engine_type")\
.withColumnRenamed('_c15',"num_of_cylinders")\
.withColumnRenamed('_c16',"engine_size")\
.withColumnRenamed('_c17',"fuel_system")\
.withColumnRenamed('_c18',"bore")\
.withColumnRenamed('_c19',"stroke")\
.withColumnRenamed('_c20',"compression_ratio")\
.withColumnRenamed('_c21',"horsepower")\
.withColumnRenamed('_c22',"peak_rpm")\
.withColumnRenamed('_c23',"city_mpg")\
.withColumnRenamed('_c24',"highway_mpg")\
.withColumnRenamed('_c25','price')

#### Replace ? Values To Null

The database have '?' values, we need replace this values to Null Values.

In [0]:
df = df.withColumn('symboling', when(df.symboling == '?', None).otherwise(df.symboling))\
.withColumn('normalized_losses', when(df.normalized_losses == '?', None).otherwise(df.normalized_losses))\
.withColumn('make', when(df.make == '?', None).otherwise(df.make))\
.withColumn('fuel_type', when(df.fuel_type == '?', None).otherwise(df.fuel_type))\
.withColumn('aspiration', when(df.aspiration == '?', None).otherwise(df.aspiration))\
.withColumn('num_of_doors', when(df.num_of_doors == '?', None).otherwise(df.num_of_doors))\
.withColumn('body_style', when(df.body_style == '?', None).otherwise(df.body_style))\
.withColumn('drive_wheels', when(df.drive_wheels == '?', None).otherwise(df.drive_wheels))\
.withColumn('engine_location', when(df.engine_location == '?', None).otherwise(df.engine_location))\
.withColumn('wheel_base', when(df.wheel_base == '?', None).otherwise(df.wheel_base))\
.withColumn('length', when(df.length == '?', None).otherwise(df.length))\
.withColumn('width', when(df.width == '?', None).otherwise(df.width))\
.withColumn('height', when(df.height == '?', None).otherwise(df.height))\
.withColumn('curb_weight', when(df.curb_weight == '?', None).otherwise(df.curb_weight))\
.withColumn('engine_type', when(df.engine_type == '?', None).otherwise(df.engine_type))\
.withColumn('num_of_cylinders', when(df.num_of_cylinders == '?', None).otherwise(df.num_of_cylinders))\
.withColumn('engine_size', when(df.engine_size == '?', None).otherwise(df.engine_size))\
.withColumn('fuel_system', when(df.fuel_system == '?', None).otherwise(df.fuel_system))\
.withColumn('bore', when(df.bore == '?', None).otherwise(df.bore))\
.withColumn('stroke', when(df.stroke == '?', None).otherwise(df.stroke))\
.withColumn('compression_ratio', when(df.compression_ratio == '?', None).otherwise(df.compression_ratio))\
.withColumn('horsepower', when(df.horsepower == '?', None).otherwise(df.horsepower))\
.withColumn('peak_rpm', when(df.peak_rpm == '?', None).otherwise(df.peak_rpm))\
.withColumn('city_mpg', when(df.city_mpg == '?', None).otherwise(df.city_mpg))\
.withColumn('highway_mpg', when(df.highway_mpg == '?', None).otherwise(df.highway_mpg))\
.withColumn('price', when(df.price == '?', None).otherwise(df.price))

In [0]:
display(df.first())

Row(symboling='3', normalized_losses=None, make='alfa-romero', fuel_type='gas', aspiration='std', num_of_doors='two', body_style='convertible', drive_wheels='rwd', engine_location='front', wheel_base='88.6', length='168.8', width='64.1', height='48.8', curb_weight='2548', engine_type='dohc', num_of_cylinders='four', engine_size='130', fuel_system='mpfi', bore='3.47', stroke='2.68', compression_ratio='9.0', horsepower='111', peak_rpm='5000', city_mpg='21', highway_mpg='27', price='13495')

#### Transforming Null Data

We have several ways to transform null data, I will perform the imputation by calculating the average of numeric variables and the most frequent value of categorical variables

In [0]:
# Calculate AVG
df.select(expr('AVG(normalized_losses) AS AVG_Normalized')).show()
# Replace na Values to AVG
df2 = df.na.fill('122',subset=['normalized_losses'])

+--------------+
|AVG_Normalized|
+--------------+
|         122.0|
+--------------+



In [0]:
# Calculate AVG
df.select(expr('AVG(bore) AS AVG_bore')).show()
# Replace na Values to AVG
df3 = df2.na.fill('3.32',subset=['bore'])

+------------------+
|          AVG_bore|
+------------------+
|3.3297512437810957|
+------------------+



In [0]:
# Calculate AVG
df.select(expr('AVG(stroke) AS AVG_stroke')).show()
# Replace na Values to AVG
df4 = df3.na.fill('3.25',subset=['stroke'])

+------------------+
|        AVG_stroke|
+------------------+
|3.2554228855721337|
+------------------+



In [0]:
# Calculate AVG
df.select(expr('AVG(horsepower) AS AVG_horsepower')).show()
# Replace na Values to AVG
df5 = df4.na.fill('104.25',subset=['horsepower'])

+------------------+
|    AVG_horsepower|
+------------------+
|104.25615763546799|
+------------------+



In [0]:
# Calculate AVG
df.select(expr('AVG(peak_rpm) AS AVG_peak_rpm')).show()
# Replace na Values to AVG
df6 = df5.na.fill('5125.36',subset=['peak_rpm'])

+-----------------+
|     AVG_peak_rpm|
+-----------------+
|5125.369458128079|
+-----------------+



In [0]:
# Calculate AVG
df.select(expr('COUNT(DISTINCT(num_of_doors)) AS AVG_num_of_doors')).show()
# Replace na Values to AVG
df7 = df6.na.fill('5125.36',subset=['num_of_doors'])

+----------------+
|AVG_num_of_doors|
+----------------+
|               2|
+----------------+



In [0]:
#Calculate Frequent Value
df.groupBy('num_of_doors').count().show()
df8 = df7.na.fill('four',subset=['num_of_doors'])

+------------+-----+
|num_of_doors|count|
+------------+-----+
|         two|   89|
|        null|    2|
|        four|  114|
+------------+-----+



In [0]:
# Droping Na Values 

df9 = df8.na.drop('all',subset=['price'])

#### Defining Schema

In [0]:
dataset = df9

In [0]:
dataset.printSchema()

root
 |-- symboling: string (nullable = true)
 |-- normalized_losses: string (nullable = false)
 |-- make: string (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- aspiration: string (nullable = true)
 |-- num_of_doors: string (nullable = false)
 |-- body_style: string (nullable = true)
 |-- drive_wheels: string (nullable = true)
 |-- engine_location: string (nullable = true)
 |-- wheel_base: string (nullable = true)
 |-- length: string (nullable = true)
 |-- width: string (nullable = true)
 |-- height: string (nullable = true)
 |-- curb_weight: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- num_of_cylinders: string (nullable = true)
 |-- engine_size: string (nullable = true)
 |-- fuel_system: string (nullable = true)
 |-- bore: string (nullable = false)
 |-- stroke: string (nullable = false)
 |-- compression_ratio: string (nullable = true)
 |-- horsepower: string (nullable = false)
 |-- peak_rpm: string (nullable = false)
 |-- city_mpg: string (

In [0]:
dataset = dataset.withColumn("symboling",col('symboling').cast('int'))
dataset = dataset.withColumn("normalized_losses",col('normalized_losses').cast('int'))
dataset = dataset.withColumn("wheel_base",col('wheel_base').cast('float'))
dataset = dataset.withColumn("length",col('length').cast('float'))
dataset = dataset.withColumn("width",col('width').cast('float'))
dataset = dataset.withColumn("height",col('height').cast('float'))
dataset = dataset.withColumn("curb_weight",col('curb_weight').cast('int'))
dataset = dataset.withColumn("engine_size",col('engine_size').cast('int'))
dataset = dataset.withColumn("bore",col('bore').cast('float'))
dataset = dataset.withColumn("stroke",col('stroke').cast('float'))
dataset = dataset.withColumn("compression_ratio",col('compression_ratio').cast('float'))
dataset = dataset.withColumn("horsepower",col('horsepower').cast('int'))
dataset = dataset.withColumn("peak_rpm",col('peak_rpm').cast('int'))
dataset = dataset.withColumn("city_mpg",col('city_mpg').cast('int'))
dataset = dataset.withColumn("highway_mpg",col('highway_mpg').cast('int'))
dataset = dataset.withColumn("price",col('price').cast('float'))

In [0]:
dataset.printSchema()

root
 |-- symboling: integer (nullable = true)
 |-- normalized_losses: integer (nullable = true)
 |-- make: string (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- aspiration: string (nullable = true)
 |-- num_of_doors: string (nullable = false)
 |-- body_style: string (nullable = true)
 |-- drive_wheels: string (nullable = true)
 |-- engine_location: string (nullable = true)
 |-- wheel_base: float (nullable = true)
 |-- length: float (nullable = true)
 |-- width: float (nullable = true)
 |-- height: float (nullable = true)
 |-- curb_weight: integer (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- num_of_cylinders: string (nullable = true)
 |-- engine_size: integer (nullable = true)
 |-- fuel_system: string (nullable = true)
 |-- bore: float (nullable = true)
 |-- stroke: float (nullable = true)
 |-- compression_ratio: float (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- peak_rpm: integer (nullable = true)
 |-- city_mpg: integer (nulla

#### Transforming MPG To L/100

In [0]:
dataset = dataset.withColumn('city_L_100km',
        expr("ROUND(235/city_mpg,2) AS city_L_100km")
)
dataset = dataset.drop('city_mpg')

#### Final Dataset

In [0]:
dataset.display()

symboling,normalized_losses,make,fuel_type,aspiration,num_of_doors,body_style,drive_wheels,engine_location,wheel_base,length,width,height,curb_weight,engine_type,num_of_cylinders,engine_size,fuel_system,bore,stroke,compression_ratio,horsepower,peak_rpm,highway_mpg,price,city_L_100km
3,122,alfa-romero,gas,std,two,convertible,rwd,front,88.6,168.8,64.1,48.8,2548,dohc,four,130,mpfi,3.47,2.68,9.0,111,5000,27,13495.0,11.19
3,122,alfa-romero,gas,std,two,convertible,rwd,front,88.6,168.8,64.1,48.8,2548,dohc,four,130,mpfi,3.47,2.68,9.0,111,5000,27,16500.0,11.19
1,122,alfa-romero,gas,std,two,hatchback,rwd,front,94.5,171.2,65.5,52.4,2823,ohcv,six,152,mpfi,2.68,3.47,9.0,154,5000,26,16500.0,12.37
2,164,audi,gas,std,four,sedan,fwd,front,99.8,176.6,66.2,54.3,2337,ohc,four,109,mpfi,3.19,3.4,10.0,102,5500,30,13950.0,9.79
2,164,audi,gas,std,four,sedan,4wd,front,99.4,176.6,66.4,54.3,2824,ohc,five,136,mpfi,3.19,3.4,8.0,115,5500,22,17450.0,13.06
2,122,audi,gas,std,two,sedan,fwd,front,99.8,177.3,66.3,53.1,2507,ohc,five,136,mpfi,3.19,3.4,8.5,110,5500,25,15250.0,12.37
1,158,audi,gas,std,four,sedan,fwd,front,105.8,192.7,71.4,55.7,2844,ohc,five,136,mpfi,3.19,3.4,8.5,110,5500,25,17710.0,12.37
1,122,audi,gas,std,four,wagon,fwd,front,105.8,192.7,71.4,55.7,2954,ohc,five,136,mpfi,3.19,3.4,8.5,110,5500,25,18920.0,12.37
1,158,audi,gas,turbo,four,sedan,fwd,front,105.8,192.7,71.4,55.9,3086,ohc,five,131,mpfi,3.13,3.4,8.3,140,5500,20,23875.0,13.82
2,192,bmw,gas,std,two,sedan,rwd,front,101.2,176.8,64.8,54.3,2395,ohc,four,108,mpfi,3.5,2.8,8.8,101,5800,29,16430.0,10.22
