In [1]:
import pyspark
from pyspark.sql import SparkSession
from itertools import islice
from pyspark.sql import Row
import pyspark.sql.functions as f

In [2]:
spark = SparkSession.builder.appName('pyspark-training').master('local[2]').getOrCreate()

23/03/20 07:10:42 WARN Utils: Your hostname, Prasanns-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.23 instead (on interface en0)
23/03/20 07:10:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/20 07:10:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Reading Data from CSV

In [3]:
df_cars = spark.sparkContext.textFile('USA_cars_datasets.csv')

In [4]:
df_cars.take(5)

                                                                                

['row_id,price,brand,model,year,title_status,mileage,color,vin,lot,state,country,condition',
 '0,6300,toyota,cruiser,2008,clean vehicle,274117.0,black,  jtezu11f88k007763,159348797,new jersey, usa,10 days left',
 '1,2899,ford,se,2011,clean vehicle,190552.0,silver,  2fmdk3gc4bbb02217,166951262,tennessee, usa,6 days left',
 '2,5350,dodge,mpv,2018,clean vehicle,39590.0,silver,  3c4pdcgg5jt346413,167655728,georgia, usa,2 days left',
 '3,25000,ford,door,2014,clean vehicle,64146.0,blue,  1ftfw1et4efc23745,167753855,virginia, usa,22 hours left']

In [5]:
header = df_cars.first()
header

'row_id,price,brand,model,year,title_status,mileage,color,vin,lot,state,country,condition'

In [6]:
df_cars = df_cars.filter(lambda x: x!= header)
df_cars.take(5)

['0,6300,toyota,cruiser,2008,clean vehicle,274117.0,black,  jtezu11f88k007763,159348797,new jersey, usa,10 days left',
 '1,2899,ford,se,2011,clean vehicle,190552.0,silver,  2fmdk3gc4bbb02217,166951262,tennessee, usa,6 days left',
 '2,5350,dodge,mpv,2018,clean vehicle,39590.0,silver,  3c4pdcgg5jt346413,167655728,georgia, usa,2 days left',
 '3,25000,ford,door,2014,clean vehicle,64146.0,blue,  1ftfw1et4efc23745,167753855,virginia, usa,22 hours left',
 '4,27700,chevrolet,1500,2018,clean vehicle,6654.0,red,  3gcpcrec2jg473991,167763266,florida, usa,22 hours left']

In [7]:
df_cars_split = df_cars.map(lambda x: x.split(','))
df_cars_split.take(5)

[['0',
  '6300',
  'toyota',
  'cruiser',
  '2008',
  'clean vehicle',
  '274117.0',
  'black',
  '  jtezu11f88k007763',
  '159348797',
  'new jersey',
  ' usa',
  '10 days left'],
 ['1',
  '2899',
  'ford',
  'se',
  '2011',
  'clean vehicle',
  '190552.0',
  'silver',
  '  2fmdk3gc4bbb02217',
  '166951262',
  'tennessee',
  ' usa',
  '6 days left'],
 ['2',
  '5350',
  'dodge',
  'mpv',
  '2018',
  'clean vehicle',
  '39590.0',
  'silver',
  '  3c4pdcgg5jt346413',
  '167655728',
  'georgia',
  ' usa',
  '2 days left'],
 ['3',
  '25000',
  'ford',
  'door',
  '2014',
  'clean vehicle',
  '64146.0',
  'blue',
  '  1ftfw1et4efc23745',
  '167753855',
  'virginia',
  ' usa',
  '22 hours left'],
 ['4',
  '27700',
  'chevrolet',
  '1500',
  '2018',
  'clean vehicle',
  '6654.0',
  'red',
  '  3gcpcrec2jg473991',
  '167763266',
  'florida',
  ' usa',
  '22 hours left']]

In [8]:
cols = header.split(',')

In [9]:
cols

['row_id',
 'price',
 'brand',
 'model',
 'year',
 'title_status',
 'mileage',
 'color',
 'vin',
 'lot',
 'state',
 'country',
 'condition']

