In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
from pyspark.sql import functions as F
from pyspark.sql.functions import *

spark = SparkSession.builder.master("local[1]").appName("Onedot.com").getOrCreate()

In [None]:
df = spark.read.json("/data/supplier_car.json")

In [None]:
pivoted_df = input_data.groupBy("ID").pivot("Attribute Names").agg(F.first("Attribute Values"))
preprocessed_df = pivoted_df.join(input_data,pivoted_df.ID==input_data.ID,'inner').drop(pivoted_df.ID)
preprocessed_df = preprocessed_df.drop_duplicates(subset=['ID'])

Saving preprocessed data in a csv file

In [None]:
preprocessed_df.toPandas().to_csv("pre-processing.csv", header=True, encoding="utf-8")

Normalisation

In [None]:
colors_german_english = {"orange":"Orange", "grün":"Green", "schwarz":"black", "grau":"Gray", "gelb":"Yellow", "braun":"Brown", "weiss":"White", "blau":"Blue", "gold":"Gold", "beige":"Beige", "violett":"Violet", "silber":"Silver", "anthrazit":"Anthracite", "rot":"Red", "bordeaux":"Bordeaux" }
normalised_supplier_data = input_data.withColumn('BodyColorText', regexp_replace('BodyColorText', ' mét.', ''))
normalised_supplier_data = normalised_supplier_data.na.replace(colors_german_english,1,"BodyColorText")

normalised_supplier_data = normalised_supplier_data.withColumn("MakeText", F.expr(r"""array_join(transform(split(regexp_replace(MakeText, '(\\s|\\(|-|\\/)(.)', '$1#$2'), '#'), x -> initcap(x)),"")"""))

normalize_Make = {"Pgo":"PGO", "Austin-healey":"Austin-Healey", "Nsu":"NSU", "Bmw":"BMW", "Agm":"AGM", "Vw":"VW", "Mg":"MG", "Bmw-Alpina":"Alpina", "Ford (usa)": "Ford"}
normalised_supplier_data = normalised_supplier_data.na.replace(normalize_Make,1,"MakeText")

Saving normalized data in a csv file

In [None]:
normalised_supplier_data.toPandas().to_csv("normalisation.csv", header=True, encoding="utf-8")

Extraction

In [None]:
split_col = split(input_data['ConsumptionTotalText'], ' ')
extracted_supplier_data = input_data.withColumn('extracted-value-ConsumptionTotalText', split_col.getItem(0))
extracted_supplier_data = extracted_supplier_data.withColumn('extracted-unit-ConsumptionTotalText', split_col.getItem(1))

Saving extracted data in a csv file

In [None]:
extracted_supplier_data.toPandas().to_csv("extraction.csv", header=True, encoding="utf-8")

Integration

In [None]:
integrated_supplier_data = input_data.select(
    col("MakeText").alias("make"), 
    col("ModelText").alias("model"),
    col("BodyColorText").alias("color"), 
    col("TypeName").alias("model_variant"), 
    col("City").alias("city")
)


Saving integrated data in a csv file

In [None]:
integrated_supplier_data.toPandas().to_csv("integration.csv", header=True, encoding="utf-8")