# Installation

In [1]:
!pip install pyspark

# Import SparkSession
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Check Spark Session Information
spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m13.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=87007e31e5328a2c6b269cf68acd437e03319ef62010b591c9579d8d0fd4b749
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

# Some data

In [2]:
dirty_data = spark.createDataFrame([
          (1,'Porsche','Boxster S','Turbo',2.5,4,22,None)
        , (2,'Aston Martin','Vanquish','Aspirated',6.0,12,16,None)
        , (3,'Porsche','911 Carrera 4S Cabriolet','Turbo',3.0,6,24,None)
        , (3,'General Motors','SPARK ACTIV','Aspirated',1.4,None,32,None)
        , (5,'BMW','COOPER S HARDTOP 2 DOOR','Turbo',2.0,4,26,None)
        , (6,'BMW','330i','Turbo',2.0,None,27,None)
        , (7,'BMW','440i Coupe','Turbo',3.0,6,23,None)
        , (8,'BMW','440i Coupe','Turbo',3.0,6,23,None)
        , (9,'Mercedes-Benz',None,None,None,None,27,None)
        , (10,'Mercedes-Benz','CLS 550','Turbo',4.7,8,21,79231)
        , (11,'Volkswagen','GTI','Turbo',2.0,4,None,None)
        , (12,'Ford Motor Company','FUSION AWD','Turbo',2.7,6,20,None)
        , (13,'Nissan','Q50 AWD RED SPORT','Turbo',3.0,6,22,None)
        , (14,'Nissan','Q70 AWD','Aspirated',5.6,8,18,None)
        , (15,'Kia','Stinger RWD','Turbo',2.0,4,25,None)
        , (16,'Toyota','CAMRY HYBRID LE','Aspirated',2.5,4,46,None)
        , (16,'Toyota','CAMRY HYBRID LE','Aspirated',2.5,4,46,None)
        , (18,'FCA US LLC','300','Aspirated',3.6,6,23,None)
        , (19,'Hyundai','G80 AWD','Turbo',3.3,6,20,None)
        , (20,'Hyundai','G80 AWD','Turbo',3.3,6,20,None)
        , (21,'BMW','X5 M','Turbo',4.4,8,18,121231)
        , (22,'GE','K1500 SUBURBAN 4WD','Aspirated',5.3,8,18,None)
    ], ['Id','Manufacturer','Model','EngineType','Displacement',
        'Cylinders','FuelEconomy','MSRP'])

# Handling duplicates

### Exact duplicates

In [3]:
# do we have any rows that are duplicated?
dirty_data.count(), dirty_data.distinct().count()

(22, 21)

In [6]:
# what row is duplicated?
(
    dirty_data
    .groupby(dirty_data.columns)
    .count()
    .filter('count > 1')
    .show()
)

+---+------------+---------------+----------+------------+---------+-----------+----+-----+
| Id|Manufacturer|          Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|count|
+---+------------+---------------+----------+------------+---------+-----------+----+-----+
| 16|      Toyota|CAMRY HYBRID LE| Aspirated|         2.5|        4|         46|null|    2|
+---+------------+---------------+----------+------------+---------+-----------+----+-----+



In [7]:
# remove the duplicated rows
full_removed = dirty_data.dropDuplicates()
full_removed.count()

21

### Only ID differs

In [8]:
# count of rows
no_ids = (
    full_removed
    .select([col for col in full_removed.columns if col != 'Id'])
)

no_ids.count(), no_ids.distinct().count()

(21, 19)

In [9]:
# what row is duplicated?
(
    full_removed
    .groupby([col for col in full_removed.columns if col != 'Id'])
    .count()
    .filter('count > 1')
    .show()
)

+------------+----------+----------+------------+---------+-----------+----+-----+
|Manufacturer|     Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|count|
+------------+----------+----------+------------+---------+-----------+----+-----+
|         BMW|440i Coupe|     Turbo|         3.0|        6|         23|null|    2|
|     Hyundai|   G80 AWD|     Turbo|         3.3|        6|         20|null|    2|
+------------+----------+----------+------------+---------+-----------+----+-----+



In [10]:
# remove the duplicated record
id_removed = full_removed.dropDuplicates(
    subset = [col for col in full_removed.columns if col != 'Id']
)

