# Reading Pegasus++ Files

PegasusTools provides a set of classes that you can use to load Pegasus++ file.

## Reading NBF Files

Passing the path to an NBF file to `pt.PegasusNBFData` will read that file from disk and return a `PegasusNBFData` object which contains member variables with all the header data and a dictionary containing the actual field data from the simulation; this dictionary is indexed via the names of the fields in the NBF file.

### Example

Before we can load an NBF file we need to have one to load. Normally you would use the files that Pegasus++ output but to facilitate this example I will create one here. The file creation tools used here are intended for use in examples and testing only.

In [None]:
import sys
from pathlib import Path

import pegasustools as pt

# ===== SETUP CODE. DISREGARD =====
sys.path.append(str(Path.cwd().parent.resolve() / "tests"))

import test_loading_nbf

root_path = Path.cwd() / "example_data"
root_path.mkdir(exist_ok=True)
nbf_file_path = root_path / "example.nbf"

_ = test_loading_nbf.generate_random_nbf_file(nbf_file_path, seed=42, dims=3)
# ===== END OF SETUP CODE =====

Now that we have an NBF file to play with we can load it. It will automatically display some metadata about the loaded NBF file via the PegasusTools logger, `pt.logger`.

In [None]:
nbf_data = pt.PegasusNBFData(nbf_file_path)

The file is fully loaded now and all the data is in numpy arrays. You can access the metadata programmatically with via the following, read only, member variables.

In [None]:
print(f"{nbf_data.time              = }")
print(f"{nbf_data.big_endian        = }")
print(f"{nbf_data.num_meshblocks    = }")
print(f"{nbf_data.list_of_variables = }")
print(f"{nbf_data.mesh_params       = }")
print(f"{nbf_data.meshblock_params  = }")

Now let's take a look at the data that was loaded. Note that the keys are the same as the list of variables.

In [None]:
for key in nbf_data.data:
    print(
        f"key: '{key}',"
        f"type: {type(nbf_data.data[key])},"
        f"shape: {nbf_data.data[key].shape}"
    )

At this point you have a `PegasusNBFData` that contains all the information in the original NBF file but now in numpy arrays that are easy to manipulate and perform additional postprocessing.

## Reading Spectra Files

Passing the path to an spectra file to `pt.load_file` will read that file from disk and return a `PegasusSpectralData` object which contains member variables with all the metadata and the spectra in the `data` member variables. The `PegasusSpectralData` also contains several methods for working with spectral data.

### Example

Before we can load a spectra file we need to have one to load. Normally you would use the files that Pegasus++ output but to facilitate this example I will create one here. The file creation tools used here are intended for use in examples and testing only.

In [None]:
import sys
from pathlib import Path

import pegasustools as pt

# ===== SETUP CODE. DISREGARD =====
sys.path.append(str(Path.cwd().parent.resolve() / "tests"))

import test_loading_spectra

root_path = Path.cwd() / "example_data"
root_path.mkdir(exist_ok=True)
spectra_file_path = root_path / "example.spec"

_ = test_loading_spectra.generate_random_spec_file(
    spectra_file_path, num_meshblocks=10, seed=42
)
# ===== END OF SETUP CODE. =====

Now that we have an spectra file to play with we can load it. There are two different versions of spectra files, a newer version that includes `n_prp`, `n_prl`, `max_w_prp`, and `max_w_prl` and an older version that doesn't. If you're reading the newer version then `n_prp`, `n_prl`, `max_w_prp`, and `max_w_prl` will be read from the file. If you're reading the older version then you can optionally pass the values of `n_prp`, `n_prl`, `max_w_prp`, and `max_w_prl` that are set in the peginput file, if not they will be set as the default values of 200, 400, 4.0, and 4.0 respectively.

In [None]:
spectra_data = pt.PegasusSpectralData(spectra_file_path)

The file is fully loaded now and all the data is in numpy arrays. Let's take a look at the header information

