In [None]:
#Modeling with pyspark

In [None]:
import pyspark
from pyspark import SparkContext
from datetime import datetime

In [9]:
dirty_data = spark.createDataFrame([
          (1,'Porsche','Boxster S','Turbo',2.5,4,22,None)
        , (2,'Aston Martin','Vanquish','Aspirated',6.0,12,16,None)
        , (3,'Porsche','911 Carrera 4S Cabriolet','Turbo',3.0,6,24,None)
        , (3,'General Motors','SPARK ACTIV','Aspirated',1.4,None,32,None)
        , (5,'BMW','COOPER S HARDTOP 2 DOOR','Turbo',2.0,4,26,None)
        , (6,'BMW','330i','Turbo',2.0,None,27,None)
        , (7,'BMW','440i Coupe','Turbo',3.0,6,23,None)
        , (8,'BMW','440i Coupe','Turbo',3.0,6,23,None)
        , (9,'Mercedes-Benz',None,None,None,None,27,None)
        , (10,'Mercedes-Benz','CLS 550','Turbo',4.7,8,21,79231)
        , (11,'Volkswagen','GTI','Turbo',2.0,4,None,None)
        , (12,'Ford Motor Company','FUSION AWD','Turbo',2.7,6,20,None)
        , (13,'Nissan','Q50 AWD RED SPORT','Turbo',3.0,6,22,None)
        , (14,'Nissan','Q70 AWD','Aspirated',5.6,8,18,None)
        , (15,'Kia','Stinger RWD','Turbo',2.0,4,25,None)
        , (16,'Toyota','CAMRY HYBRID LE','Aspirated',2.5,4,46,None)
        , (16,'Toyota','CAMRY HYBRID LE','Aspirated',2.5,4,46,None)
        , (18,'FCA US LLC','300','Aspirated',3.6,6,23,None)
        , (19,'Hyundai','G80 AWD','Turbo',3.3,6,20,None)
        , (20,'Hyundai','G80 AWD','Turbo',3.3,6,20,None)
        , (21,'BMW','X5 M','Turbo',4.4,8,18,121231)
        , (22,'GE','K1500 SUBURBAN 4WD','Aspirated',5.3,8,18,None)
    ], ['Id','Manufacturer','Model','EngineType','Displacement',
        'Cylinders','FuelEconomy','MSRP'])

In [11]:
#Checking for duplicates
print("Total records:", dirty_data.count())
print("Complete duplicates", dirty_data.distinct().count())
print("duplicate records are ") 
(dirty_data
 .groupBy(dirty_data.columns)
 .count()
 .filter('count>1')
 .show())
full_removed = dirty_data.drop_duplicates()
print("Updated count after removing duplicates", full_removed.count())


Total records: 22
Complete duplicates 21
duplicate records are 
+---+------------+---------------+----------+------------+---------+-----------+----+-----+
| Id|Manufacturer|          Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|count|
+---+------------+---------------+----------+------------+---------+-----------+----+-----+
| 16|      Toyota|CAMRY HYBRID LE| Aspirated|         2.5|        4|         46|null|    2|
+---+------------+---------------+----------+------------+---------+-----------+----+-----+

Updated count after removing duplicates 21


In [15]:
# Checking if there are duplicates where id's are different
(full_removed
.groupBy([col for col in full_removed.columns if col != "Id"])
.count()
.filter('count>1')
 .show()
)

no_ids = full_removed.select([col for col in full_removed.columns if col != "Id"])
no_ids.count(), no_ids.distinct().count()
id_removed = full_removed.drop_duplicates(
    subset=[col for col in full_removed.columns if col != "Id"])
print("After dropping duplicates w.r.t id feild, new count is", id_removed.count())

+------------+----------+----------+------------+---------+-----------+----+-----+
|Manufacturer|     Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|count|
+------------+----------+----------+------------+---------+-----------+----+-----+
|         BMW|440i Coupe|     Turbo|         3.0|        6|         23|null|    2|
|     Hyundai|   G80 AWD|     Turbo|         3.3|        6|         20|null|    2|
+------------+----------+----------+------------+---------+-----------+----+-----+

After dropping duplicates w.r.t id feild, new count is 19


In [17]:
import pyspark.sql.functions as fn
id_removed.agg(
fn.count("Id").alias("Count of IDS"),
fn.countDistinct("Id").alias("Count of Distinct IDs")).show()

+------------+---------------------+
|Count of IDS|Count of Distinct IDs|
+------------+---------------------+
|          19|                   18|
+------------+---------------------+



In [18]:
#finding duplicate id
(id_removed.groupBy("Id").count().filter('count>1').show())

+---+-----+
| Id|count|
+---+-----+
|  3|    2|
+---+-----+



In [21]:
id_removed.filter(id_removed.Id==3).show()

