In [51]:
%%script false --no-raise-error
%%sh
export ZARR_V3_EXPERIMENTAL_API=1

In [52]:
%%script false --no-raise-error
import dev_logging as d_log
import importlib
importlib.reload(d_log)
import multiprocessing
import time

proc = multiprocessing.Process(target=d_log.run_logging, args=('text.txt',))

proc.start()
print(f"pid: {proc.pid}")

for i in range(10):
    #print(i)
    time.sleep(1)
    print(proc.is_alive())

proc.kill()
proc.join()
proc.close()


In [53]:
%%script false --no-raise-error
import dev_logging as d_log
import importlib
importlib.reload(d_log)

d_log.run_logging("test.txt")


In [54]:
%%script false --no-raise-error
%%time
import ctypes
import time

test_plot = []

lib = ctypes.CDLL("/home/test/dkrz_dev/testlib1.so")


for i in range (10000):
    
    start_time = time.time()
    lib.test_open()
    test_plot.append(time.time() - start_time)

In [55]:
%%script false --no-raise-error
%%time
import ctypes
import time

test_plot_2 = []

lib = ctypes.CDLL("/home/test/dkrz_dev/testlib2.so")


for i in range (10000):
    
    start_time = time.time()
    lib.test_open_netcdf()
    test_plot_2.append(time.time() - start_time)

In [None]:
import mpi4py as MPI
import netCDF4
import numpy as np

var_levels = np.random.rand(1_000_000)
var_rooms = np.random.rand(1_000_000)

rootgrp = netCDF4.Dataset("data/test_dataset.nc", mode="w", format="NETCDF4", parallel=True)
xgrp = rootgrp.createGroup("x")
level = rootgrp.createDimension("level", None)
room = rootgrp.createDimension("room", None)

levels = rootgrp.createVariable("levels", "float64")
rooms = rootgrp.createVariable("rooms", "float64")

levels = var_levels
rooms = var_rooms

rootgrp.close()

In [3]:
%%time
%%writefile func/dataset_creation.py
# Create Testing Dataset, Change Parameters to vary size 

def create_dataset():
    import mpi4py as MPI
    import netCDF4
    import numpy
    
    comm = MPI.COMM_WORLD
    print(f"Hello World from rank {comm.Get_rank()}. total ranks={comm.Get_size()}")

    var_levels = np.random.rand(1_000_000)

    rootgrp = netCDF4.Dataset("data/test_dataset.nc", mode="w", format="NETCDF4", comm=MPI_COMM_WORLD, parallel=True)
    xgrp = rootgrp.createGroup("x")
    level = rootgrp.createDimension("level", None)
    room = rootgrp.createDimension("room", None)

    levels = rootgrp.createVariable("levels")
    rooms = rootgrp.createVariable("rooms")

    levels = var_levels
    rooms = var_rooms

    rootgrp.close()
    
    return 0

import ipyparallel as ipp

with ipp.Cluster(n=1) as rc:
    view = rc.load_balanced_view()
    asyncresults = view.map_async(create_dataset)
    asyncresults.wait_interactive()
    results = asyncresults.get()


Overwriting func/dataset_creation.py
CPU times: user 1.18 ms, sys: 306 μs, total: 1.49 ms
Wall time: 1.14 ms


In [57]:
#dataset creation for plotting

import zarr
import netCDF4
import h5py

In [58]:
%%time
#%%writefile func/open_zarr_benchmark.py
def open_zarr(iterations):
    #Open benchmark zarr
    import zarr
    import time
    import multiprocessing as mp
    
    hold = []

    for i in range(iterations):
        #print(f"Thread with id: {mp.current_process().name} in iteration: {i}")
        
        start_time = time.monotonic()
        ds_zarr = zarr.open(store="data/test_dataset.zarr",mode="r+" ,zarr_version=2)
        hold.append(time.monotonic() - start_time)
        
    return hold
        

import multiprocessing as mp
import os

zarr_op_time = []
tmp = []
iterations_per_process = 10_000
avail_cpu = 4#mp.cpu_count() - 2

for i in range(avail_cpu):
    tmp.append(iterations_per_process)

pool = mp.Pool(avail_cpu)

zarr_op_time.append(pool.map_async(open_zarr, tmp).get())

pool.close()
pool.join()

CPU times: user 0 ns, sys: 89.5 ms, total: 89.5 ms
Wall time: 396 ms


In [59]:
%%time
#%%writefile func/read_zarr_benchmark.py

#benchmark zarr parallel with sequential reads

def read_zarr(args):
    import time
    import numpy as np
    import zarr
    
    iterations = args[0]
    region = args[1]
    hold = []

    ds_zarr = zarr.open(store="data/test_dataset.zarr",mode="r+" ,zarr_version=2)

    for i in range(iterations):
        #print(f"Currently in i: {i}")
        tmp=0
        
        start_time = time.monotonic()
        tmp = ds_zarr["x"][region + i]
        hold.append(time.monotonic() - start_time) 
        
        #print(f"Current val read at {val_picked}: {tmp}")
        
    return hold
    
import multiprocessing as mp
import numpy as np
import os

zarr_read_time = []
tmp = []
iterations_per_process = 1_000
avail_cpu = 4#mp.cpu_count() - 2

for i in range(avail_cpu):
    tmp.append([iterations_per_process, iterations_per_process * i])

pool = mp.Pool(avail_cpu)