In [11]:
# count
id_removed.count()

19

### Duplicated IDs

In [12]:
# are there any duplicated IDs?
import pyspark.sql.functions as fn

id_removed.agg(
      fn.count('Id').alias('CountOfIDs')
    , fn.countDistinct('Id').alias('CountOfDistinctIDs')
).show()

+----------+------------------+
|CountOfIDs|CountOfDistinctIDs|
+----------+------------------+
|        19|                18|
+----------+------------------+



In [13]:
# what's duplicated?
(
    id_removed
    .groupby('Id')
    .count()
    .filter('count > 1')
    .show()
)

+---+-----+
| Id|count|
+---+-----+
|  3|    2|
+---+-----+



In [14]:
(
    id_removed
    .filter('Id = 3')
    .show()
)

+---+--------------+--------------------+----------+------------+---------+-----------+----+
| Id|  Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+--------------+--------------------+----------+------------+---------+-----------+----+
|  3|General Motors|         SPARK ACTIV| Aspirated|         1.4|     null|         32|null|
|  3|       Porsche|911 Carrera 4S Ca...|     Turbo|         3.0|        6|         24|null|
+---+--------------+--------------------+----------+------------+---------+-----------+----+



In [15]:
new_id = (
    id_removed
    .select(
        [fn.monotonically_increasing_id().alias('Id')] + 
        [col for col in id_removed.columns if col != 'Id'])
)

new_id.show()

+---+------------------+--------------------+----------+------------+---------+-----------+------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|  MSRP|
+---+------------------+--------------------+----------+------------+---------+-----------+------+
|  0|    General Motors|         SPARK ACTIV| Aspirated|         1.4|     null|         32|  null|
|  1|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|         21| 79231|
|  2|     Mercedes-Benz|                null|      null|        null|     null|         27|  null|
|  3|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|         26|  null|
|  4|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|         16|  null|
|  5|        Volkswagen|                 GTI|     Turbo|         2.0|        4|       null|  null|
|  6|               BMW|                330i|     Turbo|         2.0|     null|         27|  null|
|  7|     

# Handling missing observations

### Missing observations per row

In [16]:
(
    spark.createDataFrame(
        new_id.rdd.map(
           lambda row: (
                 row['Id']
               , sum([c == None for c in row])
           )
        )
        .filter(lambda el: el[1] > 1)
        .collect()
        ,['Id', 'CountMissing']
    )
    .orderBy('CountMissing', ascending=False)
    .show()
)

+---+------------+
| Id|CountMissing|
+---+------------+
|  2|           5|
|  0|           2|
|  6|           2|
|  5|           2|
+---+------------+



In [17]:
(
    new_id
    .where('Id == 197568495616')
    .show()
)

+---+------------+-----+----------+------------+---------+-----------+----+
| Id|Manufacturer|Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+------------+-----+----------+------------+---------+-----------+----+
+---+------------+-----+----------+------------+---------+-----------+----+



In [18]:
merc_out = new_id.dropna(thresh=4)
new_id.count(), merc_out.count()

(19, 18)

In [19]:
(
    merc_out
    .where('Id == 197568495616')
    .show()
)

+---+------------+-----+----------+------------+---------+-----------+----+
| Id|Manufacturer|Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+------------+-----+----------+------------+---------+-----------+----+
+---+------------+-----+----------+------------+---------+-----------+----+



### Missing observations per column

In [20]:
for k, v in sorted(
    merc_out.agg(*[
               (1 - (fn.count(c) / fn.count('*')))
                    .alias(c + '_miss')
               for c in merc_out.columns
           ])
        .collect()[0]
        .asDict()
        .items()
    , key=lambda el: el[1]
    , reverse=True
):
    print(k, v)

MSRP_miss 0.8888888888888888
Cylinders_miss 0.11111111111111116
FuelEconomy_miss 0.05555555555555558
Id_miss 0.0
Manufacturer_miss 0.0
Model_miss 0.0
EngineType_miss 0.0
Displacement_miss 0.0


In [21]:
no_MSRP = merc_out.select([col for col in new_id.columns if col != 'MSRP'])
no_MSRP.show()

