## Create a PySpark session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

## Download sample data from [Kaggle](https://www.kaggle.com/austinreese/craigslist-carstrucks-data).

Unpack the archive and rename the data file as ***vehicles.csv*** (if needed). It contains more than 400.000 used car offers from the Craigslist.org. There are 26 columns of with informations about each car, although not all were filled in by poster.

## Define the schema for sample data

Define the data type for each consecutive column. Information about supported data types can be found [here](https://spark.apache.org/docs/latest/sql-ref-datatypes.html).

In [2]:
from pyspark.sql.types import StructType, DoubleType, StringType, IntegerType, DateType
schema = StructType() \
      .add("id",DoubleType(),True) \
      .add("url",StringType(),True) \
      .add("region",StringType(),True) \
      .add("region_url",StringType(),True) \
      .add("price",IntegerType(),True) \
      .add("year",IntegerType(),True) \
      .add("manufacturer",StringType(),True) \
      .add("model",StringType(),True) \
      .add("condition",StringType(),True) \
      .add("cylinders",StringType(),True) \
      .add("fuel",StringType(),True) \
      .add("odometer",DoubleType(),True) \
      .add("title_status",StringType(),True) \
      .add("transmission",StringType(),True) \
      .add("VIN",StringType(),True) \
      .add("drive",StringType(),True) \
      .add("size",StringType(),True) \
      .add("type",StringType(),True) \
      .add("paint_color",StringType(),True) \
      .add("image_url",StringType(),True) \
      .add("description",StringType(),True) \
      .add("county",StringType(),True) \
      .add("state",StringType(),True) \
      .add("lat",DoubleType(),True) \
      .add("long",DoubleType(),True) \
      .add("posting_date",DateType(),True)

## Read data from the file and create the DataFrame

We need to read the data from the .csv file and create the DataFrame. As the file contains the header information for columns we add the appropriate option.

In [3]:
raw_df = spark.read.option("header", True) \
     .schema(schema) \
     .csv("vehicles.csv")

As it was already mentioned this DataFrame is quite big. Let's display the first 10 records:

In [4]:
raw_df.show(10)

+-------------+--------------------+--------------------+--------------------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+----+-----+----+----+-----------+---------+-----------+------+-----+----+----+------------+
|           id|                 url|              region|          region_url|price|year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission| VIN|drive|size|type|paint_color|image_url|description|county|state| lat|long|posting_date|
+-------------+--------------------+--------------------+--------------------+-----+----+------------+-----+---------+---------+----+--------+------------+------------+----+-----+----+----+-----------+---------+-----------+------+-----+----+----+------------+
|7.222695916E9|https://prescott....|            prescott|https://prescott....| 6000|null|        null| null|     null|     null|null|    null|        null|        null|null| null|null|null|       null|     null|       nu

Number of rows can be obtained using [count](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.count.html) method:

In [5]:
raw_df.count()

441802

List the column names:

In [6]:
raw_df.columns

['id',
 'url',
 'region',
 'region_url',
 'price',
 'year',
 'manufacturer',
 'model',
 'condition',
 'cylinders',
 'fuel',
 'odometer',
 'title_status',
 'transmission',
 'VIN',
 'drive',
 'size',
 'type',
 'paint_color',
 'image_url',
 'description',
 'county',
 'state',
 'lat',
 'long',
 'posting_date']

As there is so many columns it's hard to read the data. Let's display the first record in vertical mode:

In [7]:
raw_df.show(1, vertical = True)

-RECORD 0----------------------------
 id           | 7.222695916E9        
 url          | https://prescott.... 
 region       | prescott             
 region_url   | https://prescott.... 
 price        | 6000                 
 year         | null                 
 manufacturer | null                 
 model        | null                 
 condition    | null                 
 cylinders    | null                 
 fuel         | null                 
 odometer     | null                 
 title_status | null                 
 transmission | null                 
 VIN          | null                 
 drive        | null                 
 size         | null                 
 type         | null                 
 paint_color  | null                 
 image_url    | null                 
 description  | null                 
 county       | null                 
 state        | az                   
 lat          | null                 
 long         | null                 
 posting_dat

We can check the data types for consecutive columns:

In [8]:
raw_df.printSchema()

root
 |-- id: double (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_url: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: double (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- posting_date: date (nu

## Select rows where manufacturer, model, and price are provided

As it could be seen above some offers do not provide even basic info about the car. Let's create new DataFrame were only records with provided manufacturer, model and price are stored.

In [9]:
from pyspark.sql.functions import col
df = raw_df.where(col("manufacturer").isNotNull() & col("model").isNotNull() & col("price").isNotNull())
df.show(10)

+-------------+--------------------+------+--------------------+-----+----+------------+--------------------+---------+-----------+----+--------+------------+------------+-----------------+-----+---------+------+-----------+--------------------+--------------------+------+-----+-------+----------+------------+
|           id|                 url|region|          region_url|price|year|manufacturer|               model|condition|  cylinders|fuel|odometer|title_status|transmission|              VIN|drive|     size|  type|paint_color|           image_url|         description|county|state|    lat|      long|posting_date|
+-------------+--------------------+------+--------------------+-----+----+------------+--------------------+---------+-----------+----+--------+------------+------------+-----------------+-----+---------+------+-----------+--------------------+--------------------+------+-----+-------+----------+------------+
|7.316814884E9|https://auburn.cr...|auburn|https://auburn.cr...|

Number of rows in this new DataFrame:

In [10]:
df.count()

404022

Let's see the first record in vertical mode:

In [11]:
df.show(1, vertical = True)

-RECORD 0----------------------------
 id           | 7.316814884E9        
 url          | https://auburn.cr... 
 region       | auburn               
 region_url   | https://auburn.cr... 
 price        | 33590                
 year         | 2014                 
 manufacturer | gmc                  
 model        | sierra 1500 crew ... 
 condition    | good                 
 cylinders    | 8 cylinders          
 fuel         | gas                  
 odometer     | 57923.0              
 title_status | clean                
 transmission | other                
 VIN          | 3GTP1VEC4EG551563    
 drive        | null                 
 size         | null                 
 type         | pickup               
 paint_color  | white                
 image_url    | https://images.cr... 
 description  | Carvana is the sa... 
 county       | null                 
 state        | al                   
 lat          | 32.59                
 long         | -85.48               
 posting_dat

Let's also sample a random record. We will use the following methods: [sample](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.sample.html), [limit](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.limit.html).

In [12]:
df.sample(0.001).limit(1).show(vertical = True)

-RECORD 0----------------------------
 id           | 7.313451714E9        
 url          | https://dothan.cr... 
 region       | dothan               
 region_url   | https://dothan.cr... 
 price        | 27990                
 year         | 2012                 
 manufacturer | toyota               
 model        | tacoma double cab... 
 condition    | good                 
 cylinders    | 6 cylinders          
 fuel         | gas                  
 odometer     | 67245.0              
 title_status | clean                
 transmission | other                
 VIN          | 5TFLU4EN4CX030112    
 drive        | 4wd                  
 size         | null                 
 type         | pickup               
 paint_color  | silver               
 image_url    | https://images.cr... 
 description  | Carvana is the sa... 
 county       | null                 
 state        | al                   
 lat          | 31.23                
 long         | -85.4                
 posting_dat

## Show distinct manufacturers

First we are going to create a [temporary view](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.createOrReplaceTempView.html).

In [27]:
df.createOrReplaceTempView("table_df")

In [50]:
spark.sql("SELECT manufacturer, count(*) AS occurances, \
    AVG(price) AS avg_price, \
    MAX(price) AS max_price, \
    MIN(price) AS min_price \
    FROM table_df \
    GROUP BY manufacturer \
    ORDER BY occurances DESC").show(100)

+---------------+----------+------------------+----------+---------+
|   manufacturer|occurances|         avg_price| max_price|min_price|
+---------------+----------+------------------+----------+---------+
|           ford|     70269| 36616.24484481066|1111111111|        0|
|      chevrolet|     54151|23654.187291093425| 123456789|        0|
|         toyota|     33942|15839.777267102705|    131500|        0|
|          honda|     21155|10786.607846844718|    123456|        0|
|         nissan|     18948| 18913.03562381254| 135008900|        0|
|           jeep|     18873|  92880.9424044932|1410065407|        0|
|            ram|     17367|28358.096389704613|  17000000|        0|
|            gmc|     16576|30614.966155888033| 123456789|        0|
|            bmw|     14584| 19036.12260010971|    138000|        0|
|          dodge|     13438|14765.301830629558|   6995495|        0|
|  mercedes-benz|     11694| 19519.11894988883|    990000|        0|
|        hyundai|     10278|10752.

In [51]:
avg_df = spark.sql("SELECT manufacturer, AVG(price), STDDEV(price) \
    FROM table_df \
    GROUP BY manufacturer").show(100)

+---------------+------------------+-----------------------------+
|   manufacturer|        avg(price)|stddev(CAST(price AS DOUBLE))|
+---------------+------------------+-----------------------------+
|         jaguar|26761.590838105152|           15486.088436083777|
|          buick| 36993.38186157518|           1672617.0299815973|
|     land rover|7911.0952380952385|            15025.62842580889|
|     mitsubishi|13766.353066829417|             9285.72397067099|
|        pontiac| 8138.923487544484|            9421.648531995548|
|          lexus|19259.542699386504|           12619.053652702445|
|         toyota|15839.777267102705|           13089.547326712098|
|       chrysler|10483.984617956863|           14699.215883897876|
|          tesla| 37967.50352112676|           15207.115049401118|
|        lincoln|19933.444896981313|             23634.3541355372|
|           audi|23405.218268720128|           15466.816180943159|
|         datsun|15393.725806451614|            9549.644412175