In [10]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *

In [13]:
sc = SparkContext()

In [14]:
# construct spark session instance
spark = SparkSession(sc).builder \
	.master("local") \
	.appName("Process Listings") \
	.config("spark.debug.maxToStringFields", "100") \
	.getOrCreate()

In [15]:
# file path
#file = '/data/p_dsi/capstone_projects/shea/mc_listings_extract.csv.gz'
file = 'hdfs:///user/conawws1/mc_listings_extract.csv.gz'

In [16]:
# define schema
schema = StructType([
	StructField('id',StringType(),True)
	,StructField('vin',StringType(),True)
	,StructField('heading',StringType(),True)
	,StructField('more_info',StringType(),True)
	,StructField('price',IntegerType(),True)
	,StructField('msrp',IntegerType(),True)
	,StructField('miles',IntegerType(),True)
	,StructField('stock_no',StringType(),True)
	,StructField('year',ShortType(),True)
	,StructField('make',StringType(),True)
	,StructField('model',StringType(),True)
	,StructField('trim',StringType(),True)
	,StructField('vehicle_type',StringType(),True)
	,StructField('body_type',StringType(),True)
	,StructField('body_subtype',StringType(),True)
	,StructField('drivetrain',StringType(),True)
	,StructField('fuel_type',StringType(),True)
	,StructField('engine',StringType(),True)
	,StructField('engine_block',StringType(),True)
	,StructField('engine_size',StringType(),True)
	,StructField('engine_measure',StringType(),True)
	,StructField('engine_aspiration',StringType(),True)
	,StructField('transmission',StringType(),True)
	,StructField('speeds',ByteType(),True)
	,StructField('doors',ByteType(),True)
	,StructField('cylinders',ByteType(),True)
	,StructField('city_mpg',ByteType(),True)
	,StructField('highway_mpg',ByteType(),True)
	,StructField('interior_color',StringType(),True)
	,StructField('exterior_color',StringType(),True)
	,StructField('base_exterior_color',StringType(),True)
	,StructField('base_interior_color',StringType(),True)
	,StructField('is_certified',ByteType(),True) # try BooleanType
	,StructField('is_transfer',ByteType(),True) # try BooleanType
	,StructField('taxonomy_vin',StringType(),True)
	,StructField('model_code',StringType(),True)
	,StructField('scraped_at',StringType(),True)
	,StructField('status_date',StringType(),True)
	,StructField('first_scraped_at',StringType(),True)
	,StructField('dealer_id',StringType(),True)
	,StructField('source',StringType(),True)
	,StructField('seller_name',StringType(),True)
	,StructField('street',StringType(),True)
	,StructField('city',StringType(),True)
	,StructField('state',StringType(),True)
	,StructField('zip',StringType(),True)
	,StructField('latitude',FloatType(),True)
	,StructField('longitude',FloatType(),True)
	,StructField('country',StringType(),True)
	,StructField('seller_phone',StringType(),True)
	,StructField('seller_email',StringType(),True)
	,StructField('seller_type',StringType(),True)
	,StructField('listing_type',StringType(),True)
	,StructField('inventory_type',StringType(),True)
	,StructField('dealer_type',StringType(),True)
	,StructField('car_seller_name',StringType(),True)
	,StructField('car_address',StringType(),True)
	,StructField('car_street',StringType(),True)
	,StructField('car_city',StringType(),True)
	,StructField('car_state',StringType(),True)
	,StructField('car_zip',StringType(),True)
	,StructField('car_latitude',FloatType(),True)
	,StructField('car_longitude',FloatType(),True)
	,StructField('seller_comments',StringType(),True)
	,StructField('options',StringType(),True)
	,StructField('features',StringType(),True)
	,StructField('photo_links',StringType(),True)
	,StructField('photo_url',StringType(),True)
	,StructField('dom',ShortType(),True)
	,StructField('dom_180',ShortType(),True)
	,StructField('dom_active',ShortType(),True)
	,StructField('currency_indicator',StringType(),True)
	,StructField('miles_indicator',StringType(),True)
	,StructField('carfax_1_owner',ByteType(),True) # try BooleanType
	,StructField('carfax_clean_title',ByteType(),True) # try BooleanType
	,StructField('loan_term',ShortType(),True)
	,StructField('loan_apr',FloatType(),True)
	,StructField('l_down_pay',FloatType(),True)
	,StructField('l_emi',FloatType(),True)
	,StructField('f_down_pay',FloatType(),True)
	,StructField('f_down_pay_per',FloatType(),True)
	,StructField('f_emi',FloatType(),True)
	,StructField('lease_term',ShortType(),True)
	,StructField('in_transit',ByteType(),True) # try BooleanType
	,StructField('in_transit_at',StringType(),True)
	,StructField('in_transit_days',IntegerType(),True)
	,StructField('high_value_features',StringType(),True)
	])

