In [11]:
%pip install requests
%pip install pandas 
%pip install bs4
%pip install pyarrow
%pip install fastparquet

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


Obtenemos y descargamos los archivos .parquet necesarios y los combinamos en un mismo archivo .csv

In [10]:
import requests
import pandas as pd
import os
from bs4 import BeautifulSoup
from urllib.parse import urljoin

# URL base
base_url = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

# Años deseados
years = range(2020, 2025)

# Directorios
parquet_dir = "parquet_files"
os.makedirs(parquet_dir, exist_ok=True)
output_csv = "combined_data.csv"

def download_parquet_files(year, url, taxi_type="yellow"):
    try:
        response = requests.get(url)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, "html.parser")
        links = soup.find_all("a", href=lambda href: href and href.endswith(".parquet"))

        for link in links:
            if taxi_type in link["href"] and str(year) in link["href"]:
                file_url = urljoin(url, link["href"])
                filename = os.path.join(parquet_dir, os.path.basename(file_url))

                if os.path.exists(filename):
                    print(f"[SKIP] Ya existe: {filename}")
                    continue

                print(f"[DESCARGANDO] {file_url}")
                r = requests.get(file_url, stream=True)
                with open(filename, 'wb') as f:
                    for chunk in r.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)

    except requests.exceptions.RequestException as e:
        print(f"Error al acceder a {url}: {e}")
    except Exception as e:
        print(f"Error inesperado: {e}")

# Tipo de taxi
taxi_type = input("¿Qué tipo de taxi quieres descargar? (yellow/green): ").strip().lower()

# Descargar archivos
for year in years:
    download_parquet_files(year, base_url, taxi_type)

# Validar columnas consistentes (normalizando nombres a minúsculas)
columns_set = None
valid_files = []
for filename in os.listdir(parquet_dir):
    if filename.endswith(".parquet"):
        filepath = os.path.join(parquet_dir, filename)
        try:
            df = pd.read_parquet(filepath, engine="pyarrow")
            df.columns = [col.lower() for col in df.columns]  # normaliza nombres
            file_columns = set(df.columns)
            del df

            if columns_set is None:
                columns_set = file_columns
            elif file_columns != columns_set:
                print(f"⚠️ Columnas diferentes en: {filename}. Saltando este archivo.")
                continue

            valid_files.append(filepath)

        except Exception as e:
            print(f"❌ Error al leer {filename}: {e}")

# Escribir CSV en modo streaming
if valid_files:
    with open(output_csv, 'w', newline='', encoding='utf-8') as f_out:
        first = True
        for filepath in sorted(valid_files):
            print(f"Procesando {filepath}")
            try:
                df = pd.read_parquet(filepath, engine="pyarrow")
                df.to_csv(f_out, index=False, header=first)
                first = False
            except Exception as e:
                print(f"❌ Error al procesar {filepath}: {e}")

    print(f"✅ Datos combinados guardados en: {output_csv}")
else:
    print("⚠️ No se encontraron archivos .parquet válidos para procesar.")



[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-01.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-02.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-03.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-04.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-05.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-06.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-07.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-08.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-09.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-10.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-11.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2020-12.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2021-01.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2021-02.parquet
[SKIP] Ya existe: parquet_files/yellow_tripdata_2021-03.parquet
[SKIP] Ya existe: parquet_files/yellow_t

Dividimos los datos en varios csv ya que databricks no permite subir archivos tan grandes, 2gb max.

In [1]:
import os

input_csv = "combined_data.csv"
output_dir = "csv_split"
os.makedirs(output_dir, exist_ok=True)

max_chunks = 10
target_chunk_size = os.path.getsize(input_csv) // max_chunks  # tamaño en bytes aprox. por archivo

with open(input_csv, 'r', encoding='utf-8') as f_in:
    header = f_in.readline()
    chunk_idx = 1
    chunk_lines = []
    current_chunk_size = 0

    for line in f_in:
        chunk_lines.append(line)
        current_chunk_size += len(line.encode('utf-8'))

        if current_chunk_size >= target_chunk_size:
            chunk_path = os.path.join(output_dir, f"chunk_{chunk_idx:02}.csv")
            with open(chunk_path, 'w', encoding='utf-8') as f_out:
                f_out.write(header)
                f_out.writelines(chunk_lines)
            print(f"✅ Guardado: {chunk_path}")

            chunk_lines = []
            current_chunk_size = 0
            chunk_idx += 1

    # Último chunk si queda algo pendiente
    if chunk_lines:
        chunk_path = os.path.join(output_dir, f"chunk_{chunk_idx:02}.csv")
        with open(chunk_path, 'w', encoding='utf-8') as f_out:
            f_out.write(header)
            f_out.writelines(chunk_lines)
        print(f"✅ Guardado final: {chunk_path}")


✅ Guardado: csv_split/chunk_01.csv
✅ Guardado: csv_split/chunk_02.csv
✅ Guardado: csv_split/chunk_03.csv
✅ Guardado: csv_split/chunk_04.csv
✅ Guardado: csv_split/chunk_05.csv
✅ Guardado: csv_split/chunk_06.csv
✅ Guardado: csv_split/chunk_07.csv
✅ Guardado: csv_split/chunk_08.csv
✅ Guardado: csv_split/chunk_09.csv
✅ Guardado final: csv_split/chunk_10.csv
