### Importaciones e inicio del cluster

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, hour, minute
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
import pandas as pd

In [2]:
# Iniciar un cluster de Spark, de lo contrario crearlo
spark = SparkSession.builder \
    .appName("Accidentes de Trafico") \
    .getOrCreate()

### Analisis y seleccion de los datos

In [3]:
# Ruta al archivo local
ruta = '../../data/US_Accidents_March23.csv'
accidentes = spark.read.csv(ruta, header=True, inferSchema=True)
accidentes.show(5)
accidentes.printSchema()

+---+-------+--------+-------------------+-------------------+-----------------+------------------+-------+-------+------------+--------------------+--------------------+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID| Source|Severity|         Start_Time|           End_Time|        Start_Lat|         Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|              Street|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Ameni

In [4]:
# Columnas a tratar
columnas = ['Severity', 'Start_Time', 'Start_Lng', 'Start_Lat', 'City', 
            'State', 'Temperature(F)', 'Humidity(%)', 'Pressure(in)',
            'Visibility(mi)', 'Wind_Speed(mph)', 'Precipitation(in)',
            'Weather_Condition']
# En pyspark se usa la funcion col() para crear el objeto fuera del schema
accidentes = accidentes.select([col(c) for c in columnas])
accidentes = accidentes.na.drop()

# Debemos separar la la columna Start_Time a valores numericos
# para que esta sea admitida por el modelo de ML.

# Spark tiene sus funciones correspondientes para lograr esto
accidentes = accidentes.withColumn("Year", year("Start_Time")) \
           .withColumn("Month", month("Start_Time")) \
           .withColumn("Day", dayofmonth("Start_Time")) \
           .withColumn("Hour", hour("Start_Time")) \
           .withColumn("Minute", minute("Start_Time"))
accidentes.show(5)

+--------+-------------------+------------------+------------------+------------+-----+--------------+-----------+------------+--------------+---------------+-----------------+-----------------+----+-----+---+----+------+
|Severity|         Start_Time|         Start_Lng|         Start_Lat|        City|State|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Year|Month|Day|Hour|Minute|
+--------+-------------------+------------------+------------------+------------+-----+--------------+-----------+------------+--------------+---------------+-----------------+-----------------+----+-----+---+----+------+
|       3|2016-02-08 07:44:26|-82.92519399999998|40.100590000000004| Westerville|   OH|          37.9|       97.0|       29.63|           7.0|            3.5|             0.03|       Light Rain|2016|    2|  8|   7|    44|
|       3|2016-02-08 08:10:04|-82.92519399999998|40.100590000000004| Westerville|   OH|          37.4|      100.

In [5]:
# StringIndexer() para darle valores numericos a la variable 'Weather_Condition', 
# luego aplicamos OneHotEncoder(); que convierte la categorizacion previa a forma binaria
weather_indexer = StringIndexer(inputCol="Weather_Condition", outputCol="Weather_Condition_Indexed")
weather_encoder = OneHotEncoder(inputCol="Weather_Condition_Indexed", outputCol="Weather_Condition_Encoded")

# Codificar City
city_indexer = StringIndexer(inputCol="City", outputCol="City_Indexed")
city_encoder = OneHotEncoder(inputCol="City_Indexed", outputCol="City_Encoded")

# Codificar State
state_indexer = StringIndexer(inputCol="State", outputCol="State_Indexed")
state_encoder = OneHotEncoder(inputCol="State_Indexed", outputCol="State_Encoded")

# VectorAssembler() fusiona varias columnas en un 'vector columna'
assembler = VectorAssembler(inputCols=['Year', 'Month', 'Day', 'Hour', 'Minute', 
                                       'Start_Lng', 'Start_Lat', 'City_Encoded', 
                                       'State_Encoded', 'Temperature(F)', 'Humidity(%)', 
                                       'Pressure(in)','Visibility(mi)', 'Wind_Speed(mph)', 
                                       'Precipitation(in)', 'Weather_Condition_Encoded'], 
                            outputCol="features")

# Pipeline: indexer, encoder y assembler son clases (no se han ejecutado)
# se llamara a estas transformaciones desde un Pipeline
pipeline = Pipeline(stages=[
    city_indexer, city_encoder,
    state_indexer, state_encoder,
    weather_indexer, weather_encoder,
    assembler
])
model = pipeline.fit(accidentes)
transformed_data = model.transform(accidentes)

# Mostrar datos transformados
transformed_data.select("features", "Severity").show(5)

+--------------------+--------+
|            features|Severity|
+--------------------+--------+
|(12485,[0,1,2,3,4...|       3|
|(12485,[0,1,2,3,4...|       3|
|(12485,[0,1,2,3,4...|       3|
|(12485,[0,1,2,3,4...|       2|
|(12485,[0,1,2,3,4...|       2|
+--------------------+--------+
only showing top 5 rows

