In [466]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, LongType, MapType, Row
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.rdd import RDD
import numpy as np
import pandas as pd
import sys

spark = SparkSession.builder.appName("Auto Report–Spark").getOrCreate()



# 1. Load Data

In [288]:
schema = StructType([ \
    StructField('incident_id', IntegerType(), True), 
    StructField('incident_type', StringType(), True),
    StructField('vin_num', StringType(), True),
    StructField('make', StringType(), True),
    StructField('model', StringType(), True),
    StructField('year', StringType(), True),
    StructField('incident_date', DateType(), True),
    StructField('desc', StringType(), True)
])


In [289]:
df = spark.read.format("csv") \
    .option("header", False) \
    .schema(schema) \
    .load("/Users/mallory/Desktop/DataEngineering/Springboard/DistComp/hadoop_auto/data.csv")
df.show()
#df.cache()

+-----------+-------------+-----------------+--------+------+----+-------------+--------------------+
|incident_id|incident_type|          vin_num|    make| model|year|incident_date|                desc|
+-----------+-------------+-----------------+--------+------+----+-------------+--------------------+
|          1|            I|VXIO456XLBB630221|  Nissan|Altima|2003|   2002-05-08|Initial sales fro...|
|          2|            I|INU45KIOOPA343980|Mercedes|  C300|2015|   2014-01-01|Sold from EuroMotors|
|          3|            A|VXIO456XLBB630221|    null|  null|null|   2014-07-02|   Head on collision|
|          4|            R|VXIO456XLBB630221|    null|  null|null|   2014-08-05| Repair transmission|
|          5|            I|VOME254OOXW344325|Mercedes|  E350|2015|   2014-02-01|    Sold from Carmax|
|          6|            R|VOME254OOXW344325|    null|  null|null|   2015-02-06|Wheel allignment ...|
|          7|            R|VXIO456XLBB630221|    null|  null|null|   2015-01-01|Re

## 1.1: Perform Map Operation 
Implement `extract_key_vin_value()` method

In [507]:
def extract_key_vin_value(x):
    """:param x: data source loaded into SparkSession,
        :output: dictionary tuple with mapping values to be transformed into MapType"""

    vin_number = x.vin_num
    make = x.make
    year = x.year
    model = x.model
    incident_id = x.incident_id
    incident_type = x.incident_type
    incident_date = x.incident_date
    desc = x.desc
  
    return (vin_number, {"make": make, "year": year, "model": model, "incident_id":incident_id, "incident_type":incident_type, "incident_date":incident_date, "desc":desc})


In [508]:
vin_kv = df.rdd.map(lambda x: extract_key_vin_value(x))
# vin_kv.cache()
type(vin_kv)

pyspark.rdd.PipelinedRDD

In [293]:
# QA - was checking how to access each member 😅
# for key, value in vin_kv.collect():
#     # print(key, value)
#     print(key, value["make"], value["year"], value["model"])

## 1.2 Perform Group Aggregation to Populate Make & Year to All Records
Implement `populate_make()` method

In [638]:
# vin_kv.collectAsMap()
sc = spark.sparkContext
def populate_make(data_rdd):
    # data = data_rdd.collect()
    assert (isinstance(data_rdd, RDD), 'data_rdd is not an RDD')

    output = []

    for member in data_rdd:

        if member["incident_id"] != None:
                incident_id = member["incident_id"] # TODO TypeError: string indices must be integers
        if member["incident_type"] != None:
                incident_type = member["incident_type"]
        if member["incident_date"] != None:
                incident_date = member["incident_date"]
        desc = member["desc"]
        if member["make"] != None:
                make = member["make"]
        if member["year"] != None:
                year = member["year"]
        if member["model"] != None:
                model = member["model"]
        output.append({key: {"make": make, "year": year, "model": model, "incident_id":incident_id, "incident_type":incident_type, "incident_date":incident_date, "desc": desc}})
    return output


  assert (isinstance(data_rdd, RDD), 'data_rdd is not an RDD')


In [481]:
# # vin_kv.collectAsMap()
# sc = spark.sparkContext


