# ETL

Het extract - transform - load concept is een veel voorkomend begrip in (big) data toepassingen en geeft het stappenplan weer van de levenscyclus van de data binnen je toepassing.
Het concept bestaat uit drie stappen:
* extract: zoeken van data, inlezen en validatie
* transform: verwerken van data, data cleaning, aggregatie, groupering, filtering, ...
* load: opslaan van de getransformeerde data in een file, database, datawarehouse, datalake, ...

In de rest van deze notebook gaan we bestuderen hoe deze stappen uit te voeren met Spark.
Hiervoor gaan we een csv gebruiken als bronbestand.

## Extract

In deze directory staat een zip file waarin deze csv is opgeslaan. 
Unzip deze file eerst en upload het naar het hdfs

In [1]:
import zipfile

with zipfile.ZipFile("cars.zip", 'r') as zip_ref:
    zip_ref.extractall()
    
import pydoop.hdfs as hdfs

localFS = hdfs.hdfs(host='')
client = hdfs.hdfs(host='localhost', port=9000)

if not client.exists('/user/bigdata/08_ETL'):
    client.create_directory('/user/bigdata/08_ETL')

# do some cleaning in case anything else than input is present on HDFS
for f in client.list_directory("."):
    client.delete(f["name"], True)
        
# upload input.txt
hdfs_filename = "08_ETL/cars.csv"
localFS.copy("cars.csv", client, hdfs_filename)

2022-03-17 10:48:19,335 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


0

Maak nu een locale sparkcontext aan en lees dit bestand in

In [4]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [15]:
df = spark.read.option("delimiter", ",").option("header", True).csv("/user/bigdata/" + hdfs_filename)
# df = spark.read.csv(hdfs_filename, header=True, sep=",")


De datastructuur van het csv is als volgt:

In [17]:
df.printSchema()

root
 |-- manufacturer_name: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- color: string (nullable = true)
 |-- odometer_value: string (nullable = true)
 |-- year_produced: string (nullable = true)
 |-- engine_fuel: string (nullable = true)
 |-- engine_has_gas: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_capacity: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- has_warranty: string (nullable = true)
 |-- state: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- price_usd: string (nullable = true)
 |-- is_exchangeable: string (nullable = true)
 |-- location_region: string (nullable = true)
 |-- number_of_photos: string (nullable = true)
 |-- up_counter: string (nullable = true)
 |-- feature_0: string (nullable = true)
 |-- feature_1: string (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: string (nullable = true)


## Transform

De transform stap is de meest complexe stap van de drie en kan uit een grote verscheidenheid van bewerkingen bestaan, zoals:
* Dataformaten aanpassen
* Vertalingen van tekst
* Geencodeerde waarden aanpassen: 0/1 vs true/false of m/f vs male/female
* Allerhande data-cleaning stappen
* Encoderen (Ordinal of One-hot) van categorieke kolommen
* Groeperen van data
* Uitvoeren van berekeningen 
* ...

Schrijf hieronder eerst zelf de code om de volgende stappen uit te voeren:
* Omzetten naar integer van de kolommen: odometer_value, year_produced, engine_capacity, price_usd, number_of_photos, up_counter, duration_listed
* Omzetten naar boolean van de kolommen: engine_has_gas, has_warranty, is_exchangeable, feature_0 tot en met 9
* Bereken het aantal null en nan waarden per kolom

In [65]:
# omzetten naar integer van bepaalde kolommen
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,avg,sum,count,when

spark = SparkSession.builder.config("spark.driver.cores", 2).appName("Spark_les").getOrCreate()
df = spark.read.option("delimiter", ",").option("header", True).csv("/user/bigdata/" + hdfs_filename)


#df.show(5, vertical=True)

#df = df.select([col(c).cast("int") for c in int_columns])
#df.printSchema()
int_columns = ["odometer_value", "year_produced", "engine_capacity", "price_usd", "number_of_photos", "up_counter", "duration_listed"]
for c in int_columns:
    df = df.withColumn(c, col(c).cast("int"))
#df = df.withColumn([col(c).cast("int") for c in int_columns])
df.printSchema()

root
 |-- manufacturer_name: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- color: string (nullable = true)
 |-- odometer_value: integer (nullable = true)
 |-- year_produced: integer (nullable = true)
 |-- engine_fuel: string (nullable = true)
 |-- engine_has_gas: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_capacity: integer (nullable = true)
 |-- body_type: string (nullable = true)
 |-- has_warranty: string (nullable = true)
 |-- state: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- price_usd: integer (nullable = true)
 |-- is_exchangeable: string (nullable = true)
 |-- location_region: string (nullable = true)
 |-- number_of_photos: integer (nullable = true)
 |-- up_counter: integer (nullable = true)
 |-- feature_0: string (nullable = true)
 |-- feature_1: string (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: string (nullable = 

In [None]:
# omzetten naar boolean van bepaalde kolommen
bool_columns = ["engine_has_gas", "has_warranty", "is_exchangeable"]
for n in range(10):
    bool_columns.append("feature_" + str(n))
bool_columns
for c in bool_columns:
    df = df.withColumn(c, col(c).cast("boolean"))
df.printSchema()

In [99]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(col(c).isNull(), 1)) for c in df.columns]).show()