zarr_read_time.append(pool.map_async(read_zarr, tmp).get())

pool.close()
pool.join()

CPU times: user 0 ns, sys: 24.8 ms, total: 24.8 ms
Wall time: 13.8 s


In [60]:
%%time
#%%writefile func/write_zarr_benchmark.py

#Write benchmark zarr   serial

def write_zarr(args):
    import time
    import numpy as np
    import zarr
    
    iterations = args[0]
    region = args[1]
    hold = []

    ds_zarr = zarr.open(store="data/test_dataset.zarr",mode="r+" ,zarr_version=2)

    for i in range(iterations):
        #print(f"Currently in i: {i}")
        tmp=0
        prev = ds_zarr["x"][region + i]
        curr = ds_zarr["x"][region + i]
        fill = np.random.rand(10, 10, 1)

        #print(f"prev item at {val_picked} is {type(prev)} and curr item at {val_picked} is {type(curr)}, both are the same : {(prev == curr).all()}")
        
        start_time = time.monotonic()
        ds_zarr["x"][region + i] = fill
        hold.append(time.monotonic() - start_time) 
        
        curr = ds_zarr["x"][region + i]
        #print(f"Current val read at {val_picked}: {(prev == curr).all()}")

    return hold
        
import multiprocessing as mp
import numpy as np
import os

zarr_write_time = []
tmp = []
iterations_per_process = 1_000
avail_cpu = 4#mp.cpu_count() - 2

for i in range(avail_cpu):
    tmp.append([iterations_per_process, iterations_per_process * i])

pool = mp.Pool(avail_cpu)

zarr_write_time.append(pool.map_async(read_zarr, tmp).get())

pool.close()
pool.join()

CPU times: user 0 ns, sys: 24 ms, total: 24 ms
Wall time: 11.4 s


In [61]:
%%script false --no-raise-error
import concurrent

with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
    zarr_op_time = {executor.submit(open_zarr): i for i in range(0, 10)}
    
with concurrent.futures.ProcessPoolExecutor(max_workers=12) as executor:
    zarr_op_time = {executor.submit(open_zarr): i for i in range(0, 10)}

In [None]:
%%time
#%%writefile func/open_netcdf4_benchmark.py
#Open Benchmark netcdf4 

def open_netcdf4(iterations):
    import time
    import netCDF4

    for i in range(iterations):
        print(f"Thread with id: {mp.current_process().name} in iteration: {i}")
        
        start_time = time.monotonic()
        ds_netcdf4 = netCDF4.Dataset("data/test_dataset.nc", mode="r+", format="NETCDF4")
        netcdf4_op_time.append(time.monotonic() - start_time)

import multiprocessing as mp
import ipyparallel as ipp
import os



zarr_op_time = []
tmp = []
iterations_per_process = 10_000
avail_cpu = 4#mp.cpu_count() - 2

for i in range(avail_cpu):
    tmp.append(iterations_per_process)

pool = mp.Pool(avail_cpu)

zarr_op_time.append(pool.map_async(open_netcdf4, tmp).get())

pool.close()
pool.join()

In [63]:
import pandas as pd
import numpy as np

df_op = pd.DataFrame()
df_op.insert(0,"zarr_op_time", zarr_op_time[0][0])

df_read = pd.DataFrame()
df_read.insert(0,"zarr_read_time", zarr_read_time[0][0])

df_write = pd.DataFrame()
df_write.insert(0,"zarr_write_time", zarr_write_time[0][0])


In [64]:
#%%script false --no-raise-error
df_op.to_json("data/plotting/parallel/plotting_df_op_para_seq-r&w.json")
df_read.to_json("data/plotting/parallel/plotting_df_read_para_seq-r&w.json")
df_write.to_json("data/plotting/parallel/plotting_df_write_para_seq-r&w.json")

In [67]:
import pandas as pd
import numpy as np

df_op = pd.read_json("data/plotting/parallel/plotting_df_op_para_seq-r&w.json")
df_read = pd.read_json("data/plotting/parallel/plotting_df_read_para_seq-r&w.json")
df_write = pd.read_json("data/plotting/parallel/plotting_df_write_para_seq-r&w.json")

def filter_outliers(df) -> pd.DataFrame:
    cols = df.select_dtypes('number').columns
    df_sub = df.loc[:, cols]
    
    iqr = df_sub.quantile(0.75, numeric_only=False) - df_sub.quantile(0.25, numeric_only=False)
    lim = np.abs((df_sub - df_sub.median()) / iqr) < 2.22
    df.loc[:, cols] = df_sub.where(lim, np.nan)
    df.dropna(inplace=True)
    
    return df
    

if True:
    
    df_op = filter_outliers(df_op)
    df_read = filter_outliers(df_read)
    df_write = filter_outliers(df_write)
    #df_lol = filter_outliers(df_lol)
    
    


In [68]:
import plotly.express as px

fig = px.box(data_frame=df_op, log_y=True)
fig.show()

fig = px.histogram(data_frame=df_op, log_y=True, marginal="violin", barmode="group")
fig.show()

fig = px.box(data_frame=df_read, log_y=True)
fig.show()

fig = px.histogram(data_frame=df_read, log_y=True, marginal="violin", barmode="group")
fig.show()

fig = px.box(data_frame=df_write, log_y=True)
fig.show()

fig = px.histogram(data_frame=df_write, log_y=True, marginal="violin", barmode="group")
fig.show()