# MPI Communication with Kosh Data Management

This example shows ways to read in and operate on datasets that are too large to fit in memory. In this case we will use three processes, but the code would work with any number less than the data size. 

## Create a large dataset

In [None]:
import numpy as np
from mpi4py import MPI

# Initialize MPI for communication between processes
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()

# All ranks need this information
h5_file = "my_data.h5"
dataset_name = "normal"
data = np.empty((0, 5))

# Create a large dataset on rank 0
if rank == 0:
    # Create an empty array with 539786 rows and 5 columns.
    data = np.zeros((53786, 5))

    # Generate random normal values for each column, setting location (mean) to the column index.
    # Column 0 will have close to a mean of zero, column 1 will have close to a mean of 1, etc.
    for i in range(5):
        data[:, i] = np.random.normal(loc=i, scale=1, size=53786)
    print(data.shape);

(53786, 5)

In [None]:
import h5py

if rank == 0:
    # Save to hdf5 file
    with h5py.File(h5_file, "w") as f:
        f.create_dataset(dataset_name, data=data)

# If running in a script you want other ranks to wait for rank 0
comm.Barrier()

We can store and organize all our datasets in a Kosh store and associate the hdf5 file to a dataset. 

In [None]:
import kosh

store_path = "data_slicing.sql"
# Only rank 0 will create the store and dataset
# and remove any previously created datasets
if rank == 0:
    store = kosh.connect(store_path, delete_all_contents=True)
    dset = store.create("example_1")
    dset.associate(h5_file, 'hdf5')
    print(f"Rank {rank} dset {dset}"

# The other ranks need to wait for rank 0
comm.Barrier()

Rank 0 dset KOSH DATASET <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   id: 9c4f3ef134314957af96eb3e428bcd57 <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   name: example_1 <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   creator: Me <br>
        
--- Attributes --- <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   creator: Me <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   name: example_1 <br>
--- Associated Data (1)--- <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;    mime_type: hdf5 <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; /my/directory/my_data.h5 ( 9b31268705bb43b392a8fe641b5dc8fa ) <br>
--- Ensembles (0)--- <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;  [ ] <br>
--- Ensemble Attributes --- <br>
--- Alias Feature Dictionary --- <br>

In [None]:
# After rank 0 is done creating the store and dataset
# the other ranks will connect to the store and find the dataset.
if rank != 0:
    # Other ranks will just read from the store
    store = kosh.connect(store_path, read_only=True)
    generator_obj = store.find(name="example_1")
    dset = next(generator_obj)

# Show one of the other ranks has access to the same dataset
if rank == 1:
    print(f"Rank {rank} dset {dset}")

Rank 1 dset KOSH DATASET <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   id: 9c4f3ef134314957af96eb3e428bcd57 <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   name: example_1 <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   creator: Me <br>
        
--- Attributes --- <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   creator: Me <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   name: example_1 <br>
--- Associated Data (1)--- <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;    mime_type: hdf5 <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; /my/directory/my_data.h5 ( 9b31268705bb43b392a8fe641b5dc8fa ) <br>
--- Ensembles (0)--- <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;  [ ] <br>
--- Ensemble Attributes --- <br>
--- Alias Feature Dictionary --- <br>

We can see that the dataset created by rank 0 is the same one we have access to on rank 1. They have the same dataset ID, and associated data file ID.

HDF5 already allows us to load slices of the data without reading in the entire dataset. 
Kosh's Default HDF5 Loader allows us to do the same thing with Kosh datasets pointing to HDF5 files.

## Using MPI communication 3 ways with Kosh tools
### 1. MPI Function with the default Kosh HDF5 loader
We need to distribute the data across ranks or processors. Each process will run this function and since it is based on rank number, each one will read in a different chunk of data.

In [None]:
def get_slice(rank, nprocs, total_size):

    # Calculate the chunk size for each rank
    chunk_size = total_size // nprocs
    remainder = total_size % nprocs

    # Calculate start and end indices for each rank
    start_index = rank * chunk_size + min(rank, remainder)
    end_index = start_index + chunk_size + (1 if rank < remainder else 0)

    return [start_index, end_index]

Next we want to use MPI communication to calculate statistics of each column of the dataset. We will calculate the minimum, maximum, and mean. 

In [None]:
def get_global_stats(local_data, total_size, comm):

    # Now we can process the local data
    local_min = np.min(local_data, axis=0)
    local_max = np.max(local_data, axis=0)
    local_sum = np.sum(local_data, axis=0)

    # Use MPI comm to find global stats
    global_min = local_min * 0.0
    comm.Allreduce([local_min, MPI.DOUBLE],
                   [global_min, MPI.DOUBLE],
                   op=MPI.MIN)

    global_max = local_max * 0.0
    comm.Allreduce([local_max, MPI.DOUBLE],
                   [global_max, MPI.DOUBLE],
                   op=MPI.MAX)

    global_sum = comm.allreduce(local_sum, op=MPI.SUM)
    global_mean = global_sum / total_size

    return [global_min, global_max, global_mean]

Let's use these functions with our Kosh store and dataset

In [None]:
# Rank 0 will get the total size of the dataset
total_size = 0
if rank == 0:
    total_size = next(dset[dataset_name].describe_entries())["size"][0]
#communicate data size to other ranks
total_size = comm.bcast(total_size, root=0)
print(f"Rank {rank} has total data size: {total_size}");

start_index, end_index = get_slice(rank, nprocs, total_size)

# Read the local portion of the dataset
local_data = dset[dataset_name][slice(start_index, end_index)]

# Each process now has its own portion of the dataset
print(f"Rank {rank} read in data size: {local_data.shape}");

global_min, global_max, global_mean = get_global_stats(local_data,
                                                       total_size,
                                                       comm)

Rank 0 has total data size: 53786 <br>
Rank 0 read in data size: (17929, 5) <br>
Rank 1 has total data size: 53786 <br>
Rank 1 read in data size: (17929, 5) <br>
Rank 2 has total data size: 53786 <br>
Rank 2 read in data size: (17928, 5)

Each process was able to communicate the local statsistics, and process 0 did the final communication to compute the global statistics for each column.

In [None]:
 if rank == 0:
    print("Data stats: min, max, mean\n");
for i in range(len(global_min)):
    if rank == 0:
        print(f"Column {i}: {global_min[i]}, {global_max[i]}, {global_mean[i]}");

Data stats: min, max, mean

Column 0: -3.8022259459206973, 4.299911205123149, -0.0058177973219273324 <br>
Column 1: -3.3927571082494996, 5.634167696696126, 0.9956082957507477 <br>
Column 2: -2.2467429817991267, 6.252497478206577, 1.9941243339832948v <br>
Column 3: -1.1730922554311096, 7.1547535770778525, 2.9961969445688132 <br>
Column 4: 0.018511455979809632, 8.403950647261377, 4.005149971171246

### 2. MPI Function with a custom Kosh loader

Kosh offers loaders for these types of data files.

    HDF5
    
    json
    
    numpy
    
    pandas
    
    pgm
    
    pil
    
    sidre

Only the default HDF5 and numpy text loaders have the ability to enable distributed data loading. 

However, users may create custom loaders with \_\_getitem\_\_ so we can slice a large dataset and load different chunks on each processor. The numpy text loader below is a good example of how to use \_\_getitem\_\_ in a custom loader. 

In [None]:
class NumpyTxtLoader(kosh.KoshLoader):
    types = {"numpy/txt": ["numpy", ]}

    def _setup_via_metadata(self):
        # use metadata to identify
        self.skiprows = getattr(self.obj, "skiprows", 0)
        self.features_at_line = getattr(self.obj, "features_line", None)
        self.features_separator = getattr(self.obj, "features_separator", None)
        self.columns_width = getattr(self.obj, "columns_width", None)

    def list_features(self, *args, **kargs):
        self._setup_via_metadata()
        if self.features_at_line is None:
            return ["features", ]
        else:
            with open(self.obj.uri) as f:
                line = -1
                while line < self.features_at_line:
                    st = f.readline()
                    line += 1
                if self.columns_width is not None:
                    features = [st[i:i + self.columns_width].strip()
                                for i in range(0, len(st), self.columns_width)]
                else:
                    while st[0] == "#":
                        st = st[1:]
                    st = st.strip()
                    features = st.split(self.features_separator)
            return features

    def __getitem__(self, key):
        self._setup_via_metadata()
        original_key = key
        if isinstance(key, tuple):
            # double subset
            key, key2 = key[:2]  # ignore if more is sent
        else:
            key2 = None
        if isinstance(key, int):
            key = slice(key, key + 1)

        if isinstance(key, slice):
            start = key.start
            stop = key.stop
            step = key.step
            if step is None:
                step = 1
            if (start is not None and start < 0) or \
                    (stop is not None and stop < 0):
                # Doh we need to count lines
                nlines = number_of_lines(self.obj.uri)
                if start is not None and start < 0:
                    start = nlines + start
                if stop is not None and stop < 0:
                    stop = nlines + stop
            # ok if it's neg step we need to flip these two
            # it has to do with numpy loader starting at a row for n row
            # not reading a range
            if step < 0:
                start, stop = stop, start
                if stop is not None:
                    stop += 1  # because slice if exclusive on the end
            if start is None:
                start = self.skiprows
            else:
                start += self.skiprows
            if stop is not None:
                max_rows = stop - start + self.skiprows
            else:
                max_rows = None

            if max_rows is None or max_rows > 0:
                # , usecols=numpy.arange(key2.start, key2.stop, key2.step))
                data = numpy.loadtxt(
                    self.obj.uri, skiprows=start, max_rows=max_rows)
            else:
                # , usecols=numpy.arange(key2.start, key2.stop, key2.step))
                data = numpy.loadtxt(
                    self.obj.uri,
                    skiprows=start,
                    max_rows=2)[
                    0:2:-1]
            if data.ndim > 1:
                if key2 is not None:
                    data = data[::step, key2]
                elif step != 1:  # useless if step is 1
                    data = data[::step]
            else:
                if key.step is not None and key.step != 1:
                    data = data[::step]
                else:
                    if key2 is not None:
                        data = data[key2]
                    else:
                        data = data
        else:
            raise KeyError("Invalid key value: {}".format(original_key))
        if self.features_at_line is None:
            return data
        else:
            feature_index = self.list_features().index(self.feature)
            return data[:, feature_index]

Let's try using this loader with the getitem functionality.

In [None]:
# Create txt file using the same dataset as in the previous example
txt_name = 'array.out'
if rank == 0:
    txt_data = np.savetxt(txt_name, data)

# We will use the same store as before, and add another Kosh dataset.
# We can save the data size information for later use.
data_shape = comm.bcast(data.shape, root=0)
# Only rank 0 will create the dataset
if rank == 0:
    metadata = {'size': data_shape[0]}
    dset2  = store.create("array", metadata=metadata, features_separator=',')
    # Associate the files to the Kosh dataset
    dset2.associate(txt_name, mime_type='numpy/txt')

    # Get the total size of the dataset
    total_size = getattr(dset2, "size")
    print(f"Total data size: {total_size}")

# Other ranks will wait for 0 to finish work
comm.Barrier()

Total data size: 53786

In [None]:
# The other ranks will search for the dataset based on the name
if rank != 0:
    generator_obj = store.find(name="array")
    dset2 = next(generator_obj)
    # Show rank 2 has dset2
    if rank == 2:
        print(f"Rank {rank} dset {dset}")

Rank 2 dset KOSH DATASET <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   id: 34da205abf4f4439b62e60528e384efc <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   name: array <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   creator: Me <br>
        
--- Attributes --- <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   creator: Me <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   name: array <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   size: 53786 <br>
--- Associated Data (1)--- <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;    mime_type: numpy/txt <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; /my/directory/array.out ( 34f9c5fe8bc74ac5b06370cb1fe1c515 ) <br>
--- Ensembles (0)--- <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;  [ ] <br>
--- Ensemble Attributes --- <br>
--- Alias Feature Dictionary --- <br>

In [None]:
start_index, end_index = get_slice(rank, nprocs, total_size)

# Read the local portion of the dataset
local_data = dset2["features"][slice(start_index, end_index)]

# Each process now has its own portion of the dataset
print(f"Rank {rank} has data size: {local_data.shape}");

global_min, global_max, global_mean = get_global_stats(local_data,
                                                       total_size,
                                                       comm)

# Each process was able to communicate the local statsistics, and process 0 did
# the final communication to compute the global statistics for each column.

print("Data stats: min, max, mean\n");
for i in range(len(global_min)):
    print(f"Column {i}: {global_min[i]}, {global_max[i]}, {global_mean[i]}");

Rank 0 has data size: (17929, 5) <br>
Rank 1 has data size: (17929, 5) <br>
Rank 2 has data size: (17928, 5) <br>
Data stats: min, max, mean

Column 0: -3.8022259459206973, 4.299911205123149, -0.0058177973219273324 <br>
Column 1: -3.3927571082494996, 5.634167696696126, 0.9956082957507477 <br>
Column 2: -2.2467429817991267, 6.252497478206577, 1.9941243339832948v <br>
Column 3: -1.1730922554311096, 7.1547535770778525, 2.9961969445688132 <br>
Column 4: 0.018511455979809632, 8.403950647261377, 4.005149971171246

We are able to distribute the data array across the processes with the getitem function in the numpy txt loader, and come to the same conclusion as in the hdf5 case.

### 3. Using a Kosh operator for MPI functions with a parallel enabled loader
Let's continue our example with an HDF5 data file but this time the MPI functions will take place in a Kosh operator, and using the global min and max we can return normalized dataset chunks to each process.

We will create a few arrays with varying row sizes, but all with 5 columns. We only create the data with rank 0. 

In [None]:
if rank == 0:
    for n in range(3):
        size = (n + 1) * 60
        # Create an empty array with 5386 rows and 5 columns
        data = np.zeros((size, 5))

        # Generate random normal values for each column, setting location (mean) to the column index
        # Column 0 will have close to a mean of zero, column 1 will have close to a mean of 1, etc.
        for i in range(5):
            data[:, i] = np.random.normal(loc=i, scale=1, size=size)

        # Save to hdf5 file
        h5_file = f"my_data{n}.h5"
        dataset_name = "normal"
        with h5py.File(h5_file, "w") as f:
            f.create_dataset(dataset_name, data=data)

We will use the same store as before, and add another Kosh dataset.

In [None]:
# Rank 0 will create the datasets and associate the files
if rank == 0:
    dset3  = store.create()
    dset4 = store.create()
    dset5 = store.create()

    # Associate the files to the Kosh dataset
    dset3.associate("my_data0.h5", 'hdf5')
    dset4.associate("my_data1.h5", 'hdf5')
    dset5.associate("my_data2.h5", 'hdf5')
    
# The other ranks need to wait for rank 0
comm.Barrier()

In [None]:
# The other ranks need to find the datasets
if rank != 0:
    generator_obj1 = store.find(name="data0")
    dset3 = next(generator_obj1)

    generator_obj2 = store.find(name="data1")
    dset4 = next(generator_obj2)

    generator_obj3 = store.find(name="data2")
    dset5 = next(generator_obj3)

We need a function to assign data to each processor from multiple files.

In [None]:
def distribute_data(sizes, nprocs, rank):
    breakpoint()
    # Calculate the start and end indices for each process
    total_size = sum(sizes)
    start_idx = rank * total_size // nprocs
    end_idx = (rank + 1) * total_size // nprocs

    # Create a list to hold the local data sizes and corresponding file indices
    local_data_info = []
    current_size = 0

    for i, size in enumerate(sizes):
        if current_size + size > start_idx and current_size < end_idx:
            # Calculate the number of rows to read for this dataset
            if current_size < start_idx:
                # Calculate the starting row for this dataset
                start_row = start_idx - current_size
            else:
                start_row = 0
            
            if current_size + size > end_idx:
                # Calculate the number of rows to read
                end_row = end_idx - current_size
            else:
                end_row = size
            
            local_data_info.append((i, start_row, end_row))  # Store the index and row range
        current_size += size

    return local_data_info

Now we can create a custom Kosh operator that can distribute data evenly across the processors, calculate the global min and max, and then return normalized data chunks.

In [None]:
class MPINormalize(kosh.KoshOperator):

    types = {"numpy": ["numpy", ]}

    def __init__(self, *args, **options):
        super(MPINormalize, self).__init__(*args, **options)
        self.options = options

        # Initialize MPI
        self.comm = MPI.COMM_WORLD
        self.rank = comm.Get_rank()
        self.nprocs = comm.Get_size()

    def operate(self, *inputs, **kargs):

        # Get the sizes of each kosh dataset
        input_sizes = []
        total_size = 0
        if rank == 0:
            desc = list(self.describe_entries())
            for i in range(len(inputs)):
                input_sizes.append(desc[i]["size"][0])
            total_size = sum(input_sizes)
        input_sizes = comm.bcast(input_sizes, root=0)
        total_size = comm.bcast(total_size, root=0)

        local_data_info = distribute_data(input_sizes, nprocs, rank)

        # Each process can now read its assigned dataset(s)
        local_data = np.empty((0, 5), dtype=float)
        for index, start, stop in local_data_info:
            data = inputs[index][slice(start, stop)]
            local_data = np.concatenate((local_data, data), axis=0)

        # With MPI we calculate statistics for each column in the dataset
        global_min, global_max, _ = get_global_stats(local_data,
                                                     total_size,
                                                     comm)

        # Using the min and max we normalize each column of data
        for f in range(len(global_min)):
            local_data[:, f] = (local_data[:, f] - global_min[f]) / \
                (global_max[f] - global_min[f])

        return local_data

In [None]:
scaled_local = MPINormalize(dset3['normal'], dset4['normal'], dset5['normal'])[:]

print(f"Rank {rank} local data: \n shape {scaled_local.shape} \n min {scaled_local.min(axis=0)} \n \
        max {scaled_local.max(axis=0)} \n mean {scaled_local.mean(axis=0)}");
  

Rank 0 local data: <br> 
shape (120, 5) <br> 
min  [0.        , 0.21420619, 0.15787334, 0.        , 0.        ] <br> 
max  [1.        , 0.87725257, 0.93489143, 0.99439837, 0.80387367] <br> 
mean [0.49013495, 0.58344045, 0.47229402, 0.58820366, 0.42868854] <br> 

Rank 1 local data: <br>
shape (120, 5) <br>
min  [0.08429409 0.18198548 0.         0.25029003 0.05370255] <br> 
max  [0.88083102 0.94954022 1.         1.         0.85265535] <br> 
mean [0.49276804 0.57804458 0.52172094 0.60238864 0.43822516] <br> 

Rank 2 local data: <br>
shape (120, 5) <br>
min  [0.03761877 0.         0.07890669 0.17243195 0.11313334] <br> 
max  [0.97382317 1.         0.99053639 0.99903149 1.        ] <br> 
mean [0.50205695 0.58495661 0.51220394 0.56918653 0.45209219] <br> 


