In [1]:
import io
import os
import numpy as np
import lz4.frame
import json
import shutil


from time import perf_counter


from utils import get_defect_list
#import massPy as mp
import massPy_dev.massPy as mp

In [2]:


class CompressArchive:
    def __init__(self, archive_dir, output_dir = None, dtype_out = 'float64', delete_archive_if_successful = False):

        self.archive_dir = archive_dir
        self.output_dir = archive_dir + '_compressed' if output_dir is None else output_dir

        self.archive = mp.archive.loadarchive(archive_dir)
        self.frame_list = self.__find_missing_frames()
        self.failed_conversion_list = []

        self.num_frames = len(self.frame_list)
        self.LX = self.archive.LX
        self.LY = self.archive.LY


        self.dtype_out = dtype_out
        self.delete_archive_if_successful = delete_archive_if_successful

        self.time_to_open_json = np.nan
        self.time_to_open_npz = np.nan
        self.compression_ratio = np.nan
        self.json_frame_size = np.nan
        self.npz_frame_size = np.nan

    def __calc_density(self, ff):
        return np.sum(ff, axis=1)

    def __calc_velocity(self, ff, density = None):
        d = self.__calc_density(ff) if density is None else density
        return np.asarray([ (ff.T[1] - ff.T[2] + ff.T[5] - ff.T[6] - ff.T[7] + ff.T[8]) / d,
                            (ff.T[3] - ff.T[4] + ff.T[5] - ff.T[6] + ff.T[7] - ff.T[8]) / d
                        ]).reshape(2, self.LX, self.LY)

    def __estimate_size_reduction(self,):

        input_file = os.path.join(self.archive_dir, f'frame{self.frame_list[0]}.json')
        output_file = os.path.join(self.output_dir, f'frame{self.frame_list[0]}.npz')

        if not os.path.exists(output_file):
            print(f'File {output_file} does not exist')
            return

        # Get the sizes in bytes of the input and output archives
        self.json_frame_size = os.path.getsize(input_file)
        self.npz_frame_size = os.path.getsize(output_file)

        # Calculate the compression ratio
        self.compression_ratio = self.json_frame_size / self.npz_frame_size
        return 
  
    def __get_time_to_open_npz(self, frame_number = None):

        if np.isnan(self.time_to_open_npz):
            frame_number = self.frame_list[0] if frame_number is None else frame_number
            npz_file_path = os.path.join(self.output_dir, f'frame{frame_number}.npz')
            if not os.path.exists(npz_file_path):
                print(f'File {npz_file_path} does not exist')
                return
            try:
                t_start = perf_counter()
                npz = np.load(npz_file_path, allow_pickle=True)
                self.time_to_open_npz = perf_counter() - t_start
            except:
                print(f'Error opening file {npz_file_path}')
        else:
            pass
        return

    def __find_missing_frames(self):
        dir_list = os.listdir(self.archive_dir)
        frame_list = []

        for item in dir_list:
            if item.startswith("frame"):
                frame_num = int(item.split('.')[0].split('frame')[-1])
                frame_list.append(frame_num)

        if len(frame_list) == self.archive.num_frames:
            return np.arange(self.archive.nstart, self.archive.nsteps + 1, self.archive.ninfo)
        else:
            frame_list.sort()
            return frame_list
        
    def __unpack_json_dict(self, json_dict, exclude_keys=[], calc_velocities = False):

        keys = list(json_dict['data'].keys())
        arr_dict = {key: np.array(json_dict['data'][key]['value'],dtype=self.dtype_out) for key in keys if key not in exclude_keys}

        if calc_velocities:
            ff = np.array(json_dict['data']['ff']['value'],dtype='float64')
            arr_dict['density'] = self.__calc_density(ff).astype(self.dtype_out)         
            v = self.__calc_velocity(ff, arr_dict['density'])
            arr_dict['vx'] = v[0].flatten().astype(self.dtype_out)
            arr_dict['vy'] = v[1].flatten().astype(self.dtype_out)
        return arr_dict

    def __convert_json_to_npz(self, frame_number, overwrite_existing_npz_files = False, compress = True, exclude_keys=[], calc_velocities = False):

        frame_input_path = os.path.join(self.archive_dir, f'frame{frame_number}.json')
        frame_output_path = os.path.join(self.output_dir, f'frame{frame_number}.npz')

        if os.path.exists(frame_output_path) and not overwrite_existing_npz_files:
            print(f'File {frame_output_path} already exists. Skipping...')
            return

        if np.isnan(self.time_to_open_json):
            t_start = perf_counter()
            with open(frame_input_path, 'r') as f:
                data = json.load(f)
            self.time_to_open_json = perf_counter() - t_start
        else:
            with open(frame_input_path, 'r') as f:
                data = json.load(f)

        arr_dict = self.__unpack_json_dict(data, exclude_keys=exclude_keys, calc_velocities = calc_velocities)
        if compress:
            np.savez_compressed(frame_output_path, **arr_dict)
        else:
            np.savez(frame_output_path, **arr_dict)
        return

    def convert_archive_to_npz(self, compress = True, exclude_keys=[], calc_velocities = False, overwrite_existing_npz_files = False, verbose = 1,):
        # Create the output folder if it does not exist
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

        # copy parameters.json to output folder
        parameters_path = os.path.join(self.archive_dir, 'parameters.json')
        shutil.copy(parameters_path, self.output_dir)

        if verbose > 0:
            start = perf_counter()

        for i, frame in enumerate(self.frame_list):
            try:
                if verbose == 2:
                    start_frame = perf_counter()
                self.__convert_json_to_npz(frame, overwrite_existing_npz_files = overwrite_existing_npz_files,\
                                            compress = compress, exclude_keys = exclude_keys, calc_velocities = calc_velocities)
                if verbose == 2:
                    print(f'Frame {frame} processed in {perf_counter() - start_frame:.2f} seconds')
            except:
                print(f'Error processing frame {frame}. Skipping...')
                self.failed_conversion_list.append(frame)

        if verbose > 0:
            print(f'Archive processed in {perf_counter() - start:.2f} seconds with {len(self.failed_conversion_list)} failed conversions')
            if len(self.failed_conversion_list) > 0:
                print(f'Frames for which conversion to npz failed: {self.failed_conversion_list}')
            self.print_conversion_info()

        if self.delete_archive_if_successful and len(self.failed_conversion_list) == 0:
            shutil.rmtree(self.archive_dir)
            print(f'Archive {self.archive_dir} deleted')
        return

    def print_conversion_info(self):

        self.__estimate_size_reduction()
        self.__get_time_to_open_npz()
    
        print(f'\nEstimated size reduction for entire archive ({self.num_frames} frames):')
        print(f'Uncompressed archive size: {self.json_frame_size / 1024 ** 2 * self.num_frames:.2f} MB')
        print(f'Compressed archive size: {self.npz_frame_size / 1024 ** 2 * self.num_frames:.2f} MB')
        print(f'Compression ratio: {self.compression_ratio:.2f}x\n')

        print(f'Time to open json frame: {self.time_to_open_json:.2f} seconds')
        print(f'Time to open npz frame: {self.time_to_open_npz:.2f} seconds')
        print(f'Speedup: {self.time_to_open_json / self.time_to_open_npz:.2f}x')
        return

    def get_conversion_info(self):

        self.__estimate_size_reduction()
        self.__get_time_to_open_npz()

        return {'archive_dir': self.archive_dir,
                'output_dir': self.output_dir,
                'num_frames': self.num_frames,
                'frame_list': self.frame_list,
                'json_frame_size': self.json_frame_size,
                'npz_frame_size': self.npz_frame_size,
                'failed_conversion_list': self.failed_conversion_list,
                'time_to_open_json_frame': self.time_to_open_json,
                'time_to_open_npz_frame': self.time_to_open_npz,
                'compression_ratio': self.compression_ratio,
        }