In [10]:
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType

In [11]:
schema = StructType([StructField(col, StringType(), True) for col in cols])

In [12]:
schema

StructType(List(StructField(row_id,StringType,true),StructField(price,StringType,true),StructField(brand,StringType,true),StructField(model,StringType,true),StructField(year,StringType,true),StructField(title_status,StringType,true),StructField(mileage,StringType,true),StructField(color,StringType,true),StructField(vin,StringType,true),StructField(lot,StringType,true),StructField(state,StringType,true),StructField(country,StringType,true),StructField(condition,StringType,true)))

In [13]:
df_cars = spark.createDataFrame(df_cars_split, schema)

In [14]:
df_cars.show()

+------+-----+---------+-------+----+-------------+--------+------+-------------------+---------+--------------+-------+-------------+
|row_id|price|    brand|  model|year| title_status| mileage| color|                vin|      lot|         state|country|    condition|
+------+-----+---------+-------+----+-------------+--------+------+-------------------+---------+--------------+-------+-------------+
|     0| 6300|   toyota|cruiser|2008|clean vehicle|274117.0| black|  jtezu11f88k007763|159348797|    new jersey|    usa| 10 days left|
|     1| 2899|     ford|     se|2011|clean vehicle|190552.0|silver|  2fmdk3gc4bbb02217|166951262|     tennessee|    usa|  6 days left|
|     2| 5350|    dodge|    mpv|2018|clean vehicle| 39590.0|silver|  3c4pdcgg5jt346413|167655728|       georgia|    usa|  2 days left|
|     3|25000|     ford|   door|2014|clean vehicle| 64146.0|  blue|  1ftfw1et4efc23745|167753855|      virginia|    usa|22 hours left|
|     4|27700|chevrolet|   1500|2018|clean vehicle|  66

In [15]:
df_cars.printSchema()

root
 |-- row_id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- color: string (nullable = true)
 |-- vin: string (nullable = true)
 |-- lot: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- condition: string (nullable = true)



In [16]:
price_col =  f.col('price').cast('int')

In [17]:
type(price_col)

pyspark.sql.column.Column

In [18]:
df_cars =df_cars.withColumn('price', price_col)

In [19]:
df_cars.printSchema()

root
 |-- row_id: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- color: string (nullable = true)
 |-- vin: string (nullable = true)
 |-- lot: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- condition: string (nullable = true)



### Getting the Dataframe Schema

In [20]:
schema1 = df_cars.schema

In [21]:
schema1

StructType(List(StructField(row_id,StringType,true),StructField(price,IntegerType,true),StructField(brand,StringType,true),StructField(model,StringType,true),StructField(year,StringType,true),StructField(title_status,StringType,true),StructField(mileage,StringType,true),StructField(color,StringType,true),StructField(vin,StringType,true),StructField(lot,StringType,true),StructField(state,StringType,true),StructField(country,StringType,true),StructField(condition,StringType,true)))

### Getting Dataframe Columns

In [22]:
cars_cols = df_cars.columns

In [23]:
cars_cols

['row_id',
 'price',
 'brand',
 'model',
 'year',
 'title_status',
 'mileage',
 'color',
 'vin',
 'lot',
 'state',
 'country',
 'condition']

In [24]:
df_cars.first()

Row(row_id='0', price=6300, brand='toyota', model='cruiser', year='2008', title_status='clean vehicle', mileage='274117.0', color='black', vin='  jtezu11f88k007763', lot='159348797', state='new jersey', country=' usa', condition='10 days left')

## `Mark rows of cars where the car prices are over 10000 USD`

In [25]:
df_cars.select('price').show(5)

+-----+
|price|
+-----+
| 6300|
| 2899|
| 5350|
|25000|
|27700|
+-----+
only showing top 5 rows



In [26]:
df_marked = df_cars.withColumn('Over10000', f.when(df_cars.price > 10000, "Yes").otherwise('No'))

