Skip to content

Commit

Permalink
fix(eda.distribution): delay scipy computations
Browse files Browse the repository at this point in the history
  • Loading branch information
dovahcrow committed Sep 4, 2020
1 parent 4bba52e commit 89fafae
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
4 changes: 3 additions & 1 deletion dataprep/eda/correlation/compute/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def kendalltau( # pylint: disable=invalid-name
return np.float64(corr) # Sometimes corr is a float, causes dask error


@dask.delayed
@dask.delayed( # pylint: disable=no-value-for-parameter
name="kendalltau-scipy", pure=True
)
def corrcoef(arr: np.ndarray) -> np.ndarray:
"""delayed version of np.corrcoef."""
_, (corr, _) = np.corrcoef(arr, rowvar=False)
Expand Down
25 changes: 21 additions & 4 deletions dataprep/eda/distribution/compute/common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Common types and functionalities for compute(...)."""

from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union, cast

import dask
import numpy as np
import pandas as pd
import dask.dataframe as dd
from scipy.stats import normaltest as normaltest_, ks_2samp as ks_2samp_

from ...dtypes import drop_null, is_dtype, detect_dtype, Continuous, DTypeDef

Expand Down Expand Up @@ -271,9 +272,8 @@ def _calc_line_dt(
def _calc_groups(
df: dd.DataFrame, x: str, ngroups: int, largest: bool = True
) -> Tuple[dd.DataFrame, Dict[str, int], List[str]]:
"""
Auxillary function to parse the dataframe to consist of only the
groups with the largest counts
"""Auxillary function to parse the dataframe to consist of only the
groups with the largest counts.
"""

# group count statistics to inform the user of the sampled output
Expand All @@ -292,3 +292,20 @@ def _calc_groups(
grp_cnt_stats[f"{x}_shw"] = len(largest_grps)

return df, grp_cnt_stats, largest_grps


@dask.delayed( # pylint: disable=no-value-for-parameter
name="scipy-normaltest", pure=True, nout=2
)
def normaltest(arr: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Delayed version of scipy normaltest. Due to the dask version will
trigger a compute."""
return cast(Tuple[np.ndarray, np.ndarray], normaltest_(arr))


@dask.delayed( # pylint: disable=no-value-for-parameter
name="scipy-ks_2samp", pure=True, nout=2
)
def ks_2samp(data1: np.ndarray, data2: np.ndarray) -> Tuple[float, float]:
"""Delayed version of scipy ks_2samp."""
return cast(Tuple[float, float], ks_2samp_(data1, data2))
33 changes: 14 additions & 19 deletions dataprep/eda/distribution/compute/overview.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
"""Computations for plot(df) function."""

from typing import Any, Dict, List, Optional, Tuple
from itertools import combinations
from typing import Any, Dict, List, Optional, Tuple

import dask
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.array.stats import chisquare, normaltest, skew
from scipy.stats import ks_2samp
from dask.array.stats import chisquare, skew

from ....errors import UnreachableError
from ...dtypes import (
Expand All @@ -24,7 +23,7 @@
is_dtype,
)
from ...intermediate import Intermediate
from .common import _calc_line_dt
from .common import _calc_line_dt, ks_2samp, normaltest


def compute_overview(
Expand Down Expand Up @@ -81,9 +80,7 @@ def compute_overview(
first_rows[col].apply(hash)
except TypeError:
srs = df[col] = srs.astype(str)
datas.append(
calc_nom_col(drop_null(srs), first_rows[col], ngroups, largest)
)
datas.append(calc_nom_col(drop_null(srs), ngroups, largest))
col_names_dtypes.append((col, Nominal()))
elif is_dtype(col_dtype, Continuous()):
## if cfg.hist_enable or cfg.any_insights("hist"):
Expand Down Expand Up @@ -179,9 +176,7 @@ def calc_cont_col(srs: dd.Series, bins: int) -> Dict[str, Any]:


## def calc_nom_col(srs: dd.Series, first_rows: pd.Series, cfg: Config)
def calc_nom_col(
srs: dd.Series, first_rows: pd.Series, ngroups: int, largest: bool
) -> Dict[str, Any]:
def calc_nom_col(srs: dd.Series, ngroups: int, largest: bool) -> Dict[str, Any]:
"""
Computations for a categorical column in plot(df)
Expand Down Expand Up @@ -227,9 +222,7 @@ def calc_nom_col(
## data["npresent"] = srs.shape[0]

## if cfg.insight.constant_length_enable:
if not first_rows.apply(lambda x: isinstance(x, str)).all():
srs = srs.astype(str) # srs must be a string to compute the value lengths
length = srs.str.len()
length = srs.apply(lambda v: len(str(v)), meta=(srs.name, np.int64))
data["min_len"], data["max_len"] = length.min(), length.max()

return data
Expand Down Expand Up @@ -269,12 +262,13 @@ def calc_stats(
# compute distribution similarity on a data sample
# TODO .map_partitions() fails for create_report since it calls calc_stats() with a pd dataframe
# df_smp = df.map_partitions(lambda x: x.sample(min(1000, x.shape[0])), meta=df)
# NOTE ks_2samp triggers a .compute(), could use .delayed()

if num_cols: # remove this if statement when create_report is refactored
stats["ks_tests"] = []
for col1, col2 in list(combinations(num_cols, 2)):
if ks_2samp(df[col1], df[col2])[1] > 0.05:
stats["ks_tests"].append((col1, col2))
stats["ks_tests"].append(
(col1, col2, ks_2samp(df[col1], df[col2])[1] > 0.05)
)

return stats

Expand All @@ -299,9 +293,10 @@ def format_overview(data: Dict[str, Any]) -> List[Dict[str, str]]:
ins.append({"Duplicates": f"Dataset has {ndup} ({pdup}%) duplicate rows"})

## if cfg.insight.similar_distribution_enable
for cols in data.get("ks_tests", []):
msg = f"{cols[0]} and {cols[1]} have similar distributions"
ins.append({"Similar Distribution": msg})
for (*cols, test_result) in data.get("ks_tests", []):
if test_result:
msg = f"{cols[0]} and {cols[1]} have similar distributions"
ins.append({"Similar Distribution": msg})

data.pop("ks_tests", None)

Expand Down

0 comments on commit 89fafae

Please sign in to comment.