## Overview

This notebook contains a Spark pipeline for processing JSON formatted data and export CSV files during the various steps of its adaptation, in order to match the target data.

In [2]:
input_dir = '/FileStore/tables/'
outdir = '/dbfs/FileStore/'

file_location = input_dir + "supplier_car.json"
df = spark.read.json(file_location)

In [3]:
import pandas as pd
import os
#Transposing values of columns Attr. Names and Attr. Values from rows into columns
#Step 1, same granularity
dfPandas = df.toPandas()
pivotedDF = dfPandas.pivot(index='ID', columns='Attribute Names', values='Attribute Values')
del dfPandas['Attribute Names'], dfPandas['Attribute Values']
dfPandas = dfPandas.join(on='ID', other=pivotedDF)

dfPandas = dfPandas.rename(columns={'ID': '_id'})

outname = 'pre-processed.csv'
dfPandas.to_csv(outdir+outname, index=False, encoding="utf-8")

#Step 2 / Normalization
#Normalizer function
def normalize(df: pd.DataFrame, col: str, allowedValues: list, replaceMap: dict):
  df[col].replace(replaceMap, inplace=True)
  df.loc[~df[col].isin(allowedValues), col] = 'Other'

#colorsMap = {"anthrazit": "Anthracite", "anthrazit mét.": "Metallic Anthracite", "blau": "Blue", "blau mét.": "Metallic Blue", "silber": "Silver", "silber mét.": "Metallic Silver", "violett": "Purple", "violett mét.": "Metallic Purple", "weiss": "White", "weiss mét.": "Metallic White", "bordeaux": "Maroon", "bordeaux mét.": "Metallic Maroon", "braun": "Brown", "braun mét.": "Metallic Brown", "gelb": "Yellow", "gelb mét.": "Metallic Yellow", "gold": "Gold", "gold mét.": "Metallic Gold", "grau": "Grey", "grau mét.": "Metallic Grey", "grün": "Green", "grün mét.": "Metallic Green", "beige": "Beige", "beige mét.": "Metallic Beige", "orange": "Orange", "orange mét.": "Metallic Orange", "rot": "Red", "red mét.": "Metallic Red", "schwarz": "Black", "schwarz mét.": "Metallic Black"}

#Bordeaux taken out as it is not in the target list, metallic colors taken out as not present in target data
colorsMap = {"anthrazit": "Anthracite", "anthrazit mét.": "Anthracite", "blau": "Blue", "blau mét.": "Blue", "silber": "Silver", "silber mét.": "Silver", "violett": "Purple", "violett mét.": "Purple", "weiss": "White", "weiss mét.": "White", "braun": "Brown", "braun mét.": "Brown", "gelb": "Yellow", "gelb mét.": "Yellow", "gold": "Gold", "gold mét.": "Gold", "grau": "Grey", "grau mét.": "Grey", "grün": "Green", "grün mét.": "Green", "beige": "Beige", "beige mét.": "Beige", "orange": "Orange", "orange mét.": "Orange", "rot": "Red", "red mét.": "Red", "schwarz": "Black", "schwarz mét.": "Black"}

#Normalization of body colors
listOfAllowedColors = list(colorsMap.values())
normalize(dfPandas, 'BodyColorText', listOfAllowedColors, colorsMap)

#============================TESTING===============================
testDF = dfPandas
color = testDF['BodyColorText'][2]
testDF['BodyColorText'][2] = 'NoColor' 
normalize(testDF, 'BodyColorText', listOfAllowedColors, colorsMap)
#Positive assertion
assert(testDF['BodyColorText'][2] == 'Other'), 'It didn\'t work!'
#Negative assertion
assert('NoColor' not in testDF.loc[:, 'BodyColorText']), 'It did not work!'
#=================================================================

#Normalization of internal colors
normalize(dfPandas, 'InteriorColorText', listOfAllowedColors, colorsMap)

#Normalization of MakeText
dfPandas['MakeText'] = dfPandas['MakeText'].str.title()

#Normalization of ModelText
#This is already like the model attribute in target data