In [27]:
df_marked.select('price','Over10000').show(5)

+-----+---------+
|price|Over10000|
+-----+---------+
| 6300|       No|
| 2899|       No|
| 5350|       No|
|25000|      Yes|
|27700|      Yes|
+-----+---------+
only showing top 5 rows



In [28]:
df_marked = df_cars.withColumn('Over10000', f.when(f.col('price') > 10000, "Yes").otherwise('No'))

In [29]:
df_marked.select('Over10000').show(5)

+---------+
|Over10000|
+---------+
|       No|
|       No|
|       No|
|      Yes|
|      Yes|
+---------+
only showing top 5 rows



## `Add new column to the dataset where the car prices are converted to INR. Use the factor USD 1 = Rs. 75`

In [30]:
df_inr = df_marked.withColumn("PriceINR", f.col('price')*75)

In [31]:
df_inr.select('price','PriceINR').show(5)

+-----+--------+
|price|PriceINR|
+-----+--------+
| 6300|  472500|
| 2899|  217425|
| 5350|  401250|
|25000| 1875000|
|27700| 2077500|
+-----+--------+
only showing top 5 rows



## `Find ctop 5 cars with the highest mileage.`

In [32]:
df_cars.select('mileage').show(10)

+--------+
| mileage|
+--------+
|274117.0|
|190552.0|
| 39590.0|
| 64146.0|
|  6654.0|
| 45561.0|
|149050.0|
| 23525.0|
|  9371.0|
| 63418.0|
+--------+
only showing top 10 rows



In [33]:
df_cars.printSchema()

root
 |-- row_id: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- color: string (nullable = true)
 |-- vin: string (nullable = true)
 |-- lot: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- condition: string (nullable = true)



In [34]:
mileage_col =  f.col('mileage').cast('int')

In [35]:
df_cars =df_cars.withColumn('mileage', mileage_col)

In [36]:
df_cars.printSchema()

root
 |-- row_id: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- color: string (nullable = true)
 |-- vin: string (nullable = true)
 |-- lot: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- condition: string (nullable = true)



In [37]:
df_cars.select('brand','model','year','title_status','state','country','mileage')\
.orderBy(f.col('mileage').desc()).show(5)

+---------+------+----+-----------------+--------------+-------+-------+
|    brand| model|year|     title_status|         state|country|mileage|
+---------+------+----+-----------------+--------------+-------+-------+
|peterbilt| truck|2010|salvage insurance|       georgia|    usa|1017936|
|     ford|  door|2013|    clean vehicle|south carolina|    usa| 999999|
|peterbilt| truck|2009|salvage insurance|       florida|    usa| 982486|
|peterbilt| truck|2012|salvage insurance|       florida|    usa| 902041|
|chevrolet|pickup|2003|    clean vehicle|     wisconsin|    usa| 507985|
+---------+------+----+-----------------+--------------+-------+-------+
only showing top 5 rows



## `Find  top 2 average mileage per car maker`

In [38]:
df_cars.groupby('brand').agg(f.avg('mileage').alias('avg_mileage'))\
        .orderBy(f.col('avg_mileage').desc()).show()

+-------------+------------------+
|        brand|       avg_mileage|
+-------------+------------------+
|    peterbilt|         725615.75|
|       toyota|          274117.0|
|        acura|120379.66666666667|
|         audi|          118091.0|
|        honda|           91599.0|
|        mazda|           82647.0|
|     chrysler|           73004.0|
|    chevrolet| 65124.46127946128|
|          gmc| 58548.73809523809|
|      hyundai| 56683.86666666667|
|          kia| 56609.46153846154|
|mercedes-benz|           54597.0|
|      lincoln|           52123.0|
|         ford|  52084.3044534413|
|          bmw| 47846.41176470588|
|        dodge| 44184.86342592593|
|       nissan|42426.230769230766|
|     cadillac|           40195.9|
|         jeep| 38723.26666666667|
|        buick|37926.846153846156|
+-------------+------------------+
only showing top 20 rows



