# 2 Data Preparation

## 2.1 Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.mllib.stat import Statistics
from pyspark.sql.types import *
from pyspark.sql.functions import col, countDistinct, count, when, isnan, create_map, lit, udf, month, monotonically_increasing_id
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml import Pipeline
import numpy as np
import pandas as pd
import functools
pd.set_option('max_columns', None)
import xlrd
from itertools import chain

#temporary hide error reports
import warnings
warnings.filterwarnings('ignore')

In [2]:
spark = SparkSession.builder.appName("nehody").getOrCreate()

In [3]:
dataPath = "./data/2019.csv"

In [4]:
rawDataSpark = spark\
            .read\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .option("delimiter", ",")\
            .option("encoding", "cp1250")\
            .csv(dataPath)

In [5]:
#lets see some data
rawDataSpark.limit(5).toPandas()

Unnamed: 0,IDENTIFIKAČNÍ ČÍSLO,DRUH POZEMNÍ KOMUNIKACE,ČÍSLO POZEMNÍ KOMUNIKACE,"den, měsíc, rok",_c4,čas,DRUH NEHODY,DRUH SRÁŽKY JEDOUCÍCH VOZIDEL,DRUH PEVNÉ PŘEKÁŽKY,CHARAKTER NEHODY,ZAVINĚNÍ NEHODY,ALKOHOL U VINÍKA NEHODY PŘÍTOMEN,HLAVNÍ PŘÍČINY NEHODY,usmrceno osob,těžce zraněno osob,lehce zraněno osob,CELKOVÁ HMOTNÁ ŠKODA,DRUH POVRCHU VOZOVKY,STAV POVRCHU VOZOVKY V DOBĚ NEHODY,STAV KOMUNIKACE,POVĚTRNOSTNÍ PODMÍNKY V DOBĚ NEHODY,VIDITELNOST,ROZHLEDOVÉ POMĚRY,DĚLENÍ KOMUNIKACE,SITUOVÁNÍ NEHODY NA KOMUNIKACI,ŘÍZENÍ PROVOZU V DOBĚ NEHODY,MÍSTNÍ ÚPRAVA PŘEDNOSTI V JÍZDĚ,SPECIFICKÁ MÍSTA A OBJEKTY V MÍSTĚ NEHODY,SMĚROVÉ POMĚRY,POČET ZÚČASTNĚNÝCH VOZIDEL,MÍSTO DOPRAVNÍ NEHODY,DRUH KŘIŽUJÍCÍ KOMUNIKACE,DRUH VOZIDLA,VÝROBNÍ ZNAČKA MOTOROVÉHO VOZIDLA,ROK VÝROBY VOZIDLA,CHARAKTERISTIKA VOZIDLA,SMYK,VOZIDLO PO NEHODĚ,"ÚNIK PROVOZNÍCH, PŘEPRAVOVANÝCH HMOT",ZPŮSOB VYPROŠTĚNÍ OSOB Z VOZIDLA,SMĚR JÍZDY NEBO POSTAVENÍ VOZIDLA,ŠKODA NA VOZIDLE,KATEGORIE ŘIDIČE,STAV ŘIDIČE,VNĚJŠÍ OVLIVNĚNÍ ŘIDIČE,nan.1,nan.2,nan.3,nan.4,nan.5,nan.6,nan.7,nan.8,nan.9,nan.10,nan.11,nan.12,nan.13,nan.14,nan.15,nan.16,nan.17,nan.18,LOKALITA NEHODY
0,2100180006,5,,2018-12-31,1,2560,2,0,0,2,1,0,508,0,0,0,500,2,1,1,1,1,1,1,0,0,0,10,1,3,0,,17,,XX,,,,,,,0,,,,,,-740232779,-1045024154,-740232815,-1045021257,PRAHA10Vršovice,RUSKÁ,,sledovanákomunikace,,,,,,,,,1
1,2100190001,4,,2019-01-01,2,35,1,4,0,2,1,2,503,0,0,0,250,1,3,1,1,4,1,0,1,2,0,0,4,2,19,,3,39.0,15,12.0,0.0,1.0,0.0,1.0,75.0,150,3.0,1.0,1.0,,,-739292161,-1044517377,-739292180,-1044517804,PRAHA3Vinohrady,VINOHRADSKÁ,,uzel,,,,,,,,,1
2,2100190002,5,,2019-01-01,2,230,4,0,0,1,3,0,100,0,1,0,200,2,1,1,1,4,1,5,7,2,0,0,3,1,0,,10,0.0,,5.0,0.0,1.0,0.0,1.0,1.0,200,8.0,1.0,1.0,,,-738457590,-1045400585,-738459763,-1045389710,PRAHA10,VOLŠINÁCH,,sledovanákomunikace,,,,,,,,,1
3,2100190003,4,,2019-01-01,2,240,3,0,6,2,1,9,511,0,0,0,810,2,1,1,1,4,1,0,1,2,0,0,4,1,15,,3,47.0,15,3.0,0.0,1.0,0.0,1.0,25.0,800,2.0,5.0,0.0,,,-741273247,-1049730456,-74127254,-1049730314,PRAHA4-KRČ,Vídeňská,,místníkomunikace,,1685507.0,,Souhlasnýsesměremúseku,Pomalý,554782.0,487856.0,GN_V0.1UIR-ADR_410,1
4,2100190004,6,,2019-01-01,2,260,3,0,9,2,1,0,516,0,0,0,100,2,3,1,1,4,1,0,1,3,3,0,5,1,10,,3,2.0,02,1.0,0.0,4.0,0.0,1.0,5.0,100,9.0,,,,,-751013997,-1046439576,-751013989,-1046439857,PRAHA5,Klukovická,,místníkomunikace,,713690.0,,Souhlasnýsesměremúseku,Pomalý,554782.0,501417.0,GN_V0.1UIR-ADR_410,1


## 2.2 Cleaning process

