In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=914f8d9c520be53a766ab425928cc28e22e05aed2aa01812b6b9aa6ba63d0830
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


## Introducción

El proceso de ETL del conjunto de datasets de Taxis se realiza en 3 Etapas: <br>
    1. ETL sobre un archivo <br>
    2. ETL sobre el resto de archivos, mediante una función que replica los procedimientos de la Etapa 1, posteriomente los resultados se unen en un dataset final <br>
    3. ETL sobre el dataset final.

## Librerias

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, date_format, lit, substring, count, when, rand
from pyspark.sql.types import IntegerType
import requests
import os
import pandas as pd

## Etapa 1

In [3]:
# Crea una instancia de SparkSession
spark = SparkSession.builder.appName("Taxis").getOrCreate()

### 1. Descarga de datos

In [4]:
# Taxis amarillos 2020-03
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-03.parquet"
response = requests.get(url)
with open("yellow_2020-03.parquet", "wb") as file:
    file.write(response.content)


In [5]:
df = spark.read.parquet("yellow_2020-03.parquet")

### 2. Selección de columnas

In [6]:
# Seleccionar las columnas deseadas
selec_columns = df.select(
    "tpep_pickup_datetime",
    "PULocationID",
    "DOLocationID",
    "payment_type",
    "total_amount"
)


# Mostrar información sobre el DataFrame
selec_columns.printSchema()

root
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- total_amount: double (nullable = true)



In [7]:
selec_columns.show(5)

+--------------------+------------+------------+------------+------------+
|tpep_pickup_datetime|PULocationID|DOLocationID|payment_type|total_amount|
+--------------------+------------+------------+------------+------------+
| 2020-03-01 00:31:13|          88|         255|           1|        27.8|
| 2020-03-01 00:08:22|         193|         193|           2|         3.8|
| 2020-03-01 00:52:18|         246|          90|           1|       11.75|
| 2020-03-01 00:47:53|         151|         238|           1|       10.56|
| 2020-03-01 00:43:19|          79|         261|           1|       24.35|
+--------------------+------------+------------+------------+------------+
only showing top 5 rows



### 3. Columna "tpep_pickup_datetime". Creación de las Columnas "pickup_date" y "pickup_time"

In [8]:
# Separar las fechas y los horarios
selec_columns = selec_columns.withColumn("pickup_date", to_date(col("tpep_pickup_datetime")))
selec_columns = selec_columns.withColumn("pickup_time", date_format(col("tpep_pickup_datetime"), "HH:mm:ss"))

# Eliminar la columna 'tpep_pickup_datetime'
selec_columns = selec_columns.drop("tpep_pickup_datetime")

In [9]:
# Mostrar el DataFrame resultante con las nuevas columnas
selec_columns.show(5)

+------------+------------+------------+------------+-----------+-----------+
|PULocationID|DOLocationID|payment_type|total_amount|pickup_date|pickup_time|
+------------+------------+------------+------------+-----------+-----------+
|          88|         255|           1|        27.8| 2020-03-01|   00:31:13|
|         193|         193|           2|         3.8| 2020-03-01|   00:08:22|
|         246|          90|           1|       11.75| 2020-03-01|   00:52:18|
|         151|         238|           1|       10.56| 2020-03-01|   00:47:53|
|          79|         261|           1|       24.35| 2020-03-01|   00:43:19|
+------------+------------+------------+------------+-----------+-----------+
only showing top 5 rows



In [10]:
selec_columns.printSchema()

root
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- pickup_time: string (nullable = true)



In [11]:
# Seleccionar una semana (del 03-04 al 03-10)

# Convierte la cadena '2020-03-04' a tipo de dato Date para comparación
start_date = to_date(lit('2020-03-04'))

# Convierte la cadena '2020-03-10' a tipo de dato Date para comparación
end_date = to_date(lit('2020-03-10'))

# Filtra los registros entre 'start_date' y 'end_date'
filtered_df = selec_columns.filter((col("pickup_date") >= start_date) & (col("pickup_date") <= end_date))

# Muestra el DataFrame resultante con los registros deseados
filtered_df.head(5)