In [17]:
# read csv
df = spark.read.csv(file, schema = schema, sep = ',', quote = '"')
## PARSING PROBLEM, need to use quotations as field delimiter
	# example vins (1C6RR6YT5JS119710, 4T1B11HK2KU777711)

In [18]:
# size
print((df.count(), len(df.columns)))

(2989, 87)


In [19]:
# preview
df.take(5)

[Row(id=None, vin=None, heading=None, more_info=None, price=None, msrp=None, miles=None, stock_no=None, year=None, make=None, model=None, trim=None, vehicle_type=None, body_type=None, body_subtype=None, drivetrain=None, fuel_type=None, engine=None, engine_block=None, engine_size=None, engine_measure=None, engine_aspiration=None, transmission=None, speeds=None, doors=None, cylinders=None, city_mpg=None, highway_mpg=None, interior_color=None, exterior_color=None, base_exterior_color=None, base_interior_color=None, is_certified=None, is_transfer=None, taxonomy_vin=None, model_code=None, scraped_at=None, status_date=None, first_scraped_at=None, dealer_id=None, source=None, seller_name=None, street=None, city=None, state=None, zip=None, latitude=None, longitude=None, country=None, seller_phone=None, seller_email=None, seller_type=None, listing_type=None, inventory_type=None, dealer_type=None, car_seller_name=None, car_address=None, car_street=None, car_city=None, car_state=None, car_zip=Non

In [20]:
# confirm 
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- vin: string (nullable = true)
 |-- heading: string (nullable = true)
 |-- more_info: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- msrp: integer (nullable = true)
 |-- miles: integer (nullable = true)
 |-- stock_no: string (nullable = true)
 |-- year: short (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- trim: string (nullable = true)
 |-- vehicle_type: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- body_subtype: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- engine_block: string (nullable = true)
 |-- engine_size: string (nullable = true)
 |-- engine_measure: string (nullable = true)
 |-- engine_aspiration: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- speeds: byte (nullable = true)
 |-- doors: byte (nullable = true)
 |

In [21]:
from pyspark.sql.functions import col, size, split, isnull

In [22]:
df2 = (df
		.drop('more_info') # drop listing url
		.withColumn('photo_links_count', size(split(col('photo_links'), r'\|'))) # count photo links
		.drop('photo_links') # drop photo links
		.replace(-1,0,'photo_links_count') # fix count for NAs
		.withColumn('photo_main', ~isnull(col('photo_url')))
		)
#		.sample(fraction = 0.01, withReplacement = False)

In [23]:
df2.select('vin','photo_url','photo_main').take(10)

[Row(vin='vin', photo_url='photo_url', photo_main=True),
 Row(vin='3N1CN7AP3GL814923', photo_url='http://images.dealersync.com/cloud/userdocumentprod/2659/Photos/580611/wm_cf5d55b8525e443a8ac92b4331255817_580611.jpg?_=b77b544b6d916283e23f148920282ea88cdbc335', photo_main=True),
 Row(vin='5N1AT2MT3GC842962', photo_url=' reliability', photo_main=True),
 Row(vin='1C3CCCAB3GN161112', photo_url=None, photo_main=False),
 Row(vin='4T1B11HK2KU777711', photo_url=' AM/FM', photo_main=True),
 Row(vin='5NPE24AF1HH479439', photo_url='https://media-cdn-a5-jazel-tango.jazel-qa.com/media/69490215?', photo_main=True),
 Row(vin='1C4RJFBG0FC653856', photo_url=' Fore/Aft Movement', photo_main=True),
 Row(vin='5NMZUDLB1HH047582', photo_url='https://vini.gm.com/realimages/5NMZUDLB1HH047582/375324a.jpg', photo_main=True),
 Row(vin='5UXWZ7C31H0V94154', photo_url='450RPM|Transmission: 8 speed automatic|Variable intake manifold|Variable valve control|Approach angle: 26 deg|Departure angle: 23 deg|Ground clearan

In [24]:
df2.where("vin == '1C6RR6YT5JS119710'").select(['vin','dom']).show()

+---+---+
|vin|dom|
+---+---+
+---+---+



In [12]:
sc.stop()