In [6]:
#drop useless columns
droppedData = rawDataSpark.drop('IDENTIFIKAČNÍ ČÍSLO','DRUH_KŘIŽUJÍCÍ_KOMUNIKACE','ČÍSLO POZEMNÍ KOMUNIKACE',
                                'SMĚR_JÍZDY_NEBO_POSTAVENÍ_VOZIDLA', 'VÝROBNÍ_ZNAČKA_MOTOROVÉHO_VOZIDLA', 
                                'ZPŮSOB_VYPROŠTĚNÍ_OSOB_Z_VOZIDLA', 'ÚNIK_PROVOZNÍCH_PŘEPRAVOVANÝCH_HMOT',
                                'VOZIDLO_PO_NEHODĚ',
                                'nan.18','nan.17','nan.16','nan.15','nan.14','nan.13','nan.12','nan.11',
                                'nan.10','nan.9','nan.8','nan.7','nan.6','nan.5','nan.4','nan.3','nan.2','nan.1')

In [7]:
#replace spaces in column names
replacedData = droppedData.toDF(*(c.replace(' ', '_').replace(',','') for c in droppedData.columns))

In [8]:
#rename column names to make it more legible
renamedData = replacedData.withColumnRenamed("_c4", "DEN_V_TYDNU").withColumnRenamed("čas_", "ČAS")\
                          .withColumnRenamed("den_měsíc_rok", "DATUM")\
                          .withColumnRenamed("usmrceno_osob", "USMRCENO_OSOB")\
                          .withColumnRenamed("těžce_zraněno_osob", "TĚŽCE_ZRANĚNO_OSOB")\
                          .withColumnRenamed("lehce_zraněno_osob", "LEHCE_ZRANĚNO_OSOB")

In [9]:
#change XX characters to null
replacedNansData = renamedData.replace('XX', None)

In [10]:
# #print out distinct values count in each column
# for i in replacedNansData.columns:
#     if (replacedNansData.select(i).distinct().count() < 2) | (replacedNansData.select(i).distinct().count() > 9):
#         print("Distinct values count in column "+i+": "+str(replacedNansData.select(i).distinct().count()))

In [11]:
#define all columns to change datatype to double (for mathematical operations)
cols = ['DRUH_POZEMNÍ_KOMUNIKACE', 'DEN_V_TYDNU', 'ČAS', 'CHARAKTER_NEHODY', 'USMRCENO_OSOB', 'TĚŽCE_ZRANĚNO_OSOB', 'LEHCE_ZRANĚNO_OSOB', 
        'CELKOVÁ_HMOTNÁ_ŠKODA', 'STAV_POVRCHU_VOZOVKY_V_DOBĚ_NEHODY', 'STAV_KOMUNIKACE', 'POVĚTRNOSTNÍ_PODMÍNKY_V_DOBĚ_NEHODY', 
        'VIDITELNOST', 'ROZHLEDOVÉ_POMĚRY', 'DĚLENÍ_KOMUNIKACE', 'SITUOVÁNÍ_NEHODY_NA_KOMUNIKACI', 'ŘÍZENÍ_PROVOZU_V_DOBĚ_NEHODY', 
        'MÍSTNÍ_ÚPRAVA_PŘEDNOSTI_V_JÍZDĚ', 'SPECIFICKÁ_MÍSTA_A_OBJEKTY_V_MÍSTĚ_NEHODY', 'SMĚROVÉ_POMĚRY', 'POČET_ZÚČASTNĚNÝCH_VOZIDEL', 
        'MÍSTO_DOPRAVNÍ_NEHODY', 'DRUH_VOZIDLA', 'ŠKODA_NA_VOZIDLE', 'DRUH_NEHODY', 'DRUH_SRÁŽKY_JEDOUCÍCH_VOZIDEL', 'DRUH_PEVNÉ_PŘEKÁŽKY', 
        'ZAVINĚNÍ_NEHODY', 'ALKOHOL_U_VINÍKA_NEHODY_PŘÍTOMEN', 'HLAVNÍ_PŘÍČINY_NEHODY', 'DRUH_POVRCHU_VOZOVKY', 'ROK_VÝROBY_VOZIDLA', 
        'LOKALITA_NEHODY', 'CHARAKTERISTIKA_VOZIDLA_', 'SMYK',  
        'KATEGORIE_ŘIDIČE', 'STAV_ŘIDIČE', 'VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE']
datum = ['DATUM']

replacedNansData = replacedNansData.select(
    *[replacedNansData[col_name].cast('double') for col_name in cols] + datum
)

### 2.2.1 Values mapping

In [12]:
#create values array for every categorical feature from Excel table of keys
lokalita_nehody = {}
druh_nehody = {}
druh_srazky_jedoucich_vozidel = {}
druh_pevne_srazky = {}
charakter_nehody = {}
zavineni_nehody = {}
alkohol_u_vinika = {}
nasledky_nehody = {}
druh_povrchu_vozovky = {}
stav_povrchu_vozovky = {}
stav_komunikace = {}
pocasi = {}
viditelnost = {}
rozhled = {}
deleni_komunikace = {}
situovani_nehody = {}
rizeni_provozu = {}
mistni_uprava = {}
specificka_mista = {}
smerove_pomery = {}
kategorie_chodce = {}
stav_chodce = {}
chovani_chodce = {}
situace = {}
misto_nehody = {}
druh_komunikace = {}
vnejsi_ovlivneni = {}

wb = xlrd.open_workbook('./data/Ciselnik.xlsx')
sh = wb.sheet_by_index(1)   
for i in range(7,9):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    lokalita_nehody[cell_value_class] = cell_value_id

for i in range(10,20):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    druh_nehody[cell_value_class] = cell_value_id

for i in range(21,26):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    druh_srazky_jedoucich_vozidel[cell_value_class] = cell_value_id
    
for i in range(27,37):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    druh_pevne_srazky[cell_value_class] = cell_value_id

for i in range(38,40):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    charakter_nehody[cell_value_class] = cell_value_id
    
for i in range(41,49):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    zavineni_nehody[cell_value_class] = cell_value_id

for i in range(50,60):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    alkohol_u_vinika[cell_value_class] = cell_value_id

for i in range(138,141):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    nasledky_nehody[cell_value_class] = cell_value_id

for i in range(143,149):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    druh_povrchu_vozovky[cell_value_class] = cell_value_id
    
for i in range(150,160):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    stav_povrchu_vozovky[cell_value_class] = cell_value_id

for i in range(161,173):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    stav_komunikace[cell_value_class] = cell_value_id
    
for i in range(174,182):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    pocasi[cell_value_class] = cell_value_id
    