[Row(PULocationID=144, DOLocationID=186, payment_type=2, total_amount=13.8, pickup_date=datetime.date(2020, 3, 4), pickup_time='00:01:14'),
 Row(PULocationID=230, DOLocationID=223, payment_type=1, total_amount=27.88, pickup_date=datetime.date(2020, 3, 4), pickup_time='00:00:02'),
 Row(PULocationID=148, DOLocationID=223, payment_type=1, total_amount=34.56, pickup_date=datetime.date(2020, 3, 4), pickup_time='00:00:12'),
 Row(PULocationID=265, DOLocationID=265, payment_type=1, total_amount=162.88, pickup_date=datetime.date(2020, 3, 4), pickup_time='00:01:31'),
 Row(PULocationID=162, DOLocationID=239, payment_type=2, total_amount=13.8, pickup_date=datetime.date(2020, 3, 4), pickup_time='00:00:51')]

### 4. Columna "pickup_time". Creación de la Columna "pickup_hour"


In [12]:
# Crea una columna con las horas (hh) "pickup_hour"
filtered_df = filtered_df.withColumn("pickup_hour", substring(col("pickup_time"), 1, 2))

# Elimina la columna 'pickup_time'
filtered_df = filtered_df.drop("pickup_time")

In [13]:
# Muestra el DataFrame resultante con los registros deseados
filtered_df.printSchema()

root
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- pickup_hour: string (nullable = true)



In [14]:
# Selección de la franja horaria desde las 6 hasta las 20 inclusive. 6:00:00 hasta las 20:59:59

start_time = '06'
end_time = '20'

# Filtra los registros entre 'start_time' y 'end_time'
filtered_df = filtered_df.filter((col("pickup_hour") >= start_time) & (col("pickup_hour") <= end_time))

In [15]:
# Muestra el DataFrame resultante con los registros deseados
filtered_df.tail(5)

[Row(PULocationID=211, DOLocationID=26, payment_type=0, total_amount=35.66, pickup_date=datetime.date(2020, 3, 10), pickup_hour='20'),
 Row(PULocationID=265, DOLocationID=35, payment_type=0, total_amount=36.36, pickup_date=datetime.date(2020, 3, 10), pickup_hour='20'),
 Row(PULocationID=42, DOLocationID=228, payment_type=0, total_amount=53.71, pickup_date=datetime.date(2020, 3, 10), pickup_hour='20'),
 Row(PULocationID=61, DOLocationID=77, payment_type=0, total_amount=17.96, pickup_date=datetime.date(2020, 3, 10), pickup_hour='20'),
 Row(PULocationID=236, DOLocationID=112, payment_type=0, total_amount=33.71, pickup_date=datetime.date(2020, 3, 10), pickup_hour='20')]

In [16]:
# Convierte la columna a tipo entero
filtered_df = filtered_df.withColumn("pickup_hour", filtered_df["pickup_hour"].cast(IntegerType()))

### 5. Columnas  "PULocationID" , "DOLocationID" y "payment_type"

In [17]:
# Convierte las columnas a tipo entero
filtered_df = filtered_df.withColumn("PULocationID", filtered_df["PULocationID"].cast(IntegerType()))
filtered_df = filtered_df.withColumn("DOLocationID", filtered_df["DOLocationID"].cast(IntegerType()))
filtered_df = filtered_df.withColumn("payment_type", filtered_df["payment_type"].cast(IntegerType()))

### 6. Tratamiento de faltantes

In [18]:
# Calcula la cantidad de valores faltantes por columna
missing_values = filtered_df.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in filtered_df.columns]
)

# Muestra la cantidad de valores faltantes
missing_values.show()

+------------+------------+------------+------------+-----------+-----------+
|PULocationID|DOLocationID|payment_type|total_amount|pickup_date|pickup_hour|
+------------+------------+------------+------------+-----------+-----------+
|           0|           0|           0|           0|          0|          0|
+------------+------------+------------+------------+-----------+-----------+



No se presentan datos faltantes

### 7. Selección de muestra aleatoria de 100 filas

In [19]:
# Cuenta la cantidad de filas en el DataFrame filtered_df
cantidad_de_filas = filtered_df.count()

# Muestra la cantidad de filas
print(f"El DataFrame contiene {cantidad_de_filas} filas.")

El DataFrame contiene 1126614 filas.


In [20]:
#Seleccion de muestra aleatoria

# Utiliza la función sample para seleccionar 100 filas al azar
yellow_2020_03 = filtered_df.sample(False, 100 / filtered_df.count(), seed=42)

### 8. Dataframe Parcial

