In [1]:
from functools import wraps
from pathlib import Path
from typing import Union, List, Dict
import os
import json
from multiprocessing import Pool
from pprint import pprint

import numpy as np
import openfoamparser_mai as Ofpp
import pyvista

In [2]:
LOW_DIM_PATH = 'low_dim'
HIGH_DIM_PATH = 'high_dim'
CONSTANT_PATH = 'constant'
POLYMESH_PATH = 'polyMesh'
SYSTEM_PATH = 'system'

In [3]:
def get_vel_data(
    vel: str,
    base_path: str = 'data',
    model: str = 'data_wage',
    dimpath: str = 'low_dim',
) -> Dict[str, List[np.ndarray]]:
    parsepath = Path(
        '/'.join([
            base_path,
            model,
            dimpath,
            vel,
        ])
    )
    time_paths = sorted(list(parsepath.iterdir()))[1:-2]
    veldata = {}
    for time_path in time_paths:
        time_path = Path(time_path)
        p = Ofpp.parse_internal_field(time_path / Path('p'))
        t = Ofpp.parse_internal_field(time_path / Path('T'))
        u = Ofpp.parse_internal_field(time_path / Path('U'))
        rho = Ofpp.parse_internal_field(time_path / Path('rho'))

        key = str(time_path).split('/')[-1]
        veldata[key] = [p, t, u, rho]
    return veldata

In [4]:
def get_all_dim_data(
    base_path: str = 'data',
    model: str = 'data_wage',
    dimpath: str = 'low_dim',
):
    parsepath = Path(
        '/'.join([
            base_path,
            model,
            dimpath,
        ])
    )
    dimdata = {}
    # return sorted(list(parsepath.iterdir()))[1:]
    for velpath in sorted(list(parsepath.iterdir()))[1:]:
        velpath = Path(velpath)
        key = str(velpath).split('/')[-1]
        dimdata[key] = get_vel_data(
            vel=key,
            base_path=base_path,
            model=model,
            dimpath=dimpath,
        )
    mesh: Ofpp.FoamMesh = Ofpp.FoamMesh(parsepath / Path('vel3.0'))

    dimdata['mesh'] = {
        'faces': mesh.faces,
        'boundary': mesh.boundary,
        'neighbour': mesh.neighbour,
        'owner': mesh.owner,
        'points': mesh.points,
    }
    return dimdata

In [5]:
dimdata = get_all_dim_data()

In [8]:
mesh: Ofpp.FoamMesh = dimdata['mesh']

faces = mesh['faces']
np.min(faces)

0

In [9]:
np.max(faces)

191

In [10]:
!pip list

Package            Version
------------------ ----------
appnope            0.1.3
asttokens          2.4.1
certifi            2023.11.17
charset-normalizer 3.3.2
comm               0.2.0
constriction       0.3.2
contourpy          1.2.0
cycler             0.12.1
debugpy            1.8.0
decorator          5.1.1
executing          2.0.1
fonttools          4.46.0
idna               3.6
ipykernel          6.27.1
ipython            8.18.1
jedi               0.19.1
jupyter_client     8.6.0
jupyter_core       5.5.0
kiwisolver         1.4.5
matplotlib         3.8.2
matplotlib-inline  0.1.6
nest-asyncio       1.5.8
numpy              1.25.2
openfoamparser-mai 0.14
packaging          23.2
parso              0.8.3
pexpect            4.9.0
Pillow             10.1.0
pip                23.1.2
platformdirs       4.1.0
pooch              1.8.0
prompt-toolkit     3.0.41
psutil             5.9.6
ptyprocess         0.7.0
pure-eval          0.2.2
Pygments           2.17.2
pyparsing          3.1.1
python-

In [13]:
from functools import wraps
from pathlib import Path
from typing import Union, List, Dict
from multiprocessing import Pool
from pprint import pprint
import sys

import numpy as np
import pywt

from math import pi
e = 2.718281828459045


class WaveletCompressor:
    def __init__(self, threshold: float = 0.1):
        self.threshold = threshold

    def get_size(self, data: np.ndarray):
        return sys.getsizeof(data)

    def wavelet_transform(self, data, level=2):
        """ Perform the nD CDF 9/7 wavelet transform """
        coeffs = pywt.wavedecn(data, 'bior2.2', level=level)
        return coeffs

    def inverse_wavelet_transform(self, coeffs):
        """ Perform the inverse nD CDF 9/7 wavelet transform """
        reconstructed_data = pywt.waverecn(coeffs, 'bior2.2')

        return reconstructed_data

    def mean_squared_error(self, image1, image2):
        return np.mean((image1 - image2) ** 2)


image_data = np.array([e, pi, e, pi, e, pi, e, pi], dtype=np.float64)
# image_data = np.random.rand(5, 5, 5) * 1000000
# print(np.max(image_data))
# print(np.min(image_data))
compressor = WaveletCompressor()

wavelet_coeffs = compressor.wavelet_transform(image_data, level=2)

reconstructed_image = compressor.inverse_wavelet_transform(wavelet_coeffs)

print(compressor.mean_squared_error(image_data, reconstructed_image))
print(compressor.get_size(image_data))
print(compressor.get_size(wavelet_coeffs))


2.465190328815662e-31
176
88