+---+------------------+--------------------+----------+------------+---------+-----------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|
+---+------------------+--------------------+----------+------------+---------+-----------+
|  0|    General Motors|         SPARK ACTIV| Aspirated|         1.4|     null|         32|
|  1|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|         21|
|  3|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|         26|
|  4|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|         16|
|  5|        Volkswagen|                 GTI|     Turbo|         2.0|        4|       null|
|  6|               BMW|                330i|     Turbo|         2.0|     null|         27|
|  7|           Porsche|           Boxster S|     Turbo|         2.5|        4|         22|
|  8|               BMW|          440i Coupe|     Turbo|         3.0|        6| 

### Sparse missing observations

In [22]:
multipliers = (
    no_MSRP
    .agg(
          fn.mean(
              fn.col('FuelEconomy') / 
              (
                  fn.col('Displacement') * fn.col('Cylinders')
              )
          ).alias('FuelEconomy')
        , fn.mean(
            fn.col('Cylinders') / 
            fn.col('Displacement')
        ).alias('Cylinders')
    )
).toPandas().to_dict('records')[0]

multipliers

{'FuelEconomy': 1.4957485048359977, 'Cylinders': 1.8353365984789105}

In [23]:
imputed = (
    no_MSRP
    .withColumn('FuelEconomy', fn.col('FuelEconomy')   / fn.col('Displacement') / fn.col('Cylinders'))
    .withColumn('Cylinders',   fn.col('Cylinders')   / fn.col('Displacement'))
    .fillna(multipliers)
    .withColumn('Cylinders',   (fn.col('Cylinders')   * fn.col('Displacement')).cast('integer'))
    .withColumn('FuelEconomy', fn.col('FuelEconomy') * fn.col('Displacement') * fn.col('Cylinders'))
)

imputed.show()

+---+------------------+--------------------+----------+------------+---------+------------------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|       FuelEconomy|
+---+------------------+--------------------+----------+------------+---------+------------------+
|  0|    General Motors|         SPARK ACTIV| Aspirated|         1.4|        2|4.1880958135407935|
|  1|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|              21.0|
|  3|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|              26.0|
|  4|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|              16.0|
|  5|        Volkswagen|                 GTI|     Turbo|         2.0|        4|11.965988038687982|
|  6|               BMW|                330i|     Turbo|         2.0|        3| 8.974491029015986|
|  7|           Porsche|           Boxster S|     Turbo|         2.5|        4|              22.0|
|  8|     

# Handling outliers

In [24]:
features = ['Displacement', 'Cylinders', 'FuelEconomy']
quantiles = [0.25, 0.75]

cut_off_points = []

for feature in features:
    quants = imputed.approxQuantile(feature, quantiles, 0.05)
    
    IQR = quants[1] - quants[0]
    cut_off_points.append((feature, [
        quants[0] - 1.5 * IQR,
        quants[1] + 1.5 * IQR,
    ]))
    
cut_off_points = dict(cut_off_points)

outliers = imputed.select(*['id'] + [
       (
           (imputed[f] < cut_off_points[f][0]) |
           (imputed[f] > cut_off_points[f][1])
       ).alias(f + '_o') for f in features
  ])
outliers.show()

+---+--------------+-----------+-------------+
| id|Displacement_o|Cylinders_o|FuelEconomy_o|
+---+--------------+-----------+-------------+
|  0|         false|      false|         true|
|  1|         false|      false|        false|
|  3|         false|      false|        false|
|  4|         false|       true|        false|
|  5|         false|      false|        false|
|  6|         false|      false|        false|
|  7|         false|      false|        false|
|  8|         false|      false|        false|
|  9|         false|      false|        false|
| 10|         false|      false|        false|
| 11|         false|      false|        false|
| 12|         false|      false|        false|
| 13|         false|      false|        false|
| 14|         false|      false|        false|
| 15|         false|      false|        false|
| 16|         false|      false|        false|
| 17|         false|      false|         true|
| 18|         false|      false|        false|
+---+--------

In [25]:
with_outliers_flag = imputed.join(outliers, on='Id')

(
    with_outliers_flag
    .filter('FuelEconomy_o')
    .select('Id', 'Manufacturer', 'Model', 'FuelEconomy')
    .show()
)