In [3]:
def compress_file(input_file, output_file):
    # Open the input file in binary mode for reading
    with open(input_file, 'rb') as f_in:
        # Read the data from the input file
        data = f_in.read()

        # Compress the data using lz4 compression
        compressed_data = lz4.frame.compress(data)

        # Write the compressed data to the output file
        with open(output_file, 'wb') as f_out:
            f_out.write(compressed_data)
    return

def print_size(input_file, output_file):
    # Get the size of the input file
    input_size = os.path.getsize(input_file)

    # Get the size of the output file
    output_size = os.path.getsize(output_file)

    # Print the size of the input and output files
    print(f'Input file size: {input_size / 1024 ** 2:.2f} MB')
    print(f'Output file size: {output_size / 1024 ** 2:.2f} MB')

    # Calculate the compression ratio
    ratio = input_size / output_size
    print(f'Compression ratio: {ratio:.2f}x')
    return

def estimate_size_reduction(input_file, output_file, Nframes = 1, verbose=True):
    # Get the size of the input file
    input_size = os.path.getsize(input_file)

    # Get the size of the output file
    output_size = os.path.getsize(output_file)

    # Calculate the compression ratio
    ratio = input_size / output_size

    if verbose:
        print(f'Uncompressed archive size: {input_size / 1024 ** 2 * Nframes:.2f} MB')
        print(f'Compressed archive size: {output_size / 1024 ** 2 * Nframes:.2f} MB')
        print(f'Compression ratio: {ratio:.2f}x\n')

    return input_size * Nframes, output_size * Nframes, ratio