In [21]:
# Cuenta la cantidad de filas en el DataFrame selec_columns
cantidad_de_filas = yellow_2020_03.count()

# Muestra la cantidad de filas
print(f"El DataFrame contiene {cantidad_de_filas} filas.")

El DataFrame contiene 108 filas.


In [22]:
# Muestra la estructura actualizada del DataFrame
yellow_2020_03.printSchema()

root
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- pickup_hour: integer (nullable = true)



In [23]:
# Mostrar las primeras 5 filas del DataFrame en forma de tabla
yellow_2020_03.show(5)

+------------+------------+------------+------------+-----------+-----------+
|PULocationID|DOLocationID|payment_type|total_amount|pickup_date|pickup_hour|
+------------+------------+------------+------------+-----------+-----------+
|         230|          12|           2|        23.8| 2020-03-04|         10|
|         143|         236|           1|        16.0| 2020-03-04|         11|
|         263|         141|           2|         8.8| 2020-03-04|         10|
|          74|          41|           1|        17.3| 2020-03-04|         11|
|         234|         170|           1|       11.15| 2020-03-04|         12|
+------------+------------+------------+------------+-----------+-----------+
only showing top 5 rows



## Etapa 2, ETL del resto de los archivos

In [24]:
# Función ETL para el resto de archivos
def ETL_taxis(url, name, bdate, edate):
  # Extracción de datos
  url = url
  response = requests.get(url)
  with open(name, "wb") as file:
      file.write(response.content)

  df = spark.read.parquet(name)


  # Seleccionar las columnas deseadas
  pickup_datetime_column = "tpep_pickup_datetime" if "tpep_pickup_datetime" in df.columns else "lpep_pickup_datetime"

  selec_columns = df.select(
      pickup_datetime_column,
      "PULocationID",
      "DOLocationID",
      "payment_type",
      "total_amount"
  )


  # Separar las fechas y los horarios
  selec_columns = selec_columns.withColumn("pickup_date", to_date(col(pickup_datetime_column)))
  selec_columns = selec_columns.withColumn("pickup_time", date_format(col(pickup_datetime_column), "HH:mm:ss"))
  # Eliminar la columna 'tpep_pickup_datetime'
  selec_columns = selec_columns.drop(pickup_datetime_column)


  # Seleccionar una semana (del 08-04 al 08-10)
  # Convierte la cadena '2020-08-04' a tipo de dato Date para comparación
  start_date = to_date(lit(bdate))
  # Convierte la cadena '2020-08-10' a tipo de dato Date para comparación
  end_date = to_date(lit(edate))
  # Filtra los registros entre 'start_date' y 'end_date'
  filtered_df = selec_columns.filter((col("pickup_date") >= start_date) & (col("pickup_date") <= end_date))


  # Crea una columna con las horas (hh) "pickup_hour"
  filtered_df = filtered_df.withColumn("pickup_hour", substring(col("pickup_time"), 1, 2))
  # Elimina la columna 'pickup_time'
  filtered_df = filtered_df.drop("pickup_time")


  # Selección de la franja horaria desde las 6 hasta las 20 inclusive. 6:00:00 hasta las 20:59:59
  start_time = '06'
  end_time = '20'
  # Filtra los registros entre 'start_time' y 'end_time'
  filtered_df = filtered_df.filter((col("pickup_hour") >= start_time) & (col("pickup_hour") <= end_time))
  # Convierte la columna a tipo entero
  filtered_df = filtered_df.withColumn("pickup_hour", filtered_df["pickup_hour"].cast(IntegerType()))


  # Convierte las columnas a tipo entero
  filtered_df = filtered_df.withColumn("PULocationID", filtered_df["PULocationID"].cast(IntegerType()))
  filtered_df = filtered_df.withColumn("DOLocationID", filtered_df["DOLocationID"].cast(IntegerType()))
  filtered_df = filtered_df.withColumn("payment_type", filtered_df["payment_type"].cast(IntegerType()))


  #Seleccion de muestra aleatoria
  # Utiliza la función sample para seleccionar 100 filas al azar
  df_final = filtered_df.sample(False, 100 / filtered_df.count(), seed=42)
  return df_final

### 1. Taxis amarillos

### 1. 1 Aplicar la función ETL_taxis

