In [1]:
from pyspark import SparkContext
sc = SparkContext('local', 'autops')
raw_rdd = sc.textFile("data.csv")
raw_rdd.take(2)

['1,I,VXIO456XLBB630221,Nissan,Altima,2003,2002-05-08,Initial sales from TechMotors',
 '2,I,INU45KIOOPA343980,Mercedes,C300,2015,2014-01-01,Sold from EuroMotors']

In [2]:
def extract_vin_key_value(line):
    arr = line.split(',')
    return (arr[2], (arr[3], arr[5]))
vin_kv = raw_rdd.map(lambda x: extract_vin_key_value(x))
vin_kv.collect()

[('VXIO456XLBB630221', ('Nissan', '2003')),
 ('INU45KIOOPA343980', ('Mercedes', '2015')),
 ('VXIO456XLBB630221', ('', '')),
 ('VXIO456XLBB630221', ('', '')),
 ('VOME254OOXW344325', ('Mercedes', '2015')),
 ('VOME254OOXW344325', ('', '')),
 ('VXIO456XLBB630221', ('', '')),
 ('EXOA00341AB123456', ('Mercedes', '2016')),
 ('VOME254OOXW344325', ('', '')),
 ('VOME254OOXW344325', ('', '')),
 ('EXOA00341AB123456', ('', '')),
 ('EXOA00341AB123456', ('', '')),
 ('VOME254OOXW344325', ('', '')),
 ('UXIA769ABCC447906', ('Toyota', '2017')),
 ('UXIA769ABCC447906', ('', '')),
 ('INU45KIOOPA343980', ('', ''))]

In [19]:
vin_kv.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()


[('VXIO456XLBB630221', [('Nissan', '2003'), ('', ''), ('', ''), ('', '')]),
 ('INU45KIOOPA343980', [('Mercedes', '2015'), ('', '')]),
 ('VOME254OOXW344325',
  [('Mercedes', '2015'), ('', ''), ('', ''), ('', ''), ('', '')]),
 ('EXOA00341AB123456', [('Mercedes', '2016'), ('', ''), ('', '')]),
 ('UXIA769ABCC447906', [('Toyota', '2017'), ('', '')])]

In [34]:
def populate_make(lines):   
    #first pass to find the make and year
    for ln in lines:
        if ln[0] != '':
            make = ln[0]
            year = ln[1]
            break
    #end pass to fill in make and year
    new_lines = [(make, year) for ln in lines]

    return new_lines

enhance_make = vin_kv.groupByKey().flatMap(lambda x: populate_make(list(x[1])))

In [35]:
enhance_make.collect()

[('Nissan', '2003'),
 ('Nissan', '2003'),
 ('Nissan', '2003'),
 ('Nissan', '2003'),
 ('Mercedes', '2015'),
 ('Mercedes', '2015'),
 ('Mercedes', '2015'),
 ('Mercedes', '2015'),
 ('Mercedes', '2015'),
 ('Mercedes', '2015'),
 ('Mercedes', '2015'),
 ('Mercedes', '2016'),
 ('Mercedes', '2016'),
 ('Mercedes', '2016'),
 ('Toyota', '2017'),
 ('Toyota', '2017')]

In [36]:
def extract_make_key_value(line):
    return (line[0]+'-'+line[1], 1)

make_kv = enhance_make.map(lambda x: extract_make_key_value(x))

In [37]:
make_kv.collect()

[('Nissan-2003', 1),
 ('Nissan-2003', 1),
 ('Nissan-2003', 1),
 ('Nissan-2003', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2016', 1),
 ('Mercedes-2016', 1),
 ('Mercedes-2016', 1),
 ('Toyota-2017', 1),
 ('Toyota-2017', 1)]

In [38]:
make_year_count = make_kv.reduceByKey(lambda a,b: a+b)

In [39]:
make_year_count.collect()

[('Nissan-2003', 4),
 ('Mercedes-2015', 7),
 ('Mercedes-2016', 3),
 ('Toyota-2017', 2)]