+-------------------------------------------------------+------------------------------------------------+--------------------------------------------------+-------------------------------------------+----------------------------------------------------+---------------------------------------------------+-------------------------------------------------+----------------------------------------------------+-------------------------------------------------+-----------------------------------------------------+-----------------------------------------------+--------------------------------------------------+-------------------------------------------+------------------------------------------------+-----------------------------------------------+-----------------------------------------------------+-----------------------------------------------------+------------------------------------------------------+------------------------------------------------+-----------------------------------

In bovenstaande code kan je zien dat er slechts een aantal null-waarden in de dataset aanwezig zijn.
Deze kunnen ingevuld worden door middel van een [imputer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer.html).
Hier laten we deze rijen echter gewoon vallen voor de eenvoud:

In [102]:
df = df.na.drop()
df.count()


38521

De oefening om de waarden in te vullen met een imputer (bvb door het gemiddelde) kan je hieronder doen.

In [None]:
# gemiddelde berekenen en nadien die waarde invullen met .fillna()

df.fillna({gemiddle hier}, subset=["engine_capacity"])

Bereken nu de volgende waarden van de beschikbare data:
* Aantal autos per merk
* Welke verschillende types van transmissie zijn er?
* Marktaandeel (percentage) van de verschillende types motor?
* Maximum prijs van elk merk
* Wat zijn de vijf goedkoopste voertuigen met een automatische transmissie?

In [70]:
# autos per merk
df.groupby("manufacturer_name").count().show()

+-----------------+-----+
|manufacturer_name|count|
+-----------------+-----+
|       Volkswagen| 4243|
|            Lexus|  213|
|           Jaguar|   53|
|            Rover|  235|
|           Lancia|   92|
|             Jeep|  107|
|       Mitsubishi|  887|
|              Kia|  912|
|             Mini|   68|
|            Lifan|   47|
|             LADA|  146|
|        SsangYong|   79|
|             Audi| 2468|
|             Seat|  303|
|         Cadillac|   43|
|          Москвич|   55|
|       Alfa Romeo|  207|
|            Geely|   71|
|          Renault| 2493|
|           Daewoo|  221|
+-----------------+-----+
only showing top 20 rows



In [71]:
# types transmissie
df.groupby("transmission").count().show()
# df.select("transmission").distinct().show()

+------------+-----+
|transmission|count|
+------------+-----+
|   automatic|12898|
|  mechanical|25633|
+------------+-----+



In [103]:
# marktaandeel
totaal = df.count()
df_aandeel = df.groupby('engine_type').count()
df_aandeel = df_aandeel.withColumn("marktAandeel", col("count") / totaal).show()

+-----------+-----+-------------------+
|engine_type|count|       marktAandeel|
+-----------+-----+-------------------+
|   gasoline|25647| 0.6657926845097479|
|     diesel|12874|0.33420731549025207|
+-----------+-----+-------------------+



In [92]:
# maximum prijs per merk
df.groupby("manufacturer_name").max("price_usd").show()

+-----------------+--------------+
|manufacturer_name|max(price_usd)|
+-----------------+--------------+
|       Volkswagen|         43999|
|            Lexus|         48610|
|           Jaguar|         50000|
|            Rover|          9900|
|           Lancia|          9500|
|             Jeep|         43000|
|       Mitsubishi|         31400|
|              Kia|         44700|
|             Mini|         39456|
|            Lifan|         15750|
|             LADA|         13800|
|        SsangYong|         15900|
|             Audi|         46750|
|             Seat|         18350|
|         Cadillac|         25750|
|          Москвич|         10000|
|       Alfa Romeo|         22000|
|            Geely|         22479|
|          Renault|         30304|
|           Daewoo|          6700|
+-----------------+--------------+
only showing top 20 rows



In [115]:
# goedkoopste voertuigen met automatische transmissie
#df.filter(df.transmission == "automatic").groupby("manufacturer_name").min("price_usd").show(5)
df_result = df.filter(df.transmission == "automatic").sort(col("price_usd")).asc("price_usd").limit(5)
df_result.show()

AttributeError: 'DataFrame' object has no attribute 'asc'

## Load

In deze stap veronderstellen we dat we enkel de 5 goedkoopste auto's willen bewaren.
Schrijf hieronder de benodigde code om de informatie van deze autos op te slaan in een json.

In [None]:
df_result.write.format("json").save("08_ETL/result.json")

Dit is een voorbeeld waarbij de resultaten worden opgeslaan in een bestand.
Andere mogelijkheden zijn om het op te slaan in een SQL-database.
Demo-code om dit te bereiken kan je [hier](https://kontext.tech/column/spark/395/save-dataframe-to-sql-databases-via-jdbc-in-pyspark) bekijken.
Later in dit vak zullen we ook NoSQL-databases bekijken.
Op dat moment zullen we zien hoe we de resultaten kunnen bewaren in dit type database beheersystemen (DBMS).