In [0]:
file_location = "/FileStore/tables/data.csv"

#Read data.csv
raw_rdd = spark.sparkContext.textFile(file_location)

# the map output key should be vin_number, value should be the make and year, along with the incident type.
def extract_vin_key_value(line):
    line = line.split(",")
    return (
        (line[2]),(line[3],line[5],line[1])
    )

vin_kv = raw_rdd.map(lambda x: extract_vin_key_value(x))


# Perform group aggregation to populate make and year to all the records.
# Like the reducer in MapReduce framework, Spark provides a “groupByKey” function to achieve
# shuffle and sort in order to aggregate all records sharing the same key to the same groups.
# Within a group of vin_number, we need to iterate through all the records and find the one that
# has the make and year available and capture it in group level master info. As we filter and
# output accident records, those records need to be modified adding the master info that we
# captured in the first iterations

def populate_make(line):
    lines = list(line)
    initial_sales = [x for x in lines if x[2]=='I']
    accidents = [x for x in lines if x[2]=='A']
    zipped = list(zip(initial_sales, accidents))
    return [(x[0],x[1],y[2]) for x,y in zipped]
    
enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(kv[1]))


# Map to get a RDD of ((make, year),1)
make_kv = enhance_make.map(lambda x: ((x[0],x[1]),1))


# ReduceByKey for of accidents by make and year
accident_rec_per_year = make_kv.reduceByKey(lambda x,y: x+y)

# format properly
accident_rec_per_year_format = accident_rec_per_year.map(lambda x:x[0][0]+"-"+x[0][1]+","+str(x[1]))

# # Save as text file
accident_rec_per_year_format.saveAsTextFile("car_accidents_count")

accident_rec_per_year_format.take(20)

Out[10]: ['Mercedes-2015,2', 'Mercedes-2016,1', 'Nissan-2003,1']