for i in range(183,190):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    viditelnost[cell_value_class] = cell_value_id

for i in range(191,198):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    rozhled[cell_value_class] = cell_value_id

for i in range(199,206):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    deleni_komunikace[cell_value_class] = cell_value_id
    
for i in range(207,217):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    situovani_nehody[cell_value_class] = cell_value_id
    
for i in range(218,222):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    rizeni_provozu[cell_value_class] = cell_value_id
    
for i in range(223,229):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    mistni_uprava[cell_value_class] = cell_value_id
    
for i in range(230,241):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    specificka_mista[cell_value_class] = cell_value_id
    
for i in range(242,249):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    smerove_pomery[cell_value_class] = cell_value_id

for i in range(250,255):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    kategorie_chodce[cell_value_class] = cell_value_id
    
for i in range(256,266):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    stav_chodce[cell_value_class] = cell_value_id
    
for i in range(267,275):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    chovani_chodce[cell_value_class] = cell_value_id
    
for i in range(276,287):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    situace[cell_value_class] = cell_value_id
    
for i in range(309,315):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    misto_nehody[cell_value_class] = cell_value_id
    
for i in range(316,325):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value + ' ' + sh.cell(i,3).value
    druh_komunikace[cell_value_class] = cell_value_id
    
for i in range(550,556):
    cell_value_class = sh.cell(i,1).value
    cell_value_id = sh.cell(i,2).value
    vnejsi_ovlivneni[cell_value_class] = cell_value_id

In [40]:
# actually map data to filled arrays
mapping_expr = create_map([lit(x) for x in chain(*lokalita_nehody.items())])
mappedData = replacedNansData.withColumn("LOKALITA_NEHODY", mapping_expr.getItem(col("LOKALITA_NEHODY")))

mapping_expr = create_map([lit(x) for x in chain(*druh_nehody.items())])
mappedData = mappedData.withColumn("DRUH_NEHODY", mapping_expr.getItem(col("DRUH_NEHODY")))

mapping_expr = create_map([lit(x) for x in chain(*druh_srazky_jedoucich_vozidel.items())])
mappedData = mappedData.withColumn("DRUH_SRÁŽKY_JEDOUCÍCH_VOZIDEL", mapping_expr.getItem(col("DRUH_SRÁŽKY_JEDOUCÍCH_VOZIDEL")))

mappedData = mappedData.withColumn("DRUH_PEVNÉ_PŘEKÁŽKY", when((col("DRUH_PEVNÉ_PŘEKÁŽKY") == 0), "zadna").otherwise("pevna prekazka"))

mapping_expr = create_map([lit(x) for x in chain(*charakter_nehody.items())])
mappedData = mappedData.withColumn("CHARAKTER_NEHODY", mapping_expr.getItem(col("CHARAKTER_NEHODY")))

mapping_expr = create_map([lit(x) for x in chain(*zavineni_nehody.items())])
mappedData = mappedData.withColumn("ZAVINĚNÍ_NEHODY", mapping_expr.getItem(col("ZAVINĚNÍ_NEHODY")))

mapping_expr = create_map([lit(x) for x in chain(*alkohol_u_vinika.items())])
mappedData = mappedData.withColumn("ALKOHOL_U_VINÍKA_NEHODY_PŘÍTOMEN", mapping_expr.getItem(col("ALKOHOL_U_VINÍKA_NEHODY_PŘÍTOMEN")))

mappedData = mappedData.withColumn("HLAVNÍ_PŘÍČINY_NEHODY", when((col("HLAVNÍ_PŘÍČINY_NEHODY") == 100), "nezaviněná řidičem")
      .when((col("HLAVNÍ_PŘÍČINY_NEHODY") >= 200) & (col("HLAVNÍ_PŘÍČINY_NEHODY") < 300), "nepřiměřená rychlost jízdy")
      .when((col("HLAVNÍ_PŘÍČINY_NEHODY") >= 300) & (col("HLAVNÍ_PŘÍČINY_NEHODY") < 400), "nesprávné předjíždění")
      .when((col("HLAVNÍ_PŘÍČINY_NEHODY") >= 400) & (col("HLAVNÍ_PŘÍČINY_NEHODY") < 500), "nedání přednosti v jízdě")
      .when((col("HLAVNÍ_PŘÍČINY_NEHODY") >= 500) & (col("HLAVNÍ_PŘÍČINY_NEHODY") < 600), "nesprávný způsob jízdy")
      .when((col("HLAVNÍ_PŘÍČINY_NEHODY") >= 600) & (col("HLAVNÍ_PŘÍČINY_NEHODY") < 700), "technická závada vozidla")
      .otherwise(None))

mapping_expr = create_map([lit(x) for x in chain(*druh_povrchu_vozovky.items())])
mappedData = mappedData.withColumn("DRUH_POVRCHU_VOZOVKY", mapping_expr.getItem(col("DRUH_POVRCHU_VOZOVKY")))

mapping_expr = create_map([lit(x) for x in chain(*stav_povrchu_vozovky.items())])
mappedData = mappedData.withColumn("STAV_POVRCHU_VOZOVKY_V_DOBĚ_NEHODY", mapping_expr.getItem(col("STAV_POVRCHU_VOZOVKY_V_DOBĚ_NEHODY")))

mapping_expr = create_map([lit(x) for x in chain(*stav_komunikace.items())])
mappedData = mappedData.withColumn("STAV_KOMUNIKACE", mapping_expr.getItem(col("STAV_KOMUNIKACE")))

mapping_expr = create_map([lit(x) for x in chain(*pocasi.items())])
mappedData = mappedData.withColumn("POVĚTRNOSTNÍ_PODMÍNKY_V_DOBĚ_NEHODY", mapping_expr.getItem(col("POVĚTRNOSTNÍ_PODMÍNKY_V_DOBĚ_NEHODY")))

mapping_expr = create_map([lit(x) for x in chain(*viditelnost.items())])
mappedData = mappedData.withColumn("VIDITELNOST", mapping_expr.getItem(col("VIDITELNOST")))

mapping_expr = create_map([lit(x) for x in chain(*rozhled.items())])
mappedData = mappedData.withColumn("ROZHLEDOVÉ_POMĚRY", mapping_expr.getItem(col("ROZHLEDOVÉ_POMĚRY")))

