## Extracción y transformación de datos con Spark 2.4
### Origen de datos
Los datos representan los accidentes de tráfico por día y distrito
que se han producido en la ciudad de Madrid entre 2010 y 2018

El fichero origen es un fichero csv por año tiene los datos detallados
por cada uno de los intervinientes en el accidente: Conductor, pasajero,
testito, peatón, ...

La primera parte del fichero está referida al accidente y se repite
por cada uno de los intervinientes.

### La parte del registro referida al accidente tiene los campos:
 - 'FECHA'
 - 'RANGO HORARIO'
 - 'DIA SEMANA'
 - 'DISTRITO'
 - 'LUGAR ACCIDENTE'
 - 'Nº: string'
 - 'Nº PARTE'     **Id.accidente, es el mismo para varios regs.**
 - 'CPFA Granizo'
 - 'CPFA Hielo'
 - 'CPFA Lluvia'
 - 'CPFA Niebla'
 - 'CPFA Seco'
 - 'CPFA Nieve'
 - 'CPSV Mojada'
 - 'CPSV Aceite'
 - 'CPSV Barro'
 - 'CPSV Grava Suelta'
 - 'CPSV Hielo'
 - 'CPSV Seca Y Limpia'
 - '* Nº VICTIMAS' **Total de víctimas por Nº de parte**
 - 'TIPO ACCIDENTE'
 - 'Tipo Vehiculo'
 
### La parte del registro correspondiente a cada persona:
 - 'TIPO PERSONA'
 - 'SEXO'
 - 'LESIVIDAD'
 - 'Tramo Edad'


In [40]:
import pyspark
from pyspark.sql import SparkSession
app_name = "bubbly"
master = "local[*]"
spark = (SparkSession.builder
    .master(master)
    .config("spark.driver.cores", 1)
    .appName(app_name)
    .getOrCreate() )
sc = spark.sparkContext
print ('SparkContext created')

SparkContext created


In [41]:
from  pymongo import MongoClient

In [42]:
uri = 'mongodb+srv://sparkUser:sparkUser@cluster0-emtpq.mongodb.net/test?retryWrites=true'
client = MongoClient(uri)         
db=client.test        

In [43]:
print (client)