In [25]:
# Taxis amarillos 2019-03
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-03.parquet"
name = "yellow_2019-03.parquet"
bdate = '2019-03-04'
edate = '2019-03-10'

# Llama a tu función y nombra al dataframe
yellow_2019_03 = ETL_taxis(url, name, bdate, edate)

#------------------------------------------------------------------------------------------------

# Taxis amarillos 2019-08
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-08.parquet"
name = "yellow_2019-08.parquet"
bdate = '2019-08-04'
edate = '2019-08-10'

# Llama a tu función y nombra al dataframe
yellow_2019_08 = ETL_taxis(url, name, bdate, edate)


#------------------------------------------------------------------------------------------------

# Taxis amarillos 2019-12
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-12.parquet"
name = "yellow_2019-12.parquet"
bdate = '2019-12-19'
edate = '2019-12-26'

# Llama a tu función y nombra al dataframe
yellow_2019_12 = ETL_taxis(url, name, bdate, edate)

In [26]:
# Taxis amarillos 2020-08
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-08.parquet"
name = "yellow_2020-08.parquet"
bdate = '2020-08-04'
edate = '2020-08-10'

# Llama a tu función y nombra al dataframe
yellow_2020_08 = ETL_taxis(url, name, bdate, edate)

#------------------------------------------------------------------------------------------------

# Taxis amarillos 2020-12
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-12.parquet"
name = "yellow_2020-12.parquet"
bdate = '2020-12-19'
edate = '2020-12-26'

# Llama a tu función y nombra al dataframe
yellow_2020_12 = ETL_taxis(url, name, bdate, edate)

In [27]:
# Taxis amarillos 2021-03
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-03.parquet"
name = "yellow_2021-03.parquet"
bdate = '2021-03-04'
edate = '2021-03-10'

# Llama a tu función y nombra al dataframe
yellow_2021_03 = ETL_taxis(url, name, bdate, edate)

#------------------------------------------------------------------------------------------------

# Taxis amarillos 2021-08
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-08.parquet"
name = "yellow_2021-08.parquet"
bdate = '2021-08-04'
edate = '2021-08-10'

# Llama a tu función y nombra al dataframe
yellow_2021_08 = ETL_taxis(url, name, bdate, edate)


#------------------------------------------------------------------------------------------------

# Taxis amarillos 2021-12
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-12.parquet"
name = "yellow_2021-12.parquet"
bdate = '2021-12-19'
edate = '2021-12-26'

# Llama a tu función y nombra al dataframe
yellow_2021_12 = ETL_taxis(url, name, bdate, edate)

In [28]:
# Taxis amarillos 2022-03
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-03.parquet"
name = "yellow_2022-03.parquet"
bdate = '2022-03-04'
edate = '2022-03-10'

# Llama a tu función y nombra al dataframe
yellow_2022_03 = ETL_taxis(url, name, bdate, edate)

#------------------------------------------------------------------------------------------------

# Taxis amarillos 2022-08
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-08.parquet"
name = "yellow_2022-08.parquet"
bdate = '2022-08-04'
edate = '2022-08-10'

# Llama a tu función y nombra al dataframe
yellow_2022_08 = ETL_taxis(url, name, bdate, edate)


#------------------------------------------------------------------------------------------------

# Taxis amarillos 2022-12
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-12.parquet"
name = "yellow_2022-12.parquet"
bdate = '2022-12-19'
edate = '2022-12-26'

# Llama a tu función y nombra al dataframe
yellow_2022_12 = ETL_taxis(url, name, bdate, edate)

### 1. 2 Unión y creación de columna "yellow"

In [29]:
# DataFrames con los nombres
dataframes = [yellow_2019_03, yellow_2019_08, yellow_2019_12,
              yellow_2020_03, yellow_2020_08, yellow_2020_12,
              yellow_2021_03, yellow_2021_08, yellow_2021_12,
              yellow_2022_03, yellow_2022_08, yellow_2022_12]

# Inicializa un DataFrame vacío con la misma estructura que los DataFrames originales
yellow_df = dataframes[0].filter("1=0")  # Utiliza una condición que siempre sea falsa

# Itera sobre los DataFrames y únelos uno debajo del otro
for df in dataframes:
    yellow_df = yellow_df.union(df)

In [30]:
# Puedes mostrar una vista previa de los datos
yellow_df.show(5)

