# Data Loading, Filtering, and Intermediate File Creation
This notebook demonstrates how to load and filter partitioned Parquet files using Polars with lazy evaluation, and save the filtered data to an intermediate file.


## Step 1: Install Required Packages
Ensure you have the required packages installed.

In [1]:
!pip install geopandas polars



## Step 2: Import Libraries

In [1]:
import geopandas as gpd
import polars as pl
from IPython.display import display, Markdown

## Step 3: Load and Process Shapefile
Load the shapefile containing the districts information and calculate the centroids.

In [4]:
def load_and_process_shapefile(shapefile_path, output_path):
    try:
        gdf = gpd.read_file(shapefile_path)
        gdf = gdf.to_crs(epsg=4326)  # Convert to WGS84
        # gdf['centroid'] = gdf.geometry.centroid
        gdf['lat'] = gdf.centroid.y
        gdf['lng'] = gdf.centroid.x
        # gdf['geometry_wkt'] = gdf.geometry.apply(lambda x: x.wkt)
        gdf = gdf.drop(columns=['geometry', 'centroid'])
        gdf.to_parquet(output_path)
        display(Markdown("### Shapefile loaded, centroids calculated, and saved successfully as Parquet."))
        return output_path
    except Exception as e:
        display(Markdown(f"### Error loading and processing shapefile: {e}"))
        return None

shapefile_path = '/home/nls/data/mitma/geo/zonificacion-distritos/distritos_mitma.shp'
output_path = '/home/nls/data/mitma/geo/processed_distritos.parquet'
processed_shapefile_path = load_and_process_shapefile(shapefile_path, output_path)


  gdf['lat'] = gdf.centroid.y

  gdf['lng'] = gdf.centroid.x


### Error loading and processing shapefile: "['centroid'] not found in axis"

## Step 4: Load and Filter Data
Using Polars lazy evaluation, perform joins with the centroids and filter rows with valid coordinates.


In [5]:
def load_data(parquet_path, shapefile_path):
    try:
        # Load the shapefile as a Polars DataFrame
        centroides_df = pl.read_parquet(shapefile_path)
        centroides_df = centroides_df.with_columns([
            pl.col("ID").alias("origen"),
            pl.col("ID").alias("destino")
        ])
        centroides_lazy = centroides_df.lazy()

        # Lazy load the Parquet files
        viajes_lazy = pl.scan_parquet(parquet_path + '/*/*.parquet')

        display(Markdown("### Data loaded successfully."))
        return viajes_lazy, centroides_lazy
    except Exception as e:
        display(Markdown(f"### Error loading data: {e}"))
        return None, None

def filter_valid_rows(viajes_lazy, centroides_lazy, max_rows=100000):
    try:
        # Join origin centroids
        origin_centroids = centroides_lazy#.rename({'ID': 'origin'})
        viajes_lazy = viajes_lazy.join(origin_centroids, on='origen', how='left')
        viajes_lazy = viajes_lazy.rename({'lat': 'origen_lat', 'lng': 'origen_lng'})

        # Join destination centroids
        destination_centroids = centroides_lazy#.rename({'ID': 'destination'})
        viajes_lazy = viajes_lazy.join(destination_centroids, on='destino', how='left')
        viajes_lazy = viajes_lazy.rename({'lat': 'destino_lat', 'lng': 'destino_lng'})

        # Drop extra geometries columns
        viajes_lazy = viajes_lazy.drop(columns=['geometry_wkt', 'geometry_wkt_right'])

        # Filter rows with valid coordinates and limit to max_rows
        valid_rows_lazy = viajes_lazy.filter(
            (pl.col('origen_lat').is_not_null()) & 
            (pl.col('origen_lng').is_not_null()) & 
            (pl.col('destino_lat').is_not_null()) & 
            (pl.col('destino_lng').is_not_null())
        ).filter(pl.col('origen') != pl.col('destino')).limit(max_rows)

        display(Markdown(f"### Filtered and limited to {max_rows} valid rows."))
        return valid_rows_lazy
    except Exception as e:
        display(Markdown(f"### Error filtering valid rows: {e}"))
        return None

parquet_path = '/home/nls/data/mitma/parquet/viajes_distritos'
shapefile_path = '/home/nls/data/mitma/geo/processed_distritos.parquet'
viajes_lazy, centroides_lazy = load_data(parquet_path, shapefile_path)

if viajes_lazy is not None and centroides_lazy is not None:
    valid_rows_lazy = filter_valid_rows(viajes_lazy, centroides_lazy, 500000)

    if valid_rows_lazy is not None:
        try:
            # Collect the filtered and limited result
            valid_rows_df = valid_rows_lazy.collect(streaming=True)
            print("Filtered and limited dataset collected successfully:")
            # print(valid_rows_df)

            valid_rows_df.write_parquet('/home/nls/data/mitma/geo/viajes_distritos500k_od.parquet')
            display(Markdown("### Filtered and limited dataset collected and saved successfully."))

        except Exception as e:
            print(f"An error occurred during data collection: {e}")
    else:
        display(Markdown("### No valid rows to process."))
else:
    display(Markdown("### No data to process."))

### Data loaded successfully.

  viajes_lazy = viajes_lazy.join(origin_centroids, on='origen', how='left')
  viajes_lazy = viajes_lazy.join(destination_centroids, on='destino', how='left')
  viajes_lazy = viajes_lazy.drop(columns=['geometry_wkt', 'geometry_wkt_right'])


### Filtered and limited to 500000 valid rows.

Filtered and limited dataset collected successfully:


### Filtered and limited dataset collected and saved successfully.