# Add 'i95_distance', 'station_distance' and 'station_id" to Ebird datasets
- To used with datasets pre-cleaned using AWS SageMaker Canvas

In [None]:
# mount google drive
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
# # required installs
# !pip install geopandas shapely
# !pip install --upgrade pandas numpy

Collecting pandas
  Downloading pandas-2.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
Collecting numpy
  Downloading numpy-2.3.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (62 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.1/62.1 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
Downloading pandas-2.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.4/12.4 MB[0m [31m104.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading numpy-2.3.0-cp311-cp311-manylinux_2_28_x86_64.whl (16.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.9/16.9 MB[0m [31m88.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: numpy, pandas
  Attempting uninstall: numpy
    Found existing installation: numpy 2.0.2
    

In [None]:
# required imports
import pandas as pd
import numpy as np
import geopandas as gpd
import zipfile
import os
import logging
import time
import gc
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
from datetime import datetime
from shapely.geometry import Point, LineString
from shapely.ops import nearest_points
from typing import Optional
from geopy.distance import geodesic
from typing import Optional, List, Dict, Any

import warnings
warnings.filterwarnings('ignore')

scipy library requires an older numpy version. Unfortunately, using the old numpy inhibits the use of pandas.  Run the following only if calculating

In [None]:
# ## Run this first and restart the notebook - resolves scipy-numpy discrepency ###
# !pip uninstall -y numpy scipy
# !pip install numpy==1.24.4 scipy==1.10.1

# # Running this cell will require you to restart the session.

In [None]:
# # imports for
# from scipy.spatial.distance import cdist

### Dataset check

In [None]:
# data read
vs = pd.read_parquet("/content/VA_test.parquet")

In [None]:
vs.shape

(23153, 32)

In [None]:
vs.columns

Index(['state_code', 'station_id', 'year_record', 'month_record', 'day_record',
       'day_of_week', 'latitude_0', 'longitude_0', 'station_location', 'state',
       'daily_avg_noise', 'peak_hour_noise', 'overnight_noise',
       'rush_hour_noise', 'total_daily_volume', 'OBSERVATION COUNT',
       'STATE CODE', 'LOCALITY TYPE', 'LATITUDE_1', 'LONGITUDE_1',
       'OBSERVATION DATE', 'OBSERVATION DATE_year', 'OBSERVATION DATE_month',
       'OBSERVATION DATE_day', 'TimeObservationStarted_hour',
       'TimeObservationStarted_minute', 'distance_to_station_160005',
       'distance_to_station_140004', 'distance_to_station_160308',
       'distance_to_station_060170', 'distance_to_station_792625',
       'assigned_station'],
      dtype='object')

In [None]:
print(vs.head())

   state_code  station_id  year_record  month_record  day_record  day_of_week  \
0          51      160005         2023             6          30            6   
1          51      160005         2023             6          29            5   
2          51      160005         2023             6          28            4   
3          51      160005         2023             6          27            3   
4          51      160005         2023             6          26            2   

   latitude_0  longitude_0                                   station_location  \
0    38.13873    -77.50837  5.85 S RAMP FR RT 1                           ...   
1    38.13873    -77.50837  5.85 S RAMP FR RT 1                           ...   
2    38.13873    -77.50837  5.85 S RAMP FR RT 1                           ...   
3    38.13873    -77.50837  5.85 S RAMP FR RT 1                           ...   
4    38.13873    -77.50837  5.85 S RAMP FR RT 1                           ...   

  state  ...  OBSERVATION 

In [None]:
vs.LATITUDE_1.info()

<class 'pandas.core.series.Series'>
RangeIndex: 23153 entries, 0 to 23152
Series name: LATITUDE_1
Non-Null Count  Dtype  
--------------  -----  
23153 non-null  float64
dtypes: float64(1)
memory usage: 181.0 KB


### Calculates distance between bird observation point to I95 - nearest point on a line between the two nearest I95 geo coordinates.
Adds 'i95_distance' to Ebirds dataset.

In [None]:
# # Read in Ebird data if not too large or loa
# birds = pd.read_csv('DATASET', nrows=1000)
# print(birds.shape)
# print(birds.columns)
# print(birds.head())

In [None]:
i95_coordinates = pd.read_csv('./drive/MyDrive/Capstone/i95_modified.csv')
print(i95_coordinates.shape)
print(i95_coordinates.columns)
print(i95_coordinates.head())

(55654, 10)
Index(['Way_ID', 'Segment_Number', 'Point_Order', 'Longitude', 'Latitude',
       'Highway_Type', 'Route_Ref', 'Max_Speed', 'Combined_Path',
       'Overall_Sequence'],
      dtype='object')
     Way_ID  Segment_Number  Point_Order  Longitude   Latitude Highway_Type  \
0  way/2059               0            0 -77.451046  37.680565     motorway   
1  way/2059               0            1 -77.451240  37.681075     motorway   
2  way/2059               0            2 -77.451470  37.681770     motorway   
3  way/2059               0            3 -77.451679  37.682531     motorway   
4  way/2059               0            4 -77.451867  37.683562     motorway   

  Route_Ref Max_Speed Combined_Path  Overall_Sequence  
0      I 95    65 mph           0_0                 0  
1      I 95    65 mph           0_1                 1  
2      I 95    65 mph           0_2                 2  
3      I 95    65 mph           0_3                 3  
4      I 95    65 mph           0_4       

In [None]:
i95_sorted = i95_coordinates.sort_values(['Overall_Sequence'])
i95_coords = list(zip(i95_sorted['Latitude'], i95_sorted['Longitude']))

In [26]:

# import logging
# import pandas as pd
# import geopandas as gpd
# import numpy as np
# from shapely.geometry import Point, LineString
# import pyarrow as pa
# import pyarrow.parquet as pq
# from pathlib import Path
# import gc
# from typing import Optional, List, Dict, Any

### Modify file names before running ###
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class OptimizedParquetBatchProcessor:
    """
    Heavily optimized processor for large parquet files using GeoPandas, spatial indexing,
    and vectorized operations with memory-efficient parquet streaming.
    Expected 10-100x performance improvement with better memory management.
    """

    def __init__(self,
                 input_file: str = "cleaned_ebird_file.pq",
                 output_file: str = "new_file.pq",
                 batch_size: int = 50000,
                 distance_threshold: Optional[float] = None,
                 i95_coords: Optional[List] = None,
                #  columns_to_keep: Optional[List[str]] = None,
                 use_compression: str = 'snappy'):

        self.input_file = Path(input_file)
        self.output_file = Path(output_file)
        self.batch_size = batch_size
        self.distance_threshold = distance_threshold
        # self.columns_to_keep = columns_to_keep
        self.use_compression = use_compression

        # Validate input file
        if not self.input_file.exists():
            raise FileNotFoundError(f"Input file not found: {input_file}")

        # Setup parquet file metadata
        self._setup_parquet_metadata()

        # Pre-process I-95 coordinates into optimized spatial structures
        self._setup_highway_geometry(i95_coords)

        # Statistics tracking
        self.total_rows_processed = 0
        self.total_rows_saved = 0
        self.batch_count = 0
        self.total_file_size = 0

    def _setup_parquet_metadata(self):
        """Get parquet file metadata for optimization"""
        try:
            # Read parquet metadata
            parquet_file = pq.ParquetFile(self.input_file)
            self.parquet_metadata = parquet_file.metadata
            self.parquet_schema = parquet_file.schema
            self.total_rows = self.parquet_metadata.num_rows

            logger.info(f"Parquet file info:")
            logger.info(f"  Total rows: {self.total_rows:,}")
            logger.info(f"  Number of row groups: {self.parquet_metadata.num_row_groups}")
            logger.info(f"  Columns: {len(self.parquet_schema)}")

            # Get file size
            self.total_file_size = self.input_file.stat().st_size / (1024**3)  # GB
            logger.info(f"  File size: {self.total_file_size:.2f} GB")

        except Exception as e:
            logger.error(f"Error reading parquet metadata: {e}")
            raise

    def _setup_highway_geometry(self, i95_coords):
        """Convert I-95 coordinates to optimized spatial structures"""
        if not i95_coords:
            raise ValueError("I-95 coordinates must be provided")

        logger.info("Setting up highway geometry with spatial indexing...")

        # Create LineString geometry from coordinates
        # i95_coords are (lat,lon) but LineString expects (lon,lat), so swap them
        self.highway_line = LineString([(lon, lat) for lat, lon in i95_coords])

        # Create GeoDataFrame for the highway with spatial index
        highway_gdf = gpd.GeoDataFrame([1], geometry=[self.highway_line], crs='EPSG:4326')

        # Convert to projected CRS for accurate distance calculations (UTM Zone 18N)
        self.highway_gdf_projected = highway_gdf.to_crs('EPSG:32618')
        self.highway_line_projected = self.highway_gdf_projected.geometry.iloc[0]

        logger.info("Highway geometry setup complete")

    def calculate_distances_vectorized(self, obs_gdf: gpd.GeoDataFrame) -> np.ndarray:
        """
        Vectorized distance calculation using GeoPandas
        This is the key optimization - processes all points at once
        """
        # Project observations to same CRS as highway (UTM Zone 18N)
        obs_projected = obs_gdf.to_crs('EPSG:32618')

        # Vectorized distance calculation to highway line
        distances_meters = obs_projected.geometry.distance(self.highway_line_projected)

        # Convert meters to miles
        distances_miles = distances_meters * 0.000621371

        return distances_miles.values

    def process_batch_optimized(self, batch_df: pd.DataFrame) -> pd.DataFrame:
        """Optimized batch processing using vectorized operations"""

        # Early return if empty batch
        if batch_df.empty:
            return pd.DataFrame()

        # Filter out rows with invalid coordinates early
        valid_coords = batch_df.dropna(subset=['LATITUDE_1', 'LONGITUDE_1'])

        if len(valid_coords) == 0:
            logger.warning("No valid coordinates in batch")
            return pd.DataFrame()

        # Convert to numeric and filter realistic coordinate ranges
        valid_coords = valid_coords.copy()

        # Use more efficient numeric conversion
        coord_cols = ['LATITUDE_1', 'LONGITUDE_1']
        for col in coord_cols:
            if valid_coords[col].dtype == 'object':
                valid_coords[col] = pd.to_numeric(valid_coords[col], errors='coerce')

        # Filter to reasonable coordinate bounds (roughly continental US)
        coord_filter = (
            (valid_coords['LATITUDE_1'].between(24, 50)) &
            (valid_coords['LONGITUDE_1'].between(-130, -65))
        )
        valid_coords = valid_coords[coord_filter]

        if len(valid_coords) == 0:
            logger.warning("No valid coordinates after filtering")
            return pd.DataFrame()

        # Create GeoDataFrame from observations
        geometry = gpd.points_from_xy(valid_coords['LONGITUDE_1'], valid_coords['LATITUDE_1'])
        obs_gdf = gpd.GeoDataFrame(valid_coords, geometry=geometry, crs='EPSG:4326')

        # Calculate distances using vectorized operation
        distances = self.calculate_distances_vectorized(obs_gdf)

        # Add distances to dataframe
        result_df = valid_coords.copy()
        result_df['i95_distance'] = distances

        # Apply distance filter if specified
        if self.distance_threshold is not None:
            result_df = result_df[result_df['i95_distance'] <= self.distance_threshold]

        # # Select only specified columns if provided
        # if self.columns_to_keep:
        #     available_cols = [col for col in self.columns_to_keep + ['i95_distance']
        #                     if col in result_df.columns]
        #     result_df = result_df[available_cols]

        return result_df

    def _process_one_batch(self, batch_df: pd.DataFrame) -> pd.DataFrame:
        """Process a single batch with logging and memory management"""
        self.batch_count += 1
        self.total_rows_processed += len(batch_df)

        if self.batch_count % 10 == 0:  # Log every 10th batch to reduce noise
            progress = (self.total_rows_processed / self.total_rows) * 100
            logger.info(f"Processing batch {self.batch_count} ({progress:.1f}% complete) "
                       f"with {len(batch_df)} rows")

        processed_batch = self.process_batch_optimized(batch_df)
        self.total_rows_saved += len(processed_batch)

        # Force garbage collection periodically
        if self.batch_count % 50 == 0:
            gc.collect()

        return processed_batch

    def _read_parquet_in_batches(self):
        """Generator to read parquet file in batches efficiently"""
        try:
            # Use pyarrow for more efficient reading
            parquet_file = pq.ParquetFile(self.input_file)

            # Read in batches using row groups when possible
            for batch in parquet_file.iter_batches(batch_size=self.batch_size):
                # Convert to pandas DataFrame
                df = batch.to_pandas()
                yield df

        except Exception as e:
            logger.error(f"Error reading parquet in batches: {e}")
            # Fallback to pandas chunking
            logger.info("Falling back to pandas chunking...")
            try:
                # Read the entire file and chunk it manually since pandas doesn't support chunksize for parquet
                df = pd.read_parquet(self.input_file)
                for i in range(0, len(df), self.batch_size):
                    yield df.iloc[i:i+self.batch_size]
                del df  # Free memory
            except Exception as e2:
                logger.error(f"Fallback also failed: {e2}")
                raise

    def _write_parquet_batch(self, df: pd.DataFrame, is_first_batch: bool = False):
        """Write batch to parquet file efficiently"""
        if df.empty:
            return

        # Convert to Arrow Table for efficient writing
        table = pa.Table.from_pandas(df)

        if is_first_batch:
            # Create new file with compatible parameters
            writer = pq.ParquetWriter(
                self.output_file,
                table.schema,
                compression=self.use_compression,
                use_dictionary=True  # Enable dictionary encoding
            )
            self._parquet_writer = writer

        # Write the batch
        self._parquet_writer.write_table(table)

    def run_pipeline(self) -> Dict[str, Any]:
        """Run the complete optimized pipeline for parquet files"""
        logger.info(f"Starting optimized parquet pipeline: {self.input_file}")
        logger.info(f"Batch size: {self.batch_size:,}")
        logger.info(f"Distance threshold: {self.distance_threshold}")
        logger.info(f"Expected batches: {(self.total_rows // self.batch_size) + 1}")

        start_time = pd.Timestamp.now()
        first_batch = True

        try:
            # Process file in batches
            for batch_df in self._read_parquet_in_batches():
                processed_batch = self._process_one_batch(batch_df)

                if not processed_batch.empty:
                    self._write_parquet_batch(processed_batch, is_first_batch=first_batch)
                    first_batch = False

            # Close the parquet writer
            if hasattr(self, '_parquet_writer'):
                self._parquet_writer.close()
                logger.info(f"Saved {self.total_rows_saved:,} rows to {self.output_file}")
            else:
                logger.warning("No data to save")

        except Exception as e:
            logger.error(f"Pipeline error: {str(e)}")
            # Clean up partial file
            if self.output_file.exists():
                self.output_file.unlink()
            raise

        finally:
            # Cleanup
            if hasattr(self, '_parquet_writer'):
                try:
                    self._parquet_writer.close()
                except:
                    pass

        end_time = pd.Timestamp.now()
        processing_time = (end_time - start_time).total_seconds()

        # Calculate statistics
        stats = {
            'total_rows_processed': self.total_rows_processed,
            'total_rows_saved': self.total_rows_saved,
            'total_batches': self.batch_count,
            'input_file': str(self.input_file),
            'output_file': str(self.output_file),
            'input_file_size_gb': self.total_file_size,
            'output_file_size_gb': self.output_file.stat().st_size / (1024**3) if self.output_file.exists() else 0,
            'processing_time_seconds': processing_time,
            'processing_time_formatted': str(pd.Timedelta(seconds=processing_time)),
            'rows_per_second': self.total_rows_processed / processing_time if processing_time > 0 else 0,
            'filter_efficiency_percent': (self.total_rows_saved / self.total_rows_processed * 100)
                                        if self.total_rows_processed > 0 else 0,
            'compression_ratio': (self.total_file_size / (self.output_file.stat().st_size / (1024**3)))
                               if self.output_file.exists() and self.output_file.stat().st_size > 0 else 0
        }

        logger.info("Optimized parquet pipeline completed!")
        logger.info(f"Processing time: {stats['processing_time_formatted']}")
        logger.info(f"Rows per second: {stats['rows_per_second']:,.0f}")
        logger.info(f"Filter efficiency: {stats['filter_efficiency_percent']:.2f}%")
        logger.info(f"Compression ratio: {stats['compression_ratio']:.2f}x")

        return stats

# # Example usage
# if __name__ == "__main__":

#     # Initialize processor
#     processor = OptimizedParquetBatchProcessor(
#         input_file="cleaned_ebird_file.pq",
#         output_file="filtered_ebird_i95.pq",
#         batch_size=50000,
#         distance_threshold=10.0,  # 10 miles
#         i95_coords=i95_coords,
#         use_compression='snappy'
#     )

#     # Run the pipeline
#     results = processor.run_pipeline()
#     print(f"Processing completed: {results}")

In [27]:
### Modify parameters ###
# Example usage
if __name__ == "__main__":

    # Initialize processor
    processor = OptimizedParquetBatchProcessor(
        input_file="VA_test.parquet",
        output_file="state_ebird_i95.pq",
        batch_size=50000,
        distance_threshold=10.0,  # in miles
        i95_coords=i95_coords,
        use_compression='snappy'
    )

    # Run the pipeline
    results = processor.run_pipeline()
    print(f"Processing completed: {results}")

Processing completed: {'total_rows_processed': 23153, 'total_rows_saved': 23153, 'total_batches': 1, 'input_file': 'VA_test.parquet', 'output_file': 'state_ebird_i95.pq', 'input_file_size_gb': 0, 'output_file_size_gb': 0.00023735687136650085, 'processing_time_seconds': 15.117894, 'processing_time_formatted': '0 days 00:00:15.117894', 'rows_per_second': 1531.496384350889, 'filter_efficiency_percent': 100.0, 'compression_ratio': 0.0}


In [28]:
vs1 = pd.read_parquet("/content/state_ebird_i95.pq")
print(vs1.shape)
print(vs1.columns)

(23153, 33)
Index(['state_code', 'station_id', 'year_record', 'month_record', 'day_record',
       'day_of_week', 'latitude_0', 'longitude_0', 'station_location', 'state',
       'daily_avg_noise', 'peak_hour_noise', 'overnight_noise',
       'rush_hour_noise', 'total_daily_volume', 'OBSERVATION COUNT',
       'STATE CODE', 'LOCALITY TYPE', 'LATITUDE_1', 'LONGITUDE_1',
       'OBSERVATION DATE', 'OBSERVATION DATE_year', 'OBSERVATION DATE_month',
       'OBSERVATION DATE_day', 'TimeObservationStarted_hour',
       'TimeObservationStarted_minute', 'distance_to_station_160005',
       'distance_to_station_140004', 'distance_to_station_160308',
       'distance_to_station_060170', 'distance_to_station_792625',
       'assigned_station', 'i95_distance'],
      dtype='object')


In [29]:
vs1.i95_distance.describe()

Unnamed: 0,i95_distance
count,23153.0
mean,0.141768
std,0.177003
min,0.002472
25%,0.035271
50%,0.066202
75%,0.189376
max,0.853026


### Calculate distance between bird observation point to the nearest noise measure station
- coded to process in batches to accomodate the large Ebird data set.
- use the corrected stations data with geo coordinates: 'I95_stations_corrected.csv':
 - 'latitude' & 'longitude' columns were renamed to 's_lat' & 's_lon'
 - 'longitude' values corrected to U.S. coordinates<br>

Adds 'station_distance' & 'station_id' to Ebirds dataset.

In [None]:
# # Read in Ebird data if not too large or loa
# birds = pd.read_csv('DATASET', nrows=1000)
# print(birds.shape)
# print(birds.columns)
# print(birds.head())

In [30]:
# Read in stations data
stations = pd.read_csv('/content/drive/MyDrive/Capstone/I95_stations_corrected.csv')
print(stations.shape)
print(stations.columns)
print(stations.head())

(343, 40)
Index(['record_type', 'state_code', 'station_id', 'year_record', 'f_system',
       'num_lanes', 'sample_type_volume', 'num_lanes_volume', 'method_volume',
       'sample_type_class', 'num_lanes_class', 'method_class',
       'algorithm_volume', 'num_classes', 'sample_type_truck',
       'num_lanes_truck', 'method_truck', 'calibration', 'data_retrieval',
       'type_sensor_1', 'type_sensor_2', 'primary_purpose', 'lrs_id',
       'lrs_point', 's_lat', 's_lon', 'shrp_id', 'prev_station_id',
       'year_established', 'year_discontinued', 'county_code', 'is_sample',
       'sample_id', 'nhs', 'posted_route_signing', 'posted_signed_route',
       'con_route_signing', 'con_signed_route', 'station_location', 'state'],
      dtype='object')
  record_type  state_code station_id  year_record f_system  num_lanes  \
0           S          11     001295           23       1U          3   
1           S          11     001295           23       1U          3   
2           S          11 

In [93]:
def haversine_vectorized(lat1, lon1, lat2, lon2, unit='miles'):
    """
    Vectorized haversine distance calculation (faster than geopy).
    """
    lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
    dlat, dlon = lat2 - lat1, lon2 - lon1
    a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2)**2
    c = 2 * np.arcsin(np.sqrt(a))
    r = 3959 if unit == 'miles' else 6371
    return c * r

def find_nearest_stations_batch(bird_obs, stations, batch_size=5000, debug=False):
    """
    Find nearest station for each bird observation using vectorized Haversine distance.
    Handles large datasets efficiently with batching.
    """
    n_obs = len(bird_obs)
    nearest_distances = np.full(n_obs, np.inf)
    nearest_station_ids = np.full(n_obs, '', dtype=object)

    station_coords = stations[['s_lat', 's_lon']].values
    station_ids = stations['station_id'].astype(str).values

    n_batches = (n_obs + batch_size - 1) // batch_size

    if debug:
        print(f"Processing {n_obs} bird observations in {n_batches} batches...")

    for batch_idx in range(n_batches):
        start_idx = batch_idx * batch_size
        end_idx = min((batch_idx + 1) * batch_size, n_obs)
        batch_coords = bird_obs.iloc[start_idx:end_idx][['LATITUDE_1', 'LONGITUDE_1']].values

        batch_distances = np.array([
            haversine_vectorized(b_lat, b_lon, station_coords[:, 0], station_coords[:, 1])
            for b_lat, b_lon in batch_coords
        ])

        nearest_indices = np.argmin(batch_distances, axis=1)
        nearest_distances[start_idx:end_idx] = batch_distances[np.arange(len(batch_coords)), nearest_indices]
        nearest_station_ids[start_idx:end_idx] = station_ids[nearest_indices]

        if debug and batch_idx == 0:
            print("\n[DEBUG] Sample nearest station matches (first batch):")
            for i in range(min(3, len(batch_coords))):
                print(f"  Bird {i}:")
                print(f"    → Nearest station index: {nearest_indices[i]}")
                print(f"    → Station ID: {station_ids[nearest_indices[i]]}")
                print(f"    → Distance: {nearest_distances[start_idx + i]:.2f} miles")

    return nearest_distances, nearest_station_ids

def add_station_info_to_birds(birds_df, stations_df, bird_lat_col='LATITUDE_1',
                              bird_lon_col='LONGITUDE_1', station_lat_col='s_lat',
                              station_lon_col='s_lon', batch_size=5000, debug=False):
    """
    Add nearest station information to bird observations using batched spatial distance search.
    """
    start_time = time.time()

    # Create a copy to avoid modifying the original
    result = birds_df.copy()

    if 'station_id' not in stations_df.columns:
        stations_df = stations_df.reset_index(drop=True)
        stations_df['station_id'] = stations_df.index.astype(str)
        if debug:
            print(f"Created station_id column with {len(stations_df)} stations")

    if debug:
        print(f"Stations DataFrame columns: {list(stations_df.columns)}")
        print(f"Sample station_ids: {stations_df['station_id'].head().tolist()}")

    distances, station_ids = find_nearest_stations_batch(
        result, stations_df, batch_size=batch_size, debug=debug
    )

    if debug:
        print(f"Returned distances shape: {distances.shape}")
        print(f"Returned station_ids shape: {station_ids.shape}")
        print(f"Sample station_ids: {station_ids[:5]}")
        print(f"Station_ids type: {type(station_ids[0])}")

    result['station_distance'] = distances
    result['nearest_station_id'] = station_ids

    if debug:
        print(f"Result columns after assignment: {list(result.columns)}")
        print(f"Sample result nearest_station_id: {result['nearest_station_id'].head().tolist()}")

    print(f"Processed {len(result)} bird observations in {time.time() - start_time:.2f}s")
    print(f"Avg distance: {result['station_distance'].mean():.2f} miles | "
          f"Median: {result['station_distance'].median():.2f} | "
          f"Missing: {result['station_distance'].isna().sum()}")

    return result

To test: run below cell with 'debug = True'
To run on entire dataset: run below cel with 'debug = False'

In [58]:
birds = pd.read_parquet('/content/state_ebird_i95.pq')

In [94]:
### for small dataset Run this cell to add station_distance and station_id to birds dataframe
result_df = add_station_info_to_birds(
    birds_df = birds,
    stations_df=stations,
    batch_size=10000,
    debug=True
)

Stations DataFrame columns: ['record_type', 'state_code', 'station_id', 'year_record', 'f_system', 'num_lanes', 'sample_type_volume', 'num_lanes_volume', 'method_volume', 'sample_type_class', 'num_lanes_class', 'method_class', 'algorithm_volume', 'num_classes', 'sample_type_truck', 'num_lanes_truck', 'method_truck', 'calibration', 'data_retrieval', 'type_sensor_1', 'type_sensor_2', 'primary_purpose', 'lrs_id', 'lrs_point', 's_lat', 's_lon', 'shrp_id', 'prev_station_id', 'year_established', 'year_discontinued', 'county_code', 'is_sample', 'sample_id', 'nhs', 'posted_route_signing', 'posted_signed_route', 'con_route_signing', 'con_signed_route', 'station_location', 'state']
Sample station_ids: ['001295', '001295', '002295', 'S10004', 'S10005']
Processing 23153 bird observations in 3 batches...

[DEBUG] Sample nearest station matches (first batch):
  Bird 0:
    → Nearest station index: 102
    → Station ID: 060164
    → Distance: 3.17 miles
  Bird 1:
    → Nearest station index: 102
    

Checking the result_df.  When using without the wrapper, new file does not get created.  Do not forget to **save the result_df** in local drive.

In [95]:
result_df.shape

(23153, 35)

In [98]:
result_df.columns

Index(['state_code', 'station_id', 'year_record', 'month_record', 'day_record',
       'day_of_week', 'latitude_0', 'longitude_0', 'station_location', 'state',
       'daily_avg_noise', 'peak_hour_noise', 'overnight_noise',
       'rush_hour_noise', 'total_daily_volume', 'OBSERVATION COUNT',
       'STATE CODE', 'LOCALITY TYPE', 'LATITUDE_1', 'LONGITUDE_1',
       'OBSERVATION DATE', 'OBSERVATION DATE_year', 'OBSERVATION DATE_month',
       'OBSERVATION DATE_day', 'TimeObservationStarted_hour',
       'TimeObservationStarted_minute', 'distance_to_station_160005',
       'distance_to_station_140004', 'distance_to_station_160308',
       'distance_to_station_060170', 'distance_to_station_792625',
       'assigned_station', 'i95_distance', 'station_distance',
       'nearest_station_id'],
      dtype='object')

A wrapper function for use with the large dataset.   

In [99]:
# Wrapper function for larger datasets
def process_large_dataset_with_stations(
    birds_file_path,
    stations_df,
    output_file_path,
    chunk_size=100000,
    batch_size=5000,
    bird_lat_col='LATITUDE_1',
    bird_lon_col='LONGITUDE_1',
    station_lat_col='s_lat',
    station_lon_col='s_lon',
    file_format='parquet',
    debug=False,
    save_every_n_chunks=10
):
    """
    Process a large bird observations dataset (35M+ rows) with station matching.

    Parameters:
    -----------
    birds_file_path : str
        Path to the large bird observations file (CSV, Parquet, etc.)
    stations_df : pd.DataFrame
        DataFrame containing weather station data
    output_file_path : str
        Path where the processed results will be saved
    chunk_size : int, default=100000
        Number of rows to process in each chunk (adjust based on available RAM)
    batch_size : int, default=5000
        Batch size for the station matching algorithm
    bird_lat_col, bird_lon_col : str, optional
        Column names for bird coordinates (auto-detected if None)
    station_lat_col, station_lon_col : str
        Column names for station coordinates
    file_format : str, default='parquet'
        Output format ('parquet', 'csv', 'feather')
    debug : bool, default=False
        Enable debug output
    save_every_n_chunks : int, default=10
        Save intermediate results every N chunks (for crash recovery)

    Returns:
    --------
    dict : Processing statistics and metadata
    """

    start_time = time.time()

    # Validate file format
    valid_formats = ['parquet', 'csv', 'feather']
    if file_format not in valid_formats:
        raise ValueError(f"file_format must be one of {valid_formats}")

    # Determine file reader based on input file extension
    file_path = Path(birds_file_path)
    if file_path.suffix.lower() == '.csv':
        reader_func = pd.read_csv
        reader_kwargs = {'chunksize': chunk_size}
    elif file_path.suffix.lower() in ['.parquet', '.pq']:
        # For parquet, we'll use a different approach since it doesn't have native chunking
        reader_func = None
        reader_kwargs = {}
    else:
        # Default to CSV
        reader_func = pd.read_csv
        reader_kwargs = {'chunksize': chunk_size}

    print(f"Starting processing of large dataset: {birds_file_path}")
    print(f"Chunk size: {chunk_size:,} | Batch size: {batch_size:,}")
    print(f"Output format: {file_format} | Save every: {save_every_n_chunks} chunks")

    # Initialize tracking variables
    total_rows_processed = 0
    chunk_count = 0
    processed_chunks = []
    temp_files = []

    # Create temporary directory for intermediate files
    temp_dir = Path(output_file_path).parent / f"temp_{int(time.time())}"
    temp_dir.mkdir(exist_ok=True)

    try:
        # Handle parquet files differently (read in chunks manually)
        if file_path.suffix.lower() in ['.parquet', '.pq']:
            # Read parquet file info first
            parquet_file = pd.read_parquet(birds_file_path, engine='pyarrow')
            total_rows = len(parquet_file)

            # Process parquet in manual chunks
            for start_row in range(0, total_rows, chunk_size):
                end_row = min(start_row + chunk_size, total_rows)
                chunk_df = parquet_file.iloc[start_row:end_row].copy()

                chunk_count += 1
                chunk_start_time = time.time()

                if debug:
                    print(f"\nProcessing chunk {chunk_count} (rows {start_row:,} to {end_row:,})")

                # Process chunk with your existing function
                processed_chunk = add_station_info_to_birds(
                    chunk_df,
                    stations_df,
                    bird_lat_col=bird_lat_col,
                    bird_lon_col=bird_lon_col,
                    station_lat_col=station_lat_col,
                    station_lon_col=station_lon_col,
                    batch_size=batch_size,
                    debug=debug
                )

                # Save chunk temporarily
                temp_file = temp_dir / f"chunk_{chunk_count:04d}.parquet"
                processed_chunk.to_parquet(temp_file, index=False)
                temp_files.append(temp_file)

                total_rows_processed += len(processed_chunk)
                chunk_time = time.time() - chunk_start_time

                print(f"Chunk {chunk_count} completed in {chunk_time:.2f}s "
                      f"({len(processed_chunk):,} rows) | "
                      f"Total: {total_rows_processed:,}/{total_rows:,} "
                      f"({total_rows_processed/total_rows*100:.1f}%)")

                # Memory management
                del chunk_df, processed_chunk
                gc.collect()

                # Save intermediate results periodically
                if chunk_count % save_every_n_chunks == 0:
                    print(f"Saving intermediate results after {chunk_count} chunks...")
                    _save_intermediate_results(temp_files, output_file_path, file_format)

            # Clean up the full parquet file from memory
            del parquet_file
            gc.collect()

        else:
            # Handle CSV and other formats with chunking
            chunk_iterator = reader_func(birds_file_path, **reader_kwargs)

            for chunk_df in chunk_iterator:
                chunk_count += 1
                chunk_start_time = time.time()

                if debug:
                    print(f"\nProcessing chunk {chunk_count} ({len(chunk_df):,} rows)")

                # Process chunk with your existing function
                processed_chunk = add_station_info_to_birds(
                    chunk_df,
                    stations_df,
                    bird_lat_col=bird_lat_col,
                    bird_lon_col=bird_lon_col,
                    station_lat_col=station_lat_col,
                    station_lon_col=station_lon_col,
                    batch_size=batch_size,
                    debug=debug
                )

                # Save chunk temporarily
                temp_file = temp_dir / f"chunk_{chunk_count:04d}.parquet"
                processed_chunk.to_parquet(temp_file, index=False)
                temp_files.append(temp_file)

                total_rows_processed += len(processed_chunk)
                chunk_time = time.time() - chunk_start_time

                print(f"Chunk {chunk_count} completed in {chunk_time:.2f}s "
                      f"({len(processed_chunk):,} rows) | Total: {total_rows_processed:,}")

                # Memory management
                del chunk_df, processed_chunk
                gc.collect()

                # Save intermediate results periodically
                if chunk_count % save_every_n_chunks == 0:
                    print(f" Saving intermediate results after {chunk_count} chunks...")
                    _save_intermediate_results(temp_files, output_file_path, file_format)

        # Final consolidation of all chunks
        print(f"\nConsolidating {len(temp_files)} chunks into final output...")
        final_df = _consolidate_chunks(temp_files)

        # Save final result
        _save_final_result(final_df, output_file_path, file_format)

        # Calculate statistics
        total_time = time.time() - start_time
        avg_distance = final_df['station_distance'].mean()
        median_distance = final_df['station_distance'].median()
        missing_count = final_df['station_distance'].isna().sum()

        # Cleanup temporary files
        _cleanup_temp_files(temp_dir, temp_files)

        stats = {
            'total_rows_processed': total_rows_processed,
            'total_chunks': chunk_count,
            'processing_time_seconds': total_time,
            'processing_time_formatted': f"{total_time/60:.1f} minutes",
            'rows_per_second': total_rows_processed / total_time,
            'avg_distance_miles': avg_distance,
            'median_distance_miles': median_distance,
            'missing_stations': missing_count,
            'output_file': output_file_path,
            'output_format': file_format
        }

        print(f"\nProcessing completed successfully!")
        print(f"Total rows: {total_rows_processed:,}")
        print(f"Total time: {stats['processing_time_formatted']}")
        print(f"Speed: {stats['rows_per_second']:,.0f} rows/second")
        print(f"Avg distance: {avg_distance:.2f} miles | Median: {median_distance:.2f} miles")
        print(f"Missing stations: {missing_count:,} ({missing_count/total_rows_processed*100:.1f}%)")
        print(f"Output saved to: {output_file_path}")

        return stats

    except Exception as e:
        print(f"Error during processing: {str(e)}")
        print(f"Temporary files saved in: {temp_dir}")
        print(f"You can recover partial results from these files if needed.")
        raise


def _save_intermediate_results(temp_files, output_path, file_format):
    """Save intermediate consolidated results."""
    if not temp_files:
        return

    intermediate_df = _consolidate_chunks(temp_files)
    intermediate_path = Path(output_path).with_suffix(f'.intermediate.{file_format}')
    _save_final_result(intermediate_df, str(intermediate_path), file_format)
    print(f"Intermediate results saved to: {intermediate_path}")


def _consolidate_chunks(temp_files):
    """Consolidate all temporary chunk files into a single DataFrame."""
    if not temp_files:
        raise ValueError("No temporary files to consolidate")

    print(f"Reading {len(temp_files)} chunk files...")
    chunks = []
    for temp_file in temp_files:
        chunk = pd.read_parquet(temp_file)
        chunks.append(chunk)

    print(f"Concatenating chunks...")
    final_df = pd.concat(chunks, ignore_index=True)

    # Clean up memory
    del chunks
    gc.collect()

    return final_df


def _save_final_result(df, output_path, file_format):
    """Save the final result in the specified format."""
    if file_format == 'parquet':
        df.to_parquet(output_path, index=False, engine='pyarrow')
    elif file_format == 'csv':
        df.to_csv(output_path, index=False)
    elif file_format == 'feather':
        df.to_feather(output_path)
    else:
        raise ValueError(f"Unsupported file format: {file_format}")


def _cleanup_temp_files(temp_dir, temp_files):
    """Clean up temporary files and directory."""
    try:
        for temp_file in temp_files:
            if temp_file.exists():
                temp_file.unlink()

        if temp_dir.exists():
            temp_dir.rmdir()

        print(f"Cleaned up temporary files")
    except Exception as e:
        print(f"Warning: Could not clean up all temporary files: {e}")



In [100]:
### Run below cell to add station data ###
new_birds_file = process_large_dataset_with_stations(
    birds_file_path='/content/state_ebird_i95.pq',
    stations_df=stations,
    output_file_path='ebirds_i95_stations.parquet',
    chunk_size=50000,  # Adjust based on your RAM (smaller if you have less RAM)
    batch_size=5000,   # Your existing batch size
    file_format='parquet',  # Parquet is more efficient for large datasets
    debug=False,  # Set to True for detailed output
    save_every_n_chunks=20  # Save backup every 20 chunks
)

Starting processing of large dataset: /content/state_ebird_i95.pq
Chunk size: 50,000 | Batch size: 5,000
Output format: parquet | Save every: 20 chunks
Processed 23153 bird observations in 0.99s
Avg distance: 3.24 miles | Median: 3.17 | Missing: 0
Chunk 1 completed in 1.03s (23,153 rows) | Total: 23,153/23,153 (100.0%)

Consolidating 1 chunks into final output...
Reading 1 chunk files...
Concatenating chunks...
Cleaned up temporary files

Processing completed successfully!
Total rows: 23,153
Total time: 0.0 minutes
Speed: 14,526 rows/second
Avg distance: 3.24 miles | Median: 3.17 miles
Missing stations: 0 (0.0%)
Output saved to: ebirds_i95_stations.parquet


checking the batch result

In [101]:
vs2=pd.read_parquet("/content/ebirds_i95_stations.parquet")

In [102]:
print(vs2.shape)
print(vs2.columns)

(23153, 35)
Index(['state_code', 'station_id', 'year_record', 'month_record', 'day_record',
       'day_of_week', 'latitude_0', 'longitude_0', 'station_location', 'state',
       'daily_avg_noise', 'peak_hour_noise', 'overnight_noise',
       'rush_hour_noise', 'total_daily_volume', 'OBSERVATION COUNT',
       'STATE CODE', 'LOCALITY TYPE', 'LATITUDE_1', 'LONGITUDE_1',
       'OBSERVATION DATE', 'OBSERVATION DATE_year', 'OBSERVATION DATE_month',
       'OBSERVATION DATE_day', 'TimeObservationStarted_hour',
       'TimeObservationStarted_minute', 'distance_to_station_160005',
       'distance_to_station_140004', 'distance_to_station_160308',
       'distance_to_station_060170', 'distance_to_station_792625',
       'assigned_station', 'i95_distance', 'station_distance',
       'nearest_station_id'],
      dtype='object')