## `Find the number of crusiers class cars present in the dataset`

In [39]:
df_cars.select('brand').filter(f.col('model') == 'cruiser').show()

+------+
| brand|
+------+
|toyota|
+------+



In [40]:
df_cars.filter(df_cars.model == 'cruiser').count()

1

## `Find the number of vehicles that are clean and the number of vehicles that have been salvaged`

In [41]:
df_cars.select('title_status').distinct().show()

+-----------------+
|     title_status|
+-----------------+
|salvage insurance|
|    clean vehicle|
+-----------------+



In [42]:
df_cars.groupBy('title_status').count().show()

+-----------------+-----+
|     title_status|count|
+-----------------+-----+
|salvage insurance|  163|
|    clean vehicle| 2336|
+-----------------+-----+



## `Find the number of salvaged cars per state`

In [51]:
df_cars.filter(f.col('title_status') == 'salvage insurance').groupBy('state').count().show(25)

+--------------+-----+
|         state|count|
+--------------+-----+
|north carolina|    5|
|south carolina|    5|
|         texas|   17|
|      maryland|    4|
| massachusetts|    5|
|      oklahoma|   13|
|          utah|    3|
|   mississippi|    3|
|        nevada|    1|
|      arkansas|    9|
|     tennessee|    6|
|       arizona|    1|
|      illinois|    9|
|       florida|   11|
|      michigan|   11|
|      virginia|    7|
|       wyoming|    1|
|      kentucky|    1|
|     wisconsin|    1|
|    california|   12|
|     minnesota|    8|
|       georgia|    7|
|      colorado|    7|
|       indiana|    4|
|   connecticut|    2|
+--------------+-----+
only showing top 25 rows



In [55]:
df_cars.groupby('state').agg(f.count(df_cars.title_status == 'salvage insurance')).show()

+--------------+-----------------------------------------+
|         state|count((title_status = salvage insurance))|
+--------------+-----------------------------------------+
| west virginia|                                       21|
|       alabama|                                       17|
|      new york|                                       58|
|north carolina|                                      146|
|  pennsylvania|                                      299|
|       ontario|                                        7|
|south carolina|                                       64|
|    new jersey|                                       87|
|         texas|                                      214|
|      maryland|                                        4|
|    new mexico|                                        4|
| massachusetts|                                       27|
|  rhode island|                                        2|
|      oklahoma|                                       7

In [56]:
df_cars.filter(f.col("title_status") == 'salvage insurance').count()

163

In [57]:
df_cars.agg(f.count(df_cars.title_status == 'salvage insurance')).show()

+-----------------------------------------+
|count((title_status = salvage insurance))|
+-----------------------------------------+
|                                     2499|
+-----------------------------------------+



In [59]:
df_cars.agg(f.count(df_cars.title_status == 'salvage insurance')).select("title_status").show()

AnalysisException: cannot resolve 'title_status' given input columns: [count((title_status = salvage insurance))];
'Project ['title_status]
+- Aggregate [count((title_status#5 = salvage insurance)) AS count((title_status = salvage insurance))#631L]
   +- Project [row_id#0, price#79, brand#2, model#3, year#4, title_status#5, cast(mileage#6 as int) AS mileage#179, color#7, vin#8, lot#9, state#10, country#11, condition#12]
      +- Project [row_id#0, cast(price#1 as int) AS price#79, brand#2, model#3, year#4, title_status#5, mileage#6, color#7, vin#8, lot#9, state#10, country#11, condition#12]
         +- LogicalRDD [row_id#0, price#1, brand#2, model#3, year#4, title_status#5, mileage#6, color#7, vin#8, lot#9, state#10, country#11, condition#12], false


In [None]:
df_cars.groupby('brand').agg(f.avg('mileage').alias('avg_mileage'))\
        .orderBy(f.col('avg_mileage').desc()).show()