In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, DecimalType, StringType, DateType, TimestampType
from decimal import Decimal
from datetime import datetime
spark = SparkSession.builder.appName('PostSalesReport').getOrCreate()

In [0]:
dbutils.fs.mount(
  source = "wasbs://mini-proj@sbguidedcapstorage.blob.core.windows.net/",
  mount_point = "/mnt/mini-proj",
  extra_configs  = {"fs.azure.account.key.sbguidedcapstorage.blob.core.windows.net" : dbutils.secrets.get('key-vault-scope', 'sbguidedcapstorage')}
)

True

In [0]:
schema = StructType([
    StructField('incident_id',IntegerType(),True),
    StructField('incident_type', StringType(), True),
    StructField('vin_number', StringType(), True),
    StructField('make', StringType(), True),
    StructField('model', StringType(), True),
    StructField('year', StringType(), True),
    StructField('incident_date', DateType(), True),
    StructField('description', StringType(), True)
])

In [0]:
raw_frame = spark.read.format('csv').schema(schema).load('/mnt/mini-proj/data.csv')

In [0]:
raw_frame.show(2)

+-----------+-------------+-----------------+--------+------+----+-------------+--------------------+
|incident_id|incident_type|       vin_number|    make| model|year|incident_date|         description|
+-----------+-------------+-----------------+--------+------+----+-------------+--------------------+
|          1|            I|VXIO456XLBB630221|  Nissan|Altima|2003|   2002-05-08|Initial sales fro...|
|          2|            I|INU45KIOOPA343980|Mercedes|  C300|2015|   2014-01-01|Sold from EuroMotors|
+-----------+-------------+-----------------+--------+------+----+-------------+--------------------+
only showing top 2 rows



In [0]:
def extract_vin_key_value(x):
    r_tuple = (x['vin_number'], [x['make'], x['year'],x['incident_type']]) if x['incident_type'] == 'I' else (x['vin_number'], [None,None,x['incident_type']])
    return r_tuple

In [0]:
vin_kv = raw_frame.rdd.map(lambda x: extract_vin_key_value(x))

In [0]:
def populate_make(iter):
    l = [item[0:2] for item in iter if item[0] is not None]
    return l*len(iter)

In [0]:
enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(kv[1]))

In [0]:
def extract_make_key_value(x):
    return (x[0]+'-'+x[1], 1)

In [0]:
make_kv = enhance_make.map(lambda x: extract_make_key_value(x))

In [0]:
make_kv.collect()

[('Nissan-2003', 1),
 ('Nissan-2003', 1),
 ('Nissan-2003', 1),
 ('Nissan-2003', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2015', 1),
 ('Mercedes-2016', 1),
 ('Mercedes-2016', 1),
 ('Mercedes-2016', 1),
 ('Toyota-2017', 1),
 ('Toyota-2017', 1)]

In [0]:
final = make_kv.reduceByKey(lambda x, y: x + y)

In [0]:
final.toDF().write.csv('/mnt/mini-proj/output.csv')