In [1]:
# medir tiempos
%load_ext autotime

time: 623 µs (started: 2021-06-12 15:58:34 -05:00)


# Apache Spark [Apache spark](https://spark.apache.org/)
Apache Spark™ es un motor de análisis unificado para el procesamiento de datos a gran escala.
### pyspark
PySpark es una interfaz para Apache Spark en Python. No solo le permite escribir aplicaciones Spark utilizando las API de Python, sino que también proporciona el shell de PySpark para analizar interactivamente sus datos en un entorno distribuido. PySpark es compatible con la mayoría de las funciones de Spark, como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) y Spark Core.

##### Modulo para encontrar pyspark en el sistema

In [2]:
import findspark
findspark.init("/usr/local/spark/spark-3.1.1-bin-hadoop2.7")    #para linux
#findspark.init()                                                 #para windows

time: 5.12 ms (started: 2021-06-12 15:58:36 -05:00)


##### Importando pyspark 

In [3]:
from pyspark import SparkConf, SparkContext
# Variable de configuración
conf = SparkConf().setMaster("local[*]").setAppName("ModeloML").set("spark.driver.maxResultSize","0")
# iniciamos un contexto spark (solo se ejecuta uno. Para ejecutar otra vez , reiniciar el kernel)
sc = SparkContext(conf = conf)
sc

time: 4.82 s (started: 2021-06-12 15:58:36 -05:00)


##### Importando el conjunto de datos

In [4]:
from pyspark.sql.types import StringType
from pyspark import SQLContext
# le pasamos el contexto anterior
sqlContext = SQLContext(sc)
dfspark = sqlContext.read.format('csv').option("header","true").option("inferSchema","true").load('train.csv')

time: 1min 21s (started: 2021-06-12 15:58:41 -05:00)


##### Columnas y tipo de datos

In [5]:
dfspark.printSchema()

root
 |-- key: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)

time: 24.2 ms (started: 2021-06-12 16:00:03 -05:00)


##### Total de filas

In [6]:
cant_total=dfspark.count()
print("total de filas del conjunto de datos",cant_total)

total de filas del conjunto de datos 55423856
time: 14.7 s (started: 2021-06-12 16:00:03 -05:00)


##### Tomando una muestra del 4% del conjunto de datos

In [7]:
dfspark_sample = dfspark.sample(fraction = 0.04, withReplacement = False, seed=0)

time: 6.34 ms (started: 2021-06-12 16:00:18 -05:00)


##### Total de filas de la muestra

In [8]:
cant_muestra=dfspark_sample.count()
print("total de filas de la muestra del conjunto de datos",cant_muestra)

total de filas de la muestra del conjunto de datos 2219408
time: 16.4 s (started: 2021-06-12 16:00:18 -05:00)


### Limpieza de la muestra

Se procederá a eliminar la columna con la característica "key", debido a que contiene datos innecesarios para lograr el objetivo.

In [9]:
dfspark_sample = dfspark_sample.drop('key')

time: 18.9 ms (started: 2021-06-12 16:00:34 -05:00)


###### Eliminando Valores Nulos de la tabla

In [10]:
# fare_amount (costo de viaje) no nulos
dfspark_sample = dfspark_sample.filter("fare_amount is not NULL")
# passenger (número de pasajeros) no nulos
dfspark_sample = dfspark_sample.filter("passenger_count is not NULL")
# pickup_datetime (fecha y hora de incio de viaje) no nulos
dfspark_sample = dfspark_sample.filter("pickup_datetime is not NULL")
# pickup (longitud y latitud de inicio de viaje) no nulos
dfspark_sample = dfspark_sample.filter("pickup_longitude is not NULL")
dfspark_sample = dfspark_sample.filter("pickup_latitude is not NULL")
# dropoff (longitud y laitud de fin de viaje) no nulos
dfspark_sample = dfspark_sample.filter("dropoff_longitude is not NULL")
dfspark_sample = dfspark_sample.filter("dropoff_latitude is not NULL")

time: 446 ms (started: 2021-06-12 16:00:34 -05:00)


###### Eliminado valores nan y duplicados

In [11]:
# tabla sin valores nan, sin duplicados
dfspark_sample=dfspark_sample.na.drop().dropDuplicates()
# cantidad de  data sin nulos ni nan
cantnn_muestra=dfspark_sample.count()
print("Cantidad de filas eliminadas de la muestra: ",cant_muestra-cantnn_muestra)

Cantidad de filas eliminadas de la muestra:  18
time: 1min 6s (started: 2021-06-12 16:00:35 -05:00)