#Normalization of TypeName and ModelText into target data's model_variant: model_variant = TypeName - ModelText ==================> this will be called only during the integration phase as it will erase the TypeName attribute
def normalizeTypeName(df: pd.DataFrame):
  df['model_variant'] = df['TypeName']
  for index, value in df['TypeName'].items():
    #ML 350 Inspiration - ML 350 => Inspiration ; Phaeton 6.0 W12 4Motion	- PHAETON => 6.0 W12 4Motion
    if (str(value) != str(df['ModelText'][index])): #Otherwise model_variant would become "" in certain cases
      df['model_variant'][index] = value.replace(str(df['ModelText'][index]).title(), '') \
                                        .replace(str(df['ModelText'][index]), '')
  del df['TypeName']

#Normalization of ModelTypeText - no need, is already like target data's model

#Normalization BodyTypeText (target data's carType)
#['Cabriolet' 'SUV / Geländewagen' 'Kombi' 'Limousine' 'Coupé', 'Kompaktvan / Minivan' 'Pick-up' 'Kleinwagen' 'Sattelschlepper', 'Wohnkabine' nan]
#Many values are not present in the target data, but I felt right to add them (as opposite to missing colors, which I considered less important for users)
#print(dfPandas.BodyTypeText.unique())
carTypesMap = {"Cabriolet": "Convertible / Roadster", "SUV / Geländewagen": "SUV", "Kombi": "Station Wagon", "Limousine": "Limousine", "Kompaktvan / Minivan": "Compact Van", "Pick-up": "Pick-up", "Kleinwagen": "Subcompact", "Sattelschlepper": "Trailer Truck", "Wohnkabine": "Camper shell / Truck camper"}

listOfAllowedCarTypes = list(carTypesMap.values())

#for all carTypes/rows in the BodyTypeText column, if it is not in the list of allowed car types, BodyTypeText=Other
normalize(dfPandas, 'BodyTypeText', listOfAllowedCarTypes, carTypesMap)

#============================TESTING===============================
carType = testDF['BodyTypeText'][1]
testDF['BodyTypeText'][2] = 'Mofa' 
normalize(testDF, 'BodyTypeText', listOfAllowedCarTypes, carTypesMap)
#Positive assertion
assert(testDF['BodyTypeText'][2] == 'Other'), 'It didn\'t work!'
#Negative assertion
assert('Mofa' not in testDF.loc[:, 'BodyTypeText']), 'It did not work!'
#=================================================================

#Normalization DriveTypeText
#print(dfPandas.DriveTypeText.unique()) => ['Hinterradantrieb' (Rear-wheel drive), 'Allrad' (All-wheel drive), 'Vorderradantrieb' (Front-wheel drive), 'null']
driveTypesMap = {"Hinterradantrieb": "Rear-wheel drive", "Allrad": "All-wheel drive", "Vorderradantrieb": "Front-wheel drive"}
listOfAllowedDriveTypes = list(driveTypesMap.values())
normalize(dfPandas, 'DriveTypeText', listOfAllowedDriveTypes, driveTypesMap)

#============================TESTING===============================
driveType = testDF['DriveTypeText'][2]
testDF['DriveTypeText'][2] = '4x4' 
normalize(testDF, 'DriveTypeText', listOfAllowedDriveTypes, driveTypesMap)
#Positive assertion
assert(testDF['DriveTypeText'][2] == 'Other'), 'It didn\'t work!'
#Negative assertion
assert('Mofa' not in testDF.loc[:, 'DriveTypeText']), 'It did not work!'
del testDF
#=================================================================
  
#Normalization Properties
#up to 4 out of possibleProps, combined by a comma
propertiesDict = {'Ab MFK': "freshly serviced", 'Direkt-/Parallelimport': "direct-/parallel import", 'Tuning': "tuned", 'Rennwagen': "racing car"}
possibleProps = ['Ab MFK', 'Direkt-/Parallelimport', 'Tuning', 'Rennwagen']
normalizedProperties = dfPandas['Properties']
normalizedProperties = normalizedProperties.str.replace('"', '').str.strip(' ').str.split(',') #strip=> not removing in the middle ["Ab MFK\""," \"Tuning"]

