In [42]:
import logging
from pyspark import SparkContext

logging.basicConfig(level=logging.INFO)

sc.stop()
sc = SparkContext("local", "My Application")
raw_rdd = sc.textFile("data.csv")

In [76]:
def extract_vin_key_value(line: str):
    """
    Input: A record line
        Type: list
    Output: key: vin_number, value: make, year, incident type
        Type: tuple
    """
    record = line.strip().split(',')
    key = record[2].strip()
    value = record[3].strip(), record[5].strip(), record[1].strip()
    return [key, value]

def populate_make(record: list):
    """
    Input: List of values for each record
    Output: Accident records with make and year 
    """
    accident_records = []

    for item in record:
        if item[0] != '':
            make, year = item[0], item[1]
        if item[2] == 'A':
            accident_records.append((make, year, item[2]))    
    
    return accident_records

def extract_make_key_value(record: list):
    """
    Input: List of values for each record
    Output: key: make-year, value: 1
    """
    key = str(record[0].strip() + record[1].strip())
    value = 1

    return [key, value]

def create_accident_report(file):
    vin_kv = file.map(lambda x: extract_vin_key_value(x))
    
    enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(kv[1]))
    
    make_kv = enhance_make.map(lambda x: extract_make_key_value(x))
    
    count_kv = make_kv.reduceByKey(add)

    return count_kv

report = create_accident_report(raw_rdd)
report.saveAsTextFile("accident_report")

In [54]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(vin_kv).toDF("vin", "value")
df.sort(df.vin.asc()).show()

+-----------------+-------------------+
|              vin|              value|
+-----------------+-------------------+
|EXOA00341AB123456|{Mercedes, 2016, I}|
|EXOA00341AB123456|            {, , A}|
|EXOA00341AB123456|            {, , R}|
|INU45KIOOPA343980|            {, , A}|
|INU45KIOOPA343980|{Mercedes, 2015, I}|
|UXIA769ABCC447906|  {Toyota, 2017, I}|
|UXIA769ABCC447906|            {, , R}|
|VOME254OOXW344325|            {, , R}|
|VOME254OOXW344325|            {, , A}|
|VOME254OOXW344325|            {, , R}|
|VOME254OOXW344325|            {, , R}|
|VOME254OOXW344325|{Mercedes, 2015, I}|
|VXIO456XLBB630221|  {Nissan, 2003, I}|
|VXIO456XLBB630221|            {, , R}|
|VXIO456XLBB630221|            {, , A}|
|VXIO456XLBB630221|            {, , R}|
+-----------------+-------------------+



In [63]:
print(enhance_make.collect())

[('Nissan', '2003', 'A'), ('Mercedes', '2015', 'A'), ('Mercedes', '2015', 'A'), ('Mercedes', '2016', 'A')]


In [66]:
df = sqlContext.createDataFrame(make_kv).toDF("key", "value")
df.show()

+------------+-----+
|         key|value|
+------------+-----+
|  Nissan2003|    1|
|Mercedes2015|    1|
|Mercedes2015|    1|
|Mercedes2016|    1|
+------------+-----+



In [68]:
df = sqlContext.createDataFrame(count_kv).toDF("key", "count")
df.show()

+------------+-----+
|         key|count|
+------------+-----+
|  Nissan2003|    1|
|Mercedes2015|    2|
|Mercedes2016|    1|
+------------+-----+