In [None]:
print(f"{spectra_data.time      = }")
print(f"{spectra_data.n_prp     = }")
print(f"{spectra_data.n_prl     = }")
print(f"{spectra_data.max_w_prp = }")
print(f"{spectra_data.max_w_prl = }")

Now let's take a look at the data that was loaded.

In [None]:
print(f"type: {type(spectra_data.data)}\nshape: {spectra_data.data.shape}")

The `x1min`, `x1max`, `x2min`, `x2max`, `x3min`, and `x3max` meshblock location data is also loaded and stored in the `PegasusSpectralData.meshblock_locations` member variable which is a Polars DataFrame.

In [None]:
print(spectra_data.meshblock_locations)

At this point you have a `PegasusSpectralData` that contains all the information in the original spectra file but now in a numpy array that is easy to manipulate and perform additional postprocessing. 

## Reading Track Files (`.track_mpiio_optimized` and `.track.dat` files)

Reading the two track file formats is more complex than the reading the spectra or NBF files since we want to significantly change the format to facilitate efficient searching through the track data. The goal of the file reading tools here are to transform the binary or ASCII files into Parquet files, along with a few data transformations. Once this transformation is complete the resulting directory of Parquet files can be quickly searched and manipulated using any of the standard dataframe tools such as Pandas, Polars, PyArrow, PySpark, or Dask. Polars is recommended, that's what PegasusTools uses internally and is what I found to be the best combination of stability and performance, I refer you to the [Polars documentation](https://docs.pola.rs) for details on how to work with it and some additional notes on Polars are in the [Using Polars](#Using-Polars) section

### Transformations

PegasusTools performs three transformations to raw track data in addition to converting the data to Parquet format:

