Skip to content

Commit

Permalink
feat(eda): add progress bar for dask local scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
dovahcrow committed Sep 4, 2020
1 parent 3575aac commit e13257c
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 42 deletions.
19 changes: 9 additions & 10 deletions dataprep/eda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,24 @@
dataprep.eda
============
"""
import tempfile
from bokeh.io import output_notebook

from bokeh.io import output_file, output_notebook
from .distribution import compute, plot, render
from .correlation import compute_correlation, plot_correlation, render_correlation
from .missing import compute_missing, plot_missing, render_missing
from .create_report import create_report
from .utils import is_notebook
from .distribution import compute, plot, render
from .dtypes import (
DType,
Categorical,
Nominal,
Ordinal,
Numerical,
Continuous,
Discrete,
DateTime,
Discrete,
DType,
Nominal,
Numerical,
Ordinal,
Text,
)
from .missing import compute_missing, plot_missing, render_missing
from .utils import is_notebook

__all__ = [
"plot_correlation",
Expand Down
10 changes: 5 additions & 5 deletions dataprep/eda/correlation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
This module implements the plot_correlation(df) function.
"""

from typing import Any, List, Optional, Tuple, Union
from typing import Optional, Tuple, Union

import dask.dataframe as dd
import pandas as pd
from bokeh.io import show

from ..progress_bar import ProgressBar
from ..report import Report
from .compute import compute_correlation
from .render import render_correlation
from ..report import Report

__all__ = ["render_correlation", "compute_correlation", "plot_correlation"]