def normalizeProperties(props) -> list:
    newList = []
    for prop in props:
        prop = prop.strip(' ')
        if prop in possibleProps:
            newList.append(prop.replace(prop, propertiesDict[prop]))
    return newList if len(newList) > 0 else ['Other']

for index, value in normalizedProperties.items():
      normalizedProperties.iloc[index] = normalizeProperties(value)
      normalizedProperties.iloc[index] = ', '.join(map(str, normalizedProperties.iloc[index])) 
dfPandas['Properties'] = normalizedProperties

#Normalization of ConditionTypeText
#Vorführmodell: Demo car, Occasion: Used, Oldtimer: Antique car / Restored, Neu: New
conditionTypesMap = {"Vorführmodell": "Demo car", "Occasion": "Used", "Oldtimer": "Antique Car / Restored", "Neu": "New"}
listOfAllowedConditionTypes = list(conditionTypesMap.values())
normalize(dfPandas, 'ConditionTypeText', listOfAllowedConditionTypes, conditionTypesMap)

#Normalization TransmissionTypeText 
transmissionTypesMap = {"Automatisiertes Schaltgetriebe": "Automated manual transmission", "Automatik-Getriebe": "Automatic", "Automat": "Automatic", "Schaltgetriebe manuell": "Manual", "Schaltgetriebe": "Manual", "Automat sequentiell": "CVT", "Automat stufenlos, sequentiell": "CVT", "Automat stufenlos": "CVT"}
listOfAllowedTransmissionTypes = list(transmissionTypesMap.values())
normalize(dfPandas, 'TransmissionTypeText', listOfAllowedTransmissionTypes, transmissionTypesMap)

#Normalization FuelTypeText
#print(dfPandas['FuelTypeText'].unique()) ['Benzin' 'null' 'Diesel' 'Bioethanol' 'Benzin/Elektro']
fuelTypesMap = {"Benzin": "Petrol", "Diesel": "Diesel", "Bioethanol": "Bioethanol", "Benzin/Elektro": "Petrol/Electric (Hybrid)", "Diesel/Elektro": "Diesel/Electric (Hybrid)"}
listOfAllowedFuelTypes = list(fuelTypesMap.values())
normalize(dfPandas, 'FuelTypeText', listOfAllowedFuelTypes, fuelTypesMap)

#Normalization of Ccm and City into city
#Aggregating Ccm and City into one field and deleting them, called in step 4
def normalizeCity(df: pd.DataFrame):
  df['city'] = df[['Ccm', 'City']].agg(' '.join, axis=1)
  df['zip'] = df['Ccm']
  del df['Ccm'], df['City']

#output normalized.csv
outname = 'normalized.csv'
dfPandas.to_csv(outdir+outname, index=False, encoding="utf-8")

In [4]:
#Step3, Extraction
from geopy.geocoders import Nominatim
#print(dfPandas['entity_id'].unique().size) 21906, this can be a better ID
dfPandas['_id'] = dfPandas['entity_id']

#Mandatory ones
#Separate value from unit of consumption 14.9 l/100km => 14.9, l/100km
dfPandas['ConsumptionTotalText'] = dfPandas['ConsumptionTotalText'].str.split(' ')
dfPandas[['extracted-value-ConsumptionTotalText', 'extracted-unit-ConsumptionTotalText']] = pd.DataFrame(dfPandas.ConsumptionTotalText.values.tolist(), index= dfPandas.index)

