Generates a large dataset for experiments using `dask`

In [1]:
# %pip install h5py 'dask[complete]'

In [14]:
from dask.distributed import LocalCluster
cluster = LocalCluster(memory_limit='500MB')
client = cluster.get_client()


In [13]:
# generate dataset ~100GB
import h5py
import numpy as np
import dask.array as da
from tqdm import tqdm
import sparse
dim1 = int(1e3)
dim2 = int(1e2)
dim3 = int(1e2)
batch_size = 10000
# chunks = (10, 10, 10)
# rng = da.random.default_rng()

# x = rng.random((dim1,dim2,dim3), chunks=chunks)

# x[x<0.99] = 0
# x[x !=0] = 1
# s = x.map_blocks(sparse.COO, dtype=bool)


# s.to_hdf5("mytestfile.hdf5", '/mydataset')


with h5py.File("mytestfile.hdf5", "w") as f:
    dset = f.create_dataset("mydataset", (dim1, dim2, dim3), dtype=bool)

    # Outer loop with progress bar
    for i in tqdm(range(dim1), desc="Processing dim1", unit="slice"):
        # Generate all random indices for this batch
        rand_indices = np.random.randint(0, dim3, size=dim2)

        # Inner batch processing with progress bar
        for j_start in tqdm(range(0, dim2, batch_size),
                          desc=f"dim1={i}",
                          unit="batch",
                          leave=False):
            j_end = min(j_start + batch_size, dim2)
            batch_indices = rand_indices[j_start:j_end]

            # Create boolean array for this batch
            batch = np.zeros((j_end-j_start, dim3), dtype=bool)
            batch[np.arange(j_end-j_start), batch_indices] = True

            # Write the batch
            dset[i, j_start:j_end] = batch

OSError: Unable to synchronously create file (unable to truncate a file which is already open)

In [None]:
import warnings
from collections.abc import Generator
from os import PathLike
from sys import getsizeof
from typing import Annotated

import numpy as np
from annotated_types import Ge
from loguru import logger
from numpy.typing import NDArray
from pydantic import validate_call
from tqdm.auto import tqdm

from peerannot.models.aggregation.warnings import DidNotConverge
from peerannot.models.aggregation.dawid_skene import DawidSkene
from peerannot.models.template import AnswersDict, CrowdModel
import sparse as sp
FilePathInput = PathLike | str | list[str] | Generator[str, None, None] | None


class DaskDawidSkene(DawidSkene):
    """
    =============================
    Dawid and Skene model (1979)
    =============================

    Assumptions:
    - independent workers

    Using:
    - EM algorithm

    Estimating:
    - One confusion matrix for each workers
    """



    def _init_T(self) -> None:  # noqa: N802
        """NS initialization"""
        # T shape is n_task, n_classes
        T = self.crowd_matrix.sum(axis=1)  # noqa: N806

        tdim = T.sum(1, keepdims=True)
        self.T = da.where(tdim > 0, T / tdim, 0)


    def _m_step(
        self,
    ) -> None:
        """Maximizing log likelihood (see eq. 2.3 and 2.4 Dawid and Skene 1979)

        Returns:
            :math:`\\rho`: :math:`(\\rho_j)_j` probabilities that instance has
                true response j if drawn at random (class marginals)
            pi: number of times worker k records l when j is correct
        """
        
        self.rho = self.T.sum(0) / self.n_task
        pi = da.einsum('tq,twc->wqc', self.T, self.crowd_matrix)
        denom = pi.sum(axis=2, keepdims=True)
        self.pi = pi / da.where(denom <= 0, -1e9, denom)


    def _e_step(self) -> None:
        """Estimate indicator variables (see eq. 2.5 Dawid and Skene 1979)"""

        exp_pi = da.power(self.pi[da.newaxis, :, :, :], self.crowd_matrix[:, :, da.newaxis, :])

        # numerator by taking the product over the worker axis
        num = da.prod(exp_pi, axis=3).prod(axis=1) * self.rho[da.newaxis, :]
        self.denom_e_step = num.sum(axis=1, keepdims=True)
        self.T = da.where(self.denom_e_step > 0, num / self.denom_e_step, num)



    def _log_likelihood(self) -> float:
        """Compute log likelihood of the model"""
        return da.log(da.sum(self.denom_e_step))

    @validate_call
    def run(
        self,
        epsilon: Annotated[float, Ge(0)] = 1e-6,
        maxiter: Annotated[int, Ge(0)] = 50,
    ) -> tuple[list[float], int]:
        """Run the EM optimization

        :param epsilon: stopping criterion (:math:`\\ell_1` norm between two iterates of log likelihood), defaults to 1e-6
        :type epsilon: float, optional
        :param maxiter: Maximum number of steps, defaults to 50
        :type maxiter: int, optional
        :param verbose: Verbosity level, defaults to False
        :return: Log likelihood values and number of steps taken
        :rtype: (list,int)
        """

        i = 0
        eps = np.inf

        self._init_T()
        ll = []
        pbar = tqdm(total=maxiter, desc="Dawid and Skene")
        while i < maxiter and eps > epsilon:
            self._m_step()
            self._e_step()
            likeli = self._log_likelihood()
            ll.append(likeli)
            if i > 0:
                eps = da.abs(ll[-1] - ll[-2])
            i += 1
            pbar.update(1)

        pbar.set_description("Finished")
        pbar.close()
        self.c = i
        if eps > epsilon:
            warnings.warn(
                DidNotConverge(self.__class__.__name__, eps, epsilon),
                stacklevel=2,
            )

        return ll, i

    def get_answers(self) -> NDArray:
        """Get most probable labels"""

        return np.vectorize(self.inv_labels.get)(
            np.argmax(self.get_probas(), axis=1),
        )

    def get_probas(self) -> NDArray:
        """Get soft labels distribution for each task"""
        return self.T
    

from peerannot.models import DawidSkene
from types import MethodType
import dask.array as da
import h5py
import sparse
f = h5py.File("mytestfile.hdf5", "r")
dset = f["mydataset"]

dense_test_crowd_matrix = da.from_array(dset, chunks=(1000,1000,1000))
test_crowd_matrix = dense_test_crowd_matrix.map_blocks(sparse.COO)


dds = DaskDawidSkene.from_crowd_matrix(test_crowd_matrix)
dds.run(maxiter=50)




[32m2025-04-10 23:47:29.346[0m | [34m[1mDEBUG   [0m | [36mpeerannot.models.aggregation.DS[0m:[36m_init_crowd_matrix[0m:[36m107[0m - [34m[1mSize of dense crowd matrix: 10144[0m


Dawid and Skene:   0%|          | 0/50 [00:00<?, ?it/s]

([dask.array<log, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>,
  dask.array<log, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>,
  dask.array<log, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>,
  dask.array<log, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>,
  dask.array<log, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>],
 5)

In [12]:
dds.T[0]

Unnamed: 0,Array,Chunk
Shape,"(100,)","(100,)"
Dask graph,1 chunks in 132 graph layers,1 chunks in 132 graph layers
Data type,float64 sparse._coo.core.COO,float64 sparse._coo.core.COO
"Array Chunk Shape (100,) (100,) Dask graph 1 chunks in 132 graph layers Data type float64 sparse._coo.core.COO",100  1,

Unnamed: 0,Array,Chunk
Shape,"(100,)","(100,)"
Dask graph,1 chunks in 132 graph layers,1 chunks in 132 graph layers
Data type,float64 sparse._coo.core.COO,float64 sparse._coo.core.COO


In [3]:
test_crowd_matrix.shape

(100, 100, 100)