In [4]:
import os,sys

from PIL import Image
from urllib.request import urlopen
import matplotlib.pyplot as plt
import shutil
import numpy as np

# //////////////////////////////////////////////////////
def ShowImage(data):
	data=np.flip(data,axis=0)
	print("shape",data.shape,"dtype",str(data.dtype))
	fig = plt.figure()
	ax = fig.add_subplot(1,1,1)
	ax.imshow(data, origin='lower')
	plt.show()

sys.path.append("C:/projects/OpenVisus/build/RelWithDebInfo")
import OpenVisus as ov
print("OpenVisus imported")

# Load original data from *.nc

In [46]:
%%time
import xarray as xr

# Variable Name -> List of elevations
variables = {
  'precipitation': [0],  
  'pressure': [0, 100, 200, 500],
  'temperature': [2, 30, 40, 60, 80, 100, 200, 300, 500, 1000],
  'virtual_potential_temperature': [2, 30, 40, 60, 80, 100, 200, 300, 500, 1000],
  'winddirection': [10, 30, 32, 40, 44, 47, 54, 57, 60, 80, 100, 120, 140, 160, 180, 200, 250, 300, 500, 1000],
  'windspeed': [10, 30, 32, 40, 44, 47, 54, 57, 60, 80, 100, 120, 140, 160, 180, 200, 250, 300, 500, 1000]
}

# Create fields for each variable and elevation
fields = [
  ov.Field('latitude',  'float32'), 
  ov.Field('longitude', 'float32'),
]

# Go through each variable and elevation combination
for variable in variables:
  for elevation in variables[variable]:
    fields.append(ov.Field(f'{variable}_{elevation}m', "int16"))
    
nc_file = xr.open_dataset(
  './cstm_d01_2016-05-01_00_00_00.nc', 

  # important because the original data is int16 (otherwise xarray cast )
  # see https://docs.xarray.dev/en/stable/generated/xarray.open_dataset.html
  mask_and_scale=False
  )
data={}
shape=None
for field in fields:
  # print(field.name, data[field.name].shape,data[field.name].dtype)
  # use isel(time=0) since each field is 3d but only has a depth of 1
  data[field.name] = nc_file[field.name].isel(time=0).values
  assert(shape is None or shape==data[field.name].shape)
  shape=data[field.name].shape

CPU times: total: 203 ms
Wall time: 262 ms


In [None]:
# check if compressing using pure python gives back the same compression ratio
np.savez_compressed("check-compression-size.npz",data,compression="gzip",compression_opts=9)

# Create IDX

In [47]:
%%time

import struct,io,time
from io import BytesIO

# since I am writing/reading in multiple threads, I will not have collisions and I will not need file locks
os.environ["VISUS_DISABLE_WRITE_LOCK"]="1"

# /////////////////////////////////////////////////////////////
def CreateRamAccess(db):
    ret=db.createAccess(ov.StringTree.fromString("<access type='RamAccess' chmod='rw' available='0' compression='raw'/>"))
    ret.disableWriteLocks()
    assert(ret.bDisableWriteLocks==True)
    assert(ret.compression=="")
    assert(str(ret.getAccessTypeName())=="class Visus::RamAccess")
    return ret

dims=list(reversed(shape))

# using a temporary filename , so I am sure I can remove the directory
idx_filename='visus-remove-me/visus.idx'
assert('visus-remove-me' in idx_filename)
shutil.rmtree(os.path.dirname(idx_filename), ignore_errors=True)

db=ov.CreateIdx(
    url=idx_filename, 
    dims=dims, 
    fields=fields,
    # writing at the beginning should be uncompressed 
    compression="raw"
    )

# make sure the file has been created
assert(os.path.isfile(idx_filename))

blocksperfile=db.idxfile.blocksperfile
total_blocks=db.getTotalNumberOfBlocks()

nfields=len(fields)
pdim=db.getPointDim()
num_files=total_blocks//blocksperfile
file_header_size=10*4
block_header_size=10*4
header0=np.zeros(shape=[file_header_size+nfields*blocksperfile*block_header_size,],dtype=np.uint8)
assert(num_files*blocksperfile==total_blocks)

print(f"header0       ={header0.nbytes}")
print(f"total_blocks  ={total_blocks}")
print(f"pdim          ={pdim}")
print(f"num_files     ={num_files}")
print(f"dims          ={dims}")
print(f"nfields       ={nfields}")
print(f"blocksperfile ={blocksperfile}")
print(f"header0       ={header0.nbytes}")
print("")

print(db.getDatasetBody().toString())

# db.write(data,field=db.getField("temperature"))
# db.write(data,field=db.getField("pressure"))

