In [73]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext, SparkConf

from collections import defaultdict
from collections import Counter
import re

import importlib
from scripts import cleaners, genhelpers
importlib.reload(cleaners)
importlib.reload(genhelpers)

<module 'scripts.genhelpers' from '/c/Users/Ruibin/Documents/repos/vrn-analysis/scripts/genhelpers.py'>

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName('vrn_analysis') \
    .master('local[*]') \
    .getOrCreate()

sc = spark.sparkContext

# conf = SparkConf().setAppName('vrn_analysis') \
#                   .setMaster('local')
# sc = SparkContext(conf=conf)

In [3]:
data = sc.textFile('vrn.csv')

## Data Preparation

### Car identification

Steps:
1. **Clean** input data
    - Take special care to clean entries containing commas in the model name, i.e. remove those commas
    - Correct other erroneous typos
1. Obtain **list of unique car make/model combinations**
1. **De-duplicate** any similar-but-different make/model combinations
    - e.g. "C180 AVG" and "C180 AVANTGARDE"

In [5]:
car_types = data.map(lambda x: genhelpers.clean_raw_data(x)) \
                .flatMap(lambda x: x.split(',')) \
                .filter(lambda x: genhelpers.is_car_detail(x)) \
                .map(lambda x: x.strip('"'))

At this point, `car_types` is a list of car details with the structure `<make> / <model>`. <br>
e.g. "*MERCEDES BENZ / S320L (R19 LED)*"

Build a `cars` dict:

- `{ <car_make>: [<list_of_car_models>] }`

In [6]:
# line 1: clean model name
# line 2: split into (make, model) tuple
# line 3: for each make, combine all of its models into a comma-separated string
# line 4: convert comma-separated string to a list
# line 5: sort list of models in alphabetical order
# line 6: sort overall dict by make in alphabetical order
car_dict = car_types.map(lambda x: cleaners.clean_model_name(x)) \
                    .map(lambda x: x.split(' / ')) \
                    .reduceByKey(lambda x, y: x + ',' + y) \
                    .map(lambda x: (x[0], x[1].split(','))) \
                    .map(lambda x: (x[0], sorted(set(x[1])))) \
                    .sortByKey(lambda x: x[0]) \
                    .collectAsMap()

car_dict