##### La muestra se guarda en el HDFS 
Este metodo acelera algunos procesos.

In [12]:
dfspark_sample.persist()

DataFrame[fare_amount: double, pickup_datetime: string, pickup_longitude: double, pickup_latitude: double, dropoff_longitude: double, dropoff_latitude: double, passenger_count: int]

time: 135 ms (started: 2021-06-12 16:01:41 -05:00)


### Estadisticas de la muestra

In [13]:
import numpy as np
import pandas as pd

time: 220 ms (started: 2021-06-12 16:01:41 -05:00)


In [14]:
# casteamos a pandas
estadisticas=dfspark_sample.describe(["pickup_longitude",
                                 "pickup_latitude",
                                 "dropoff_longitude",
                                 "dropoff_latitude",
                                 "passenger_count",
                                 "fare_amount"]).toPandas()

time: 1min 7s (started: 2021-06-12 16:01:41 -05:00)


In [15]:
estadisticas

Unnamed: 0,summary,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,fare_amount
0,count,2219390.0,2219390.0,2219390.0,2219390.0,2219390.0,2219390.0
1,mean,-72.50355231460159,39.91779369363991,-72.50697030839864,39.91850726486102,1.6851959322156087,11.352503967306385
2,stddev,13.098889190132796,9.821243657749829,13.244549098861285,10.317917931047642,1.3377753769552685,42.44902507908096
3,min,-3366.527908,-3488.079513,-3366.527908,-3488.079513,0.0,-52.0
4,max,2497.117435,2964.163855,3211.57975,3333.304575,208.0,61550.86


time: 19.4 ms (started: 2021-06-12 16:02:48 -05:00)


##### cantidad viajes por número de pasajeros.

In [16]:
dfspark_sample.groupBy("passenger_count").count().show()

+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|              1|1535882|
|              6|  47132|
|              3|  97242|
|              5| 157254|
|              4|  47158|
|              7|      1|
|              2| 326983|
|              0|   7734|
|            208|      4|
+---------------+-------+

time: 3.49 s (started: 2021-06-12 16:02:48 -05:00)


##### Observaciones :
1. Valores de passenger_count imposibles , como 34,49,51,129,208.
2. Precios demasiados elevados debido a la cantidad de pasajeros y negativo(imposible).
3. Cantidad de datos en la que el precio es menor a o igual a 0

##### minimos y máximos de latitude y longitude

In [17]:
# mínimo y máximo para longitud del subconjunto de datos
long_min_i=dfspark_sample.agg({'pickup_longitude': 'min'}).show()
long_max_i=dfspark_sample.agg({'pickup_longitude': 'max'}).show()
long_min_f=dfspark_sample.agg({'dropoff_longitude': 'min'}).show()
long_max_f=dfspark_sample.agg({'dropoff_longitude': 'max'}).show()

# mínimo y máximo para para latitud del subconjuntos de datos
lat_min_i=dfspark_sample.agg({'pickup_latitude': 'min'}).show()
lat_max_i=dfspark_sample.agg({'pickup_latitude': 'max'}).show()
lat_min_f=dfspark_sample.agg({'dropoff_latitude': 'min'}).show()
lat_max_f=dfspark_sample.agg({'dropoff_latitude': 'max'}).show()

+---------------------+
|min(pickup_longitude)|
+---------------------+
|         -3366.527908|
+---------------------+

+---------------------+
|max(pickup_longitude)|
+---------------------+
|          2497.117435|
+---------------------+

+----------------------+
|min(dropoff_longitude)|
+----------------------+
|          -3366.527908|
+----------------------+

+----------------------+
|max(dropoff_longitude)|
+----------------------+
|            3211.57975|
+----------------------+

+--------------------+
|min(pickup_latitude)|
+--------------------+
|        -3488.079513|
+--------------------+

+--------------------+
|max(pickup_latitude)|
+--------------------+
|         2964.163855|
+--------------------+

+---------------------+
|min(dropoff_latitude)|
+---------------------+
|         -3488.079513|
+---------------------+

+---------------------+
|max(dropoff_latitude)|
+---------------------+
|          3333.304575|
+---------------------+