def decompress_and_convert(input_file, out_format = 'json'):
    # Open the input file in binary mode for reading
    with open(input_file, 'rb') as f_in:
        # Read the compressed data from the input file
        compressed_data = f_in.read()

        # Decompress the data using lz4 decompression
        decompressed_data = lz4.frame.decompress(compressed_data)    

        if out_format == 'json':
            # Decode the bytes to string
            decoded_data = decompressed_data.decode('utf-8')
            json_data = json.loads(decoded_data)
            return json_data       
        elif out_format == 'npz':
            # Decode the bytes to string and parse JSON
            npz_data = npz_data = np.load(io.BytesIO(decompressed_data), allow_pickle=True)
            return npz_data      
        else:
            print('Invalid output format. Please use "json" or "npz"')
            return
        
def unpack_arrays(json_dict, dtype_out = 'float64', exclude_keys=[]):
    keys = list(json_dict['data'].keys())
    arr_dict = {}
    arr_dict = {key: np.array(json_dict['data'][key]['value'],dtype=dtype_out) for key in keys if key not in exclude_keys}
   # for key in keys:
    #    arr_dict[key] = np.array(json_dict['data'][key]['value'],dtype=dtype_out)
    return arr_dict

def unpack_nematic_json_dict(json_dict, dtype_out = 'float64', exclude_keys=[], calc_velocities = False):
    keys = list(json_dict['data'].keys())
    arr_dict = {}
    arr_dict = {key: np.array(json_dict['data'][key]['value'],dtype=dtype_out) for key in keys if key not in exclude_keys}

    if calc_velocities:
        ff = np.array(json_dict['data']['ff']['value'],dtype=dtype_out)
        arr_dict['vx'], arr_dict['vy'] = mp.base_modules.flow.velocity(ff,)
    return arr_dict

def find_missing_frames(archive_path):

    ar = mp.archive.loadarchive(dir)

    dir_list = os.listdir(archive_path)
    frame_list = []

    for item in dir_list:
        if item.startswith("frame"):
            frame_num = int(item.split('.')[0].split('frame')[-1])
            frame_list.append(frame_num)

    if len(frame_list) == ar.num_frames:
        return np.arange(ar.nstart, ar.nsteps + 1, ar.ninfo)
    else:
        frame_list.sort()
        return frame_list
    
def convert_json_to_npz(json_path, out_path, compress = True, dtype_out = 'float64', exclude_keys=[]):

    with open(json_path, 'r') as f:
        data = json.load(f)

    arr_dict = unpack_arrays(data,  dtype_out = dtype_out, exclude_keys=exclude_keys)
    if compress:
        np.savez_compressed(out_path, **arr_dict)
    else:
        np.savez(out_path, **arr_dict)
    return