+------------+------------+------------+------------+-----------+-----------+
|PULocationID|DOLocationID|payment_type|total_amount|pickup_date|pickup_hour|
+------------+------------+------------+------------+-----------+-----------+
|         161|         231|           1|       32.76| 2019-03-04|         11|
|         239|         142|           2|         7.8| 2019-03-04|         12|
|          43|          43|           1|       12.35| 2019-03-04|         13|
|         239|         237|           2|        10.8| 2019-03-04|         13|
|         162|         186|           1|       17.16| 2019-03-04|          8|
+------------+------------+------------+------------+-----------+-----------+
only showing top 5 rows



In [31]:
# Cuenta la cantidad de filas en el DataFrame selec_columns
cantidad_de_filas = yellow_df.count()

# Muestra la cantidad de filas
print(f"El DataFrame contiene {cantidad_de_filas} filas.")

El DataFrame contiene 1262 filas.


In [32]:
# Agrega una nueva columna llamada "yellow" con todos los valores establecidos en 1
yellow_df = yellow_df.withColumn("yellow", lit(1))

# Muestra una vista previa del DataFrame actualizado
yellow_df.show(5)

+------------+------------+------------+------------+-----------+-----------+------+
|PULocationID|DOLocationID|payment_type|total_amount|pickup_date|pickup_hour|yellow|
+------------+------------+------------+------------+-----------+-----------+------+
|         161|         231|           1|       32.76| 2019-03-04|         11|     1|
|         239|         142|           2|         7.8| 2019-03-04|         12|     1|
|          43|          43|           1|       12.35| 2019-03-04|         13|     1|
|         239|         237|           2|        10.8| 2019-03-04|         13|     1|
|         162|         186|           1|       17.16| 2019-03-04|          8|     1|
+------------+------------+------------+------------+-----------+-----------+------+
only showing top 5 rows



### 2. Taxis verdes

### 2. 1 Aplicar la función ETL_taxis

In [33]:
# Taxis verdes 2019-03
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-03.parquet"
name = "green_2019-03.parquet"
bdate = '2019-03-04'
edate = '2019-03-10'

# Llama a tu función y nombra al dataframe
green_2019_03 = ETL_taxis(url, name, bdate, edate)

#------------------------------------------------------------------------------------------------

# Taxis verdes 2019-08
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-08.parquet"
name = "green_2019-08.parquet"
bdate = '2019-08-04'
edate = '2019-08-10'

# Llama a tu función y nombra al dataframe
green_2019_08 = ETL_taxis(url, name, bdate, edate)


#------------------------------------------------------------------------------------------------

# Taxis verdes 2019-12
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2019-12.parquet"
name = "green_2019-12.parquet"
bdate = '2019-12-19'
edate = '2019-12-26'

# Llama a tu función y nombra al dataframe
green_2019_12 = ETL_taxis(url, name, bdate, edate)

In [34]:
# Taxis verdes 2020-03
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2020-03.parquet"
name = "green_2020-03.parquet"
bdate = '2020-03-04'
edate = '2020-03-10'

# Llama a tu función y nombra al dataframe
green_2020_03 = ETL_taxis(url, name, bdate, edate)

#------------------------------------------------------------------------------------------------
# Taxis verdes 2020-08
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2020-08.parquet"
name = "green_2020-08.parquet"
bdate = '2020-08-04'
edate = '2020-08-10'

# Llama a tu función y nombra al dataframe
green_2020_08 = ETL_taxis(url, name, bdate, edate)

#------------------------------------------------------------------------------------------------

# Taxis verdes 2020-12
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2020-12.parquet"
name = "green_2020-12.parquet"
bdate = '2020-12-19'
edate = '2020-12-26'

# Llama a tu función y nombra al dataframe
green_2020_12 = ETL_taxis(url, name, bdate, edate)

In [35]:
# Taxis verdes 2021-03
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-03.parquet"
name = "green_2021-03.parquet"
bdate = '2021-03-04'
edate = '2021-03-10'

# Llama a tu función y nombra al dataframe
green_2021_03 = ETL_taxis(url, name, bdate, edate)

#------------------------------------------------------------------------------------------------
# Taxis verdes 2021-08
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-08.parquet"
name = "green_2021-08.parquet"
bdate = '2021-08-04'
edate = '2021-08-10'

# Llama a tu función y nombra al dataframe
green_2021_08 = ETL_taxis(url, name, bdate, edate)