+---+--------------+---------------+------------------+
| Id|  Manufacturer|          Model|       FuelEconomy|
+---+--------------+---------------+------------------+
|  0|General Motors|    SPARK ACTIV|4.1880958135407935|
| 17|        Toyota|CAMRY HYBRID LE|              46.0|
+---+--------------+---------------+------------------+



In [26]:
no_outliers = (
    with_outliers_flag
    .filter('!FuelEconomy_o')
    .select(imputed.columns)
)

# Exploring descriptive statistics

In [27]:
descriptive_stats = no_outliers.describe(features)
descriptive_stats.show()

+-------+------------------+-----------------+------------------+
|summary|      Displacement|        Cylinders|       FuelEconomy|
+-------+------------------+-----------------+------------------+
|  count|                16|               16|                16|
|   mean|3.4437499999999996|            6.125|19.600446608398162|
| stddev|1.3549753995306826|2.276693508870558|4.6666477673737505|
|    min|               2.0|                3| 8.974491029015986|
|    max|               6.0|               12|              26.0|
+-------+------------------+-----------------+------------------+



In [28]:
descriptive_stats_all = no_outliers.describe()
descriptive_stats_all.show()

+-------+-----------------+------------+-----+----------+------------------+-----------------+------------------+
|summary|               Id|Manufacturer|Model|EngineType|      Displacement|        Cylinders|       FuelEconomy|
+-------+-----------------+------------+-----+----------+------------------+-----------------+------------------+
|  count|               16|          16|   16|        16|                16|               16|                16|
|   mean|              9.5|        null|300.0|      null|3.4437499999999996|            6.125|19.600446608398162|
| stddev|4.979959839195493|        null| null|      null|1.3549753995306826|2.276693508870558|4.6666477673737505|
|    min|                1|Aston Martin|  300| Aspirated|               2.0|                3| 8.974491029015986|
|    max|               18|  Volkswagen| X5 M|     Turbo|               6.0|               12|              26.0|
+-------+-----------------+------------+-----+----------+------------------+------------

In [29]:
(
    no_outliers
    .select(features)
    .groupBy('Cylinders')
    .agg(*[
          fn.count('*').alias('Count')
        , fn.mean('FuelEconomy').alias('MPG_avg')
        , fn.mean('Displacement').alias('Disp_avg')
        , fn.stddev('FuelEconomy').alias('MPG_stdev')
        , fn.stddev('Displacement').alias('Disp_stdev')
    ])
    .orderBy('Cylinders')
).show()

+---------+-----+------------------+------------------+------------------+-------------------+
|Cylinders|Count|           MPG_avg|          Disp_avg|         MPG_stdev|         Disp_stdev|
+---------+-----+------------------+------------------+------------------+-------------------+
|        3|    1| 8.974491029015986|               2.0|              null|               null|
|        4|    4|21.241497009671995|             2.125| 6.413009924998987|0.24999999999999997|
|        5|    1|16.666666666666668|               2.7|              null|               null|
|        6|    5|              22.4|3.1799999999999997|1.5165750888103096| 0.2683281572999748|
|        8|    4|             18.75|               5.0|               1.5| 0.5477225575051655|
|       12|    1|              16.0|               6.0|              null|               null|
+---------+-----+------------------+------------------+------------------+-------------------+



# Computing correlations

In [30]:
(
    no_outliers
    .corr('Cylinders', 'Displacement')
)

0.9381829964408112

In [31]:
n_features = len(features)

corr = []

for i in range(0, n_features):
    temp = [None] * i

    for j in range(i, n_features):
        temp.append(no_outliers.corr(features[i], features[j]))
    corr.append([features[i]] + temp)

correlations = spark.createDataFrame(corr, ['Column'] + features)

correlations.show()

+------------+------------+------------------+--------------------+
|      Column|Displacement|         Cylinders|         FuelEconomy|
+------------+------------+------------------+--------------------+
|Displacement|         1.0|0.9381829964408112|-0.10757908872387673|
|   Cylinders|        null|               1.0| -0.0421854654503534|
| FuelEconomy|        null|              null|                 1.0|
+------------+------------+------------------+--------------------+