mapping_expr = create_map([lit(x) for x in chain(*deleni_komunikace.items())])
mappedData = mappedData.withColumn("DĚLENÍ_KOMUNIKACE", mapping_expr.getItem(col("DĚLENÍ_KOMUNIKACE")))

mapping_expr = create_map([lit(x) for x in chain(*situace.items())])
mappedData = mappedData.withColumn("SITUOVÁNÍ_NEHODY_NA_KOMUNIKACI", mapping_expr.getItem(col("SITUOVÁNÍ_NEHODY_NA_KOMUNIKACI")))

mapping_expr = create_map([lit(x) for x in chain(*rizeni_provozu.items())])
mappedData = mappedData.withColumn("ŘÍZENÍ_PROVOZU_V_DOBĚ_NEHODY", mapping_expr.getItem(col("ŘÍZENÍ_PROVOZU_V_DOBĚ_NEHODY")))

mapping_expr = create_map([lit(x) for x in chain(*mistni_uprava.items())])
mappedData = mappedData.withColumn("MÍSTNÍ_ÚPRAVA_PŘEDNOSTI_V_JÍZDĚ", mapping_expr.getItem(col("MÍSTNÍ_ÚPRAVA_PŘEDNOSTI_V_JÍZDĚ")))

mapping_expr = create_map([lit(x) for x in chain(*specificka_mista.items())])
mappedData = mappedData.withColumn("SPECIFICKÁ_MÍSTA_A_OBJEKTY_V_MÍSTĚ_NEHODY", mapping_expr.getItem(col("SPECIFICKÁ_MÍSTA_A_OBJEKTY_V_MÍSTĚ_NEHODY")))

mapping_expr = create_map([lit(x) for x in chain(*smerove_pomery.items())])
mappedData = mappedData.withColumn("SMĚROVÉ_POMĚRY", mapping_expr.getItem(col("SMĚROVÉ_POMĚRY")))

mapping_expr = create_map([lit(x) for x in chain(*misto_nehody.items())])
mappedData = mappedData.withColumn("MÍSTO_DOPRAVNÍ_NEHODY", mapping_expr.getItem(col("MÍSTO_DOPRAVNÍ_NEHODY")))

mapping_expr = create_map([lit(x) for x in chain(*druh_komunikace.items())])
mappedData = mappedData.withColumn("DRUH_POZEMNÍ_KOMUNIKACE", mapping_expr.getItem(col("DRUH_POZEMNÍ_KOMUNIKACE")))

mappedData = mappedData.withColumn("VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE", when((col("VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE").isNull()), "nevime")
      .when((col("VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE") == 1), "nebyl ovlivnen")
      .when((col("VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE") == 2), "oslnen sluncem")
      .when((col("VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE") == 3), "oslnen svetlomety")
      .when((col("VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE") == 4), "ovlinen jednanim ostatniho ucastnika")
      .when((col("VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE") == 5), "vyvhybal se zveri"))


mappedData = mappedData.withColumn("DEN_V_TYDNU", when((col("DEN_V_TYDNU") == 0) | (col("DEN_V_TYDNU") == 6), "vikend")
      .when((col("DEN_V_TYDNU") == 1) | (col("DEN_V_TYDNU") == 5), "predPoVikend")
      .when((col("DEN_V_TYDNU") >= 2) & (col("DEN_V_TYDNU") <= 4), "tyden"))

mappedData = mappedData.withColumn("DRUH_VOZIDLA", when((col("DRUH_VOZIDLA") < 3), "motorka")
      .when((col("DRUH_VOZIDLA") >= 3) & (col("DRUH_VOZIDLA") <= 4), "osobni")
      .when((col("DRUH_VOZIDLA") >= 5) & (col("DRUH_VOZIDLA") <= 7), "nakladni")
      .when((col("DRUH_VOZIDLA") == 8 ) | (col("DRUH_VOZIDLA") == 10) | (col("DRUH_VOZIDLA") == 11) | (col("DRUH_VOZIDLA") == 16), "dopravni")
      .otherwise("ostatni"))

mappedData = mappedData.withColumn("CHARAKTERISTIKA_VOZIDLA_", when((col("CHARAKTERISTIKA_VOZIDLA_") == 1), "soukrome")
      .when((col("CHARAKTERISTIKA_VOZIDLA_") >= 2) & (col("CHARAKTERISTIKA_VOZIDLA_") <= 3), "firemni")
      .when((col("CHARAKTERISTIKA_VOZIDLA_") >= 4) & (col("CHARAKTERISTIKA_VOZIDLA_") <= 5) | (col("CHARAKTERISTIKA_VOZIDLA_") == 7), "verejna doprava")
      .when((col("CHARAKTERISTIKA_VOZIDLA_") == 6), "nakladni doprava")
      .when((col("CHARAKTERISTIKA_VOZIDLA_") == 9), "mimo CR")
      .when((col("CHARAKTERISTIKA_VOZIDLA_") >= 10) & (col("CHARAKTERISTIKA_VOZIDLA_") <= 15) & (col("CHARAKTERISTIKA_VOZIDLA_") != 14), "statni")
      .when((col("CHARAKTERISTIKA_VOZIDLA_") == 17), "odcizene")
      .when((col("CHARAKTERISTIKA_VOZIDLA_") == 18), "autoskola")
      .otherwise("jine"))

mappedData = mappedData.withColumn("SMYK", when((col("SMYK") == 1), "ano")
      .otherwise("ne"))

mappedData = mappedData.withColumn("KATEGORIE_ŘIDIČE", when((col("KATEGORIE_ŘIDIČE") >= 1) & (col("KATEGORIE_ŘIDIČE") <= 7), "s ridicskym opravnenim")
      .when((col("KATEGORIE_ŘIDIČE") == 7), "bez opravneni")
      .otherwise("ostatni"))

mappedData = mappedData.withColumn("STAV_ŘIDIČE", when((col("STAV_ŘIDIČE") == 1), "dobry")
      .when((col("STAV_ŘIDIČE") == 2), "unaven, usnul")
      .when((col("STAV_ŘIDIČE") >= 3) & (col("STAV_ŘIDIČE") <= 5), "pod vlivem navykovych latek")
      .otherwise("jiny"))