def create_npz_folder(archive_path, output_folder = None, check_for_missing_frames = False, compress = True, \
                      dtype_out= 'float32', exclude_keys=[], verbose = 1):
    """
    verbose = 0: no output
    verbose = 1: print time to process entire archive
    verbose = 2: print time to process each frame
    """
    # Create the output folder if it does not exist

    output_folder = archive_path + '_npz' if output_folder is None else output_folder

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # copy parameters.json to output folder
    parameters_path = os.path.join(archive_path, 'parameters.json')
    shutil.copy(parameters_path, output_folder)

    # Load the archive and get the list of frames
    ar = mp.archive.loadarchive(archive_path)
    frame_list = find_missing_frames(archive_path) if check_for_missing_frames else np.arange(ar.nstart, ar.nsteps + 1, ar.ninfo)

    # initialize failed conversions list
    failed_conversions = []

    if verbose > 0:
        start = perf_counter()

    for i, frame in enumerate(frame_list):
        frame_input_path = os.path.join(archive_path, f'frame{frame}.json')
        frame_output_path = os.path.join(output_folder, f'frame{frame}.npz')
        try:
            if verbose == 2:
                start_frame = perf_counter()
            convert_json_to_npz(frame_input_path, frame_output_path, compress = compress, dtype_out = dtype_out, exclude_keys = exclude_keys)
            if verbose == 2:
                print(f'Frame {frame} processed in {perf_counter() - start_frame:.2f} seconds')
        except:
            print(f'Error processing frame {frame}. Skipping...')
            failed_conversions.append(frame)

    if verbose > 0:
        print(f'Archive processed in {perf_counter() - start:.2f} seconds with {len(failed_conversions)} failed conversions')
        if len(failed_conversions) > 0:
            print(f'Frames for which conversion to npz failed: {failed_conversions}')
        print('\nEstimated (from first frame) size reduction of archive: ')

        frame_input_path = os.path.join(archive_path, f'frame{frame_list[0]}.json')
        frame_output_path = os.path.join(output_folder, f'frame{frame_list[0]}.npz')
        input_size, output_size, ratio = estimate_size_reduction(frame_input_path, frame_output_path, Nframes = len(frame_list))
    return

In [4]:
main_path = 'C:\\Users\\Simon Andersen\\Dokumenter\\Uni\\Speciale\\Hyperuniformity\\nematic_data'
out_dir = os.path.join(main_path, 'compressed_data')
if not os.path.exists(out_dir):
    os.makedirs(out_dir)

data_dirs = os.listdir(main_path)
data_dirs = [os.path.join(main_path, d) for d in data_dirs]
#ar = mp.archive.loadarchive(data_dirs[0])
N = 7
dir = data_dirs[N]

outfiles_paths = []
for file in os.listdir(dir):
    if not file.endswith('.json'):
        outfiles_paths.append(os.path.join(dir, file))

for file in outfiles_paths:
    os.remove(file)

file_path = os.path.join(dir, os.listdir(dir)[1])
dir, file_path

('C:\\Users\\Simon Andersen\\Dokumenter\\Uni\\Speciale\\Hyperuniformity\\nematic_data\\ns512_205',
 'C:\\Users\\Simon Andersen\\Dokumenter\\Uni\\Speciale\\Hyperuniformity\\nematic_data\\ns512_205\\frame2100000.json')

In [11]:
ca = CompressArchive(dir, dtype_out = 'float32', delete_archive_if_successful = False)
conversion_kwargs = {'compress': True, 'exclude_keys': ['ff'], 'calc_velocities': True, 'overwrite_existing_npz_files': True, 'verbose': 1}
ca.convert_archive_to_npz(**conversion_kwargs)

Archive processed in 16.53 seconds with 0 failed conversions

Estimated size reduction for entire archive (8 frames):
Uncompressed archive size: 313.27 MB
Compressed archive size: 64.49 MB
Compression ratio: 4.86x

Time to open json frame: 1.15 seconds
Time to open npz frame: 0.01 seconds
Speedup: 135.00x


In [9]:
dir