# def populate_make(data_rdd):
#     # data = data_rdd.collect()
#     assert (isinstance(data_rdd, RDD), 'data_rdd is not an RDD')

#     output = []

#     for member in data_rdd:
#         value = member[1]
#         key = member[0]
        
#         print(member[1]) # QA
#         print(value) #QA

#         incident_id = value[3] # TODO string index is out of range?
#         incident_type = value[4]
#         incident_date = value[5]
#         desc = value[6]
#         if value[0] != None:
#             make = value[0]
#         if value[1] != None:
#             year = value[1]
#         if value[2] != None:
#             model = value[2]
#         output.append({key: {"make": make, "year": year, "model": model, "incident_id": incident_id,
#                       "incident_type": incident_type, "incident_date": incident_date, "desc": desc}})
#     #return sc.parallelize(output)
#     return output


  assert (isinstance(data_rdd, RDD), 'data_rdd is not an RDD')


In [585]:
# QA
#print(populate_make(vin_kv))

# x = populate_make(vin_kv.collect())
# for i in x.items():
#      print(i)

# type(x)
#type(populate_make(vin_kv.collect()))

In [296]:
# # creating a bridge table to collect master information for each make and model
# bridge_schema = StructType([
#     StructField("vin_key", StringType(), True),
#     # use MapType to make use of key-value pairs returned by function
#     StructField("properties", MapType(StringType(), StringType(), True))
# ])

# bridge_df = spark.createDataFrame(data=vin_kv, schema=bridge_schema)
# bridge_df.show(truncate=False)
# # check schema
# bridge_df.printSchema()

# # options here were map_concat(), coalesce(), and explode()
# # explode map column to create a new row for each element in the given map column
# bridge_df = bridge_df.select("vin_key", explode("properties"))

# bridge_df.printSchema()  # schema will validate the explosion
# bridge_df.show()

# # get distinct values & drop null to avoid duplication
# df_distinct = bridge_df.select(
#     "vin_key", "key", "value").distinct().na.drop().sort("key")
# df_distinct.show(truncate=False)

# # get rid of null values to get year make and model as an array of maps
# map_df = df_distinct.select("vin_key", create_map("key", "value").alias("map")) \
#     .groupBy("vin_key") \
#     .agg(collect_list("map").alias("make_model_year")) \
#     # .cache()

# # make = map_df.select("vin_key", map_df.make_model_year[0].alias("make")).show()

# # model = map_df.select("vin_key", map_df.make_model_year[1].alias("model")).show()

# # year = map_df.select("vin_key", map_df.make_model_year[2].alias("year")).show()

# # map_df.show(truncate=False)
# # map_df.printSchema()

# map_df = map_df.select("vin_key", map_df.make_model_year[0].alias("make_map"), map_df.make_model_year[1].alias(
#     "model_map"), map_df.make_model_year[2].alias("year_map"))  # .cache()

# map_df.show()

# map_df = map_df.select("vin_key", map_df.make_map.getItem("make").alias("make"),
#                        map_df.model_map.getItem("model").alias("model"),
#                        map_df.year_map.getItem("year").alias("year")) \
#     .show()


In [639]:

enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(kv[1]))
print(type(enhance_make)) # RDD
print(enhance_make.collect())  # list of dictionaries