1. Computes new particle IDs. The new IDs are globally unique and not just unique within the meshblock. They are computed using a simple hash function that combines the old particle ID, meshblock ID, and species (if applicable) to uniquely identify a single particle. Particle IDs are not guanteed to be sequential, though they often are.
2. Computes the magnetic moment $\mu$, stored in the column named `mu`
3. Computes the absolute change in $\mu$ from one timestep to the next for a given particle. This column is named `delta_mu_abs` and is defined as $\Delta\mu_{abs} = \mid \mu_{t} - \mu_{t-1} \mid$; the first time step where this operation is not defined is set to [`null`](https://docs.pola.rs/user-guide/expressions/missing-data/).

### Suggested Workflow

#### Data Prep

1. Run the simulation
2. Get the path to the directory containing the `.track.dat` or `.track_mpiio_optimized` files. This directory should be on your cluster's scratch file system.
3. Once the simulation is complete you should use one of the slurm templates below submit a job that will convert the track files to Parquet files. Details for converting the different file types are provided in their relevant sections. The end result is a directory full of parquet files where each files has a range of particles in it.

#### Analyzing the Data

1. Load the Parquet files using your preferred dataframe API, using the streaming/lazy evaluation engine if the dataset is larger than memory.
2. Perform the query/transformations that you need to do
3. Write out the (hopefully) smaller dataset to a new Parquet file for future plotting or additional post processing so that you don't have to reprocess the entire dataset every time.

### Processing `.track.dat` Files

This slurm submission script can be used to convert the ASCII `.track.dat` files to parquet files. Everything inside of angled brackets (`<like this>`) should be replaces with appropriate values for your usage. 

Converting `.track.dat` files is pretty quick, observed times for a 290GB dataset are about 150s on Stellar and about 20 minutes on Della. You could reasonably run this on a visualization node if you want, but not a header node. If you choose to run this on a vis node make sure to set the environment variable `POLARS_MAX_THREADS` to 1, otherwise it will spawn `num_processes` times number of CPU cores Polars threads and become very slow for all users.

The final parquet files contain many particles and have file names of the format `<input file prefix>_particles_<first particle id>_<last particle id>.parquet`. This should substantially reduce the number of files that the file system has to manage since each parquet file contains thousands of particles. The target/maximum size of the parquet files can be tuned with the `max_parquet_size` argument; it defaults to ~2GB. Reducing the target file size below about 500MB significantly reduces performance.


#### `.track.dat` Files with Species

If the track ASCII files have species information then the `species` column in the parquet files will contain the species ID with `0` corresponding to protons (i.e. `p` files) and the non-proton species index increased by one to make room for the protons. I.e. `m00` becomes `1`, `m01` becomes `2`, etc.

If the track ASCII files *do not* have species information then every element in the `species` column is set to 0 since the particles must be protons.

***
```python
#!/usr/bin/env python3

#SBATCH --job-name=<job name>
#SBATCH --nodes=1                      # Only works with a single node
#SBATCH --ntasks=1                     # Only works with a single task
#SBATCH --cpus-per-task=32             # Number of CPUs assigned to each task
#SBATCH --time=0-00:30:00              # Walltime in dd-hh:mm:ss format
#SBATCH --mem-per-cpu=4G               # CPU memory per cpu-core
#SBATCH --mail-user=<your email here>  # Send notification email to this address
#SBATCH --mail-type=ALL

# First we need make sure Polars only uses a single thread since PegasusTools is
# handling the parallelization
import os
os.environ["POLARS_MAX_THREADS"] = "1"

import pathlib

import pegasustools as pt


# Python's Multiprocessing sometimes requires that any code that calls it directly or
# indirectly is in a function.
def main():
    # Get the number of processes to use
    num_processes = int(os.environ["SLURM_CPUS_PER_TASK"])

    # The path to the .track.dat files. They should be in system scratch
    source_directory = pathlib.Path("</path/to/directory/with/.track.dat/files>")

    # The path to the directory to write the parquet files to. This should also be in system
    # scratch. It will be created if it doesn't exist.
    destination_directory = pathlib.Path("</path/to/directory/to/write/parquet/files>")

    # Perform the transformation
    pt.collate_tracks_from_ascii(
        num_processes,
        source_directory,
        destination_directory,
    )


if __name__ == "__main__":
    main()
```
***

### Processing `.track_mpiio_optimized` Files

This slurm submission script can be used to convert the binary `.track_mpiio_optimized` files to parquet files. Everything inside of angled brackets (`<like this>`) should be replaces with appropriate values for your usage.

On Stellar this scales well up to about 16 cores and takes about 2 hours to run on a 2TB dataset. It consumes quite a bit of memory and saturates the I/O bandwidth and so should only be run on a compute node.

On Della this conversion exhibits some strange behaviour. For a 2TB dataset it takes about 3 hours, ~90GB of memory per core, and only scales up to about 10 cores. The actual memory usage is only about 15GB per core but Linux caches a lot of the source files and if you use less than ~90GB of memory per core it crashes. This is all true on the x86 nodes on Della that I've tested on. On ARM systems (namely the Grace CPU) it runs fine with only about 11GB of memory per CPU, scales well up to 50 cores, and runs in about 20 minutes. I'm not sure why this discrepancy is happening but it is something to keep in mind when running on other systems or when hardware gets changed out. The code does perform same basic correctness checks at the end and will notify you if they fail. If the checks fail you can try rerunning the final step with more memory by setting the `restart_collect` argument to `True`.

The final parquet files contain many particles and have file formats of the format `<input file prefix>_particles_<first particle id>_<last particle id>.parquet`. During the run, and if a restart is required, you may see some parquet files with names that end in `_temp`. These are temporary files created to store intermediate steps and PegasusTools will clean them up and remove them at the end of a successful run.

***
```python
#!/usr/bin/env python3

#SBATCH --job-name=<job name>
#SBATCH --nodes=1                      # Only works with a single node
#SBATCH --ntasks=1                     # Only works with a single task
#SBATCH --cpus-per-task=16             # Number of CPUs assigned to each task
#SBATCH --time=0-04:00:00              # Walltime in dd-hh:mm:ss format
#SBATCH --mem-per-cpu=16G              # CPU memory per cpu-core
#SBATCH --mail-user=<your email here>  # Send notification email to this address
#SBATCH --mail-type=ALL

# First we need make sure Polars only uses a single thread since PegasusTools is
# handling the parallelization
import os
os.environ["POLARS_MAX_THREADS"] = "2"

import pathlib

import pegasustools as pt


# Python's Multiprocessing sometimes requires that any code that calls it directly or
# indirectly is in a function.
def main():
    # Get the number of processes to use
    num_processes = int(os.environ["SLURM_CPUS_PER_TASK"])

    # The path to the .track.dat files. They should be in system scratch
    source_directory = pathlib.Path("</path/to/directory/with/.track.dat/files>")

    # The path to the directory to write the parquet files to. This should also be in
    # system scratch. It will be created if it doesn't exist.
    destination_directory = pathlib.Path("</path/to/directory/to/write/parquet/files>")

    # Perform the transformation. If you need to restart the last step of the job then
    # pass the `restart_collect=False` argument.
    pt.collate_tracks_from_binary(
        num_processes,
        source_directory,
        destination_directory,
    )


if __name__ == "__main__":
    main()
```
***

### Using Polars

When using Polars there are a few things to keep in mind:

- Be sure to install the `polars-u64-idx` package not the regular `polars` package. The regular `polars` package uses 32 bit indices internally and that limits it to ~4.3 billion rows which is too few for dealing with Pegasus track data. The `polars-u64-idx` package uses 64 bit indices and as such can support ~18 quintillion rows, more than enough for this application.
- As much as possible you should utilize the 
[lazy API](https://docs.pola.rs/user-guide/lazy/)
(see 
[here](https://medium.com/background-thread/what-is-lazy-evaluation-programming-word-of-the-day-8a6f4410053f) 
or 
[here](https://en.wikipedia.org/wiki/Lazy_evaluation) 
for a general discussion of what lazy evaluation is). 
This enables you to work on datasets that are larger than the memory of the system you're currently using and lazily evaluated operations are potentially faster and more efficient than eagerly evaluated operations. The caveat to this is that Polars uses a streaming engine under the hood to perform lazy operations and at the time of writing (June 2025) they are rewriting their streaming engine. As such not all operations are currently supported in streaming mode. I believe that most operations that will be needed for track data are already supported and you can check what operations are currently supported on their 
[Tracking issue for the new streaming engine](https://github.com/pola-rs/polars/issues/20947).
If there is an operation that you need that is not yet supported I recommend doing as much filtering/selecting/etc as you can lazily to reduce the size of the dataset so that it fits in memory, then doing whatever unsupported operations you need using the in-memory engine.
- There are two functions in Polars for reading parquet files: `pl.read_parquet` and `pl.scan_parquet`. The `read` variant reads the parquet file(s) directly into memory as a Polars DataFrame. The `scan` variant reads the Parquet file(s) as a Polars LazyFrame that you can perform lazily evaluated or streaming operations on. There are also `pl.scan_*` and `pl.read_*` functions for other file formats.
- Similarly to `pl.scan_parquet` and `pl.read_parquet` there are two different functions for writing Parquet files: `pl.LazyFrame.sink_parquet` and `pl.DataFrame.write_parquet`. These operate on LazyFrames and DataFrames respectively. `sink_parquet` also acts as an implicit `collect` operation to execute the lazily evaluated tasks.
- `pl.scan_parquet`, `pl.read_parquet`, and the other `scan`/`read` functions in Polars accept a single file as an argument, an iterable of files, or a directory (in which case they will read all the files in that directory with the proper file type). When reading multiple files this way Polars treats them as a single large DataFrame/LazyFrame.

I refer you to the [Polars User Guide](https://docs.pola.rs) and [Polars Python API Reference](https://docs.pola.rs/api/python/stable/reference/index.html) for details on general usage of Polars and to learn the basics. Below there are a couple of examples of potentially common operations you may need to perform on Pegasus++ Track data.

#### 

#### Examples

##### Compute $\Delta\mu_{abs}$

This is already done when converting from the Pegasus++ output files to the Parquet files so you likely don't need to do this exact operation but it is instructive.

***
```python
import pathlib

import polars as pl

# Load the dataset lazily
source_directory = pathlib.Path("</path/to/directory/with/parquet/files>")
dataset = pl.scan_parquet(source_directory)

# Note that this assumes that the data is already sorted by particle ID and then time
# Compute delta mu
dataset = dataset.with_columns(  # with_columns creates a new column with the name "delta_mu_abs"
    # pl.when(condition) computes the value in `then()` when condition is True and the
    # value in `otherwise()` when condition is False.
    delta_mu_abs=pl.when(pl.col("particle_id") == pl.col("particle_id").shift())
    # `.shift()` shifts the indicated column up by 1. So a.shift()[i] = a[i-1]
    .then((pl.col("mu") - pl.col("mu").shift()).abs())
    .otherwise(None)
)

# Write the dataframe with the new column to a new file. Note that since evaluation is
# lazy no compution is done until we call `sink_parquet`
destination_path = pathlib.Path("</path/to/output/parquet/file>")
dataset.sink_parquet(destination_path)
```
***

##### Get All Particles with $\Delta\mu_{abs}$ Above Threshold 

***
```python
import pathlib

import polars as pl

# Load the dataset lazily
source_directory = pathlib.Path("</path/to/directory/with/parquet/files>")
dataset = pl.scan_parquet(source_directory)

# First we need to figure out which particles have delta mu values above the threshold
threshold = 0.02
selected_particle_ids = (
    # Filter selects the rows that match the condition
    dataset.filter(pl.col("delta_mu_abs") > threshold)
    # Select selects the column(s) that match arguments.
    .select(pl.col("particle_id"))
    # Unique gets the unique elements in the dataframe
    .unique()
).collect(engine="streaming")  # Perform the computation using the streaming engine

# Select returns a Dataframe but `is_in` requires an imploded Series so let's do that
# conversion now
selected_particle_ids = selected_particle_ids.to_series().implode()

# Now we know which particles have delta mu values above the threshold we need to get
# the full time series data for all those particles
selected_particles = dataset.filter(pl.col("particle_id").is_in(selected_particle_ids))

# Write the dataframe with the selected particles
destination_path = pathlib.Path("</path/to/output/parquet/file>")
selected_particles.sink_parquet(destination_path)
```
***

## Reading Trace Files (`.trace_mpiio_optimized` files) 

Reading trace files is done identically to reading binary track files (i.e. `.track_mpiio_optimized` files) with three exceptions:

1. PegasusTools only supports trace files with the newer headers that include column information. If you have an older dataset that does not include those headers you should add the headers manually or write a script to do it.
2. Unlike the track data no transformations are done. The `block_id` is already a global ID since there's only one trace per meshblock and points in space don't have a $\mu$ value to compute.
3. ASCII trace files are not supported.

To convert the binary trace files to parquet files use the same slurm script provided above for binary track files just replace the call to `pt.collate_tracks_from_binary` with `pt.collate_traces`, the rest of the slurm script remains unchanged, including the arguments to the collating function.

## Reading `.hst` Files

`.hst` files can be read using the `load_hst_file` function which will return a Polars dataframe containing the data from the `.hst` file. This function automatically corrects for any overlap due to restarts by only accepting the newest/latest data.

### Example

In [None]:
# The path to the .hst file
hst_path = Path.cwd() / "example_data" / "example.hst"

# Load the data, `hst_data` is a Polars dataframe
hst_data = pt.load_hst_file(hst_path)

# Check the number of rows and column names
print(f"Number of Rows: {len(hst_data)}")
print(f"Column Names: {hst_data.columns}")