'C:\\Users\\Simon Andersen\\Dokumenter\\Uni\\Speciale\\Hyperuniformity\\nematic_data\\ns512_205'

In [12]:
ar = mp.archive.loadarchive(ca.output_dir)

In [14]:
ca.num_frames

8

In [17]:
ar2 = mp.archive.loadarchive(ca.archive_dir)
defect_list2 = get_defect_list(ar2, ar2.LX, ar2.LY, Nframes=ca.num_frames, archive_path=ca.archive_dir)

In [15]:
defect_list = get_defect_list(ar, ar.LX, ar.LY, Nframes=ca.num_frames, archive_path=ca.output_dir)

In [22]:
defect_list2[-1]

[{'charge': 0.5, 'pos': [31.5, 188.5]},
 {'charge': 0.5, 'pos': [31.5, 308.5]},
 {'charge': 0.5, 'pos': [43.5, 265.5]},
 {'charge': -0.5, 'pos': [79.5, 435.5]},
 {'charge': -0.5, 'pos': [181.5, 231.5]},
 {'charge': 0.5, 'pos': [187.5, 402.5]},
 {'charge': 0.5, 'pos': [201.5, 75.5]},
 {'charge': 0.5, 'pos': [299.5, 404.5]},
 {'charge': -0.5, 'pos': [337.5, 410.5]},
 {'charge': -0.5, 'pos': [408.5, 16.5]},
 {'charge': -0.5, 'pos': [420.5, 235.5]},
 {'charge': -0.5, 'pos': [464.5, 396.5]}]

In [21]:
defect_list[-1]

[{'charge': 0.5, 'pos': [31.5, 188.5]},
 {'charge': 0.5, 'pos': [31.5, 308.5]},
 {'charge': 0.5, 'pos': [43.5, 265.5]},
 {'charge': -0.5, 'pos': [79.5, 435.5]},
 {'charge': -0.5, 'pos': [181.5, 231.5]},
 {'charge': 0.5, 'pos': [187.5, 402.5]},
 {'charge': 0.5, 'pos': [201.5, 75.5]},
 {'charge': 0.5, 'pos': [299.5, 404.5]},
 {'charge': -0.5, 'pos': [337.5, 410.5]},
 {'charge': -0.5, 'pos': [408.5, 16.5]},
 {'charge': -0.5, 'pos': [420.5, 235.5]},
 {'charge': -0.5, 'pos': [464.5, 396.5]}]

In [166]:
arr_dict = {key: npz_frame[key] for key in npz_frame.files}

In [169]:
frame.__dict__.update(arr_dict)

In [165]:
frame_list = ca.frame_list
npz_frame = np.load(os.path.join(ca.output_dir, f'frame{frame_list[0]}.npz'), allow_pickle=True)
npz_frame.files

for key in npz_frame.files:
    print(key, npz_frame[key].shape)

QQxx (1048576,)
QQyx (1048576,)
FFx (1048576,)
FFy (1048576,)
dE_kin (1048576,)
dPE_LC (1048576,)
density (1048576,)
vx (1048576,)
vy (1048576,)


In [7]:
# Time to open array and get defects using current method
t1 = perf_counter()
ar = mp.archive.loadarchive(dir)
frame = ar._read_frame(0)

Qxx_dat = frame.QQxx.reshape(ar.LX, ar.LY)
Qyx_dat = frame.QQyx.reshape(ar.LX, ar.LY)
#defect_list = get_defect_list(ar, ar.LX, ar.LY, Nframes=2, archive_path=dir)
t2 = perf_counter()
print('Time to open array and get defects using current method: ', (t2-t1)/(len(os.listdir(dir)) -1))

Time to open array and get defects using current method:  6.95262305


In [17]:
# Time to open array using load json
t1 = perf_counter()
with open(file_path, 'r') as f:
    data = json.load(f)
arr_dict = unpack_arrays(data)
arr_dict32 = unpack_arrays(data, dtype_out='float32')
t2 = perf_counter()
print('Time to open array using load json: ', t2-t1)

Time to open array using load json:  6.14445180000007