#date to season
mappedData = mappedData.withColumn("OBDOBI", when((month("DATUM")==12) | (month("DATUM")<=2), "zima")\
                                .otherwise(when((month("DATUM")>=3) & (month("DATUM")<=5), "jaro")\
                                          .otherwise(when((month("DATUM")>=6) & (month("DATUM")<=8), "leto")\
                                                    .otherwise(when((month("DATUM")>=9) & (month("DATUM")<=11), "podzim")))))\

#time to time of the day
mappedData = mappedData.withColumn("ČAS", when(col("ČAS") <= 800, "rano")\
                                .otherwise(when((col("ČAS") > 800) & (col("ČAS") <= 1600), "den")\
                                          .otherwise(when(col("ČAS") > 1600, "vecer"))))

# Public holidays
dataPath_svatky = "./data/statni_svatky.csv"
df_svatky = spark\
            .read\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .option("delimiter", ";")\
            .option("encoding", "cp1250")\
            .csv(dataPath_svatky)
mappedData = mappedData.join(df_svatky, mappedData.DATUM == df_svatky.DATUM1,how='left')
mappedData = mappedData.drop('DATUM1').drop('DATUM')
mappedData = mappedData.fillna({'SVATEK':'0'})
mappedData = mappedData.withColumn("SVATEK", when((col("SVATEK") == '0'), 'Ne').otherwise('Ano'))

#label
mappedData = mappedData.withColumn("CHARAKTER_NEHODY", when(col("CHARAKTER_NEHODY")=="nehoda s následky na životě", 1).otherwise(0)).withColumnRenamed("CHARAKTER_NEHODY", "LABEL")

mappedData.limit(5).toPandas()

Unnamed: 0,DRUH_POZEMNÍ_KOMUNIKACE,DEN_V_TYDNU,ČAS,LABEL,USMRCENO_OSOB,TĚŽCE_ZRANĚNO_OSOB,LEHCE_ZRANĚNO_OSOB,CELKOVÁ_HMOTNÁ_ŠKODA,STAV_POVRCHU_VOZOVKY_V_DOBĚ_NEHODY,STAV_KOMUNIKACE,POVĚTRNOSTNÍ_PODMÍNKY_V_DOBĚ_NEHODY,VIDITELNOST,ROZHLEDOVÉ_POMĚRY,DĚLENÍ_KOMUNIKACE,SITUOVÁNÍ_NEHODY_NA_KOMUNIKACI,ŘÍZENÍ_PROVOZU_V_DOBĚ_NEHODY,MÍSTNÍ_ÚPRAVA_PŘEDNOSTI_V_JÍZDĚ,SPECIFICKÁ_MÍSTA_A_OBJEKTY_V_MÍSTĚ_NEHODY,SMĚROVÉ_POMĚRY,POČET_ZÚČASTNĚNÝCH_VOZIDEL,MÍSTO_DOPRAVNÍ_NEHODY,DRUH_VOZIDLA,ŠKODA_NA_VOZIDLE,DRUH_NEHODY,DRUH_SRÁŽKY_JEDOUCÍCH_VOZIDEL,DRUH_PEVNÉ_PŘEKÁŽKY,ZAVINĚNÍ_NEHODY,ALKOHOL_U_VINÍKA_NEHODY_PŘÍTOMEN,HLAVNÍ_PŘÍČINY_NEHODY,DRUH_POVRCHU_VOZOVKY,ROK_VÝROBY_VOZIDLA,LOKALITA_NEHODY,CHARAKTERISTIKA_VOZIDLA_,SMYK,KATEGORIE_ŘIDIČE,STAV_ŘIDIČE,VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE,OBDOBI,SVATEK
0,komunikace sledovaná (ve vybraných městech),predPoVikend,vecer,0,0.0,0.0,0.0,500.0,povrch suchý neznečištěný,"dobrý, bez závad",neztížené,ve dne viditelnost nezhoršená vlivem povětrnos...,dobré,dvoupruhová,jiná situace,žádný způsob řízení provozu,žádná místní úprava,parkoviště přiléhající ke komunikaci,přímý úsek,3.0,mimo křižovatku,ostatni,0.0,"srážka s vozidlem zaparkovaným, odstaveným",nepřichází v úvahu,ne,řidičem motorového vozidla,nezjišťováno,nesprávný způsob jízdy,živice,,v obci,jine,ne,ostatni,jiny,nevime,zima,Ne
1,uzel tj. křižovatka sledovaná ve vybraných mě...,tyden,rano,0,0.0,0.0,0.0,250.0,povrch mokrý,"dobrý, bez závad",neztížené,"v noci s veřejným osvětlením, viditelnost nezh...",dobré,žádná z uvedených,vstup chodce na signál VOLNO,světelným signalizačním zařízením,žádná místní úprava,žádné nebo žádné z uvedených,křižovatka průsečná - čtyřramenná,2.0,na křižovatce uvnitř hranic křižovatky definov...,osobni,150.0,srážka s jedoucím nekolejovým vozidlem,zezadu,ne,řidičem motorového vozidla,ne,nesprávný způsob jízdy,dlažba,15.0,v obci,statni,ne,s ridicskym opravnenim,dobry,nebyl ovlivnen,zima,Ano
2,komunikace sledovaná (ve vybraných městech),tyden,rano,1,0.0,1.0,0.0,200.0,povrch suchý neznečištěný,"dobrý, bez závad",neztížené,"v noci s veřejným osvětlením, viditelnost nezh...",dobré,vícepruhová,"chůze, stání na chodníku",světelným signalizačním zařízením,žádná místní úprava,žádné nebo žádné z uvedených,zatáčka,1.0,mimo křižovatku,dopravni,200.0,srážka s chodcem,nepřichází v úvahu,ne,chodcem,nezjišťováno,nezaviněná řidičem,živice,,v obci,verejna doprava,ne,ostatni,dobry,nebyl ovlivnen,zima,Ano
3,uzel tj. křižovatka sledovaná ve vybraných mě...,tyden,rano,0,0.0,0.0,0.0,810.0,povrch suchý neznečištěný,"dobrý, bez závad",neztížené,"v noci s veřejným osvětlením, viditelnost nezh...",dobré,žádná z uvedených,vstup chodce na signál VOLNO,světelným signalizačním zařízením,žádná místní úprava,žádné nebo žádné z uvedených,křižovatka průsečná - čtyřramenná,1.0,,osobni,800.0,srážka s pevnou překážkou,nepřichází v úvahu,pevna prekazka,řidičem motorového vozidla,"ano obsah alkoholu v krvi 1,5 ‰ a více",nesprávný způsob jízdy,živice,15.0,v obci,firemni,ne,s ridicskym opravnenim,pod vlivem navykovych latek,,zima,Ano
4,komunikace místní,tyden,rano,0,0.0,0.0,0.0,100.0,povrch mokrý,"dobrý, bez závad",neztížené,"v noci s veřejným osvětlením, viditelnost nezh...",dobré,žádná z uvedených,vstup chodce na signál VOLNO,místní úprava,přednost vyznačena dopravními značkami,žádné nebo žádné z uvedených,křižovatka styková - tříramenná,1.0,na kžižovatce jedná-li se o křížení místních k...,osobni,100.0,srážka s pevnou překážkou,nepřichází v úvahu,pevna prekazka,řidičem motorového vozidla,nezjišťováno,nesprávný způsob jízdy,živice,2.0,v obci,soukrome,ne,ostatni,jiny,nevime,zima,Ano


