In [3]:
# !pip install polars geopy
import polars as pl
from geopy.distance import distance as geodesic_distance
from IPython.display import display, Markdown

def md(text):
    display(Markdown(text))

In [48]:
# CONST
MIN_STOPS = 3

itineraries_cols = [
    "legId",
    "startingAirport",
    "destinationAirport",
    "travelDuration",
    "totalFare",
    "totalTravelDistance",
    "segmentsArrivalAirportCode",
]
sample = pl.read_csv("itineraries_100k.csv", separator=",").select(
    itineraries_cols
)


airports = pl.read_csv("airports-codepublic.csv", separator=";").select(
    ["Airport Code", "Latitude", "Longitude"]
)


def run_query(query: callable, *args, batch_size=None):
    """
    Run a query on itineraries with args and
    batch_size mb of memory
    """
    if batch_size is not None:
        row_size = sample.estimated_size('mb') / sample.height
        batch_size = int(batch_size / row_size)
        print(f"Rows per batch: {batch_size}")
        !head -n {batch_size} itineraries.csv > itineraries_tmp.csv
        file_name = "itineraries_tmp.csv"
    else:
        file_name = "itineraries.csv"
    res: pl.LazyFrame = query(
        pl.scan_csv(file_name, separator=",").select(
            itineraries_cols
        ),
        *args,
    )
    return res

def dump_query(query: callable, *args, batch_size=None):
    run_query(query, *args, batch_size=batch_size).sink_csv(query.__name__ + ".csv")

def collect_query(query: callable, *args, batch_size=None):
    return run_query(query, *args, batch_size=batch_size).collect()

def count_stops(segments: str):
    return len(segments.split("||")) - 1

def filter_by_stops(itineraries: pl.DataFrame, min_stops: int):
    """
    Filter itineraries by minimum number of stops
    """
    return itineraries.filter(
        pl.col("segmentsArrivalAirportCode").map_elements(count_stops, return_dtype=int) >= min_stops
    )

def remove_last_stop(segments: str):
    return ",".join(segments.split("||")[:-1]) or ""
def add_stops_codes(itineraries: pl.DataFrame):
    """
    Add a column with the list of stops codes
    """
    return itineraries.with_columns(
        pl.col("segmentsArrivalAirportCode")
        .map_elements(remove_last_stop, return_dtype=str)
        .alias("stopsAirportCode")
    )

In [49]:
md(f"""
- **Query 1**

Obtener el ID, trayecto, precio y escalas de vuelos con {MIN_STOPS} o más escalas.
""")


- **Query 1**

Obtener el ID, trayecto, precio y escalas de vuelos con 3 o más escalas.


In [50]:
def query1(itineraries: pl.DataFrame, min_stops: int = MIN_STOPS):
    print(f"Query 1: {min_stops} stops")
    return (
        itineraries.pipe(filter_by_stops, min_stops)
        .pipe(add_stops_codes)
        .select(
            [
                "legId",
                "startingAirport",
                "destinationAirport",
                "totalFare",
                "stopsAirportCode",
            ]
        )
    )

In [55]:
dump_query(query1)

Query 1: 3 stops


In [38]:
md(
    f"""
- **Query 2**

Obtener el ID y trayecto de vuelos cuya deistancia total
sea mayor al doble de la distancia directa entre puntos
origen-destino.
"""
)


- **Query 2**

Obtener el ID y trayecto de vuelos cuya deistancia total
sea mayor al doble de la distancia directa entre puntos
origen-destino.


In [53]:
def query2(itineraries: pl.DataFrame):
    def calculate_distance(row: pl.Struct):
        lat_start = row["Latitude_start"]
        lon_start = row["Longitude_start"]
        lat_dest = row["Latitude_dest"]
        lon_dest = row["Longitude_dest"]
        return geodesic_distance(
            (lat_start, lon_start), (lat_dest, lon_dest)
        ).km

    airports_tmp = pl.scan_csv("airports-codepublic.csv", separator=";").select(
            ["Airport Code", "Latitude", "Longitude"]
        )
    return itineraries.join(
        airports_tmp,
        left_on="startingAirport",
        right_on="Airport Code",
        how="inner",
        ).with_columns(
            pl.col("Latitude").alias("Latitude_start"),
            pl.col("Longitude").alias("Longitude_start"),
        ).join(
        airports_tmp,
        left_on="destinationAirport",
        right_on="Airport Code",
        how="inner",
    ).with_columns(
        pl.col("Latitude").alias("Latitude_dest"),
        pl.col("Longitude").alias("Longitude_dest"),
    ).with_columns(
        pl.struct([
            "Latitude_start",
            "Longitude_start",
            "Latitude_dest",
            "Longitude_dest",
        ]).map_elements(calculate_distance, return_dtype=float).alias("distance")
    ).filter(pl.col("distance") * 2 < pl.col("totalTravelDistance")).select(
        ["legId", "startingAirport", "destinationAirport"]
    )

In [54]:
dump_query(query2, batch_size=10)

Rows per batch: 96986


In [23]:
md(f"""- Query 3

Obtener el ID, trayecto, escalas y duración para los 2 vuelos
de cada trayecto con menor duración entre todos los vuelos de
{MIN_STOPS} escalas o más.
""")

- Query 3

Obtener el ID, trayecto, escalas y duración para los 2 vuelos
de cada trayecto con menor duración entre todos los vuelos de
3 escalas o más.


In [51]:
def query3(itineraries: pl.DataFrame, min_stops: int = MIN_STOPS):
    return (
        itineraries.pipe(filter_by_stops, min_stops)
        .pipe(add_stops_codes)
        .select(  # try commenting this line
            [
                "legId",
                "startingAirport",
                "destinationAirport",
                "travelDuration",
                "stopsAirportCode",
            ]
        )
        .group_by(["startingAirport", "destinationAirport"])
        .agg([pl.min("travelDuration")])
    )

In [53]:
res = collect_query(query3)
res.head(4)

startingAirport,destinationAirport,travelDuration
str,str,str


In [None]:
md(f"""- Query 4

El precio promedio y máximo trayecto de los
vuelos con precio mayor a la media general de precios.
""")