time: 6.92 s (started: 2021-06-1

##### Observaciones:
1. Valores para longitud imposibles, ya que longitud varía entre -90 y 90
2. Valores para latitud imposibles, ya que latitud varía entre -180 y 180

### Transformación de la data

##### Seleccionar passenger_count de 0-9
El número de pasajeros de cada viaje puede ser entre 0 a 9.

In [18]:
dfspark_sample = dfspark_sample.filter("passenger_count < 10")

time: 56.8 ms (started: 2021-06-12 16:02:59 -05:00)


In [19]:
dfspark_sample.groupBy("passenger_count").count().show()

+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|              1|1535882|
|              6|  47132|
|              3|  97242|
|              5| 157254|
|              4|  47158|
|              7|      1|
|              2| 326983|
|              0|   7734|
+---------------+-------+

time: 3.22 s (started: 2021-06-12 16:02:59 -05:00)


##### Seleccionar fare_amount mayores o iguales a cero
El precio del viaje puede no puede ser menor que cero.

In [20]:
# Selecionar fare_amount mayor a 0
dfspark_sample = dfspark_sample.filter("fare_amount >= 0")

time: 23.1 ms (started: 2021-06-12 16:03:02 -05:00)


In [21]:
dfspark_sample.describe(["fare_amount"]).show()

+-------+------------------+
|summary|       fare_amount|
+-------+------------------+
|  count|           2219298|
|   mean|11.353255646605419|
| stddev| 42.44968689216035|
|    min|               0.0|
|    max|          61550.86|
+-------+------------------+

time: 1.51 s (started: 2021-06-12 16:03:02 -05:00)


##### Latitud y Longitud validas.
La latitud válida varía entre -90 a 90.
La longitud válida varía entre -180 a 180.

In [22]:
# Filtrando valores grandes para longitud, de tal manera que solo se considerará valores correctos.
dfspark_sample = dfspark_sample.filter("pickup_longitude < 180 and pickup_longitude > -180" )
dfspark_sample = dfspark_sample.filter("dropoff_longitude < 180 and dropoff_longitude > -180")
dfspark_sample = dfspark_sample.filter("pickup_latitude < 90 and pickup_latitude > -90" )
dfspark_sample = dfspark_sample.filter("dropoff_latitude < 90 and dropoff_latitude > -90")

time: 65.9 ms (started: 2021-06-12 16:03:03 -05:00)


In [23]:
estadisticas=dfspark_sample.describe(["pickup_longitude",
                                 "pickup_latitude",
                                 "dropoff_longitude",
                                 "dropoff_latitude"]).toPandas()

time: 2.76 s (started: 2021-06-12 16:03:04 -05:00)


In [24]:
estadisticas

Unnamed: 0,summary,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,count,2219173.0,2219173.0,2219173.0,2219173.0
1,mean,-72.49600158477304,39.91679455251933,-72.50229931133802,39.91968272022599
2,stddev,10.463045755203852,6.112153680008393,10.439016533276371,6.104146217099324
3,min,-121.9149932861328,-74.017222,-121.9151840209961,-74.177303
4,max,40.840962,74.007413,73.93996,74.95


time: 22.2 ms (started: 2021-06-12 16:03:06 -05:00)


##### Diferencias de latitud y longitud

In [25]:
# Agregamos columnas de diferencias.
from pyspark.sql.functions import abs
dfspark_sample = dfspark_sample.withColumn("dif_latitude",
                                           abs(dfspark_sample['dropoff_latitude']-dfspark_sample['pickup_latitude']))
dfspark_sample = dfspark_sample.withColumn("dif_longitude",
                                           abs(dfspark_sample['dropoff_longitude']-dfspark_sample['pickup_longitude']))

time: 140 ms (started: 2021-06-12 16:03:06 -05:00)


##### Distancia Haversine
Es la distancia entre dos puntos geográficos

In [26]:
# Creamos la función para hallar la distancia entre dos puntos geográficos
import math
from pyspark.sql.functions import udf, array, col
from pyspark.sql.types import FloatType

def haversine(x):
    lat1=x[0]
    lon1=x[1]
    lat2=x[2]
    lon2=x[3]
    
    rad=math.pi/180
    dlat=lat2-lat1
    dlon=lon2-lon1
    R=6372.795477598
    a=(math.sin(rad*dlat/2))**2 + math.cos(rad*lat1)*math.cos(rad*lat2)*(math.sin(rad*dlon/2))**2
    distancia=2*R*math.asin(math.sqrt(a))
    return distancia

distancia_udf = udf(lambda z: haversine(z), FloatType())
#spark.udf.register('distancia_udf', distancia_udf)
dfspark_sample = dfspark_sample.withColumn('distancia', distancia_udf(array('pickup_latitude','pickup_longitude','dropoff_latitude','dropoff_longitude')))                             

time: 115 ms (started: 2021-06-12 16:03:07 -05:00)


In [27]:
dfspark_sample.select(col('distancia')).show()

+----------+
| distancia|
+----------+
| 1.8683056|
| 1.8191968|
|  1.145462|
| 1.0793539|
| 1.1367035|
|  2.424119|
|  3.829884|
| 1.0900514|
| 5.5276585|
|  3.122764|
| 2.3627946|
| 5.3667502|
| 1.3431213|
|  9.317738|
|  5.013023|
| 4.6022677|
| 1.1546545|
| 20.964258|
| 0.8852747|
|0.17227533|
+----------+
only showing top 20 rows

time: 1.11 s (started: 2021-06-12 16:03:07 -05:00)


#### Creando columnas día de la semana, mes, año y hora del viaje.

###### funciones para aplicar a la columna pickupdatetime y obtener las columnas deseadas

In [28]:
# funciones que me ayudarán en la transformación.
from datetime import datetime, date, time, timedelta
import calendar
def dia(dia):
    if dia == 1:
        return 'lunes'
    if dia == 2:
        return 'martes'
    if dia == 3:
        return 'miércoles'
    if dia == 4:
        return 'jueves'
    if dia == 5:
        return 'viernes'
    if dia == 6:
        return 'sábado'
    if dia == 7:
        return 'domingo'
    if dia < 1 or dia > 7:
        return 

from pyspark.sql import Row

def dia_semana(row):
    fecha , hora , utc = row.split(" ")
    formato_fecha = "%Y-%m-%d"
    dia_semana = datetime.isoweekday(datetime.strptime(fecha,formato_fecha))
    return dia_semana

def mes(row):
    fecha , hora , utc = row.split(" ")
    formato_fecha = "%Y-%m-%d"
    mes = datetime.strptime(fecha,formato_fecha).month
    return mes

def anio(row):
    fecha , hora , utc = row.split(" ")
    formato_fecha = "%Y-%m-%d"
    anio = datetime.strptime(fecha,formato_fecha).year
    return anio

def hora(row):
    fecha , hora , utc = row.split(" ")
    formato_hora = "%H:%M:%S"
    hora = datetime.strptime(hora,formato_hora).hour
    return hora



from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# convirtiendo las funciones en funciones UDF
udf_dia_semana= udf( lambda z : dia_semana(z), IntegerType())
udf_mes= udf( lambda z : mes(z), IntegerType())
udf_anio= udf( lambda z : anio(z), IntegerType())
udf_hora= udf( lambda z : hora(z), IntegerType())

time: 2.25 ms (started: 2021-06-12 16:03:08 -05:00)


##### Creando colunma dia de la semana , hora , mes y año

In [29]:
from pyspark.sql.functions import col
dfspark_sample = dfspark_sample.withColumn('dia_semana', 
                                           udf_dia_semana(dfspark_sample['pickup_datetime'] )  )
dfspark_sample = dfspark_sample.withColumn('hora', 
                                           udf_hora(dfspark_sample['pickup_datetime'] )  )

dfspark_sample = dfspark_sample.withColumn('mes', 
                                           udf_mes(dfspark_sample['pickup_datetime'] )  )

dfspark_sample = dfspark_sample.withColumn('anio', 
                                           udf_anio(dfspark_sample['pickup_datetime'] )  )


time: 293 ms (started: 2021-06-12 16:03:08 -05:00)


In [30]:
dfspark_sample.show()

+-----------+--------------------+-----------------+-----------------+------------------+------------------+---------------+--------------------+--------------------+----------+----------+----+---+----+
|fare_amount|     pickup_datetime| pickup_longitude|  pickup_latitude| dropoff_longitude|  dropoff_latitude|passenger_count|        dif_latitude|       dif_longitude| distancia|dia_semana|hora|mes|anio|
+-----------+--------------------+-----------------+-----------------+------------------+------------------+---------------+--------------------+--------------------+----------+----------+----+---+----+
|        8.0|2015-03-12 23:14:...|-73.9931411743164|40.72793960571289|-73.99661254882812|40.744529724121094|              2|0.016590118408203125| 0.00347137451171875| 1.8683056|         4|  23|  3|2015|
|       10.0|2013-08-21 08:38:...|       -73.964837|        40.769933|        -73.983462|         40.761655|              1|0.008278000000004226|0.018625000000000114| 1.8191968|         3|

##### Guardando el conjunto de datos  de muestra en un carpeta output 

In [31]:
import os
path="file:"+os.getcwd()+"/Output"
dfspark_sample.write.format("csv").option("header", "true").save(path)

time: 1min 12s (started: 2021-06-12 16:03:23 -05:00)


In [32]:
sc.stop()

time: 1.54 s (started: 2021-06-12 16:07:24 -05:00)
