In [8]:
import pandas as pd
import geopandas as gpd
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import os

# Constants
SCRIPT_DIR = os.path.dirname(os.path.abspath(""))
CHECKPOINT_FILE = os.path.join(SCRIPT_DIR, 'checkpoint.json')

def save_checkpoint(i):
    checkpoint_data = {'i': i}
    with open(CHECKPOINT_FILE, 'w') as f:
        json.dump(checkpoint_data, f)

def load_checkpoint():
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, 'r') as f:
            checkpoint_data = json.load(f)
        return checkpoint_data['i']
    else:
        return 0

def process_batch(batch, processing_function, **kwargs):
    processed_batch = batch.apply(lambda row: processing_function(row, **kwargs), axis=1)
    return processed_batch

def process_large_dataframe(df, processing_function, batch_size=1000, num_workers=4, **kwargs):
    """
    Generalized function to process large DataFrames/GeoDataFrames in batches using thread pooling.

    :param df: The DataFrame or GeoDataFrame to be processed.
    :param processing_function: The function to apply to each row in the DataFrame.
    :param batch_size: Number of rows to process in each batch.
    :param num_workers: Number of worker threads to use.
    :param kwargs: Additional keyword arguments to pass to the processing_function.
    :return: Processed DataFrame or GeoDataFrame.
    """
    start_index = load_checkpoint()

    total_batches = len(df) // batch_size + (1 if len(df) % batch_size else 0)
    print(f"Total Batches: {total_batches}")
    
    processed_dfs = []

    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = []
        
        for i in range(start_index, total_batches):
            start = i * batch_size
            end = min((i + 1) * batch_size, len(df))
            batch = df.iloc[start:end]
            
            futures.append(executor.submit(process_batch, batch, processing_function, **kwargs))
            
            save_checkpoint(i)
        
        for future in as_completed(futures):
            try:
                processed_batch = future.result()
                processed_dfs.append(processed_batch)
                print(f"Processed batch {i + 1} / {total_batches}")
            except Exception as e:
                print(f"Error during batch processing: {e}")
                save_checkpoint(i)
                break

    save_checkpoint(total_batches)

    # Concatenate the processed batches into a single DataFrame
    result_df = pd.concat(processed_dfs, ignore_index=True)

    # If the input was a GeoDataFrame, ensure the output is also a GeoDataFrame
    if isinstance(df, gpd.GeoDataFrame):
        result_df = gpd.GeoDataFrame(result_df, geometry='geometry', crs=df.crs)

    return result_df

# Example usage:
# Define a sample processing function
def sample_processing_function(row, additional_param):
    # Replace this with your actual processing logic
    row['processed_value'] = row['value'] * additional_param
    return row

# Sample DataFrame
data = {'value': list(range(10000))}  # Replace with your actual data
df = pd.DataFrame(data)

# Process the DataFrame in batches with thread pooling
processed_df = process_large_dataframe(df, sample_processing_function, batch_size=1000, num_workers=4, additional_param=2)

# processed_df now contains the results of applying sample_processing_function to each row in the DataFrame


Total Batches: 10


ValueError: No objects to concatenate

In [6]:
processed_df

Unnamed: 0,value,processed_value
0,1000,additional_paramadditional_paramadditional_par...
1,1001,additional_paramadditional_paramadditional_par...
2,1002,additional_paramadditional_paramadditional_par...
3,1003,additional_paramadditional_paramadditional_par...
4,1004,additional_paramadditional_paramadditional_par...
...,...,...
9995,9995,additional_paramadditional_paramadditional_par...
9996,9996,additional_paramadditional_paramadditional_par...
9997,9997,additional_paramadditional_paramadditional_par...
9998,9998,additional_paramadditional_paramadditional_par...