{'ALPINA': ['B3 BITURBO'],
 'ASTON MARTIN': ['DB11 V8', 'DB9', 'RAPIDE 6.0', 'V8 VANTAGE S/SHIFT 4.7'],
 'AUDI': ['A3 SEDAN 1.0',
  'A3 SPORTBACK 1.0',
  'A5 SPORTBACK 2.0',
  'A8L 4.2',
  'Q2 1.4',
  'Q5 2.0',
  'Q5 SPORT 2.0',
  'Q7 2.0',
  'Q7 3.0',
  'R8 5.2',
  'S4 3.0'],
 'B.M.W.': ['216D ACTIVE TOURER',
  '216I GRAN TOURER',
  '216I GRAN TOURER M-SPORT',
  '218I CONVERTIBLE',
  '325I COUPE',
  '335I COUPE',
  '420I CONVERTIBLE',
  '420I COUPE M-SPORT',
  '430I COUPE M-SPORT',
  '520D M-SPORT',
  '520I',
  '523I',
  '528I',
  '530I',
  '530I M-SPORT',
  '535I',
  '540I M-SPORT',
  '630I',
  '640I',
  '640I GRAN COUPE',
  '640I GRAN COUPE M-SPORT',
  '730LI',
  '740LI',
  'I8',
  'M3 COUPE',
  'M3 SEDAN',
  'M4 COUPE',
  'M5',
  'M5 30 JAHRE EDITION',
  'M6 COUPE',
  'M6 GRAN COUPE',
  'X2 SDRIVE18I M-SPORT',
  'X3 SDRIVE20I',
  'X3 XDRIVE20I',
  'X3 XDRIVE30I M-SPORT',
  'X5 M-SPORT',
  'X5 XLINE',
  'X6 XDRIVE35I',
  'X7 XDRIVE40I'],
 'BENTLEY': ['BENTAYGA V8',
  'CONTINENTAL GT

`car_dict` lists all of the unique car make/model combinations.

Find the corresponding price for all of the combinations.

### Car pricing

Load the CSV of car prices. This CSV contains 3 columns - "Make", "Model", "Price".

Car pricelists:
https://www.sgcarmart.com/new_cars/newcars_pricelist_listing.php

In [9]:
prices_csv = sc.textFile('data/prices.csv')

In [10]:
def split_into_3tuple(line):
    details, price = line.strip('"').split('"')
    make, model = details.strip(',').split(',')
    return (make, model, price)

prices = prices_csv.filter(lambda x: x != 'Make,Model,Price') \
                   .map(lambda x: split_into_3tuple(x)) \
                   .map(lambda x: (f'{x[0]} / {x[1]}', x[2])) \
                   .collectAsMap()

`prices` is a mapping dict of `{ <make / model>: <price> }`,<br>
e.g. "*B.M.W. / M5 30 JAHRE EDITION: 588,800*"

### Mapping cars to their corresponding price

In [67]:
def car_details_to_price(details_row):
    """Replaces the car details in a row of data with their corresponding price.
    
    Args:
        details_row (str): A row in the raw data
    """
    
    def _remove_first_col(row: str) -> list:
        row_split = row.split(',')
        return [c for i, c in enumerate(row_split) if i != 0]
    
    details_row = _remove_first_col(details_row)
    output = [genhelpers.get_price(details) for details in details_row]
    return output

In [68]:
data_with_prices = data.map(lambda x: genhelpers.clean_raw_data(x)) \
                       .filter(lambda x: remove_first_col(x)) \
                       .map(lambda x: car_details_to_price(x))

data_with_prices.collect()

[['1',
  '2',
  '3',
  '4',
  '5',
  '6',
  '7',
  '8',
  '9',
  '10',
  '18',
  '28',
  '88',
  '99',
  '100',
  '888',
  '999',
  '8888'],
 [399888.0,
  250088.0,
  428800.0,
  504888.0,
  252888.0,
  102000.0,
  520000.0,
  102000.0,
  125000.0,
  233800.0,
  273988.0,
  1160000.0,
  1216000.0,
  343488.0,
  868000.0,
  433988.0,
  912000.0,
  0],
 [1190000.0,
  115900.0,
  912000.0,
  157000.0,
  370388.0,
  224888.0,
  82999.0,
  360000.0,
  635888.0,
  0,
  215000.0,
  912000.0,
  272888.0,
  461888.0,
  202888.0,
  246888.0,
  1468888.0,
  399888.0],
 [1297000.0,
  414888.0,
  1105000.0,
  298888.0,
  249888.0,
  798000.0,
  182888.0,
  399888.0,
  1494000.0,
  210888.0,
  572599.0,
  461888.0,
  423888.0,
  399888.0,
  242888.0,
  1468888.0,
  285888.0,
  0],
 [0,
  1160000.0,
  278788.0,
  233800.0,
  200888.0,
  912000.0,
  0,
  274888.0,
  437888.0,
  251888.0,
  769788.0,
  140776.0,
  256888.0,
  590000.0,
  0,
  136999.0,
  297800.0,
  205800.0],
 [558188.0,
  440888.0,
 

`data_with_prices` here is a `PipelinedRDD`.

## Data Exploration

All of the cars and their corresponding prices have been found.

Now, it is possible to perform data exploration.

In [74]:
from itertools import islice
from pyspark.sql.types import FloatType, StructField, StructType

In [75]:
cols = data_with_prices.take(1)[0]
schema = StructType([StructField(c, FloatType()) for c in cols])

# remove first row as it is the header row
rdd = data_with_prices.mapPartitionsWithIndex(
        lambda idx, it: islice(it, 1, None) if idx == 0 else it)

In [76]:
df = spark.createDataFrame(rdd, schema)

In [90]:
df.filter(df['1'] != 0) \
  .describe(['1', '2', '3', '8', '88', '888', '8888']).show(truncate=10)

+-------+----------+----------+----------+----------+----------+----------+----------+
|summary|         1|         2|         3|         8|        88|       888|      8888|
+-------+----------+----------+----------+----------+----------+----------+----------+
|  count|        23|        23|        23|        23|        23|        23|        23|
|   mean|823694....|437115....|612252....|477627....|422721....|279958....|563107....|
| stddev|470683....|426685....|558894....|373606....|496717....|300123....|633686....|
|    min|  200000.0|       0.0|       0.0|       0.0|       0.0|       0.0|       0.0|
|    max| 1600000.0| 1588000.0| 2194000.0| 1498000.0| 2194000.0| 1468888.0| 2194000.0|
+-------+----------+----------+----------+----------+----------+----------+----------+