header0       =343080
total_blocks  =128
pdim          =2
num_files     =1
dims          =[2049, 1749]
nfields       =67
blocksperfile =128
header0       =343080

<dataset url="visus-remove-me/visus.idx" cache_dir="" typename="IdxDataset">
	<idxfile>
		<version value="6" />
		<bitmask value="V01010101010101010101010" />
		<box value="0 2049 0 1749" />
		<bitsperblock value="16" />
		<blocksperfile value="128" />
		<block_interleaving value="0" />
		<filename_template value="./visus/%04x.bin" />
		<missing_blocks value="False" />
		<arco value="0" />
		<time_template value="" />
		<axis value="" />
		<field name="latitude" description="" index="" default_compression="" default_layout="" default_value="0" filter="" dtype="float32" />
		<field name="longitude" description="" index="" default_compression="" default_layout="" default_value="0" filter="" dtype="float32" />
		<field name="precipitation_0m" description="" index="" default_compression="" default_layout="" default_value="0" filt

# Write data in memory
 - uncompressed
 - not using any disk IO

In [48]:
%%time

# by specifying an `access` I tell OpenVisus to write in memory and not on disk
ram_access=CreateRamAccess(db)

# for each timestep
for timestep in db.getTimesteps():
  
  # for each field
  for F,field in enumerate(db.getFields()):

    # here I am writing the HDF5 (or numpy data, or whatever)
    # this is called BoxQuery (equivalent to writing a Region of Interest)
    print(f"Writing timestep={timestep} field={field} F={F}...",end="")
    logic_box=db.getLogicBox()
    query = db.createBoxQuery(ov.BoxNi(ov.PointNi(logic_box[0]),ov.PointNi(logic_box[1])), db.getField(field), timestep, ord('w'))
    db.beginBoxQuery(query)
    assert(query.isRunning())
    query.buffer = ov.Array.fromNumPy(data[field],TargetDim=pdim, bShareMem=True)
    assert(db.executeBoxQuery(ram_access, query))
    print("done")

# Here I am reding blocks (all in memory) and converting them into numpy arrays
# using a dict to make block looking simplier
blocks={}
for timestep in db.getTimesteps():
  for field in db.getFields():
    for blockid in range(total_blocks):
      read_block = db.createBlockQuery(blockid, db.getField(field), timestep, ord('r'))
      if db.executeBlockQueryAndWait(ram_access, read_block): 
        # scrgiorgio: if I share the memory here I have a problem... why????
        blocks[(timestep, field, blockid)]=ov.Array.toNumPy(read_block.buffer, bShareMem=False) 

Writing timestep=0 field=latitude F=0...done
Writing timestep=0 field=longitude F=1...done
Writing timestep=0 field=precipitation_0m F=2...done
Writing timestep=0 field=pressure_0m F=3...done
Writing timestep=0 field=pressure_100m F=4...done
Writing timestep=0 field=pressure_200m F=5...done
Writing timestep=0 field=pressure_500m F=6...done
Writing timestep=0 field=temperature_2m F=7...done
Writing timestep=0 field=temperature_30m F=8...done
Writing timestep=0 field=temperature_40m F=9...done
Writing timestep=0 field=temperature_60m F=10...done
Writing timestep=0 field=temperature_80m F=11...done
Writing timestep=0 field=temperature_100m F=12...done
Writing timestep=0 field=temperature_200m F=13...done
Writing timestep=0 field=temperature_300m F=14...done
Writing timestep=0 field=temperature_500m F=15...done
Writing timestep=0 field=temperature_1000m F=16...done
Writing timestep=0 field=virtual_potential_temperature_2m F=17...done
Writing timestep=0 field=virtual_potential_temperature_3

In [49]:
%%time

# verify
if True:
  for field in db.getFields():
    check=db.read(time=timestep,field=db.getField(field), access=ram_access)
    assert(np.array_equal(data[field], check))
    # ShowImage(check)
  print("Verification ok")

Verification ok
CPU times: total: 641 ms
Wall time: 941 ms


# Write blocks to disk using Python

In [51]:
%%time

import zlib

NoCompression   = 0x00
ZipMask         = 0x03
ZfpCompression  = 0x08
CompressionMask = 0x0f

RowMajor        = 0x10

# use empty string or `zip` here
compression="zip"

# TODO (!)
assert(compression!="zfp") 

# at the beginning I am using a disk access to generate filename, later on to verify the data
disk_access=db.createAccess()

#using bytesIO does not help at all (scrgiorgio: at least from today tests 20240527)
use_bytes_io=False

# block statistics
statistics={}