In [27]:
#define categorical and numerical features and label
categoricalFeatures = ['DRUH_POZEMNÍ_KOMUNIKACE','DEN_V_TYDNU','ČAS','DRUH_NEHODY','DRUH_SRÁŽKY_JEDOUCÍCH_VOZIDEL',
                 'DRUH_PEVNÉ_PŘEKÁŽKY','ZAVINĚNÍ_NEHODY','ALKOHOL_U_VINÍKA_NEHODY_PŘÍTOMEN',
                 'HLAVNÍ_PŘÍČINY_NEHODY','DRUH_POVRCHU_VOZOVKY','STAV_POVRCHU_VOZOVKY_V_DOBĚ_NEHODY','STAV_KOMUNIKACE',
                 'POVĚTRNOSTNÍ_PODMÍNKY_V_DOBĚ_NEHODY','VIDITELNOST','ROZHLEDOVÉ_POMĚRY','DĚLENÍ_KOMUNIKACE',
                 'SITUOVÁNÍ_NEHODY_NA_KOMUNIKACI','ŘÍZENÍ_PROVOZU_V_DOBĚ_NEHODY','MÍSTNÍ_ÚPRAVA_PŘEDNOSTI_V_JÍZDĚ',
                 'SPECIFICKÁ_MÍSTA_A_OBJEKTY_V_MÍSTĚ_NEHODY','SMĚROVÉ_POMĚRY','MÍSTO_DOPRAVNÍ_NEHODY','DRUH_VOZIDLA',
                 'CHARAKTERISTIKA_VOZIDLA_','SMYK',
                'KATEGORIE_ŘIDIČE','STAV_ŘIDIČE','VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE','LOKALITA_NEHODY', 'OBDOBI', 'SVATEK']
numericalFeatures = ['CELKOVÁ_HMOTNÁ_ŠKODA', 'POČET_ZÚČASTNĚNÝCH_VOZIDEL','ROK_VÝROBY_VOZIDLA','ŠKODA_NA_VOZIDLE']
label = ['LABEL']

In [17]:
# show distinct count for every categorical feature
featureDistinctCounts = mappedData.agg(*(countDistinct(col(c)).alias(c) for c in categoricalFeatures)).toPandas().transpose()
featureDistinctCounts.columns=['distinctCount']
print("Distinct values counts for categorical features")
featureDistinctCounts

Distinct values counts for categorical features


Unnamed: 0,distinctCount
DRUH_POZEMNÍ_KOMUNIKACE,9
DEN_V_TYDNU,3
ČAS,3
DRUH_NEHODY,10
DRUH_SRÁŽKY_JEDOUCÍCH_VOZIDEL,5
DRUH_PEVNÉ_PŘEKÁŽKY,10
ZAVINĚNÍ_NEHODY,8
ALKOHOL_U_VINÍKA_NEHODY_PŘÍTOMEN,10
HLAVNÍ_PŘÍČINY_NEHODY,6
DRUH_POVRCHU_VOZOVKY,6


In [18]:
# show distinct count for every numerical feature
featureDistinctCounts = mappedData.agg(*(countDistinct(col(c)).alias(c) for c in numericalFeatures)).toPandas().transpose()
featureDistinctCounts.columns=['distinctCount']
print("Distinct values counts for numerical features")
featureDistinctCounts

Distinct values counts for numerical features


Unnamed: 0,distinctCount
USMRCENO_OSOB,5
TĚŽCE_ZRANĚNO_OSOB,8
LEHCE_ZRANĚNO_OSOB,16
CELKOVÁ_HMOTNÁ_ŠKODA,1111
POČET_ZÚČASTNĚNÝCH_VOZIDEL,12
ROK_VÝROBY_VOZIDLA,61
ŠKODA_NA_VOZIDLE,165


In [41]:
#lets see some statistics
mappedData.describe().toPandas()