#------------------------------------------------------------------------------------------------

# Taxis verdes 2021-12
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-12.parquet"
name = "green_2021-12.parquet"
bdate = '2021-12-19'
edate = '2021-12-26'

# Llama a tu función y nombra al dataframe
green_2021_12 = ETL_taxis(url, name, bdate, edate)

In [36]:
# Taxis verdes 2022-03
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-03.parquet"
name = "green_2022-03.parquet"
bdate = '2022-03-04'
edate = '2022-03-10'

# Llama a tu función y nombra al dataframe
green_2022_03 = ETL_taxis(url, name, bdate, edate)

#------------------------------------------------------------------------------------------------
# Taxis verdes 2022-08
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-08.parquet"
name = "green_2022-08.parquet"
bdate = '2022-08-04'
edate = '2022-08-10'

# Llama a tu función y nombra al dataframe
green_2022_08 = ETL_taxis(url, name, bdate, edate)

#------------------------------------------------------------------------------------------------

# Taxis verdes 2022-12
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-12.parquet"
name = "green_2022-12.parquet"
bdate = '2022-12-19'
edate = '2022-12-26'

# Llama a tu función y nombra al dataframe
green_2022_12 = ETL_taxis(url, name, bdate, edate)

### 2. 2 Unión y creación de columna "yellow"

In [37]:
# DataFrames con los nombres
dataframes = [green_2019_03, green_2019_08, green_2019_12,
              green_2020_03, green_2020_08, green_2020_12,
              green_2021_03, green_2021_08, green_2021_12,
              green_2022_03, green_2022_08, green_2022_12]

# Inicializa un DataFrame vacío con la misma estructura que los DataFrames originales
green_df = dataframes[0].filter("1=0")  # Utiliza una condición que siempre sea falsa

# Itera sobre los DataFrames y únelos uno debajo del otro
for df in dataframes:
    green_df = green_df.union(df)

In [38]:
# Cuenta la cantidad de filas en el DataFrame selec_columns
cantidad_de_filas = green_df.count()

# Muestra la cantidad de filas
print(f"El DataFrame contiene {cantidad_de_filas} filas.")

El DataFrame contiene 1180 filas.


In [39]:
# Agrega una nueva columna llamada "yellow" con todos los valores establecidos en 0
green_df = green_df.withColumn("yellow", lit(0))

# Muestra una vista previa del DataFrame actualizado
green_df.show(5)

+------------+------------+------------+------------+-----------+-----------+------+
|PULocationID|DOLocationID|payment_type|total_amount|pickup_date|pickup_hour|yellow|
+------------+------------+------------+------------+-----------+-----------+------+
|          67|          11|           1|         9.3| 2019-03-04|         15|     0|
|          82|         138|           2|        33.8| 2019-03-04|         15|     0|
|          65|         150|           1|        57.3| 2019-03-04|         16|     0|
|         188|          56|           1|       38.65| 2019-03-04|         16|     0|
|           7|         223|           2|        16.8| 2019-03-04|         17|     0|
+------------+------------+------------+------------+-----------+-----------+------+
only showing top 5 rows



## Dataset final "Taxis_df"

In [40]:
# Combina (une) los DataFrames uno encima del otro
Taxis_df = green_df.union(yellow_df)

In [41]:
# Cuenta la cantidad de filas en el DataFrame selec_columns
cantidad_de_filas = Taxis_df.count()

# Muestra la cantidad de filas
print(f"El DataFrame contiene {cantidad_de_filas} filas.")

El DataFrame contiene 2442 filas.


## Guardar el dataset como "**Taxis_NY.csv**"

In [42]:
# Convierte el DataFrame de PySpark en un Pandas DataFrame
Taxis_df = Taxis_df.toPandas()

In [43]:
# Guarda el Pandas DataFrame en un solo archivo CSV
Taxis_df.to_csv('Datasets/Outputs/Taxis_NY.csv', index=False)

## Detener la sesión de pyspark

In [44]:
spark.stop()


## Etapa 3, ETL de "Taxis_NY.csv"

In [45]:
Taxis_df = pd.read_csv('Datasets/Outputs/Taxis_NY.csv')

In [46]:
Taxis_df.describe()

