In ERT we run the forward model once per realization. The forward model will output some data. The format in which we store this data greatly affects the performance of ERT overall.

So what do we need to do with the data?

1. Combine it across N realization folders into a single dataset (Not strictly required, but likely faster than using N separate datasets to support queries below)
2. Do a simple relation table join between the table of observations and the responses across all realizations. Observations table has these columns: [obs_key, response_key, observed_value, observation_error], Responses
3. Find out whether a realization has all of its responses/parameters or not (used internally in ERT and in the GUI)
4. Get data for a single response key across all realizations (for the plotter)
5. Get all observations for a single response key
6. Get observations for single observation key

To benchmark this we first mock up some datasets in realization folders

In [1]:
import datetime
import os

print(os.getcwd())
os.makedirs("benchmark_data_processing_out", exist_ok=True)

num_reals = 200


def create_realization_dirs():
    for i in range(num_reals):
        os.makedirs(f"benchmark_data_processing_out/realization-{i}", exist_ok=True)


create_realization_dirs()

/Users/yngves.kristiansen/Documents/Repositories/oppdrag/equinor/ert/notebooks


Now we create some summary and gen data datasets

In [4]:
import pandas as pd
import polars as pl
import pyarrow as pa
import xarray as xr
import time
from functools import wraps
from memory_profiler import memory_usage
from dateutil.relativedelta import relativedelta


def time_function(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        mem_usage_before = memory_usage(-1, interval=0.1, timeout=1)
        result = func(*args, **kwargs)
        mem_usage_after = memory_usage(-1, interval=0.1, timeout=1)
        end_time = time.time()
        elapsed_time = end_time - start_time
        mem_usage = max(mem_usage_after) - min(mem_usage_before)
        return (elapsed_time, mem_usage, result)

    return wrapper


@time_function
def df_to_xarray_file(name: str, df: pd.DataFrame):
    xarray_dataset = xr.Dataset.from_dataframe(df)
    xarray_dataset.to_netcdf(name)
    return xarray_dataset


@time_function
def df_to_arrow_file(name: str, df: pd.DataFrame):
    # Convert pandas DataFrame to Arrow Table
    arrow_table = pa.Table.from_pandas(df)
    with pa.OSFile(name, "wb") as f:
        writer = pa.RecordBatchFileWriter(f, arrow_table.schema)
        writer.write_table(arrow_table)
        writer.close()

    return arrow_table


@time_function
def df_to_polars_file(name: str, df: pd.DataFrame):
    # Convert pandas DataFrame to Polars DataFrame
    polars_df = pl.from_pandas(df)

    with open(name, "w+") as f:
        polars_df.write_ipc(file=f, compression="uncompressed")

    return polars_df


data_formats = [("xarray", df_to_xarray_file), ("arrow", df_to_arrow_file), ("polars", df_to_polars_file)]

num_summary_keys = 100
num_summary_timesteps = 300

num_gen_data_keys = 100
gen_data_index = [1000, 1200, 1400, 1600, 1800, 2000]
gen_data_report_steps = list(range(200))

for i in range(num_reals):
    realdir = f"benchmark_data_processing_out/realization-{i}"

    gen_data_save_timings = {}
    gen_data_keys = [f"gd_{gdi}" for gdi in range(num_gen_data_keys)]
    for gen_data_key in gen_data_keys:
        df = pd.DataFrame(data={
            "response_key":[gen_data_key] * len(gen_data_index) * len(gen_data_report_steps),
            "index":gen_data_index * len(gen_data_report_steps),
            "report_step":gen_data_report_steps * len(gen_data_index),
            "value":[i for i in range(len(gen_data_report_steps) * len(gen_data_index))]
        })

        for name, to_file_fn in data_formats:
            time_elapsed, memory, _ = to_file_fn(f"initial_{name}", df)

            if name not in gen_data_save_timings:
                gen_data_save_timings[name] = []

            gen_data_save_timings[name].append((time_elapsed, memory))

    smry_df = pd.DataFrame(data={
        "response_key":[f"smry_{i % num_summary_keys}" for i in range(num_summary_keys * num_summary_timesteps)],
        "time":[f"{(datetime.datetime(2000, 1, 1) + relativedelta(months=1)), i % num_summary_timesteps}"
                for i
                in range(num_summary_keys * num_summary_timesteps)],
        "value":[i for i in range(num_summary_keys * num_summary_timesteps)]
    })

    smry_timings = {}
    for name, to_file_fn in data_formats:
        time_elapsed, memory, _ = to_file_fn("initial_smry", smry_df)

        if name not in smry_timings:
            smry_timings[name] = []

        smry_timings[name].append((time_elapsed, memory))

    print("hello")


OSError: write() argument must be str, not bytes