diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml deleted file mode 100644 index f637acc..0000000 --- a/.github/FUNDING.yml +++ /dev/null @@ -1,2 +0,0 @@ -liberapay: sabilab -custom: ["https://www.paypal.me/jeanollion", www.sabilab.fr] diff --git a/.github/workflows/publish-to-pypi.yml b/.github/workflows/publish-to-pypi.yml new file mode 100644 index 0000000..763f5cf --- /dev/null +++ b/.github/workflows/publish-to-pypi.yml @@ -0,0 +1,116 @@ +name: Publish Python distribution 📦 to TestPyPI + +on: push + +jobs: + build: + name: Build distribution 📦 + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.x" + - name: Install pypa/build + run: >- + python3 -m + pip install + build + --user + - name: Build a binary wheel and a source tarball + run: python3 -m build + - name: Store the distribution packages + uses: actions/upload-artifact@v3 + with: + name: python-package-distributions + path: dist/ + + publish-to-pypi: + name: >- + Publish Python 🐍 distribution 📦 to PyPI + if: startsWith(github.ref, 'refs/tags/') # only publish to PyPI on tag pushes + needs: + - build + runs-on: ubuntu-latest + environment: + name: pypi + url: https://pypi.org/p/dataset-iterator + permissions: + id-token: write # IMPORTANT: mandatory for trusted publishing + steps: + - name: Download all the dists + uses: actions/download-artifact@v3 + with: + name: python-package-distributions + path: dist/ + - name: Publish distribution 📦 to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + + github-release: + name: >- + Sign the Python distribution 📦 with Sigstore + and upload them to GitHub Release + needs: + - publish-to-pypi + runs-on: ubuntu-latest + + permissions: + contents: write # IMPORTANT: mandatory for making GitHub Releases + id-token: write # IMPORTANT: mandatory for sigstore + + steps: + - name: Download all the dists + uses: actions/download-artifact@v3 + with: + name: python-package-distributions + path: dist/ + - name: Sign the dists with Sigstore + uses: sigstore/gh-action-sigstore-python@v1.2.3 + with: + inputs: >- + ./dist/*.tar.gz + ./dist/*.whl + - name: Create GitHub Release + env: + GITHUB_TOKEN: ${{ github.token }} + run: >- + gh release create + '${{ github.ref_name }}' + --repo '${{ github.repository }}' + --notes "" + - name: Upload artifact signatures to GitHub Release + env: + GITHUB_TOKEN: ${{ github.token }} + # Upload to GitHub Release using the `gh` CLI. + # `dist/` contains the built packages, and the + # sigstore-produced signatures and certificates. + run: >- + gh release upload + '${{ github.ref_name }}' dist/** + --repo '${{ github.repository }}' + + publish-to-testpypi: + name: Publish Python distribution 📦 to TestPyPI + needs: + - build + runs-on: ubuntu-latest + + environment: + name: testpypi + url: https://test.pypi.org/p/dataset-iterator + + permissions: + id-token: write # IMPORTANT: mandatory for trusted publishing + + steps: + - name: Download all the dists + uses: actions/download-artifact@v3 + with: + name: python-package-distributions + path: dist/ + - name: Publish distribution 📦 to TestPyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + repository-url: https://test.pypi.org/legacy/ \ No newline at end of file diff --git a/dataset_iterator/__init__.py b/dataset_iterator/__init__.py index 3fc3ff2..8544aa9 100644 --- a/dataset_iterator/__init__.py +++ b/dataset_iterator/__init__.py @@ -7,5 +7,5 @@ from .image_data_generator import get_image_data_generator from .datasetIO import DatasetIO, H5pyIO, MultipleFileIO, MultipleDatasetIO, ConcatenateDatasetIO, MemoryIO -from .active_learning import get_index_probability, compute_loss +from .hard_sample_mining import HardSampleMiningCallback from .concat_iterator import ConcatIterator diff --git a/dataset_iterator/active_learning.py b/dataset_iterator/active_learning.py deleted file mode 100644 index 6832db8..0000000 --- a/dataset_iterator/active_learning.py +++ /dev/null @@ -1,176 +0,0 @@ -import math -from math import ceil, sqrt, log -import os -import numpy as np -import tensorflow as tf -from .index_array_iterator import IndexArrayIterator, INCOMPLETE_LAST_BATCH_MODE -from .pre_processing import adjust_histogram_range - -class SampleProbabilityCallback(tf.keras.callbacks.Callback): - def __init__(self, iterator, predict_fun, loss_fun, period:int=10, start_epoch:int=0, skip_first:bool=False, enrich_factor:float=10., quantile_max:float=0.99, quantile_min:float=None, disable_channel_postprocessing:bool=False, iterator_modifier:callable=None, workers=None, verbose:bool=True): - super().__init__() - self.period = period - self.start_epoch = start_epoch - self.skip_first=skip_first - self.iterator = iterator - self.predict_fun = predict_fun - self.loss_fun = loss_fun - self.enrich_factor = enrich_factor - self.quantile_max = quantile_max - self.quantile_min = quantile_min - self.disable_channel_postprocessing = disable_channel_postprocessing - self.iterator_modifier = iterator_modifier - self.workers = workers - self.verbose = verbose - self.loss_idx = -1 - self.proba_per_loss = None - self.n_losses = 0 - def on_epoch_begin(self, epoch, logs=None): - if (epoch + self.start_epoch) % self.period == 0: - if epoch == 0 and self.skip_first: - return - if self.iterator_modifier is not None: - self.iterator_modifier(self.iterator, True) - loss = compute_loss(self.iterator, self.predict_fun, self.loss_fun, disable_augmentation=True, disable_channel_postprocessing=self.disable_channel_postprocessing, workers=self.workers, verbose=self.verbose) - if self.iterator_modifier is not None: - self.iterator_modifier(self.iterator, False) - self.proba_per_loss = get_index_probability(loss, enrich_factor=self.enrich_factor, quantile_max=self.quantile_max, quantile_min=self.quantile_min, verbose=self.verbose) - self.n_losses = self.proba_per_loss.shape[0] if len(self.proba_per_loss.shape) == 2 else 1 - if self.proba_per_loss is not None: - if len(self.proba_per_loss.shape) == 2: - self.loss_idx = (self.loss_idx + 1) % self.n_losses - proba = self.proba_per_loss[self.loss_idx] - else: - proba = self.proba_per_loss - self.iterator.index_probability = proba # in case of multiprocessing iwth OrderedEnqueeur this will be taken into account only a next epoch has iterator has already been sent to processes at this stage - - -def get_index_probability_1d(loss, enrich_factor:float=10., quantile_max:float=0.99, quantile_min:float=None, max_power:int=10, verbose:int=1): - assert 1 >= quantile_max > 0.5, "invalid max quantile" - if quantile_min is None: - quantile_min = 1 - quantile_max - assert 0.5 > quantile_min >= 0, f"invalid min quantile: {quantile_min}" - if 1. / enrich_factor < (1 - quantile_max): # incompatible enrich factor and quantile - quantile_max = 1. - 1 / enrich_factor - #print(f"modified quantile_max to : {quantile_max}") - loss_quantiles = np.quantile(loss, [quantile_min, quantile_max]) - - # TODO compute drop factor and enrich factor to get a constant probability factor in the end. - - Nh = loss[loss>=loss_quantiles[1]].shape[0] - Ne = loss[loss <= loss_quantiles[0]].shape[0] - loss_sub = loss[(lossloss_quantiles[0])] - Nm = loss_sub.shape[0] - S = np.sum( ((loss_sub - loss_quantiles[0]) / (loss_quantiles[1] - loss_quantiles[0])) ) - p_max = enrich_factor / loss.shape[0] - p_min = (1 - p_max * (Nh + S)) / (Nm + Ne - S) if (Nm + Ne - S)!=0 else -1 - if p_min<0: - p_min = 0. - target = 1./p_max - Nh - if target <= 0: # cannot reach enrich factor: too many hard examples - power = max_power - else: - fun = lambda power_: np.sum(((loss_sub - loss_quantiles[0]) / (loss_quantiles[1] - loss_quantiles[0])) ** power_) - power = 1 - power_inc = 0.25 - Sn = S - while power < max_power and Sn > target: - power += power_inc - Sn = fun(power) - if power > 1 and Sn < target: - power -= power_inc - else: - power = 1 - #print(f"p_min {p_min} ({(1 - p_max * (Nh + S)) / (Nm + Ne - S)}) Nh: {Nh} nE: {Ne} Nm: {Nm} S: {S} pmax: {p_max} power: {power}") - # drop factor at min quantile, enrich factor at max quantile, interpolation in between - def get_proba(value): - if value <= loss_quantiles[0]: - return p_min - elif value >= loss_quantiles[1]: - return p_max - else: - return p_min + (p_max - p_min) * ((value - loss_quantiles[0]) / (loss_quantiles[1] - loss_quantiles[0]))**power - - vget_proba = np.vectorize(get_proba) - proba = vget_proba(loss) - p_sum = float(np.sum(proba)) - proba /= p_sum - if verbose > 1: - print(f"loss proba: [{np.min(proba) * loss.shape[0]}, {np.max(proba) * loss.shape[0]}] pmin: {p_min} pmax: {p_max} power: {power} sum: {p_sum} quantile_max: {quantile_max}", flush=True) - return proba - -def get_index_probability(loss, enrich_factor:float=10., quantile_max:float=0.99, quantile_min:float=None, verbose:int=1): - if len(loss.shape) == 1: - return get_index_probability_1d(loss, enrich_factor=enrich_factor, quantile_max=quantile_max, quantile_min=quantile_min, verbose=verbose) - probas_per_loss = [get_index_probability_1d(loss[:, i], enrich_factor=enrich_factor, quantile_max=quantile_max, quantile_min=quantile_min, verbose=verbose) for i in range(loss.shape[1])] - probas_per_loss = np.stack(probas_per_loss, axis=0) - #proba = np.max(probas_per_loss, axis=0) - #proba /= np.sum(proba) - return probas_per_loss - -def compute_loss(iterator, predict_function, loss_function, disable_augmentation:bool=True, disable_channel_postprocessing:bool=False, workers:int=None, verbose:int=1): - data_aug_param = iterator.disable_random_transforms(disable_augmentation, disable_channel_postprocessing) - - simple_iterator = SimpleIterator(iterator) - batch_size = simple_iterator.batch_size - n_batches = len(simple_iterator) - @tf.function - def compute_loss(x, y_true): - y_pred = predict_function(x) - n_samples = tf.shape(x)[0] - def loss_fun(j): # compute loss per batch item in case loss is reduced along batch size - y_t = [y_true[k][j:j+1] for k in range(len(y_true))] - y_p = [y_pred[k][j:j+1] for k in range(len(y_pred))] - return tf.stack(loss_function(y_t, y_p), 0) - return tf.map_fn(loss_fun, elems=tf.range(n_samples), fn_output_signature=tf.float32, parallel_iterations=batch_size) - - losses = [] - if verbose>=1: - print(f"Active learning: computing loss...") - progbar = tf.keras.utils.Progbar(n_batches) - if workers is None: - workers = os.cpu_count() - enq = tf.keras.utils.OrderedEnqueuer(simple_iterator, use_multiprocessing=True, shuffle=False) - enq.start(workers=workers, max_queue_size=max(3, min(n_batches, int(workers * 1.5)))) - gen = enq.get() - n_tiles = None - for i in range(n_batches): - x, y_true = next(gen) - batch_loss = compute_loss(x, y_true) - if x.shape[0] > batch_size or n_tiles is not None: # tiling: keep hardest tile per sample - if n_tiles is None: # record n_tile which is constant but last batch may have fewer elements - n_tiles = x.shape[0] // batch_size - batch_loss = tf.reshape(batch_loss, shape=(n_tiles, -1, batch_loss.shape[1])) - batch_loss = tf.reduce_max(batch_loss, axis=0, keepdims=False) # record hardest tile per sample - losses.append(batch_loss) - if verbose >= 1: - progbar.update(i+1) - enq.stop() - if data_aug_param is not None: - iterator.enable_random_transforms(data_aug_param) - return tf.concat(losses, axis=0).numpy() - -class SimpleIterator(IndexArrayIterator): - def __init__(self, iterator, input_scaling_function=None): - index_array = iterator._get_index_array(choice=False) - super().__init__(len(index_array), iterator.batch_size, False, 0, incomplete_last_batch_mode=INCOMPLETE_LAST_BATCH_MODE[0], step_number = 0) - self.set_allowed_indexes(index_array) - self.iterator = iterator - self.input_scaling_function = batchwise_inplace(input_scaling_function) if input_scaling_function is not None else None - - def _get_batches_of_transformed_samples(self, index_array): - batch = self.iterator._get_batches_of_transformed_samples(index_array) - if self.input_scaling_function is not None: - if isinstance(batch, (list, tuple)): - x, y = batch - x = self.input_scaling_function(x) - batch = x, y - else: - batch = self.input_scaling_function(batch) - return batch -def batchwise_inplace(function): - def fun(batch): - for i in range(batch.shape[0]): - batch[i] = function(batch[i]) - return batch - return fun diff --git a/dataset_iterator/concat_iterator.py b/dataset_iterator/concat_iterator.py index ae89ecb..648065d 100644 --- a/dataset_iterator/concat_iterator.py +++ b/dataset_iterator/concat_iterator.py @@ -1,3 +1,4 @@ +from math import isclose import numpy as np from .index_array_iterator import IndexArrayIterator, INCOMPLETE_LAST_BATCH_MODE from .utils import ensure_multiplicity, ensure_size @@ -12,7 +13,23 @@ def __init__(self, incomplete_last_batch_mode:str=INCOMPLETE_LAST_BATCH_MODE[1], step_number:int=0): assert isinstance(iterators, (list, tuple)), "iterators must be either list or tuple" - self.iterators = iterators + self.iterators = [] + def append_it(iterator): # unroll concat iterators + if isinstance(iterator, (list, tuple)): + for subit in iterator: + append_it(subit) + elif isinstance(iterator, ConcatIterator): + for subit in iterator.iterators: + append_it(subit) + else: + self.iterators.append(iterator) + + append_it(iterators) + bs = [it.get_batch_size() for it in self.iterators] + assert np.all(np.array(bs) == bs[0] ), "all sub iterator batch_size must be equal" + for it in self.iterators: + it.incomplete_last_batch_mode = incomplete_last_batch_mode + self.sub_iterator_batch_size = bs[0] if proportion is None: proportion = [1.] self.proportion = ensure_multiplicity(len(iterators), proportion) @@ -29,6 +46,12 @@ def _get_index_array(self, choice:bool = True): # return concatenated indices fo array = np.random.choice(array, size=array.shape[0], replace=True, p=self.index_probability) return array + def get_sample_number(self): + return self.it_cumlen[-1] + + def get_batch_size(self): + return self.batch_size * self.sub_iterator_batch_size + def _set_index_array(self): indices_per_iterator = [] for i, it in enumerate(self.iterators): @@ -56,8 +79,8 @@ def __len__(self): def _get_batches_of_transformed_samples(self, index_array): index_array = np.copy(index_array) # so that main index array is not modified - index_it = self._get_it_idx(index_array) # modifies index_array - + index_it = self._get_it_idx(index_array) # modifies index_array so that indices are relative to each iterator + #batches = [self.iterators[it_idx]._get_batches_of_transformed_samples(index_array[index_it==it_idx]) for it_idx in np.unique(index_it)] batches = [self.iterators[it][i] for i, it in zip(index_array, index_it)] for i in range(1, len(batches)): assert len(batches[i])==len(batches[0]), f"Iterators have different outputs: batch from iterator {index_it[0]} has length {len(batches[0])} whereas batch from iterator {index_it[i]} has length {batches[i]}" @@ -65,7 +88,7 @@ def _get_batches_of_transformed_samples(self, index_array): if len(batches[0]) == 2: inputs = [b[0] for b in batches] outputs = [b[1] for b in batches] - return (concat_numpy_arrays(inputs), concat_numpy_arrays(outputs)) + return concat_numpy_arrays(inputs), concat_numpy_arrays(outputs) else: return concat_numpy_arrays(batches) @@ -94,7 +117,16 @@ def disable_random_transforms(self, data_augmentation:bool=True, channels_postpr def enable_random_transforms(self, parameters): for it, params in zip(self.iterators, parameters): it.enable_random_transforms(params) - + + def set_index_probability(self, value): # set to sub_iterators/ expects a concatenated vector in the order of sub iterators + cur_idx = 0 + for it in self.iterators: + size = it.get_sample_number() + proba = value[cur_idx:cur_idx+size] + it.index_probability = proba / np.sum(proba) + cur_idx+=size + assert cur_idx == value.shape[0], f"invalid index_probability length expected: {cur_idx} actual {value.shape[0]}" + def concat_numpy_arrays(arrays): if isinstance(arrays[0], (list, tuple)): n = len(arrays[0]) diff --git a/dataset_iterator/hard_sample_mining.py b/dataset_iterator/hard_sample_mining.py new file mode 100644 index 0000000..b8901a1 --- /dev/null +++ b/dataset_iterator/hard_sample_mining.py @@ -0,0 +1,220 @@ +import warnings +import os +import numpy as np +import tensorflow as tf +from .index_array_iterator import IndexArrayIterator, INCOMPLETE_LAST_BATCH_MODE + +class HardSampleMiningCallback(tf.keras.callbacks.Callback): + def __init__(self, iterator, target_iterator, predict_fun, metrics_fun, period:int=10, start_epoch:int=0, skip_first:bool=False, enrich_factor:float=10., quantile_max:float=0.99, quantile_min:float=None, disable_channel_postprocessing:bool=False, workers=None, verbose:int=1): + super().__init__() + self.period = period + self.start_epoch = start_epoch + self.skip_first = skip_first + self.iterator = iterator + self.target_iterator = target_iterator + self.predict_fun = predict_fun + self.metrics_fun = metrics_fun + self.enrich_factor = enrich_factor + self.quantile_max = quantile_max + self.quantile_min = quantile_min + self.disable_channel_postprocessing = disable_channel_postprocessing + self.workers = workers + self.verbose = verbose + self.metric_idx = -1 + self.proba_per_metric = None + self.n_metrics = 0 + self.data_aug_param = self.iterator.disable_random_transforms(True, self.disable_channel_postprocessing) + simple_iterator = SimpleIterator(self.iterator) + self.batch_size = self.iterator.get_batch_size() + self.n_batches = len(simple_iterator) + self.enq = tf.keras.utils.OrderedEnqueuer(simple_iterator, use_multiprocessing=True, shuffle=False) + + def close(self): + self.enq.stop() + if self.data_aug_param is not None: + self.iterator.enable_random_transforms(self.data_aug_param) + self.iterator.close() + + def on_epoch_begin(self, epoch, logs=None): + if self.period==1 or (epoch + self.start_epoch) % self.period == 0: + if epoch == 0 and self.skip_first: + return + metrics = self.compute_metrics() + first = self.proba_per_metric is None + self.proba_per_metric = get_index_probability(metrics, enrich_factor=self.enrich_factor, quantile_max=self.quantile_max, quantile_min=self.quantile_min, verbose=self.verbose) + self.n_metrics = self.proba_per_metric.shape[0] if len(self.proba_per_metric.shape) == 2 else 1 + if first and self.n_metrics > self.period: + warnings.warn(f"Hard sample mining period = {self.period} should be greater than metric number = {self.n_metrics}") + if self.proba_per_metric is not None: + if len(self.proba_per_metric.shape) == 2: + self.metric_idx = (self.metric_idx + 1) % self.n_metrics + proba = self.proba_per_metric[self.metric_idx] + else: + proba = self.proba_per_metric + # set probability to iterator in case of multiprocessing iwth OrderedEnqueeur this will be taken into account only a next epoch has iterator has already been sent to processes at this stage + self.target_iterator.set_index_probability(proba) + + def on_train_end(self, logs=None): + self.close() + + def compute_metrics(self): + workers = os.cpu_count() if self.workers is None else self.workers + self.enq.start(workers=workers, max_queue_size=max(3, min(self.n_batches, workers))) + gen = self.enq.get() + compute_metrics_fun = get_compute_metrics_fun(self.predict_fun, self.metrics_fun, self.batch_size) + metrics = compute_metrics_loop(compute_metrics_fun, gen, self.batch_size, self.n_batches, self.verbose) + self.enq.stop() + return metrics + + +def get_index_probability_1d(metric, enrich_factor:float=10., quantile_min:float=0.01, quantile_max:float=None, max_power:int=10, power_accuracy:float=0.1, verbose:int=1): + assert 0.5 > quantile_min >= 0, f"invalid min quantile: {quantile_min}" + if quantile_max is None: + quantile_max = 1 - quantile_min + assert 1 >= quantile_max > 0.5, f"invalid max quantile: {quantile_max}" + if 1. / enrich_factor < (1 - quantile_max): # incompatible enrich factor and quantile + quantile_max = 1. - 1 / enrich_factor + #print(f"modified quantile_max to : {quantile_max}") + metric_quantiles = np.quantile(metric, [quantile_min, quantile_max]) + + Nh = metric[metric <= metric_quantiles[0]].shape[0] # hard examples (low metric) + Ne = metric[metric >= metric_quantiles[1]].shape[0] # easy examples (high metric) + metric_sub = metric[(metric < metric_quantiles[1]) & (metric > metric_quantiles[0])] + Nm = metric_sub.shape[0] + S = np.sum( ((metric_sub - metric_quantiles[1]) / (metric_quantiles[0] - metric_quantiles[1])) ) + p_max = enrich_factor / metric.shape[0] + p_min = (1 - p_max * (Nh + S)) / (Nm + Ne - S) if (Nm + Ne - S)!=0 else -1 + if p_min<0: + p_min = 0. + target = 1./p_max - Nh + if target <= 0: # cannot reach enrich factor: too many hard examples + power = max_power + else: + fun = lambda power_: np.sum(((metric_sub - metric_quantiles[1]) / (metric_quantiles[0] - metric_quantiles[1])) ** power_) + power = 1 + Sn = S + while power < max_power and Sn > target: + power += power_accuracy + Sn = fun(power) + if power > 1 and Sn < target: + power -= power_accuracy + else: + power = 1 + #print(f"p_min {p_min} ({(1 - p_max * (Nh + S)) / (Nm + Ne - S)}) Nh: {Nh} nE: {Ne} Nm: {Nm} S: {S} pmax: {p_max} power: {power}") + # drop factor at min quantile, enrich factor at max quantile, interpolation in between + def get_proba(value): + if value <= metric_quantiles[0]: + return p_max + elif value >= metric_quantiles[1]: + return p_min + else: + return p_min + (p_max - p_min) * ((value - metric_quantiles[1]) / (metric_quantiles[0] - metric_quantiles[1]))**power + + vget_proba = np.vectorize(get_proba) + proba = vget_proba(metric) + p_sum = float(np.sum(proba)) + proba /= p_sum + if verbose > 1: + print(f"metric proba range: [{np.min(proba) * metric.shape[0]}, {np.max(proba) * metric.shape[0]}] (target range: [{p_min}; {p_max}]) power: {power} sum: {p_sum} quantiles: [{quantile_min}; {quantile_max}]", flush=True) + return proba + +def get_index_probability(metrics, enrich_factor:float=10., quantile_max:float=0.99, quantile_min:float=None, verbose:int=1): + if len(metrics.shape) == 1: + return get_index_probability_1d(metrics, enrich_factor=enrich_factor, quantile_max=quantile_max, quantile_min=quantile_min, verbose=verbose) + probas_per_metric = [get_index_probability_1d(metrics[:, i], enrich_factor=enrich_factor, quantile_max=quantile_max, quantile_min=quantile_min, verbose=verbose) for i in range(metrics.shape[1])] + probas_per_metric = np.stack(probas_per_metric, axis=0) + #proba = np.max(probas_per_metric, axis=0) + #proba /= np.sum(proba) + return probas_per_metric + +def compute_metrics(iterator, predict_function, metrics_function, disable_augmentation:bool=True, disable_channel_postprocessing:bool=False, workers:int=None, verbose:int=1): + data_aug_param = iterator.disable_random_transforms(disable_augmentation, disable_channel_postprocessing) + simple_iterator = SimpleIterator(iterator) + batch_size = iterator.get_batch_size() + n_batches = len(simple_iterator) + + compute_metrics_fun = get_compute_metrics_fun(predict_function, metrics_function, batch_size) + + if workers is None: + workers = os.cpu_count() + enq = tf.keras.utils.OrderedEnqueuer(simple_iterator, use_multiprocessing=True, shuffle=False) + enq.start(workers=workers, max_queue_size=max(3, min(n_batches, workers))) + gen = enq.get() + metrics = compute_metrics_loop(compute_metrics_fun, gen, batch_size, n_batches, verbose) + enq.stop() + if data_aug_param is not None: + iterator.enable_random_transforms(data_aug_param) + iterator.close() + return metrics + +def compute_metrics_loop(compute_metrics_fun, gen, batch_size, n_batches, verbose): + metrics = [] + indices = [] + if verbose>=1: + print(f"Hard Sample Mining: computing metrics...") + progbar = tf.keras.utils.Progbar(n_batches) + n_tiles = None + for i in range(n_batches): + current_indices, x, y_true = next(gen) + batch_metrics = compute_metrics_fun(x, y_true) + if x.shape[0] > batch_size or n_tiles is not None: # tiling: keep hardest tile per sample + if n_tiles is None: # record n_tile which is constant but last batch may have fewer elements + n_tiles = x.shape[0] // batch_size + batch_metrics = tf.reshape(batch_metrics, shape=(n_tiles, -1, batch_metrics.shape[1])) + batch_metrics = tf.reduce_max(batch_metrics, axis=0, keepdims=False) # record hardest tile per sample + metrics.append(batch_metrics) + indices.append(current_indices) + if verbose >= 1: + progbar.update(i + 1) + if verbose >= 1: + print("metrics computed", flush=True) + indices = tf.concat(indices, axis=0).numpy() + metrics = tf.concat(metrics, axis=0).numpy() + if len(metrics.shape) == 1: + metrics = indices[:, np.newaxis] + # metrics = tf.concat([indices[:, np.newaxis].astype(metrics.dtype), metrics], axis=1) + if not np.all(indices[1:] - indices[:-1] == 1): + indices = np.argsort(indices) + metrics = metrics[indices, :] + if verbose >= 1: + print("some indices where not in order!!", flush=True) + return metrics +def get_compute_metrics_fun(predict_function, metrics_function, batch_size): + @tf.function + def compute_metrics(x, y_true): + y_pred = predict_function(x) + n_samples = tf.shape(x)[0] + def metrics_fun(j): # compute metric per batch item in case metric is reduced along batch size + y_t = [y_true[k][j:j+1] for k in range(len(y_true))] + y_p = [y_pred[k][j:j+1] for k in range(len(y_pred))] + metrics = metrics_function(y_t, y_p) + return tf.stack(metrics, 0) + return tf.map_fn(metrics_fun, elems=tf.range(n_samples), fn_output_signature=tf.float32, parallel_iterations=batch_size) + + return compute_metrics + +class SimpleIterator(IndexArrayIterator): + def __init__(self, iterator, input_scaling_function=None): + index_array = iterator._get_index_array(choice=False) + super().__init__(len(index_array), iterator.batch_size, False, 0, incomplete_last_batch_mode=INCOMPLETE_LAST_BATCH_MODE[0], step_number = 0) + self.set_allowed_indexes(index_array) + self.iterator = iterator + self.input_scaling_function = batchwise_inplace(input_scaling_function) if input_scaling_function is not None else None + + def _get_batches_of_transformed_samples(self, index_array): + batch = self.iterator._get_batches_of_transformed_samples(index_array) + if isinstance(batch, (list, tuple)): + x, y = batch + batch = index_array, x, y + else: + if self.input_scaling_function is not None: + x = self.input_scaling_function(batch) + batch = index_array, x + return batch + +def batchwise_inplace(function): + def fun(batch): + for i in range(batch.shape[0]): + batch[i] = function(batch[i]) + return batch + return fun diff --git a/dataset_iterator/helpers.py b/dataset_iterator/helpers.py index 21a692f..979e5cc 100644 --- a/dataset_iterator/helpers.py +++ b/dataset_iterator/helpers.py @@ -102,13 +102,18 @@ def get_histogram(dataset, channel_keyword:str, bins, bin_size=None, sum_to_one: if bins is None: assert bin_size is not None vmin, vmax = get_min_and_max(dataset, channel_keyword, batch_size=batch_size) - n_bins = round( (vmax - vmin) / bin_size ) - bin_size = (vmax - vmin) / n_bins - bins = np.linspace(vmin, vmax, num=n_bins+1) - if isinstance(bins, int): + n_bins = int( 1 + (vmax - vmin ) / bin_size ) + bin_size = (vmax - vmin ) / (n_bins - 1) + bins = np.linspace(vmin, vmax + bin_size, num=n_bins+1) + #print(f"range: [{vmin}; {vmax}] nbins: {n_bins} binsize: {bin_size} bins: {bins}") + elif isinstance(bins, int): + assert bins>1, "at least 2 bins" vmin, vmax = get_min_and_max(dataset, channel_keyword, batch_size=batch_size) - bin_size = (vmax - vmin)/bins - bins = np.linspace(vmin, vmax, num=bins+1) + bin_size = (vmax - vmin)/(bins-1) + bins = np.linspace(vmin, vmax + bin_size, num=bins+1) + else: + assert isinstance(bins, (list, tuple)) + vmin = bins[0] histogram = None for i in range(len(iterator)): batch = iterator[i] diff --git a/dataset_iterator/index_array_iterator.py b/dataset_iterator/index_array_iterator.py index 22ea34c..a1ccf81 100644 --- a/dataset_iterator/index_array_iterator.py +++ b/dataset_iterator/index_array_iterator.py @@ -36,6 +36,15 @@ def _get_index_array(self, choice:bool = True): array = np.copy(array) return array + def get_sample_number(self): + return len(self.allowed_indexes) + + def get_batch_size(self): + return self.batch_size + + def set_index_probability(self, value): + self.index_probability = value + def __len__(self): if self.step_number > 0: return self.step_number @@ -123,6 +132,7 @@ def index_probability(self): @index_probability.setter def index_probability(self, value): if value is not None: - assert len(self.allowed_indexes) == len(value), f"invalid index_probability length: expected: {len(self.allowed_indexes)} actual {len(value)}" + size = self.get_sample_number() + assert size == len(value), f"invalid index_probability length: expected: {size} actual {len(value)}" assert isclose(np.sum(value), 1.), "probabilities do not sum to 1" self._index_probability = value diff --git a/setup.py b/setup.py index 86ec69d..b6a3742 100644 --- a/setup.py +++ b/setup.py @@ -8,12 +8,12 @@ version="0.3.8", author="Jean Ollion", author_email="jean.ollion@polytechnique.org", - description="data iterator for images contained in dataset files such as hdf5 or PIL readable files. Images can be contained in several files. Based on tensorflow.keras.preprocessing.image.Iterator", + description="Keras-style data iterator for images contained in dataset files such as hdf5 or PIL readable files. Images can be contained in several files.", long_description=long_description, long_description_content_type="text/markdown", url="https://github.com/jeanollion/dataset_iterator.git", - download_url = 'https://files.pythonhosted.org/packages/b3/1c/6383e70b8d6e409fe1e3a774d659ff0fc7fa7933a88dd199a6e48319df8b/dataset_iterator-0.3.4.tar.gz', - keywords = ['Iterator', 'Dataset', 'Image', 'Numpy'], + download_url='https://files.pythonhosted.org/packages/b3/1c/6383e70b8d6e409fe1e3a774d659ff0fc7fa7933a88dd199a6e48319df8b/dataset_iterator-0.3.9.tar.gz', + keywords=['Iterator', 'Dataset', 'Image', 'Numpy'], packages=setuptools.find_packages(), classifiers=[ #https://pypi.org/classifiers/ 'Development Status :: 4 - Beta',