<a href="https://colab.research.google.com/github/naenumtou/dataScienceLab/blob/main/PySparkUsedcarData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Set auto reload
%reload_ext autoreload
%autoreload 2

In [2]:
# Change working directory
%cd /content/drive/My Drive/Colab Notebooks/PySpark

/content/drive/My Drive/Colab Notebooks/PySpark


### Setting up PySpark in Colab (Old method)
Nowadays, the PySpark can be used in Google Colab without any setting up by directly using `pip install pyspark`. However, the main disadvantage is that it requires to re-install on every re-connection. By using Google Drive and setting up Java Virtual Machine (JVM) can overcome this limitation.

Assumed, the Google Drive is mounted. The first task is to download Java.
```
!apt-get install openjdk-8-jdk-headless -qq > /dev/nullc
```

Next, it will be installed Apache Spark 3.2.1 (Latest version) with Hadoop 3.2 from [here](https://spark.apache.org/downloads.html).
```
!wget -q https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
```

Now, unzip it into the Google Drive folder.
```
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
```

One last task that needed to be installed the `findspark` library. It will help to locate Spark on the system and import it as a regular library.
```
!pip install findspark
```

Now, it is about time to set the environment path. This will enable us to run PySpark in the Google Colab environment.
```python
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.2.1-bin-hadoop3.2'
```

After done all steps above, it can be located Spark in the Google Drive by running:
```python
import findspark
findspark.init()
findspark.find()
```

---
### Note
For this Colab notebook, it **does not** setting up JVM but it will directly use `pip install pyspark` instead.

In [3]:
# Install libraries
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 41.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=d741ab1f2dda20c076099f4d40efd79718d8f6af27c8df60d0ee6341d2581c70
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [4]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType, FloatType
from pyspark.sql.functions import trim, regexp_replace, when, count, col, regexp_extract, isnull

# Config
%config InlineBackend.figure_format = 'retina' #Retina display

In [5]:
# Start spark session
spark = SparkSession.builder\
        .master('local')\
        .appName('Colab')\
        .getOrCreate()

# Test spark
spark     

In [6]:
# Load dataset
df = spark.read.csv(
    'carData.csv',
    header = True,
    inferSchema = True
)

# Show table
df.show(5)

+----------+------------+--------+-----------+-----------+----------------+------+-------+---------+---------+---------+-----------+------------+----------+--------------------+---------+----+------+-------+
|     month|contractDate|usedType|downPercent|sellingDate|modelDescription| brand|  color|buildyear|condition|       km|newCarPrice|sellingPrice|modelGroup|               wheel|     gear|door|engine|   fuel|
+----------+------------+--------+-----------+-----------+----------------+------+-------+---------+---------+---------+-----------+------------+----------+--------------------+---------+----+------+-------+
|2017-09-01|  2013-03-04|     New|     10.00%| 2017-09-01|           Yaris|Toyota|Missing|     2013|    Fair | 101,814 |   734,000 |     361,660|     Yaris|Front Wheel Drive...|Automatic|   5|   1.5|Benzene|
|2017-09-01|  2014-07-21|    Used|      2.89%| 2017-09-01|Hilux Vigo 4x2 C|Toyota|Missing|     2010|    Fair | 260,327 |   669,000 |     251,450|     C-Cab|Rear Wheel D

In [7]:
# Show schema
df.printSchema()

root
 |-- month: string (nullable = true)
 |-- contractDate: string (nullable = true)
 |-- usedType: string (nullable = true)
 |-- downPercent: string (nullable = true)
 |-- sellingDate: string (nullable = true)
 |-- modelDescription: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- color: string (nullable = true)
 |-- buildyear: integer (nullable = true)
 |-- condition: string (nullable = true)
 |-- km: string (nullable = true)
 |-- newCarPrice: string (nullable = true)
 |-- sellingPrice: string (nullable = true)
 |-- modelGroup: string (nullable = true)
 |-- wheel: string (nullable = true)
 |-- gear: string (nullable = true)
 |-- door: integer (nullable = true)
 |-- engine: double (nullable = true)
 |-- fuel: string (nullable = true)



In [8]:
# Custom schema
schema = StructType(
    [
     StructField('month', StringType(), True), #Keep schema as string for now
     StructField('contractDate', StringType(), True), #Keep schema as string for now
     StructField('usedType', StringType(), True),
     StructField('downPercent', StringType(), True),
     StructField('sellingDate', StringType(), True), #Keep schema as string for now
     StructField('modelDescription', StringType(), True),
     StructField('brand', StringType(), True),
     StructField('color', StringType(), True),
     StructField('buildyear', IntegerType(), True),
     StructField('condition', StringType(), True),
     StructField('km', StringType(), True),
     StructField('newCarPrice', StringType(), True),
     StructField('sellingPrice', StringType(), True),
     StructField('modelGroup', StringType(), True),
     StructField('wheel', StringType(), True),
     StructField('gear', StringType(), True),
     StructField('door', StringType(), True),
     StructField('engine', StringType(), True),
     StructField('fuel', StringType(), True)
    ]
)

# Read with custom schema
df = spark.read.csv(
    'carData.csv',
    header = True,
    schema = schema
)

# Show schema
df.printSchema()

root
 |-- month: string (nullable = true)
 |-- contractDate: string (nullable = true)
 |-- usedType: string (nullable = true)
 |-- downPercent: string (nullable = true)
 |-- sellingDate: string (nullable = true)
 |-- modelDescription: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- color: string (nullable = true)
 |-- buildyear: integer (nullable = true)
 |-- condition: string (nullable = true)
 |-- km: string (nullable = true)
 |-- newCarPrice: string (nullable = true)
 |-- sellingPrice: string (nullable = true)
 |-- modelGroup: string (nullable = true)
 |-- wheel: string (nullable = true)
 |-- gear: string (nullable = true)
 |-- door: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- fuel: string (nullable = true)



In [9]:
# Show table
df.show(5)

+----------+------------+--------+-----------+-----------+----------------+------+-------+---------+---------+---------+-----------+------------+----------+--------------------+---------+----+------+-------+
|     month|contractDate|usedType|downPercent|sellingDate|modelDescription| brand|  color|buildyear|condition|       km|newCarPrice|sellingPrice|modelGroup|               wheel|     gear|door|engine|   fuel|
+----------+------------+--------+-----------+-----------+----------------+------+-------+---------+---------+---------+-----------+------------+----------+--------------------+---------+----+------+-------+
|2017-09-01|  2013-03-04|     New|     10.00%| 2017-09-01|           Yaris|Toyota|Missing|     2013|    Fair | 101,814 |   734,000 |     361,660|     Yaris|Front Wheel Drive...|Automatic|   5|   1.5|Benzene|
|2017-09-01|  2014-07-21|    Used|      2.89%| 2017-09-01|Hilux Vigo 4x2 C|Toyota|Missing|     2010|    Fair | 260,327 |   669,000 |     251,450|     C-Cab|Rear Wheel D

In [10]:
# Check value counts in 'usedType'
df.groupBy('usedType').count().orderBy('count').show()

+--------+-----+
|usedType|count|
+--------+-----+
|    Used| 2440|
|     New|23795|
+--------+-----+



In [11]:
# Replace '%' in 'downPercent' prior converting schema
df = df.withColumn(
    'downPercent',
    regexp_replace('downPercent', '%', '')
)

# Convert schema to 'FloatType'
df = df.withColumn(
    'downPercent',
    df['downPercent'].cast(FloatType()) / 100 #To percentage
)

# Show table
df.show(5)

+----------+------------+--------+--------------------+-----------+----------------+------+-------+---------+---------+---------+-----------+------------+----------+--------------------+---------+----+------+-------+
|     month|contractDate|usedType|         downPercent|sellingDate|modelDescription| brand|  color|buildyear|condition|       km|newCarPrice|sellingPrice|modelGroup|               wheel|     gear|door|engine|   fuel|
+----------+------------+--------+--------------------+-----------+----------------+------+-------+---------+---------+---------+-----------+------------+----------+--------------------+---------+----+------+-------+
|2017-09-01|  2013-03-04|     New|                 0.1| 2017-09-01|           Yaris|Toyota|Missing|     2013|    Fair | 101,814 |   734,000 |     361,660|     Yaris|Front Wheel Drive...|Automatic|   5|   1.5|Benzene|
|2017-09-01|  2014-07-21|    Used|0.028900001049041748| 2017-09-01|Hilux Vigo 4x2 C|Toyota|Missing|     2010|    Fair | 260,327 |   

In [12]:
# Check value counts in 'brand'
df.groupBy('brand').count().orderBy('count').show()

+-----------+-----+
|      brand|count|
+-----------+-----+
|      Lexus|    5|
|Non-Toyota |   75|
|     Toyota|26155|
+-----------+-----+



In [13]:
# Check value counts in 'color'
df.groupBy('color').count().orderBy('count').show()

+-----------+-----+
|      color|count|
+-----------+-----+
|       Gold|    1|
| white-grey|    2|
|Green-black|    3|
| Grey-black|    3|
|     Silver|    4|
|  Red-black|    5|
| Blue-black|    6|
|     Yellow|   12|
|   Sky blue|   15|
|white-black|   37|
|       Blue|   63|
|      Green|   63|
|      Brown|   92|
|     Orange|  102|
|        Red|  330|
|      Black| 2343|
|       Grey| 4316|
|      white| 5910|
|    Missing|12928|
+-----------+-----+



In [14]:
# Replace 'Missing' with Null in 'color'
df = df.withColumn(
    'color',
    when(
        col('color') == 'Missing', None
    ).otherwise(col('color'))
)

# Show table
df.show(5)

+----------+------------+--------+--------------------+-----------+----------------+------+-----+---------+---------+---------+-----------+------------+----------+--------------------+---------+----+------+-------+
|     month|contractDate|usedType|         downPercent|sellingDate|modelDescription| brand|color|buildyear|condition|       km|newCarPrice|sellingPrice|modelGroup|               wheel|     gear|door|engine|   fuel|
+----------+------------+--------+--------------------+-----------+----------------+------+-----+---------+---------+---------+-----------+------------+----------+--------------------+---------+----+------+-------+
|2017-09-01|  2013-03-04|     New|                 0.1| 2017-09-01|           Yaris|Toyota| null|     2013|    Fair | 101,814 |   734,000 |     361,660|     Yaris|Front Wheel Drive...|Automatic|   5|   1.5|Benzene|
|2017-09-01|  2014-07-21|    Used|0.028900001049041748| 2017-09-01|Hilux Vigo 4x2 C|Toyota| null|     2010|    Fair | 260,327 |   669,000 | 

In [15]:
# Grouping 'color'
df = df.withColumn(
    'groupColor',
    when(
        col('color').isin(['white-grey', 'Silver']), 'Grey'
    )\
    .when(
        col('color').isin(['Green-black', 'Grey-black', 'Red-black', 'Blue-black']), 'Black'
    )\
    .when(
        col('color').isin(['Sky blue']), 'Blue'
    )\
    .when(
        col('color').isin(['white-black']), 'white'
    )\
    .when(
        col('color').isin(['Yellow', 'Gold']), 'Orange'
    ).otherwise(col('color'))
)

# Fix typo 'grey' to 'gray'
df = df.withColumn(
    'groupColor',
    when(
        col('groupColor') == 'Grey', 'Gray'
    ).otherwise(col('groupColor'))
)

# Result
df.groupBy('groupColor').count().orderBy('count').show()

+----------+-----+
|groupColor|count|
+----------+-----+
|     Green|   63|
|      Blue|   78|
|     Brown|   92|
|    Orange|  115|
|       Red|  330|
|     Black| 2360|
|      Gray| 4322|
|     white| 5947|
|      null|12928|
+----------+-----+



In [16]:
# Check value counts in 'buildyear'
df.groupBy('buildyear').count().orderBy('count').show()

+---------+-----+
|buildyear|count|
+---------+-----+
|     2001|    1|
|     2002|    1|
|     2003|    2|
|     2020|    2|
|     2004|    4|
|     2005|   21|
|     2006|   45|
|     2007|   53|
|     2008|   96|
|     2009|  136|
|     2010|  266|
|     2011|  287|
|     2012|  882|
|     2019| 1647|
|     2015| 2473|
|     2014| 3358|
|     2016| 3648|
|     2013| 3838|
|     2017| 4135|
|     2018| 5340|
+---------+-----+



In [17]:
# Check value counts in 'condition'
df.groupBy('condition').count().orderBy('count').show()

+---------+-----+
|condition|count|
+---------+-----+
|    Good |   10|
| Salvage |  590|
|  Average| 3233|
|    Poor | 4297|
|    Fair |18105|
+---------+-----+



In [18]:
# Trim the text (remove space)
df = df.withColumn('condition', trim(col('condition'))) #Trim the text (remove space)

# Show table
df.show(5)

+----------+------------+--------+--------------------+-----------+----------------+------+-----+---------+---------+---------+-----------+------------+----------+--------------------+---------+----+------+-------+----------+
|     month|contractDate|usedType|         downPercent|sellingDate|modelDescription| brand|color|buildyear|condition|       km|newCarPrice|sellingPrice|modelGroup|               wheel|     gear|door|engine|   fuel|groupColor|
+----------+------------+--------+--------------------+-----------+----------------+------+-----+---------+---------+---------+-----------+------------+----------+--------------------+---------+----+------+-------+----------+
|2017-09-01|  2013-03-04|     New|                 0.1| 2017-09-01|           Yaris|Toyota| null|     2013|     Fair| 101,814 |   734,000 |     361,660|     Yaris|Front Wheel Drive...|Automatic|   5|   1.5|Benzene|      null|
|2017-09-01|  2014-07-21|    Used|0.028900001049041748| 2017-09-01|Hilux Vigo 4x2 C|Toyota| null

In [19]:
# Clean text in 'km' column --> '-', 'ตรวจสอบไม่ได้', 'ตรวจสอบเลขไมล์ไม่ได้'
df = df.withColumn('km', trim(col('km'))) #Trim the text (remove space)
df = df.withColumn(
    'km',
    when(
        col('km').isin(
            ['-', 'ตรวจสอบไม่ได้', 'ตรวจสอบเลขไมล์ไม่ได้']
        ), None
    ).otherwise(col('km'))
) #Replace to missing value

# Replace ',' in numeric text
df = df.withColumn(
      'km',
      regexp_replace('km', ',', '')
)

# Convert schema to 'IntegerType'
df = df.withColumn(
    'km',
    df['km'].cast(IntegerType())
) 

# Replace 0 to missing value
df = df.withColumn(
    'km',
    when(
        col('km') == 0, None
    ).otherwise(col('km'))
)

# Show table
df.show(5)

+----------+------------+--------+--------------------+-----------+----------------+------+-----+---------+---------+------+-----------+------------+----------+--------------------+---------+----+------+-------+----------+
|     month|contractDate|usedType|         downPercent|sellingDate|modelDescription| brand|color|buildyear|condition|    km|newCarPrice|sellingPrice|modelGroup|               wheel|     gear|door|engine|   fuel|groupColor|
+----------+------------+--------+--------------------+-----------+----------------+------+-----+---------+---------+------+-----------+------------+----------+--------------------+---------+----+------+-------+----------+
|2017-09-01|  2013-03-04|     New|                 0.1| 2017-09-01|           Yaris|Toyota| null|     2013|     Fair|101814|   734,000 |     361,660|     Yaris|Front Wheel Drive...|Automatic|   5|   1.5|Benzene|      null|
|2017-09-01|  2014-07-21|    Used|0.028900001049041748| 2017-09-01|Hilux Vigo 4x2 C|Toyota| null|     2010| 

In [20]:
# Check missing value in 'km'
print(f"Number of missing km: {df.filter(df['km'].isNull()).count()}")

Number of missing km: 733


In [21]:
# Replace ',' in 'newCarPrice' numeric text
df = df.withColumn(
      'newCarPrice',
      regexp_replace('newCarPrice', ',', '')
)

# Convert schema to 'IntegerType'
df = df.withColumn(
    'newCarPrice',
    df['newCarPrice'].cast(IntegerType())
)

# Show table
df.show(5)

+----------+------------+--------+--------------------+-----------+----------------+------+-----+---------+---------+------+-----------+------------+----------+--------------------+---------+----+------+-------+----------+
|     month|contractDate|usedType|         downPercent|sellingDate|modelDescription| brand|color|buildyear|condition|    km|newCarPrice|sellingPrice|modelGroup|               wheel|     gear|door|engine|   fuel|groupColor|
+----------+------------+--------+--------------------+-----------+----------------+------+-----+---------+---------+------+-----------+------------+----------+--------------------+---------+----+------+-------+----------+
|2017-09-01|  2013-03-04|     New|                 0.1| 2017-09-01|           Yaris|Toyota| null|     2013|     Fair|101814|     734000|     361,660|     Yaris|Front Wheel Drive...|Automatic|   5|   1.5|Benzene|      null|
|2017-09-01|  2014-07-21|    Used|0.028900001049041748| 2017-09-01|Hilux Vigo 4x2 C|Toyota| null|     2010| 

In [22]:
# Replace ',' in 'sellingPrice' numeric text
df = df.withColumn(
      'sellingPrice',
      regexp_replace('sellingPrice', ',', '')
)

# Convert schema to 'IntegerType'
df = df.withColumn(
    'sellingPrice',
    df['sellingPrice'].cast(IntegerType())
)

# Show table
df.show(5)

+----------+------------+--------+--------------------+-----------+----------------+------+-----+---------+---------+------+-----------+------------+----------+--------------------+---------+----+------+-------+----------+
|     month|contractDate|usedType|         downPercent|sellingDate|modelDescription| brand|color|buildyear|condition|    km|newCarPrice|sellingPrice|modelGroup|               wheel|     gear|door|engine|   fuel|groupColor|
+----------+------------+--------+--------------------+-----------+----------------+------+-----+---------+---------+------+-----------+------------+----------+--------------------+---------+----+------+-------+----------+
|2017-09-01|  2013-03-04|     New|                 0.1| 2017-09-01|           Yaris|Toyota| null|     2013|     Fair|101814|     734000|      361660|     Yaris|Front Wheel Drive...|Automatic|   5|   1.5|Benzene|      null|
|2017-09-01|  2014-07-21|    Used|0.028900001049041748| 2017-09-01|Hilux Vigo 4x2 C|Toyota| null|     2010| 

In [23]:
# Check value counts in 'modelGroup'
df.groupBy('modelGroup').count().orderBy('count').show()

+----------+-----+
|modelGroup|count|
+----------+-----+
|      Hino|    2|
|     Lexus|    5|
|      Wish|    9|
|    Innova|   58|
|     PRIUS|   59|
|     Other|   73|
|       Van|   99|
|      C-HR|  116|
|    Sienta|  130|
|    Avanza|  161|
|     Camry|  248|
|  Fortuner|  786|
|     Altis| 1026|
|     D-Cab| 2118|
|     B-Cab| 3934|
|      Vios| 4402|
|     Yaris| 4673|
|     C-Cab| 8336|
+----------+-----+



In [24]:
# Check value counts in 'wheel'
df.groupBy('wheel').count().orderBy('count').show()

+--------------------+-----+
|               wheel|count|
+--------------------+-----+
|4  Wheel Drive (4WD)| 1017|
|Front Wheel Drive...|10706|
|Rear Wheel Drive ...|14512|
+--------------------+-----+



In [25]:
# Extra values in 'wheel' only bracket
df = df.withColumn(
    'drive',
    regexp_extract(col('wheel'), r"\(([^()]+)\)$", 1)
)

# Check value counts in 'drive'
df.groupBy('drive').count().orderBy('count').show()

+-----+-----+
|drive|count|
+-----+-----+
|  4WD| 1017|
|  FWD|10706|
|  RWD|14512|
+-----+-----+



In [26]:
# Check value counts in 'gear'
df.groupBy('gear').count().orderBy('count').show()

+---------+-----+
|     gear|count|
+---------+-----+
|Automatic|12745|
|   Manual|13490|
+---------+-----+



In [27]:
# Check value counts in 'door'
df.groupBy('door').count().orderBy('count').show()

+----+-----+
|door|count|
+----+-----+
|   5| 4916|
|   2| 7739|
|   4|13580|
+----+-----+



In [28]:
# Check value counts in 'engine'
df.groupBy('engine').count().orderBy('count').show()

+------+-----+
|engine|count|
+------+-----+
|   2.2|    1|
|   3.5|    2|
|   4.0|    2|
|   1.9|    2|
|   2.0|  213|
|   3.0|  409|
|   2.7|  435|
|   1.6|  475|
|   1.8|  727|
|   2.8| 2037|
|   2.5| 3708|
|   1.2| 4209|
|   1.5| 5179|
|   2.4| 8836|
+------+-----+



In [29]:
# Check value counts in 'fuel'
df.groupBy('fuel').count().orderBy('count').show()

+-------+-----+
|   fuel|count|
+-------+-----+
|    CNG|  188|
|Benzene|11301|
| Diesel|14746|
+-------+-----+



In [30]:
# Drop unused columns
df = df.drop(
    *['month', 'modelDescription', 'color', 'wheel']
)

# Show table
df.show(5)

+------------+--------+--------------------+-----------+------+---------+---------+------+-----------+------------+----------+---------+----+------+-------+----------+-----+
|contractDate|usedType|         downPercent|sellingDate| brand|buildyear|condition|    km|newCarPrice|sellingPrice|modelGroup|     gear|door|engine|   fuel|groupColor|drive|
+------------+--------+--------------------+-----------+------+---------+---------+------+-----------+------------+----------+---------+----+------+-------+----------+-----+
|  2013-03-04|     New|                 0.1| 2017-09-01|Toyota|     2013|     Fair|101814|     734000|      361660|     Yaris|Automatic|   5|   1.5|Benzene|      null|  FWD|
|  2014-07-21|    Used|0.028900001049041748| 2017-09-01|Toyota|     2010|     Fair|260327|     669000|      251450|     C-Cab|   Manual|   2|   2.5| Diesel|      null|  RWD|
|  2016-04-29|     New|0.001400000005960...| 2017-09-01|Toyota|     2016|     Fair| 12312|     562000|      383060|     B-Cab|   M

In [31]:
# Checking null values
df.select(
    [count(when(isnull(col), col)).alias(col) for col in df.columns]
).show()

+------------+--------+-----------+-----------+-----+---------+---------+---+-----------+------------+----------+----+----+------+----+----------+-----+
|contractDate|usedType|downPercent|sellingDate|brand|buildyear|condition| km|newCarPrice|sellingPrice|modelGroup|gear|door|engine|fuel|groupColor|drive|
+------------+--------+-----------+-----------+-----+---------+---------+---+-----------+------------+----------+----+----+------+----+----------+-----+
|           0|       0|          0|          0|    0|        0|        0|733|          0|           0|         0|   0|   0|     0|   0|     12928|    0|
+------------+--------+-----------+-----------+-----+---------+---------+---+-----------+------------+----------+----+----+------+----+----------+-----+



In [32]:
# Export
df.toPandas().to_csv(
    'cleanCarDate.csv',
    index = False
)

In [33]:
# Stop the session
spark.stop()