In [None]:
from string import Template
import cv2
from imagecodecs import imread
from numcodecs import blosc, Blosc, Zstd # type: ignore
import concurrent.futures
import zarr
from time import perf_counter
import glymur
import matplotlib.pyplot as plt
from string import Template
import os
from imagecodecs import imread
from numcodecs import blosc, Blosc, Zstd, Delta # type: ignore
import numpy as np
import webp
from distributed import Client, LocalCluster
#from joblib import Parallel, delayed
from tqdm import tqdm, trange
from PIL import Image
import shutil
from dask import delayed
import dask.array as da
import zipfile
from dask import delayed, compute


#All the constants
patch_size = 16384*2

blosc.set_nthreads(8)  
blosc.use_threads=True
glymur.set_option('lib.num_threads', 2)

#Initialising local dask cluster
cluster = LocalCluster(dashboard_address=':8877', n_workers=16, processes=True, memory_limit='64GB')
client = Client(cluster)
client

In [3]:
#Getting image info
import glymur
img_path = '/lustre/data/store10PB/repos1/iitlab/humanbrain/analytics/159/NISL/B_159_HB1RL[M]-SL_332-ST_NISL-SE_994_compressed.jp2'
img = glymur.Jp2k(f'{img_path}')
#img = glymur.Jp2k(f'/nvme_storage/image.jp2')
img.shape

In [None]:
#Initialising zarr array
arr_path = '/mnt/local/nvme/zarr_tiles'
store = zarr.DirectoryStore(arr_path)
img_zarr = zarr.zeros(
    shape=img.shape,
    chunks=(2048,2048,3),
    dtype='uint8',
    store=store,
    overwrite=True,
)
synchronizer = zarr.ProcessSynchronizer('/mnt/local/nvme/zarr_tile_store.sync')

img_zarr = zarr.open_array(store, mode='a', synchronizer=synchronizer, chunks=(2048,2048,3))

In [None]:
max_level = np.ceil(np.log2(min(img_zarr.shape[:2])/patch_size))
#SAVE_DIR = '/mnt/local/nvme/tiling/159_101_tiles'
SAVE_DIR = 'tiling_rraja/159_101_tiles'
if os.path.exists(SAVE_DIR):
    print('Removing existing directory of tiles\n')
    shutil.rmtree(SAVE_DIR)
    print("Creating directory of tiles\n")
    os.makedirs(SAVE_DIR, exist_ok=True)
else:
    print("Creating directory of tiles")
    os.makedirs(SAVE_DIR, exist_ok=True)
    
#Defining save tarr 
@delayed
def save_tile_zarr(i, j,stride=patch_size, store=img, arr_path=arr_path, synchronizer=synchronizer):
    try:
        # z = int(np.ceil(np.log2(stride/2048)))
        # # level = max_level - z
        di = min(img.shape[0],i+stride)
        dj = min(img.shape[1],j+stride)
        tile = store[i:di, j:dj, :]
        # webp.save_image(Image.fromarray(tile), f'{save_dir}/{int(level)}-{i//stride}-{j//stride}.webp', quality=100)
        # tile = cv2.cvtColor(tile, cv2.COLOR_RGB2BGR)
        # cv2.imwrite(f'{save_dir}/{int(level)}-{i//stride}-{j//stride}.jpeg', tile)
        img_zarr = zarr.open_array(store=arr_path, mode='a', synchronizer=synchronizer, chunks=(2048,2048,3))
        img_zarr[i:di, j:dj, :] = tile
        img_zarr.store.update()
        return True
    except Exception as e:
        print(e)
        return False

#Creating dask queue
task_index = []
# while True:

print(patch_size)
for i in trange(0, img.shape[0], patch_size):
    for j in range(0, img.shape[1], patch_size):
        tile = save_tile_zarr(i, j, stride=patch_size)
        task_index.append(tile)

print("Total Tasks ", len(task_index))

In [None]:
#Running dask queue
start = perf_counter()
futures = client.gather(task_index)
results = client.compute(futures, sync=True)
end = perf_counter()