Unnamed: 0,summary,DRUH_POZEMNÍ_KOMUNIKACE,DEN_V_TYDNU,ČAS,LABEL,USMRCENO_OSOB,TĚŽCE_ZRANĚNO_OSOB,LEHCE_ZRANĚNO_OSOB,CELKOVÁ_HMOTNÁ_ŠKODA,STAV_POVRCHU_VOZOVKY_V_DOBĚ_NEHODY,STAV_KOMUNIKACE,POVĚTRNOSTNÍ_PODMÍNKY_V_DOBĚ_NEHODY,VIDITELNOST,ROZHLEDOVÉ_POMĚRY,DĚLENÍ_KOMUNIKACE,SITUOVÁNÍ_NEHODY_NA_KOMUNIKACI,ŘÍZENÍ_PROVOZU_V_DOBĚ_NEHODY,MÍSTNÍ_ÚPRAVA_PŘEDNOSTI_V_JÍZDĚ,SPECIFICKÁ_MÍSTA_A_OBJEKTY_V_MÍSTĚ_NEHODY,SMĚROVÉ_POMĚRY,POČET_ZÚČASTNĚNÝCH_VOZIDEL,MÍSTO_DOPRAVNÍ_NEHODY,DRUH_VOZIDLA,ŠKODA_NA_VOZIDLE,DRUH_NEHODY,DRUH_SRÁŽKY_JEDOUCÍCH_VOZIDEL,DRUH_PEVNÉ_PŘEKÁŽKY,ZAVINĚNÍ_NEHODY,ALKOHOL_U_VINÍKA_NEHODY_PŘÍTOMEN,HLAVNÍ_PŘÍČINY_NEHODY,DRUH_POVRCHU_VOZOVKY,ROK_VÝROBY_VOZIDLA,LOKALITA_NEHODY,CHARAKTERISTIKA_VOZIDLA_,SMYK,KATEGORIE_ŘIDIČE,STAV_ŘIDIČE,VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE,OBDOBI,SVATEK
0,count,475519,475519,475519,475519.0,475519.0,475519.0,475519.0,475519.0,475519,475519,475519,475519,475519,475519,475519,475519,475519,475519,475519,475519.0,472457,475519,475519.0,475519,475519,475519,475519,475519,475519,474234,372368.0,475519,475519,475519,475519,475519,472603,475519,475519
1,mean,,,,0.1865414420874875,0.0047001276499992,0.018754245361384,0.2147653406067896,635.8717948178727,,,,,,,,,,,,1.626450257508112,,,317.0299062708325,,,,,,,,17.27886929059425,,,,,,,,
2,stddev,,,,0.3895433885768008,0.0753037053539656,0.1610447149580078,0.5652062693450339,2202.603930955789,,,,,,,,,,,,0.6164917502068491,,,813.7606946480637,,,,,,,,23.539565119171023,,,,,,,,
3,min,dálnice,predPoVikend,den,0.0,0.0,0.0,0.0,0.0,jiný stav povrchu vozovky v době nehody,"dobrý, bez závad",déšť,"v noci bez veřejného osvětlení, viditelnost ne...",dobré,dvoupruhová,chůze po nesprávné straně,místní úprava,přednost nevyznačena - vyplývá z pravidel siln...,"most, nadjezd, podjezd, tunel",kruhový objezd,1.0,mimo křižovatku,dopravni,0.0,havárie,boční,ne,chodcem,"ano obsah alkoholu v krvi 1,5 ‰ a více",nedání přednosti v jízdě,beton,0.0,mimo obec,autoskola,ano,ostatni,dobry,nebyl ovlivnen,jaro,Ano
4,max,uzel tj. křižovatka sledovaná ve vybraných mě...,vikend,vecer,1.0,4.0,14.0,30.0,261100.0,"souvislá sněhová vrstva, rozbředlý sníh",zvlněný povrch v podélném směru,"tvoří se námraza, náledí",ve dne zhoršená viditelnost vlivem povětrnostn...,"špatné vlivem vegetace - trvale (stromy, keře ...",žádná z uvedených,vstup chodce na signál VOLNO,žádný způsob řízení provozu,žádná místní úprava,žádné nebo žádné z uvedených,zatáčka,15.0,na kžižovatce jedná-li se o křížení místních k...,ostatni,150000.0,"srážka s vozidlem zaparkovaným, odstaveným",čelní,pevna prekazka,řidičem nemotorového vozidla,pod vlivem drog,technická závada vozidla,živice,99.0,v obci,verejna doprava,ne,s ridicskym opravnenim,"unaven, usnul",vyvhybal se zveri,zima,Ne


In [20]:
#get column types - has every column the right data type?
types = [f.dataType for f in mappedData.schema.fields]
set(types)

integerFeatures = []
stringFeatures  = []
doubleFeatures  = []
otherTypeFeatures = []

for c in mappedData.columns:
  if isinstance(mappedData.schema[c].dataType, IntegerType):
    integerFeatures.append(c)
  elif isinstance(mappedData.schema[c].dataType, StringType):
    stringFeatures.append(c) 
  elif isinstance(mappedData.schema[c].dataType, DoubleType):
    doubleFeatures.append(c)
  else:
    otherTypeFeatures.append(c)
print ("Integer: ")
print(integerFeatures)
print ("\nString: ")
print(stringFeatures)
print ("\nDouble: ")
print(doubleFeatures)
print ("\nOther: ")
print(otherTypeFeatures)

Integer: 
['LABEL']

String: 
['DRUH_POZEMNÍ_KOMUNIKACE', 'DEN_V_TYDNU', 'ČAS', 'STAV_POVRCHU_VOZOVKY_V_DOBĚ_NEHODY', 'STAV_KOMUNIKACE', 'POVĚTRNOSTNÍ_PODMÍNKY_V_DOBĚ_NEHODY', 'VIDITELNOST', 'ROZHLEDOVÉ_POMĚRY', 'DĚLENÍ_KOMUNIKACE', 'SITUOVÁNÍ_NEHODY_NA_KOMUNIKACI', 'ŘÍZENÍ_PROVOZU_V_DOBĚ_NEHODY', 'MÍSTNÍ_ÚPRAVA_PŘEDNOSTI_V_JÍZDĚ', 'SPECIFICKÁ_MÍSTA_A_OBJEKTY_V_MÍSTĚ_NEHODY', 'SMĚROVÉ_POMĚRY', 'MÍSTO_DOPRAVNÍ_NEHODY', 'DRUH_VOZIDLA', 'DRUH_NEHODY', 'DRUH_SRÁŽKY_JEDOUCÍCH_VOZIDEL', 'DRUH_PEVNÉ_PŘEKÁŽKY', 'ZAVINĚNÍ_NEHODY', 'ALKOHOL_U_VINÍKA_NEHODY_PŘÍTOMEN', 'HLAVNÍ_PŘÍČINY_NEHODY', 'DRUH_POVRCHU_VOZOVKY', 'LOKALITA_NEHODY', 'CHARAKTERISTIKA_VOZIDLA_', 'SMYK', 'KATEGORIE_ŘIDIČE', 'STAV_ŘIDIČE', 'VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE', 'OBDOBI', 'SVATEK']

Double: 
['USMRCENO_OSOB', 'TĚŽCE_ZRANĚNO_OSOB', 'LEHCE_ZRANĚNO_OSOB', 'CELKOVÁ_HMOTNÁ_ŠKODA', 'POČET_ZÚČASTNĚNÝCH_VOZIDEL', 'ŠKODA_NA_VOZIDLE', 'ROK_VÝROBY_VOZIDLA']