MongoClient(host=['cluster0-shard-00-00-emtpq.mongodb.net:27017', 'cluster0-shard-00-01-emtpq.mongodb.net:27017', 'cluster0-shard-00-02-emtpq.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='Cluster0-shard-0', ssl=True, retrywrites=True)


In [44]:
from unidecode import unidecode


def normalize(cadena):
    return unidecode(cadena.lower())

string_acentos = 'Ës un cáfÉ'

print (normalize(string_acentos))

es un cafe


In [59]:
def getGeoFromCallejero (calle, numero):
    street = normalize (calle)
    record = db.callejero.find_one({'name':street})
    if record:
        for n in record['nums']:
            if n['num']==numero:
                return n['location']['coordinates']
    return None
print (getGeoFromCallejero ('Calle Mayor','1'))
print (getGeoFromCallejero ('djfñslkdjfñls','3'))


def getGeo (calle, numero):
    location = getGeoFromCallejero (calle, numero)
    if not location :
        #incluir mas fuentes para geolocalizar
        return None
    return location

g=getGeo ('Calle Mayor','1')
print (g)
print (getGeo ('djfñslkdjfñls','3'))

print ("{} --> {}".format(type(g),type(g[0])))
    

[40.4164694, -3.7164694]
None
[40.4164694, -3.7164694]
None
<class 'list'> --> <class 'float'>


In [60]:
from pyspark.sql.types import ArrayType, FloatType

getGeo_udf = func.udf(lambda d: getGeo (d[0], d[1]), ArrayType(FloatType()))

In [46]:
# Cargamos el fichero desde hdfs
#accidenteData = spark.read.csv ('hdfs://localhost:9000/user/ubuntu/accidentes/datos',header=True)
#accidenteData = spark.read.csv ('file:///home/ubuntu/Downloads/2018_Accidentalidad.csv',header=True)
accidenteData = spark.read.csv ('file:///home/jose/Descargas/2018.csv',header=True)

In [50]:
import pyspark.sql.functions as func

# Reducimos las columnas que tenemos que utilizar mediante la Select
# y aplicamos las funciones necesarias a los datos
accRed=accidenteData\
  .filter ((accidenteData['TIPO PERSONA'] == 'CONDUCTOR'))\
  .select(
        accidenteData["Nº PARTE"].alias('idAccidente'), \
        func.to_date(accidenteData.FECHA, 'dd/MM/yyyy').alias('fecha'), \
        accidenteData['RANGO HORARIO'].alias('rangoHora'), \
        accidenteData.DISTRITO.alias('distrito'), \
        func.when ((accidenteData['Nº']!='0'),\
                    func.regexp_replace (accidenteData['LUGAR ACCIDENTE'],' NUM\s*$',' '), \
             ).alias('calle') ,\
        accidenteData['Nº'].alias('numCalle'), \
        accidenteData["TIPO ACCIDENTE"].alias('tipoAccidente'))\
   .dropDuplicates()

In [51]:
accRed.limit(10).toPandas()

Unnamed: 0,idAccidente,fecha,rangoHora,distrito,calle,numCalle,tipoAccidente
0,2018/510,2018-01-09,DE 8:00 A 8:59,USERA,AVENIDA DE ANDALUCIA,19,CAÍDA MOTOCICLETA
1,2018/356,2018-01-09,DE 20:00 A 20:59,SALAMANCA,CALLE DE GOYA,91,CAÍDA MOTOCICLETA
2,2018/541,2018-01-12,DE 15:00 A 15:59,CIUDAD LINEAL,CALLE DE ARTURO SORIA,126,COLISIÓN DOBLE
3,2018/874,2018-01-22,DE 15:00 A 15:59,RETIRO,,0,CAÍDA MOTOCICLETA
4,2018/1048,2018-01-25,DE 19:00 A 19:59,VILLA DE VALLECAS,CARRETERA DEL VERTEDERO MUNICIPAL VALDEMINGOMEZ,40,CHOQUE CON OBJETO FIJO
5,2018/1100,2018-01-28,DE 12:00 A 12:59,SAN BLAS,CALLE DE SOFIA,7,CAÍDA BICICLETA
6,2018/1368,2018-02-01,DE 10:00 A 10:59,ARGANZUELA,CALLE 30 15RV KM. ...,20,CHOQUE CON OBJETO FIJO
7,2018/1275,2018-02-02,DE 16:00 A 16:59,BARAJAS,,0,COLISIÓN DOBLE
8,2018/1488,2018-02-05,DE 13:00 A 13:59,TETUAN,CALLE DE VEZA,12,COLISIÓN DOBLE
9,2018/1825,2018-02-15,DE 22:00 A 22:59,CHAMBERI,,0,CAÍDA MOTOCICLETA


In [63]:
accRedGeo=accRed.limit(10).withColumn ('geo', getGeo_udf (func.struct('calle', 'numCalle')))
accRedGeo.toPandas()

Traceback (most recent call last):
  File "/home/jose/spark/python/pyspark/serializers.py", line 587, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/home/jose/spark/python/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/home/jose/spark/python/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/home/jose/anaconda3/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/home/jose/anaconda3/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/jose/anaconda3/lib/python3.7/pickle.py", line 771, in save_tuple
    save(element)
  File "/home/jose/anaconda3/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/jose/spark/python/pyspark/cloudpickle.py", line 400, in save_function
    self.save_function_tuple(obj)
  File "/home/jose/spark/python/pyspark/cloudpickle.py", line 549, in save_funct

PicklingError: Could not serialize object: TypeError: can't pickle SSLContext objects

In [64]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct

def sum(x, y):
    return x + y

sum_cols = udf(sum, IntegerType())

a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols('A', 'B')).show()

+---+---+---+
| ID|  A|  B|
+---+---+---+
|101|  1| 16|
+---+---+---+

+---+---+---+------+
| ID|  A|  B|Result|
+---+---+---+------+
|101|  1| 16|    17|
+---+---+---+------+



In [15]:
# Agrupamos por fecha, distrito, accidente y generamos varias columnas contadoras 
accGrouped=accRed\
                .groupBy('fecha','distrito','idAccidente')\
                .agg(func.count(func.when(accRed.lesividad=='MT',1)).alias('muertos'),
                     func.count(func.when(accRed.lesividad=='HG',1)).alias('graves'),
                     func.count(func.when(accRed.lesividad=='HL',1)).alias('leves'),
                     func.count(func.when((accRed.sexo == 'MUJER') &
                                          (accRed.tipoPersona == 'CONDUCTOR'),1)).alias('condMujer'),
                     func.count(func.when((accRed.sexo == 'HOMBRE') &
                                          (accRed.tipoPersona == 'CONDUCTOR'),1)).alias('condHombre'),
                    )\
                .na.fill(0)\
                .sort ('fecha','distrito','idAccidente')

In [16]:
# Añadimos una columna con la indicación "Hombre" cuando los conductores son solo hombres o
# "Mujer" si son solo mujeres, en el resto de casos irá la etiqueta "Ambos"
accGrCondFila=accGrouped.select ("fecha", "distrito", "idAccidente","muertos","graves","leves", "condMujer", "condHombre")\
                .withColumn("conductorHM",
                            func.when (accGrouped.condMujer == 0, "Hombre")
                           .when (accGrouped.condHombre == 0, "Mujer")
                           .otherwise("Ambos")
                )
accGrCondFila.limit(15).toPandas()

Unnamed: 0,fecha,distrito,idAccidente,muertos,graves,leves,condMujer,condHombre,conductorHM
0,2018-01-01,ARGANZUELA,2018/25,0,0,1,0,2,Hombre
1,2018-01-01,CARABANCHEL,2018/23,0,1,0,0,2,Hombre
2,2018-01-01,CENTRO,2018/76,0,0,2,0,1,Hombre
3,2018-01-01,CHAMARTIN,2018/43,0,0,1,0,2,Hombre
4,2018-01-01,CIUDAD LINEAL,2018/12,0,1,2,0,2,Hombre
5,2018-01-01,FUENCARRAL-EL PARDO,2018/16,0,0,2,0,1,Hombre
6,2018-01-01,HORTALEZA,2018/3,0,1,0,0,1,Hombre
7,2018-01-01,MORATALAZ,2018/8,0,0,1,0,2,Hombre
8,2018-01-01,RETIRO,2018/34,0,0,2,0,2,Hombre
9,2018-01-01,SALAMANCA,2018/20,0,1,1,1,1,Ambos


In [17]:
accRes = accGrouped\
   .groupBy (accGrouped.fecha, accGrouped.distrito)\
   .agg (
            func.count(func.when(accGrouped.condMujer==0,1)).alias('condHombre'),
            func.count(func.when(accGrouped.condHombre==0,1)).alias('condMujer'),
            func.count(func.when((accGrouped.condMujer>0)&
                                 (accGrouped.condHombre>0),1)).alias('condMixto'),
            func.sum (func.when (accGrouped.condMujer==0,accGrouped.muertos)).alias('muertosCondHombre'),
            func.sum (func.when (accGrouped.condMujer==0,accGrouped.graves)).alias('gravesCondHombre'),
            func.sum (func.when (accGrouped.condMujer==0,accGrouped.leves)).alias('levesCondHombre'),
            func.sum (func.when (accGrouped.condHombre==0,accGrouped.muertos)).alias('muertosCondMujer'),
            func.sum (func.when (accGrouped.condHombre==0,accGrouped.graves)).alias('gravesCondMujer'),
            func.sum (func.when (accGrouped.condHombre==0,accGrouped.leves)).alias('levesCondMujer'),
            func.sum (accGrouped.muertos).alias('totalMuertos'),
            func.sum (accGrouped.graves).alias('totalGraves'),
            func.sum (accGrouped.leves).alias('totalLeves')
        )\
    .na.fill(0)\
    .orderBy (accGrouped.fecha, accGrouped.distrito)

#### Por comodidad para poder guardar todos los datos en un único fichero utilizamos la librería pandas.


In [18]:
df=accRes.toPandas()
df.to_csv('~/accidente_victimas_por_sexo_conductor.csv',sep=';')
df2=accGrCondFila.toPandas()
df2.to_csv('~/agrupadoPorAccidente_SexoConductor.csv',sep=';')