print(f"Time taken to save {len(task_index)} tiles is {end-start} seconds")

In [None]:
#Loading dask_array for better memory management and faster access
#import dask.array as da

img_dask = da.from_zarr(store)

img_dask

In [None]:

#from dask import delayed

max_level = np.ceil(np.log2(min(img_zarr.shape[:2])/2048))

SAVE_DIR = '159_9942'
if os.path.exists(SAVE_DIR):
    print('Removing existing directory of tiles\n')
    shutil.rmtree(SAVE_DIR)
    print("Creating directory of tiles\n")
    os.makedirs(SAVE_DIR, exist_ok=True)
else:
    print("Creating directory of tiles")
    os.makedirs(SAVE_DIR, exist_ok=True)




# @delayed(name='load the tile', nout=4)
def load_tile(i, j, stride=2048, store=img_dask, max_level=max_level):
    z = int(np.ceil(np.log2(stride/2048)))
    level = max_level - z + 1
    di = min(store.shape[0], i+stride)
    dj = min(store.shape[1], j+stride)
    z_ = 2**z
    if z>0:
        tile = store[i:di, j:dj,:][::z_,::z_]
        # print(tile.shape,z)
    else:
        tile = store[i:di, j:dj,:]
    return tile, level, i//stride, j//stride

@delayed(name='save_tile')
def save_tile(tile, level, x, y, save_dir=SAVE_DIR, max_level=max_level):
    img = Image.fromarray(tile)
    if max_level == 0:
        raise ValueError("Max level is 0. Tiling not possible")
    curr_quality = 30 + ((1-(level/max_level))*69)
    img.save(f'{save_dir}/{int(level)}-{x}-{y}.webp', 'WEBP', quality=curr_quality)
    return True

# Wrapping all this in a delayed function
# @delayed(name='save_tile_img')
def save_tile_img(i, j,stride=2048, store=img_dask, max_level=max_level, save_dir=SAVE_DIR):
    tile, level, x, y = load_tile(i, j, stride=stride, store=store, max_level=max_level)
    a = save_tile(tile, level, x, y)
    return a

def run():
    task_index = []
    patch_size = 2048
    count=0
    while True:
        if patch_size > max(img_zarr.shape[:2]):
            #print(patch_size)
            break
        # print(patch_size)

        for i in range(0, img_zarr.shape[0], patch_size):
            for j in range(0, img_zarr.shape[1], patch_size):
                # print(i, j)
                tile_params = save_tile_img(i, j, stride=patch_size)
                #count+=1
                task_index.append(tile_params)
        patch_size *= 2
    return task_index

start = perf_counter()
task_index = run()
# task_index = task_index[::-1]
end = perf_counter()
print (f"Time taken to save {len(task_index)} tiles is {end-start} seconds")

# print("Total Tasks ", len(task_index))
start = perf_counter()
futures = client.gather(task_index)
results = client.compute(futures, sync=True)
end = perf_counter()

print(f"Time taken to save {len(task_index)} tiles is {end-start} seconds")

In [None]:
#Saving the zip file

def add_to_zip(file_path, zip_file_name):
    """Adds a file to the given zip file."""
    with zipfile.ZipFile(zip_file_name, 'a', zipfile.ZIP_DEFLATED) as zipf:
        arcname = os.path.basename(file_path)  # Add the file with just its name
        zipf.write(file_path, arcname)

def zip_zoomify_layers_dask(source_dir, output_zip):
    """Zip the Zoomify layers using Dask to parallelize the process."""
    tasks = []
    for file in os.listdir(source_dir):
        file_path = os.path.join(source_dir, file)
        if os.path.isfile(file_path):
            # Delayed execution for each file
            task = delayed(add_to_zip)(file_path, output_zip)
            tasks.append(task)

    # Compute all tasks in parallel
    return tasks
    #compute(*tasks)

tasks = zip_zoomify_layers_dask('159_9942', '159_9942.zip')

start = perf_counter()
futures = client.gather(tasks)
results = client.compute(futures, sync=True)
end = perf_counter()

print(f"Time taken to save {len(task_index)} tiles is {end-start} seconds")