Unnamed: 0,PULocationID,DOLocationID,payment_type,total_amount,pickup_hour,yellow
count,2442.0,2442.0,2188.0,2442.0,2442.0,2442.0
mean,134.551188,148.078215,1.325411,19.893612,13.810811,0.51679
std,73.285114,73.898133,0.538549,15.065554,3.9186,0.49982
min,3.0,1.0,0.0,-54.05,6.0,0.0
25%,74.0,78.0,1.0,11.0,10.0,0.0
50%,134.0,145.0,1.0,15.36,14.0,1.0
75%,191.0,229.0,2.0,22.8375,17.0,1.0
max,265.0,265.0,4.0,150.1,20.0,1.0


In [47]:
# Valores nulos
null_counts = Taxis_df.isnull().sum()
null_counts

PULocationID      0
DOLocationID      0
payment_type    254
total_amount      0
pickup_date       0
pickup_hour       0
yellow            0
dtype: int64

### Columnas "PULocationID"	"DOLocationID"

### 1. Datos atípicos. Verificación de valores entre 1 y 263

In [48]:
# Cuenta la cantidad de registros fuera de las zonas de New York (1-263)
def outliers_1(n , N):
  zone = range(n, N+1)
  registros_fuera_de_rango = 0

  for index, row in Taxis_df.iterrows():
      if row['PULocationID'] not in zone or row['DOLocationID'] not in zone:
          registros_fuera_de_rango += 1

  print(f"Total de registros fuera del rango ({n}, {N}): {registros_fuera_de_rango}")

In [49]:
outliers_1(1, 263)

Total de registros fuera del rango (1, 263): 25


no se eliminarán estos registros ya que contiene información relevante en el resto de las columnas

### Columna "payment_type"

### 1. Datos atípicos. verificación de valores entre 1 y 6

In [50]:
# Valores únicos
Taxis_df['payment_type'].unique()

array([ 1.,  2., nan,  3.,  4.,  0.])

In [51]:
# Reemplaza los valores NaN en la columna 'payment_type' por 5 (valor desconocido)
Taxis_df['payment_type'].fillna(5, inplace=True)

### 2. Cambiar a str

In [52]:
# De float a str
Taxis_df['payment_type'] = Taxis_df['payment_type'].astype(str)

### Columna "total_amount"

### 1. Datos atípicos. Corrección de valores negativos

In [53]:
Taxis_df[Taxis_df['total_amount'] < 1]

Unnamed: 0,PULocationID,DOLocationID,payment_type,total_amount,pickup_date,pickup_hour,yellow
429,193,7,1.0,0.0,2020-08-08,13,0
707,255,255,3.0,-5.8,2021-08-06,18,0
1101,193,193,1.0,0.0,2022-12-20,14,0
1162,41,41,3.0,0.0,2022-12-25,19,0
1562,141,236,4.0,-10.8,2020-03-07,18,1
1790,43,236,3.0,-7.8,2020-12-24,15,1
1816,193,193,3.0,-3.3,2021-03-04,11,1
2073,132,132,4.0,-54.05,2021-12-22,7,1


In [54]:
Taxis_df['total_amount'] = Taxis_df['total_amount'].abs()

### Columna "pickup_date"

### 1. Creación de Columnas "pickup_year" , "pickup_month" , "pickup_day"

In [55]:
# Divide la columna 'pickup_date' en tres partes: año, mes y día
Taxis_df[['pickup_year', 'pickup_month', 'pickup_day']] = Taxis_df['pickup_date'].str.split('-', expand=True)

# Convierte las nuevas columnas en enteros
Taxis_df['pickup_year'] = Taxis_df['pickup_year'].astype(int)
Taxis_df['pickup_month'] = Taxis_df['pickup_month'].astype(int)
Taxis_df['pickup_day'] = Taxis_df['pickup_day'].astype(int)

In [56]:
# Elimina la columna "pickup_date"
Taxis_df.drop(labels = "pickup_date" , axis = 1 , inplace = True)

In [57]:
Taxis_df.head(3)

Unnamed: 0,PULocationID,DOLocationID,payment_type,total_amount,pickup_hour,yellow,pickup_year,pickup_month,pickup_day
0,67,11,1.0,9.3,15,0,2019,3,4
1,82,138,2.0,33.8,15,0,2019,3,4
2,65,150,1.0,57.3,16,0,2019,3,4


### 2. Detección datos atípicos de "pickup_year" , "pickup_month" , "pickup_day"