<class 'pyspark.rdd.PipelinedRDD'>
[{'INU45KIOOPA343980': {'make': 'Nissan', 'year': '2003', 'model': 'Altima', 'incident_id': 1, 'incident_type': 'I', 'incident_date': datetime.date(2002, 5, 8), 'desc': 'Initial sales from TechMotors'}}, {'INU45KIOOPA343980': {'make': 'Nissan', 'year': '2003', 'model': 'Altima', 'incident_id': 3, 'incident_type': 'A', 'incident_date': datetime.date(2014, 7, 2), 'desc': 'Head on collision'}}, {'INU45KIOOPA343980': {'make': 'Nissan', 'year': '2003', 'model': 'Altima', 'incident_id': 4, 'incident_type': 'R', 'incident_date': datetime.date(2014, 8, 5), 'desc': 'Repair transmission'}}, {'INU45KIOOPA343980': {'make': 'Nissan', 'year': '2003', 'model': 'Altima', 'incident_id': 7, 'incident_type': 'R', 'incident_date': datetime.date(2015, 1, 1), 'desc': 'Replace right head light'}}, {'INU45KIOOPA343980': {'make': 'Mercedes', 'year': '2015', 'model': 'C300', 'incident_id': 2, 'incident_type': 'I', 'incident_date': datetime.date(2014, 1, 1), 'desc': 'Sold from 

# 2. Count the # of Accident Occurrences for Each Vehicle Make & Year
## 2.1 Perform Map Operation
Implement `extract_key_make_value()` method

In [702]:
# def extract_key_make_value(data):
#     # QA
#     # assert (isinstance(data, dict), "'data' from extract_key_make_value() must be a dictionary.")
#     # print(data)
#     #print(type(data))
    
#     #data = data.collect() # returns list
#     # iterate over data list
#     for dict_item in data:
#         # item should be a dictionary 
#         for value in dict_item.values():
#             # print(values)
#             if value["incident_type"] == 'A':
#                 count = ((value["make"]+value["year"]), 1)
#                 print(type(count))
#             else:
#                 count = ((value["make"]+value["year"]), 0)
#         # returns rdd
#             return count 


# # QA
# # extract_key_make_value(enhance_make)
# enhance_make.map(lambda x: extract_key_make_value(x)).collect()


In [715]:
def extract_key_make_value(data):
    # QA
    assert (isinstance(data, dict), "'data' from extract_key_make_value() must be a dictionary.")

    # print(data)
    for key, value in data.items():
        # print(key, value)
        if value["incident_type"] == 'A':
            count = ((value["make"]+value["year"]), 1)
            # print(count)
        else:
            count = ((value["make"]+value["year"]), 0)
        # returns rdd
        return count


# QA
# extract_key_make_value(enhance_make)
# enhance_make.map(lambda x: extract_key_make_value(x)).collect()


[('Nissan2003', 0),
 ('Nissan2003', 1),
 ('Nissan2003', 0),
 ('Nissan2003', 0),
 ('Mercedes2015', 0),
 ('Mercedes2015', 1),
 ('Mercedes2015', 0),
 ('Mercedes2015', 0),
 ('Mercedes2015', 1),
 ('Mercedes2015', 0),
 ('Mercedes2015', 0),
 ('Mercedes2016', 0),
 ('Mercedes2016', 0),
 ('Mercedes2016', 1),
 ('Toyota2017', 0),
 ('Toyota2017', 0)]

In [719]:
make_kv = enhance_make.map(lambda x: extract_key_make_value(x))

# QA
type(make_kv)
print(make_kv.collect())


[('Nissan2003', 0), ('Nissan2003', 1), ('Nissan2003', 0), ('Nissan2003', 0), ('Mercedes2015', 0), ('Mercedes2015', 1), ('Mercedes2015', 0), ('Mercedes2015', 0), ('Mercedes2015', 1), ('Mercedes2015', 0), ('Mercedes2015', 0), ('Mercedes2016', 0), ('Mercedes2016', 0), ('Mercedes2016', 1), ('Toyota2017', 0), ('Toyota2017', 0)]


## 2.2 Aggregate Tuple Keys to Count Total Number of Records per Key Composite
Use `reduceByKey()` to sum all values from each record.

In [721]:
# using make_kv
make_year_rdd = make_kv.reduceByKey(lambda key,count:key+count)

# QA
print(type(make_year_rdd))
print(make_year_rdd.collect())

<class 'pyspark.rdd.PipelinedRDD'>
[('Nissan2003', 1), ('Mercedes2015', 2), ('Mercedes2016', 1), ('Toyota2017', 0)]


In [725]:
final_df = make_year_rdd.toDF(schema=["make_year", "accident_count"])
final_df.show()

+------------+--------------+
|   make_year|accident_count|
+------------+--------------+
|  Nissan2003|             1|
|Mercedes2015|             2|
|Mercedes2016|             1|
|  Toyota2017|             0|
+------------+--------------+



In [728]:
final_df.write.mode("append").format("csv").save("./report_csv")