Expand Down Expand Up @@ -61,8 +61,8 @@ def plot_correlation(
This function only supports numerical or categorical data,
and it is better to drop None, Nan and Null value before using it
"""

intermediate = compute_correlation(df, x=x, y=y, value_range=value_range, k=k)
with ProgressBar(minimum=1):
intermediate = compute_correlation(df, x=x, y=y, value_range=value_range, k=k)
figure = render_correlation(intermediate)

return Report(figure)
48 changes: 25 additions & 23 deletions dataprep/eda/distribution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
This module implements the plot(df) function.
"""

from typing import Optional, Tuple, Union, Dict
from typing import Optional, Tuple, Union

import dask.dataframe as dd
import pandas as pd

from ..container import Container
from ..dtypes import DTypeDef
from ..progress_bar import ProgressBar
from ..report import Report
from .compute import compute
from .render import render
from ..report import Report
from ..dtypes import DTypeDef
from ..container import Container

__all__ = ["plot", "compute", "render"]

Expand Down Expand Up @@ -143,25 +144,26 @@ def plot(
"""
# pylint: disable=too-many-locals,line-too-long

intermediate = compute(
df,
x=x,
y=y,
z=z,
bins=bins,
ngroups=ngroups,
largest=largest,
nsubgroups=nsubgroups,
timeunit=timeunit.lower(),
agg=agg,
sample_size=sample_size,
top_words=top_words,
stopword=stopword,
lemmatize=lemmatize,
stem=stem,
value_range=value_range,
dtype=dtype,
)
with ProgressBar(minimum=1):
intermediate = compute(
df,
x=x,
y=y,
z=z,
bins=bins,
ngroups=ngroups,
largest=largest,
nsubgroups=nsubgroups,
timeunit=timeunit.lower(),
agg=agg,
sample_size=sample_size,
top_words=top_words,
stopword=stopword,
lemmatize=lemmatize,
stem=stem,
value_range=value_range,
dtype=dtype,
)
figure = render(intermediate, yscale=yscale, tile_size=tile_size)
if intermediate.visual_type == "distribution_grid":
return Container(figure)
Expand Down
12 changes: 8 additions & 4 deletions dataprep/eda/missing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

import dask.dataframe as dd
import pandas as pd
from bokeh.io import show

from ..dtypes import DTypeDef
from ..progress_bar import ProgressBar
from ..report import Report
from .compute import compute_missing
from .render import render_missing
from ..report import Report
from ..dtypes import DTypeDef

__all__ = ["render_missing", "compute_missing", "plot_missing"]

Expand Down Expand Up @@ -56,6 +56,10 @@ def plot_missing(
>>> plot_missing(df, "HDI_for_year")
>>> plot_missing(df, "HDI_for_year", "population")
"""
itmdt = compute_missing(df, x, y, dtype=dtype, bins=bins, ndist_sample=ndist_sample)

with ProgressBar(minimum=1):
itmdt = compute_missing(
df, x, y, dtype=dtype, bins=bins, ndist_sample=ndist_sample
)
fig = render_missing(itmdt)
return Report(fig)
159 changes: 159 additions & 0 deletions dataprep/eda/progress_bar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""ProgressBar shows the how many dask tasks finished/remains using tqdm."""

from typing import Any, Optional, Dict, Tuple, Union
from time import time

from dask.callbacks import Callback

from .utils import is_notebook

if is_notebook():
from tqdm.notebook import tqdm
else:
from tqdm import tqdm

# pylint: disable=method-hidden,too-many-instance-attributes
class ProgressBar(Callback): # type: ignore
"""A progress bar for DataPrep.EDA.
Parameters
----------
minimum : int, optional
Minimum time threshold in seconds before displaying a progress bar.
Default is 0 (always display)
_min_tasks : int, optional
Minimum graph size to show a progress bar, default is 5
width : int, optional
Width of the bar. None means auto width.
interval : float, optional
Update resolution in seconds, default is 0.1 seconds
"""

_minimum: float = 0
_min_tasks: int = 5
_width: Optional[int] = None
_interval: float = 0.1
_last_duration: float = 0
_pbar: Optional[tqdm] = None
_state: Optional[Dict[str, Any]] = None
_started: Optional[float] = None
_last_task: Optional[str] = None # in case we initialize the pbar in _finish

def __init__(
self,
minimum: float = 0,
min_tasks: int = 5,
width: Optional[int] = None,
interval: float = 0.1,
) -> None:
super().__init__()
self._minimum = minimum
self._min_tasks = min_tasks
self._width = width
self._interval = interval

def _start(self, _dsk: Any) -> None:
"""A hook to start this callback."""

def _start_state(self, _dsk: Any, state: Dict[str, Any]) -> None:
"""A hook called before every task gets executed."""
self._started = time()
self._state = state
_, ntasks = self._count_tasks()

if ntasks > self._min_tasks:
self._init_bar()

def _pretask(
self, key: Union[str, Tuple[str, ...]], _dsk: Any, _state: Dict[str, Any]
) -> None:
"""A hook called before one task gets executed."""
if self._started is None:
raise ValueError("ProgressBar not started properly")

if self._pbar is None and time() - self._started > self._minimum:
self._init_bar()

if isinstance(key, tuple):
key = key[0]

if self._pbar is not None:
self._pbar.set_description(f"Computing {key}")
else:
self._last_task = key

def _posttask(
self,
_key: str,
_result: Any,
_dsk: Any,
_state: Dict[str, Any],
_worker_id: Any,
) -> None:
"""A hook called after one task gets executed."""

if self._pbar is not None:
self._update_bar()

def _finish(self, _dsk: Any, _state: Dict[str, Any], _errored: bool) -> None:
"""A hook called after all tasks get executed."""
if self._started is None:
raise ValueError("ProgressBar not started properly")

if self._pbar is None and time() - self._started > self._minimum:
self._init_bar()

if self._pbar is not None:
self._update_bar()
self._pbar.close()

self._state = None
self._started = None
self._pbar = None

def _update_bar(self) -> None:
if self._pbar is None:
return
ndone, _ = self._count_tasks()

self._pbar.update(max(0, ndone - self._pbar.n))

def _init_bar(self) -> None:
if self._pbar is not None:
raise ValueError("ProgressBar already initialized.")
ndone, ntasks = self._count_tasks()

if self._last_task is not None:
desc = f"Computing {self._last_task}"
else:
desc = ""

if self._width is None:
self._pbar = tqdm(
total=ntasks,
dynamic_ncols=True,
mininterval=self._interval,
initial=ndone,
desc=desc,
)
else:
self._pbar = tqdm(
total=ntasks,
ncols=self._width,
mininterval=self._interval,
initial=ndone,
desc=desc,
)

self._pbar.start_t = self._started
self._pbar.refresh()

def _count_tasks(self) -> Tuple[int, int]:
if self._state is None:
raise ValueError("ProgressBar not started properly")

state = self._state
ndone = len(state["finished"])
ntasks = sum(len(state[k]) for k in ["ready", "waiting", "running"]) + ndone

return ndone, ntasks

0 comments on commit e13257c

Please sign in to comment.