In [58]:
# Función para detectar años y dias fuera de rango
def outliers_2(col, n , N):
  zone = range(n, N+1)
  registros_fuera_de_rango = 0

  for index, row in Taxis_df.iterrows():
      if row[col] not in zone:
          registros_fuera_de_rango += 1

  print(f"Total de registros fuera del rango ({n}, {N}): {registros_fuera_de_rango}")

In [59]:
# Detección de años fuera de rango
outliers_2('pickup_year' , 2020, 2023)

Total de registros fuera del rango (2020, 2023): 604


In [60]:
# Detección de meses fuera de rango
def outliers_month(column_name, valid_months):
    records_outside_range = 0

    for index, row in Taxis_df.iterrows():
        if row[column_name] not in valid_months:
            records_outside_range += 1

    print(f"Total de registros fuera del rango: {records_outside_range}")

list_months = [3, 8, 12]
outliers_month('pickup_month', list_months)

Total de registros fuera del rango: 0


In [61]:
# Detección de dias fuera de rango
outliers_2('pickup_day', 1 , 31)

Total de registros fuera del rango (1, 31): 0


### Columna "pickup_hour"

### 1. Detección datos atípicos

In [62]:
# Detección de horas fuera de rango
def outliers_2(col, n , N):
  zone = range(n, N+1)
  registros_fuera_de_rango = 0

  for index, row in Taxis_df.iterrows():
      if row[col] not in zone:
          registros_fuera_de_rango += 1

  print(f"Total de registros fuera del rango ({n}, {N}): {registros_fuera_de_rango}")

outliers_2('pickup_hour', 6 , 20)

Total de registros fuera del rango (6, 20): 0


### Ordenar las columnas

In [63]:
# Obtener el nombre de las columnas
columns = Taxis_df.columns.tolist()

# Mover la columna "yellow" al principio
columns.insert(0, columns.pop(columns.index("yellow")))

# Mover la columna "pickup_hour" al final
columns.append(columns.pop(columns.index("pickup_hour")))

# Reorganizar las columnas del DataFrame
Taxis_df = Taxis_df[columns]

### Dataset Final "**Taxis_NY.csv**"

In [64]:
# Agregar un índice numérico predeterminado
Taxis_df.set_index(pd.RangeIndex(start=0, stop=len(Taxis_df)), inplace=True)

# Convertir el índice en una columna
Taxis_df.reset_index(inplace=True)

# Cambiar el nombre de la columna de índice a "ID"
Taxis_df.rename(columns={'index': 'ID_taxis'}, inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  Taxis_df.rename(columns={'index': 'ID_taxis'}, inplace=True)


In [65]:
Taxis_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2442 entries, 0 to 2441
Data columns (total 10 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ID_taxis      2442 non-null   int64  
 1   yellow        2442 non-null   int64  
 2   PULocationID  2442 non-null   int64  
 3   DOLocationID  2442 non-null   int64  
 4   payment_type  2442 non-null   object 
 5   total_amount  2442 non-null   float64
 6   pickup_year   2442 non-null   int64  
 7   pickup_month  2442 non-null   int64  
 8   pickup_day    2442 non-null   int64  
 9   pickup_hour   2442 non-null   int64  
dtypes: float64(1), int64(8), object(1)
memory usage: 190.9+ KB


In [66]:
Taxis_df.tail(5)

Unnamed: 0,ID_taxis,yellow,PULocationID,DOLocationID,payment_type,total_amount,pickup_year,pickup_month,pickup_day,pickup_hour
2437,2437,1,229,170,1.0,18.0,2022,12,26,19
2438,2438,1,161,230,1.0,28.55,2022,12,26,19
2439,2439,1,186,163,0.0,28.21,2022,12,20,13
2440,2440,1,232,91,0.0,88.67,2022,12,20,16
2441,2441,1,116,41,0.0,21.38,2022,12,23,20


In [67]:
# Sobreescribir el archivo Taxis_NY.csv
Taxis_df.to_csv('Datasets/Outputs/Taxis_NY.csv', index=False)

### Dataset Complemetario "**Year.csv**"

In [68]:
year_data = {'year': [2019, 2020, 2021, 2022]}
year_df = pd.DataFrame(year_data)

In [69]:
# Guarda el DataFrame en un archivo CSV llamado "year.csv"
year_df.to_csv('Datasets/Outputs/year.csv', index=False)