#Building country attribute with country's code 
geolocator = Nominatim(user_agent="extractCountry")
#extract location out of city name
dfPandas.assign(country="")
dfPandas.assign(countryFull="")
dictCountries = {}
dictCountriesFull = {}
#counter = 0
for index, city in dfPandas['City'].items():
    if city not in dictCountries:
        location = geolocator.geocode(city,  addressdetails=True)
        locationDict= location.raw['address']
        countryCode = locationDict['country_code']
        countryFull = locationDict['country']
        dfPandas.loc[dfPandas.index[index], 'country'] = countryCode.upper()
        dictCountries[city] = countryCode.upper()
        dfPandas.loc[dfPandas.index[index], 'countryFull'] = countryFull
        dictCountriesFull[city] = countryFull
    else:
        dfPandas.loc[dfPandas.index[index], 'country'] = dictCountries[city]
        dfPandas.loc[dfPandas.index[index], 'countryFull'] = dictCountriesFull[city]


In [5]:
#KM => mileage, mileage_unit=km
dfPandas['mileage_unit'] = 'kilometer'
#Set up drive attribute by country code!
#city->country->RHD/LHD
file_location = input_dir + "countriesLHD.csv"
countriesDF = spark.read.format('csv').load(file_location) 
countries = countriesDF.toPandas()#pd.read_csv('dbfs??/FileStore/tables/countriesLHD.csv')
countries = countries.drop([0])
dfPandas.assign(drive="")
#for each country, code, if code in the list of LHD countries, drive=LHD, else drive=RHD
for index, country in dfPandas['countryFull'].items():
    if country in countries.iloc[0]: dfPandas.loc[dfPandas.index[index], 'drive'] = 'LHD' 
    else: dfPandas.loc[dfPandas.index[index], 'drive'] = 'RHD'
#======================TESTING=============================
rhd = dfPandas['drive'][dfPandas['drive']=='RHD'].count()
lhd = dfPandas['drive'][dfPandas['drive']=='LHD'].count()
assert(rhd+lhd==dfPandas.shape[0]), 'Something went wrong!'
#==========================================================
outname = 'extracted.csv'
dfPandas.to_csv(outdir+outname, index=False, encoding="utf-8")

In [6]:
newDF = dfPandas.copy()#checkpoint to not run all cells again
#Step 4, Integration
#this normalizes TypeName, substitutes it for model_variant and deletes TypeName
normalizeTypeName(newDF)

#This merges Ccm and City into city, there are already some entries in city attribute of target data
# with postal code, so I felt like joining postal code to City was the best thing to do
normalizeCity(newDF)

#erase
del newDF['_id'], newDF['ModelTypeText'], newDF['TypeNameFull'], newDF['entity_id'], \
    newDF['ConsumptionTotalText'], newDF['countryFull'], \
    newDF['extracted-value-ConsumptionTotalText'], newDF['TransmissionTypeText'], \
    newDF['Seats'], newDF['Properties'], newDF['InteriorColorText'], newDF['Hp'], \
    newDF['FuelTypeText'], newDF['DriveTypeText'], newDF['Doors'], \
    newDF['ConsumptionRatingText'], newDF['Co2EmissionText']
#Renaming columns
newDF = newDF.rename(columns={"BodyColorText": "color", "MakeText": "make", "BodyTypeText": "carType", \
                                    "ConditionTypeText": "condition", "ModelText": "model", \
                                    "ConsumptionRatingTest": "Energy_label", "FuelTypeText": "fuel_type", \
                                    "extracted-unit-ConsumptionTotalText": "fuel_consumption_unit", \
                                    "InteriorColorText": "interior_color", "FirstRegMonth": "manufacture_month", \
                                    "FirstRegYear": "manufacture_year", "Co2EmissionText": "CO_2_emissions", \
                                    "Km": "mileage"})
newDF["currency"] = 'null'
newDF["type"] = 'null'
newDF["price_on_request"] = 'null'
#Rearranging columns
cols = ['carType', 'color', 'condition', 'currency', 'drive', 'city', 'country', 'make', 'manufacture_year', 'mileage', 'mileage_unit', 'model', 'model_variant', 'price_on_request', 'type', 'zip', 'manufacture_month', 'fuel_consumption_unit']
newDF = newDF[cols]
#print(newDF['country'].unique())
outname = 'integrated.csv'
newDF.to_csv(outdir+outname, index=False, encoding="utf-8")