In [20]:
# Time to convert and save to numpy array
t1 = perf_counter()
npz_file_res = os.path.join(dir, os.path.join(out_dir,'data.npz'))
with open(file_path, 'r') as f:
    data = json.load(f)
arr_dict = unpack_arrays(data, dtype_out = 'float32')
del arr_dict['ff']
np.savez_compressed(npz_file_res, **arr_dict)
t2 = perf_counter()

print('Time to convert and save to numpy array, excluding ff: ', t2-t1)
print_size(file_path, npz_file_res)

# Time to open npz file and unpack arrays
t1 = perf_counter()
npz = np.load(npz_file_res, allow_pickle=True)
LX = ar.LX

Qxx_dat = npz['QQxx'].reshape(LX, LX)
Qyx_dat = npz['QQyx'].reshape(LX, LX)
# Get defects
defects = mp.nematic.nematicPy.get_defects(Qxx_dat, Qyx_dat, LX, LX)


t2 = perf_counter()
print('Time to open npz file and get defects, excluding ff: ', t2-t1)

Time to convert and save to numpy array, excluding ff:  7.462377200000105
Input file size: 155.5849323272705 MB
Output file size: 22.298182487487793 MB
Compression ratio: 6.98x
Time to open npz file and get defects, excluding ff:  1.4298758999998427


In [19]:
# Time to convert and save to numpy array
t1 = perf_counter()
npz_file = os.path.join(dir, os.path.join(out_dir,'data.npz'))
with open(file_path, 'r') as f:
    data = json.load(f)
arr_dict = unpack_arrays(data, dtype_out = 'float32')
np.savez_compressed(npz_file, **arr_dict)
t2 = perf_counter()

print('Time to convert and save to numpy array: ', t2-t1)
print_size(file_path, npz_file)

# Time to open npz file and unpack arrays
t1 = perf_counter()
npz = np.load(npz_file, allow_pickle=True)
LX = ar.LX

Qxx_dat = npz['QQxx'].reshape(LX, LX)
Qyx_dat = npz['QQyx'].reshape(LX, LX)
# Get defects
defects = mp.nematic.nematicPy.get_defects(Qxx_dat, Qyx_dat, LX, LX)


t2 = perf_counter()
print('Time to open npz file and get defects: ', t2-t1)

Time to convert and save to numpy array:  8.52731899999992
Input file size: 155.5849323272705 MB
Output file size: 50.06804656982422 MB
Compression ratio: 3.11x
Time to open npz file and get defects:  1.2702217999999448


In [15]:
# Time to compress json file using lz4
t1 = perf_counter()
output_file = file_path + '.lz4'
compress_file(file_path, output_file)
t2 = perf_counter()
print('Time to compress json file using lz4: ', t2-t1)
print_size(file_path, output_file)

# Time to read lz4 file, decompress and load arrays
t1 = perf_counter()
json_dict = decompress_and_convert(output_file, out_format='json')
arr_dict = unpack_arrays(json_dict)
t2 = perf_counter()
print('Time to compress json file using lz4: ', t2-t1)

Time to compress json file using lz4:  0.7508942000000047
Input file size: 155.5849323272705 MB
Output file size: 89.09958457946777 MB
Compression ratio: 1.75x
Time to compress json file using lz4:  4.974672699999985


In [16]:
# Time to compress npz file using lz4
t1 = perf_counter()
output_file = npz_file + '.lz4'
compress_file(npz_file, output_file)
t2 = perf_counter()
print('Time to compress npz file using lz4: ', t2-t1)
print_size(npz_file, output_file)

# Time to read lz4 file, decompress and load arrays
t1 = perf_counter()
arr_dict = decompress_and_convert(output_file, out_format = 'npz')
t2 = perf_counter()
print('Time to read npz file using lz4: ', t2-t1)

Time to compress npz file using lz4:  0.14694500000001653
Input file size: 50.06804656982422 MB
Output file size: 50.0709924697876 MB
Compression ratio: 1.00x
Time to read npz file using lz4:  0.05856810000000223


In [137]:
Q = arr_dict['QQxx']
Q[0]

0.922946

In [None]:
## IDEAS
# save each array as binary with lz4 and read directly from binary file