Skip to content

Commit

Permalink
Use multiprocessing pool instead of dask
Browse files Browse the repository at this point in the history
This change is informed by several issues:

The previous version stacked `delayed` put a delayed call around a
generator. Starting with dask 2023.9.2 this lead the traceback attached
at the end of this message.

I opted to remove dask completely because even though I managed to get
it working with the newer dask version, the task apparently isn't
gaining much from multi-threading. Even with only two workers, CPU cores
aren't at 100% on my machine. Using a multi-processing scheduler
resulted in errors ("worker unexpectetly finished" or something like
that). So instead I opted to use Python's default multiprocessing pool
which speeds up the example nicely (from 33s to 9s on my machine).

Traceback (most recent call last):
  File "/home/lg/Res/scikit-image/doc/examples/applications/plot_haar_extraction_selection_classification.py", line 81, in <module>
    X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=150,
  [... shortened ...]
  File "/home/lg/.local/lib/venv/skimagedev/lib/python3.11/site-packages/sklearn/utils/validation.py", line 347, in _num_samples
    raise TypeError(
TypeError: Singleton array array(<generator object <genexpr> at 0x7f5cee64d2a0>, dtype=object) cannot be considered a valid collection.
  • Loading branch information
lagru committed Sep 18, 2023
1 parent 80442f1 commit d774eeb
Showing 1 changed file with 14 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
"""
from time import time
from functools import partial
from multiprocessing.pool import Pool

import numpy as np
import matplotlib.pyplot as plt

from dask import delayed

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
Expand All @@ -48,7 +48,6 @@
# integral image within this ROI is computed. Finally, the integral image is
# used to extract the features.

@delayed
def extract_feature_image(img, feature_type, feature_coord=None):
"""Extract the haar feature for the current image"""
ii = integral_image(img)
Expand All @@ -67,12 +66,11 @@ def extract_feature_image(img, feature_type, feature_coord=None):
# To speed up the example, extract the two types of features only
feature_types = ['type-2-x', 'type-2-y']

# Build a computation graph using Dask. This allows the use of multiple
# CPU cores later during the actual computation
X = delayed(extract_feature_image(img, feature_types) for img in images)
# Compute the result
# Use multiprocessing pool to compute tasks in parallel
worker = partial(extract_feature_image, feature_type=feature_types)
t_start = time()
X = np.array(X.compute(scheduler='single-threaded'))
X = [x for x in Pool().map(worker, images)]
X = np.stack(X)
time_full_feature_comp = time() - t_start

# Label images (100 faces and 100 non-faces)
Expand Down Expand Up @@ -139,12 +137,15 @@ def extract_feature_image(img, feature_type, feature_coord=None):
# but we would like to emphasize the usage of `feature_coord` and `feature_type`
# to recompute a subset of desired features.

# Build the computational graph using Dask
X = delayed(extract_feature_image(img, feature_type_sel, feature_coord_sel)
for img in images)
# Compute the result
# Use multiprocessing pool to compute tasks in parallel
worker = partial(
extract_feature_image,
feature_type=feature_type_sel,
feature_coord=feature_coord_sel
)
t_start = time()
X = np.array(X.compute(scheduler='single-threaded'))
X = [x for x in Pool().map(worker, images)]
X = np.stack(X)
time_subs_feature_comp = time() - t_start

y = np.array([1] * 100 + [0] * 100)
Expand Down

0 comments on commit d774eeb

Please sign in to comment.