+---+--------------+--------------------+----------+------------+---------+-----------+----+
| Id|  Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+--------------+--------------------+----------+------------+---------+-----------+----+
|  3|General Motors|         SPARK ACTIV| Aspirated|         1.4|     null|         32|null|
|  3|       Porsche|911 Carrera 4S Ca...|     Turbo|         3.0|        6|         24|null|
+---+--------------+--------------------+----------+------------+---------+-----------+----+



In [26]:
#creating new ids
new_id = (
    id_removed
    .select(
        [fn.monotonically_increasing_id().alias('Id')] + 
        [col for col in id_removed.columns if col != 'Id'])
)
new_id.show()

+-------------+------------------+--------------------+----------+------------+---------+-----------+------+
|           Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|  MSRP|
+-------------+------------------+--------------------+----------+------------+---------+-----------+------+
|   8589934592|    General Motors|         SPARK ACTIV| Aspirated|         1.4|     null|         32|  null|
| 188978561024|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|         21| 79231|
| 197568495616|     Mercedes-Benz|                null|      null|        null|     null|         27|  null|
| 206158430208|Ford Motor Company|          FUSION AWD|     Turbo|         2.7|        6|         20|  null|
| 438086664192|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|         26|  null|
| 523986010112|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|         16|  null|
| 721554505728|    

In [31]:
# Finding missing observations
(
    spark.createDataFrame(
        new_id
        .rdd
        .map(
           lambda row: (
                 row['Id']
               , sum([c == None for c in row])
           )
        )
        .filter(lambda el: el[1] > 1)
        ,['Id', 'CountMissing']
    )
    .orderBy('CountMissing', ascending=False)
    .show()
)


merc_out = new_id.dropna(thresh=4)
for k, v in sorted(
    merc_out
        .agg(*[
               (1 - (fn.count(c) / fn.count('*')))
                    .alias(c + '_miss')
               for c in merc_out.columns
           ])
        .collect()[0]
        .asDict()
        .items()
    , key=lambda el: el[1]
    , reverse=True
):
    print(k, v)

+------------+------------+
|          Id|CountMissing|
+------------+------------+
|197568495616|           5|
|  8589934592|           2|
|721554505728|           2|
|919123001344|           2|
+------------+------------+

MSRP_miss 0.8888888888888888
Cylinders_miss 0.11111111111111116
FuelEconomy_miss 0.05555555555555558
Id_miss 0.0
Manufacturer_miss 0.0
Model_miss 0.0
EngineType_miss 0.0
Displacement_miss 0.0


In [32]:
# as column msrp is missing most of the values, we're removing it 
no_MSRP = merc_out.select([col for col in new_id.columns if col != 'MSRP'])
no_MSRP.show()

+-------------+------------------+--------------------+----------+------------+---------+-----------+
|           Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|
+-------------+------------------+--------------------+----------+------------+---------+-----------+
|   8589934592|    General Motors|         SPARK ACTIV| Aspirated|         1.4|     null|         32|
| 188978561024|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|         21|
| 206158430208|Ford Motor Company|          FUSION AWD|     Turbo|         2.7|        6|         20|
| 438086664192|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|         26|
| 523986010112|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|         16|
| 721554505728|        Volkswagen|                 GTI|     Turbo|         2.0|        4|       null|
| 764504178688|               Kia|         Stinger RWD|     Turbo|         2.0|   

In [33]:
multipliers = (
    no_MSRP
    .agg(
          fn.mean(
              fn.col('FuelEconomy') / 
              (
                  fn.col('Displacement') * fn.col('Cylinders')
              )
          ).alias('FuelEconomy')
        , fn.mean(
            fn.col('Cylinders') / 
            fn.col('Displacement')
        ).alias('Cylinders')
    )
).toPandas().to_dict('records')[0]


imputed = (
    no_MSRP
    .withColumn('FuelEconomy', fn.col('FuelEconomy') / fn.col('Displacement') / fn.col('Cylinders'))
    .withColumn('Cylinders', fn.col('Cylinders') / fn.col('Displacement'))
    .fillna(multipliers)
    .withColumn('Cylinders', (fn.col('Cylinders') * fn.col('Displacement')).cast('integer'))
    .withColumn('FuelEconomy', fn.col('FuelEconomy') * fn.col('Displacement') * fn.col('Cylinders'))
)

imputed.show()

+-------------+------------------+--------------------+----------+------------+---------+------------------+
|           Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|       FuelEconomy|
+-------------+------------------+--------------------+----------+------------+---------+------------------+
|   8589934592|    General Motors|         SPARK ACTIV| Aspirated|         1.4|        2| 4.188095813540793|
| 188978561024|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|              21.0|
| 206158430208|Ford Motor Company|          FUSION AWD|     Turbo|         2.7|        5|16.666666666666668|
| 438086664192|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|              26.0|
| 523986010112|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|              16.0|
| 721554505728|        Volkswagen|                 GTI|     Turbo|         2.0|        4|11.965988038687978|
| 764504178688|    