Other: 
[]


In [42]:
#replace null in year with median
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['ROK_VÝROBY_VOZIDLA']
                  , outputCols=['ROK_VÝROBY_VOZIDLA']
                  , strategy ="median")

imputedData = (imputer.fit(mappedData).transform(mappedData))

In [43]:
#drop rows with null values
countTotal = imputedData.count()
countNaDropped = imputedData.na.drop().count()
display("Count total is: {}, count after na.drop() is: {}. Total of {} records dropped.".format(countTotal, countNaDropped, countTotal - countNaDropped))
droppedData = imputedData.na.drop()

'Count total is: 475519, count after na.drop() is: 468293. Total of 7226 records dropped.'

In [23]:
# export mapped/cleaned data --> go to 1_DataExploration
droppedData.write.mode('overwrite').parquet("cleanData")

In [45]:
#define all columns again (for iteration purposes)
cols = ['DRUH_POZEMNÍ_KOMUNIKACE', 'DEN_V_TYDNU', 'ČAS', 'CELKOVÁ_HMOTNÁ_ŠKODA', 'STAV_POVRCHU_VOZOVKY_V_DOBĚ_NEHODY', 
        'STAV_KOMUNIKACE', 'POVĚTRNOSTNÍ_PODMÍNKY_V_DOBĚ_NEHODY', 
        'VIDITELNOST', 'ROZHLEDOVÉ_POMĚRY', 'DĚLENÍ_KOMUNIKACE', 'SITUOVÁNÍ_NEHODY_NA_KOMUNIKACI', 'ŘÍZENÍ_PROVOZU_V_DOBĚ_NEHODY', 
        'MÍSTNÍ_ÚPRAVA_PŘEDNOSTI_V_JÍZDĚ', 'SPECIFICKÁ_MÍSTA_A_OBJEKTY_V_MÍSTĚ_NEHODY', 'SMĚROVÉ_POMĚRY', 'POČET_ZÚČASTNĚNÝCH_VOZIDEL', 
        'MÍSTO_DOPRAVNÍ_NEHODY', 'DRUH_VOZIDLA', 'ŠKODA_NA_VOZIDLE', 'DRUH_NEHODY', 'DRUH_SRÁŽKY_JEDOUCÍCH_VOZIDEL', 'DRUH_PEVNÉ_PŘEKÁŽKY', 
        'ZAVINĚNÍ_NEHODY', 'ALKOHOL_U_VINÍKA_NEHODY_PŘÍTOMEN', 'HLAVNÍ_PŘÍČINY_NEHODY', 'DRUH_POVRCHU_VOZOVKY', 'ROK_VÝROBY_VOZIDLA', 
        'LOKALITA_NEHODY', 'CHARAKTERISTIKA_VOZIDLA_', 'SMYK',  
        'KATEGORIE_ŘIDIČE', 'STAV_ŘIDIČE', 'VNĚJŠÍ_OVLIVNĚNÍ_ŘIDIČE', 'OBDOBI', 'SVATEK']

## 2.3 Stage and pipeline

In [None]:
stages = []

#bucketizer
from pyspark.ml.feature import Bucketizer

##CELKOVÁ_HMOTNÁ_ŠKODA
bucketBorders = [-float("inf"), 0, 500, float("inf")]
bucketer1 = (Bucketizer()
            .setSplits(bucketBorders)
            .setInputCol("CELKOVÁ_HMOTNÁ_ŠKODA")
            .setOutputCol("CELKOVÁ_HMOTNÁ_ŠKODA_bucket"))
stages.append(bucketer1)

##POČET_ZÚČASTNĚNÝCH_VOZIDEL
bucketBorders = [-float("inf"), 1, 2, float("inf")]
bucketer2 = (Bucketizer()
            .setSplits(bucketBorders)
            .setInputCol("POČET_ZÚČASTNĚNÝCH_VOZIDEL")
            .setOutputCol("POČET_ZÚČASTNĚNÝCH_VOZIDEL_bucket"))
stages.append(bucketer2)

##ROK_VÝROBY_VOZIDLA
bucketBorders = [-float("inf"), 0, 10, 20, float("inf")]
bucketer3 = (Bucketizer()
            .setSplits(bucketBorders)
            .setInputCol("ROK_VÝROBY_VOZIDLA")
            .setOutputCol("ROK_VÝROBY_VOZIDLA_bucket"))
stages.append(bucketer3)

##ŠKODA_NA_VOZIDLE
bucketBorders = [-float("inf"), 0, 300, float("inf")]
bucketer4 = (Bucketizer()
            .setSplits(bucketBorders)
            .setInputCol("ŠKODA_NA_VOZIDLE")
            .setOutputCol("ŠKODA_NA_VOZIDLE_bucket"))
stages.append(bucketer4)

#string indexer for bucketed features
from pyspark.ml.feature import StringIndexer
for c in numericalFeatures:
    indexer = StringIndexer(inputCol = "{}_bucket".format(c) ,outputCol='{}_index'.format(c) ,handleInvalid="skip")
    stages.append(indexer)

#string indexer for categorical features
from pyspark.ml.feature import StringIndexer
for c in categoricalFeatures:
    indexer2 = StringIndexer(inputCol = c ,outputCol='{}_index'.format(c) ,handleInvalid="skip")
    stages.append(indexer2)

#one hot encoder
from pyspark.ml.feature import OneHotEncoder
for c in cols:
    encoder = OneHotEncoder(inputCol = "{}_index".format(c), outputCol='{}_oh'.format(c))
    stages.append(encoder)

# assemble vector
fullVector = VectorAssembler(inputCols=["{}_oh".format(c) for c in cols], outputCol="features")
stages.append(fullVector)

#pipeline
print(stages)
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(droppedData)
modelData = pipelineModel.transform(droppedData)

In [47]:
modelData["LABEL","features"].limit(5).toPandas()

Unnamed: 0,LABEL,fullVector
0,0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ..."
1,0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, ..."
2,1,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, ..."
3,0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ..."
4,0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, ..."


## 2.4 Data Output

In [51]:
modelData.write.mode('overwrite').parquet("modelData")