# for each timestep
for timestep in db.getTimesteps():

	# for each block inside file
	for fileid, BLOCKID in enumerate(range(0,total_blocks,blocksperfile)):

		# create the file
		filename=disk_access.getFilename(db.getField(), timestep, BLOCKID)
		
		os.makedirs(os.path.dirname(filename),exist_ok =True)
		if os.path.isfile(filename):
			os.remove(filename)
		
		# using BytesIO does not seem to help at all
		with (BytesIO() if use_bytes_io else open(filename,"wb")) as stream:

			print(f"Writing filename={filename}...")

			stream.write(header0.tobytes())
			for field_index,field in enumerate(db.getFields()):
				for B,blockid in enumerate(range(BLOCKID,BLOCKID+blocksperfile)):

					key=(timestep,field,blockid)
					block=blocks.get(key,None)
					if block is None:  continue

					# i do not want to write all zero blocks
					all_zeros = not np.any(block)
					if all_zeros:
						continue

					flags=RowMajor

					# no need to compress
					if compression in ["","raw"]:
						block=block.tobytes()
						flags|=0

					# need to compress in zip
					elif compression=="zip":
						block=zlib.compress(block,level=9)
						flags|=ZipMask

					# write the binary data
					stream.seek(0, io.SEEK_END)
					offset=stream.tell()
					size=len(block)
					stream.write(block)

					# write the file header
					stream.seek(file_header_size+block_header_size*(field_index*blocksperfile+B), io.SEEK_SET)
					stream.write(struct.pack('>IIIIIIIIII', 0,0, offset & 0xffffffff, offset>>32, size, flags,0,0,0,0))

					# print(f"Wrote block timestep={timestep} field={field} field_index={field_index} blockid={blockid} B={B} offset={offset} size={size} flags={flags}")
					statistics[key]=(offset,size)

			# need to do the real IO 
			if use_bytes_io:
				with open(filename, "wb") as f:
					f.write(stream.getbuffer())	

			print(f"Writing filename={filename} DONE")

Writing filename=c:/projects/openvisuspy/notebooks/visus-remove-me/visus/0000.bin...
Writing filename=c:/projects/openvisuspy/notebooks/visus-remove-me/visus/0000.bin DONE
CPU times: total: 18.8 s
Wall time: 31 s


In [52]:
if True:
	for field in db.getFields():
		check=db.read(field=field, access=disk_access)
		assert(np.array_equal(data[field], check))
		# ShowImage(check)
	print("Verification ok")

Verification ok


# Plot block statistics

In [55]:
import plotly.express as px
df = px.data.tips()

size_int16=2
uncompressed_single_timestep_size=np.product(shape)*size_int16*len(db.getFields())
uncompressed_block_size=(1<<db.getDefaultBitsPerBlock())*size_int16
print(f"uncompressed_single_timestep_size        ={uncompressed_single_timestep_size:,}")
print(f"uncompressed_block_size                  ={uncompressed_block_size:,}")

v=[]
tot=0
for key,(offset,size) in statistics.items():
  tot+=size
  v.append(100*size/uncompressed_block_size)

print(f"% ratio ={100*tot/uncompressed_single_timestep_size}")

fig = px.histogram(v)
fig.show()

uncompressed_single_timestep_size        =480,215,934
uncompressed_block_size                  =131,072
% ratio =88.53967723611603


In [57]:
print(f"{480215934*0.88:,}")

422,590,021.92


In [62]:
# install --quiet h5glance
#from h5glance import H5Glance
#H5Glance('./cstm_d01_2016-05-01_00_00_00.nc')

# ZFP

In [64]:
# if you have only one writer
os.environ["VISUS_DISABLE_WRITE_LOCK"]="1"

data=np.load("./recon_combined_1_fullres.npy")
depth,height,width=data.shape
print(f"np.load done dtype={data.dtype} shape={data.shape} c_size={width*height*depth*4:,}")

idx_filename="remove-me/zfp/visus.idx"
shutil.rmtree(os.path.dirname(idx_filename), ignore_errors=True)
fields=[ov.Field("data",str(data.dtype),"row_major")]
db=ov.CreateIdx(url=idx_filename,dims=[width,height,depth],fields=fields,compression="raw")

np.load done dtype=float32 shape=(330, 402, 1193) c_size=633,053,520


In [65]:
db.write(data)
print(f"db.write (uncompressed) done")

db.write (uncompressed) done


In [66]:
# encoding-number-of-bits and decoding-number-of-bits
#   it will be written in the IDX file and used as the field.default_compression
#   this is needed since the IDX block header does not support/store number-of-bitblanes
t1 = time.time()
db.compressDataset(["zfp-precision=8-precision=8"]) 
print(f"db.compressDataset done in {time.time()-t1} seconds")

db.compressDataset done in 4.908674001693726 seconds
