In [1]:
zip_num_vehicles_rdd = sc.textFile('zip_veh_count.csv')
zip_num_vehicles_rdd.collect()
header = zip_num_vehicles_rdd.first()

In [2]:
def map_multiple_zip(records):
    for record in records:
        if not record == header:
            fields = record.split(',')
            zip_code = fields[0]
            vehicle_count = int(fields[1])
            samples = int(fields[2])
            half_vehicle_count = vehicle_count / 2
            half_samples = samples / 2
            
            if "NY" in zip_code:
                # Duplicate record with incorrect format
                continue
            
            elif ';' in zip_code:
                # Of form 11220;11204,39294
                zip1 = zip_code.split(';')[0].strip()
                zip2 = zip_code.split(';')[1].strip()
                yield (zip1, (half_vehicle_count, half_samples, 1))
                yield (zip2, (half_vehicle_count, half_samples, 1))
            
            elif ':' in zip_code:
                # Of form 10309: 10312
                zip1 = zip_code.split(':')[0].strip()
                zip2 = zip_code.split(':')[1].strip()
                half_vehicle_count = vehicle_count / 2
                yield (zip1, (half_vehicle_count, half_samples, 1))
                yield(zip2, (half_vehicle_count, half_samples, 1))
            
            else:
                yield (zip_code, (vehicle_count, samples, 1))
    
zip_key_rdd = zip_num_vehicles_rdd.mapPartitions(map_multiple_zip)
zip_key_rdd.collect()

[(u'11231', (26598, 12, 1)),
 (u'11215', (13299, 6, 1)),
 (u'10038', (47638, 12, 1)),
 (u'10002', (23819, 6, 1)),
 (u'10010', (76732, 4, 1)),
 (u'10037', (38366, 2, 1)),
 (u'10023', (133731, 9, 1)),
 (u'10010', (463870, 4, 1)),
 (u'10035', (231935, 2, 1)),
 (u'11357', (16343, 5, 1)),
 (u'11435', (16343, 5, 1)),
 (u'11354', (16343, 5, 1)),
 (u'11215', (97406, 10, 1)),
 (u'10014', (131372, 10, 1)),
 (u'10027', (67208, 10, 1)),
 (u'10019', (33604, 5, 1)),
 (u'11218', (39294, 6, 1)),
 (u'11214', (39294, 6, 1)),
 (u'11220', (19647, 3, 1)),
 (u'11204', (19647, 3, 1)),
 (u'10451', (11030, 5, 1)),
 (u'10455', (11030, 5, 1)),
 (u'10459', (11030, 5, 1)),
 (u'11435', (331081, 9, 1)),
 (u'10463', (331081, 9, 1)),
 (u'11415', (331081, 9, 1)),
 (u'11423', (34724, 6, 1)),
 (u'11101', (34724, 6, 1)),
 (u'11428', (34724, 6, 1)),
 (u'10025', (100472, 4, 1)),
 (u'10040', (50236, 2, 1)),
 (u'10033', (15337, 6, 1)),
 (u'10468', (15337, 6, 1)),
 (u'12603', (15337, 6, 1)),
 (u'10458', (24044, 6, 1)),
 (u'122

In [3]:
def dict_mapper(zip_tuples):
    for zip_tuple in zip_tuples:
        zip_code = zip_tuple[0]
        zip_dict = zip_tuple[1]
        vehicle_count = zip_dict['vehicle_count']
        samples = zip_dict['samples']
        days_in_2012_2013 = 731 # 2012 was a leap year
        vehicle_count_2012_2013 = vehicle_count / samples * days_in_2012_2013
        yield (zip_code, vehicle_count_2012_2013)
    
def seqOp(dict1, tup):
    vehicle_count = tup[0]
    samples = tup[1]
    dict1['vehicle_count'] = dict1.get('vehicle_count', 0) + vehicle_count
    dict1['samples'] = dict1.get('samples', 0) + samples
    return dict1

def combOp(dict1, dict2):
    for key, value in dict1.items():
        dict2[key] = dict2.get(key, 0) + value
        return dict2
    
result_rdd = zip_key_rdd.aggregateByKey({}, seqOp, combOp).mapPartitions(dict_mapper)
result_rdd.collect()

[(u'14220', 15893402),
 (u'10004', 10424060),
 (u'11205', 4951063),
 (u'10026', 8477407),
 (u'10037', 30136206),
 (u'10455', 3567280),
 (u'10309', 5149164),
 (u'10040', 15850273),
 (u'11106', 3660848),
 (u'10305', 53658324),
 (u'10303', 5001502),
 (u'11430', 3289500),
 (u'10301', 22011141),
 (u'10475', 7046109),
 (u'10035', 45352702),
 (u'11203', 10926988),
 (u'10457', 28818213),
 (u'11418', 19396354),
 (u'11207', 15605388),
 (u'11209', 13940170),
 (u'10459', 4338485),
 (u'11223', 6294641),
 (u'11232', 6344349),
 (u'11221', 12392643),
 (u'13207', 3717135),
 (u'11355', 3738334),
 (u'11229', 23680745),
 (u'10002', 6896985),
 (u'10453', 7209122),
 (u'11366', 7528569),
 (u'10031', 16283025),
 (u'11364', 10214994),
 (u'11362', 4448866),
 (u'11726', 7161607),
 (u'10462', 9360455),
 (u'10473', 6462771),
 (u'11236', 6820961),
 (u'10468', 1868436),
 (u'11937', 13270574),
 (u'11423', 13030806),
 (u'11414', 30082112),
 (u'10017', 44501818),
 (u'11427', 5646244),
 (u'12110', 5102380),
 (u'10460', 

In [10]:
# result_rdd.coalesce(1).saveAsTextFile('./Raw Result/traffic_volume')