# Contar cúantos trayectos se han realizado en bus a cada uno de los destinos. Spark

En este ejemplo, no se va a tener en cuenta los campos vacios, lo cual se debería de tratar pero para simplificar vamos simplemente a saber la cantidad de trayectos a dicho destino "sin importar su origen".

In [1]:
# Importamos SparkSession, el punto de entrada clave para funcionalidades de Spark.
# A diferencia del enfoque con RDD, aquí no requerimos el SparkContext explícitamente.
# obtener el SparkContext:
from pyspark.sql import SparkSession

In [2]:
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("ejemplo_DF")\
 .getOrCreate() # Si ya existe una SparkSession, getOrCreate() la recupera, en lugar de crear una nueva.

# Aquí procedemos a la carga de los datos desde un archivo en un DataFrame.
# Esto se realiza utilizando las capacionalidades de lectura de Spark.
# Utilizamos la opción 'header' para evitar cargar la primera fila como datos, una ventaja sobre el método de manejo de RDD.
bus_trips_lines = spark.read\
 .option("header", "true")\
 .csv("/Enric/bus_trips.csv")

# Para visualizar los datos en DataFrames, disponemos de dos métodos:
# El método 'take' heredado de RDD, así como el método 'show' propio de DataFrames.
# Ambos son útiles para inspeccionar rápidamente los primeros registros.
print(bus_trips_lines.take(2)) # Uso de 'take' para obtener los primeros 2 registros.
bus_trips_lines.show(2) # Uso de 'show' para visualizar los primeros 2 registros.

[Row(trip_code='554e2d76', company='7980c0ae', line_number='83ed87ea', bus='3b3307d2', trip_type='regular', origin='TRES RIOS', origin_state='RJ', destination='JUIZ DE FORA', destination_state='MG', year='2019', month='1', day='1', trip_start_hour_and_minute='528', hour='5', minute='28', trip_end_hour_and_minute='652', trip_duration_hours='1.39', travelled_distance_km='67.97099999999999', delay_start_minutes=None, delay_end_minutes='-47.55'), Row(trip_code='c5f130e7', company='7980c0ae', line_number='83ed87ea', bus='651283f4', trip_type='regular', origin='TRES RIOS', origin_state='RJ', destination='JUIZ DE FORA', destination_state='MG', year='2019', month='1', day='1', trip_start_hour_and_minute='530', hour='5', minute='30', trip_end_hour_and_minute='653', trip_duration_hours='1.39', travelled_distance_km=None, delay_start_minutes='0.0166666666666666', delay_end_minutes='-45.983333333')]
+---------+--------+-----------+--------+---------+---------+------------+------------+------------

In [3]:
# Un recurso valioso de la API estructurada de Spark es 'printSchema'.
# Este método nos permite examinar rápidamente los tipos de datos de las columnas,
# facilitando la comprensión de la estructura del DataFrame.
bus_trips_lines.printSchema()

root
 |-- trip_code: string (nullable = true)
 |-- company: string (nullable = true)
 |-- line_number: string (nullable = true)
 |-- bus: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- origin_state: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- destination_state: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- trip_start_hour_and_minute: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- trip_end_hour_and_minute: string (nullable = true)
 |-- trip_duration_hours: string (nullable = true)
 |-- travelled_distance_km: string (nullable = true)
 |-- delay_start_minutes: string (nullable = true)
 |-- delay_end_minutes: string (nullable = true)



In [4]:
# Estas imports serán utilizados para crear una función personalizada que se integren con el DataFrame.
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Definimos una función para convertir de manera segura cadenas a enteros.
# En caso de error en la conversión, retornará None en vez de lanzar una excepción.
def safe_int_conversion(s):
    try:
        return int(s)
    except ValueError:
        return None

# Registramos la función definida como una UDF en Spark.
# Esto nos permite aplicarla en una columna de DataFrame.
safe_int_conversion_udf = udf(safe_int_conversion, IntegerType())

In [5]:
# Transformamos el DataFrame aplicando la UDF a las columnas 'year', 'month' y 'day'.
# Esto asegura que los datos en estas columnas sean enteros, manejando adecuadamente los valores no numéricos.
# Además, seleccionamos las columnas relevantes para nuestro análisis.
bus_trips_lines = bus_trips_lines.select(
    safe_int_conversion_udf("year").alias("year"),
    safe_int_conversion_udf("month").alias("month"),
    safe_int_conversion_udf("day").alias("day"),
    "origin",
    "destination"
)

In [6]:
# Agrupamos los datos por 'destination' y contamos el número de trayectos a cada destino.
# Esto resulta en un nuevo DataFrame con la cuenta de trayectos por destino.
bus_trips_destinationDF = bus_trips_lines.groupBy("destination").count()

# Renombramos la columna 'count' a 'dest_count' para claridad.
# Esto refleja que el número representa la cuenta de trayectos a cada destino.
bus_trips_destinationDF = bus_trips_destinationDF.withColumnRenamed("count", "dest_count")

# Finalmente, ordenamos y mostramos los 5 destinos más populares basados en el número de viajes.
# Utilizamos la función desc de pyspark.sql.functions para ordenar de manera descendente.
bus_trips_destinationDF.orderBy(F.desc("dest_count")).show(5)

+--------------+----------+
|   destination|dest_count|
+--------------+----------+
|     SAO PAULO|     17102|
|RIO DE JANEIRO|     14299|
|BELO HORIZONTE|      4626|
|        RECIFE|      2874|
| FLORIANOPOLIS|      2857|
+--------------+----------+
only showing top 5 rows

