Skip to content

Commit

Permalink
Support spark date type (#82)
Browse files Browse the repository at this point in the history
Support for the Spark date type (treated as a timestamp).

Extending the unit tests for this type
Install Spark on Github Actions to be able to include spark tests in our CI/CD pipeline
Upgrade linting to use pre-commit (including pyupgrade for python3.6 syntax upgrades)
Document how to run popmon using spark on Google Colab (minimal example from scratch)
  • Loading branch information
sbrugman committed Jan 20, 2021
1 parent 31f3dfb commit 841bdbd
Show file tree
Hide file tree
Showing 25 changed files with 168 additions and 80 deletions.
25 changes: 18 additions & 7 deletions .github/workflows/build.yml
Expand Up @@ -24,16 +24,27 @@ jobs:
python -m pip install --upgrade pip
pip install -e .
pip install -r requirements-test.txt
- name: Lint with flake8 and black
- name: Lint with pre-commit
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# check using isort and black
make lint check=1
make lint
- name: Test with pytest
run: |
pip install pytest
pytest
pytest -m "not spark"
- name: Install Spark
env:
BUILD_DIR: "/home/runner/work/" #${{ github.workspace }}
JAVA_HOME: "/usr/lib/jvm/java-8-openjdk-amd64"
SPARK_VERSION: "2.4.7"
HADOOP_VERSION: "2.7"
SPARK_HOME: "/home/runner/work/spark/" #${{ github.workspace }}/spark/
run: |
sudo apt-get -y install openjdk-8-jdk
curl https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz --output ${BUILD_DIR}/spark.tgz
tar -xvzf ${BUILD_DIR}/spark.tgz && mv spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} ${SPARK_HOME}
pip install pytest-spark>=0.6.0 pyarrow>=0.8.0 pyspark==2.4.7
- name: Test with pytest (spark-specific)
run: |
pytest -m spark
examples:
runs-on: ubuntu-latest
Expand Down
21 changes: 21 additions & 0 deletions .pre-commit-config.yaml
@@ -0,0 +1,21 @@
repos:
- repo: https://github.com/psf/black
rev: 20.8b1
hooks:
- id: black
- repo: https://github.com/pycqa/isort
rev: 5.7.0
hooks:
- id: isort
files: '.*'
args: [ --profile=black, --project=popmon, --thirdparty histogrammar, --thirdparty pybase64 ]
- repo: https://gitlab.com/pycqa/flake8
rev: "3.8.4"
hooks:
- id: flake8
args: [ "--select=E9,F63,F7,F82"]
- repo: https://github.com/asottile/pyupgrade
rev: v2.7.4
hooks:
- id: pyupgrade
args: ['--py36-plus','--exit-zero-even-if-changed']
9 changes: 1 addition & 8 deletions Makefile
@@ -1,12 +1,5 @@
ifeq ($(check),1)
CHECK_ARG= --check
else
CHECK_ARG=
endif

lint:
isort $(CHECK_ARG) --profile black --project popmon --thirdparty histogrammar --thirdparty pybase64 .
black $(CHECK_ARG) .
pre-commit run --all-files

install:
pip install -e .
1 change: 0 additions & 1 deletion docs/source/conf.py
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# POPMON documentation build configuration file for sphinx.
#
Expand Down
36 changes: 34 additions & 2 deletions docs/source/configuration.rst
Expand Up @@ -83,7 +83,6 @@ The notation 'num', 'low', 'high' gives a fixed range histogram from 'low' to 'h
number of bins.



Monitoring rules
----------------

Expand Down Expand Up @@ -195,7 +194,7 @@ Spark usage

.. code-block:: python
import popmon
import popmon
from pyspark.sql import SparkSession
# downloads histogrammar jar files if not already installed, used for histogramming of spark dataframe
Expand All @@ -206,3 +205,36 @@ Spark usage
# generate the report
report = spark_df.pm_stability_report(time_axis='timestamp')
Spark example on Google Colab
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This snippet contains the instructions for setting up a minimal environment for popmon on Google Colab as a reference.

.. code-block:: console
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!wget -P /content/spark-2.4.7-bin-hadoop2.7/jars/ -q https://repo1.maven.org/maven2/org/diana-hep/histogrammar-sparksql_2.11/1.0.4/histogrammar-sparksql_2.11-1.0.4.jar
!wget -P /content/spark-2.4.7-bin-hadoop2.7/jars/ -q https://repo1.maven.org/maven2/org/diana-hep/histogrammar_2.11/1.0.4/histogrammar_2.11-1.0.4.jar
!pip install -q findspark popmon
Now that spark is installed, restart the runtime.

.. code-block:: python
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]") \
.config("spark.jars", "/content/jars/histogrammar_2.11-1.0.4.jar,/content/jars/histogrammar-sparksql_2.11-1.0.4.jar") \
.config("spark.sql.execution.arrow.enabled", "false") \
.config("spark.sql.session.timeZone", "GMT") \
.getOrCreate()
8 changes: 1 addition & 7 deletions make.bat
Expand Up @@ -3,13 +3,7 @@
setlocal enabledelayedexpansion

IF "%1%" == "lint" (
IF "%2%" == "check" (
SET CHECK_ARG= --check
) ELSE (
set CHECK_ARG=
)
isort !CHECK_ARG! --profile black --project popmon --thirdparty histogrammar --thirdparty pybase64 .
black !CHECK_ARG! .
pre-commit run --all-files
GOTO end
)

Expand Down
4 changes: 1 addition & 3 deletions popmon/analysis/hist_numpy.py
Expand Up @@ -337,9 +337,7 @@ def check_similar_hists(hc_list, check_type=True, assert_type=used_hist_types):
return False
dts = [hist.datatype for hist in hist_list]
if not dts.count(dts[0]) == len(dts):
warnings.warn(
"Input histograms have inconsistent datatypes: {dts}".format(dts=dts)
)
warnings.warn(f"Input histograms have inconsistent datatypes: {dts}")
return False
# Check generic attributes
if check_type:
Expand Down
13 changes: 7 additions & 6 deletions popmon/hist/filling/histogram_filler_base.py
Expand Up @@ -224,8 +224,9 @@ def assign_and_check_features(self, df, cols_by_type):
"No obvious time-axes found to choose from. So not used."
)
else:
w = f'Found {num} time-axes: {cols_by_type["dt"]}. Set *one* time_axis manually! Now NOT used.'
self.logger.warning(w)
self.logger.warning(
f'Found {num} time-axes: {cols_by_type["dt"]}. Set *one* time_axis manually! Now NOT used.'
)
else:
# c) no time axis
self.time_axis = ""
Expand Down Expand Up @@ -275,7 +276,7 @@ def assign_and_check_features(self, df, cols_by_type):
def auto_complete_bin_specs(self, df, cols_by_type):
"""auto complete the bin-specs that have not been provided
:param df: input dateframe
:param df: input dataframe
:param cols_by_type: dict of columns classified by type
"""
# auto-determine binning of numerical and time features for which no bin_specs exist
Expand Down Expand Up @@ -311,7 +312,7 @@ def auto_complete_bin_specs(self, df, cols_by_type):
if c in float_cols:
q = quantiles_f[c]
# by default, n_bins covers range 5-95% quantiles + we add 10%
# basicly this gives a nice plot when plotted
# basically this gives a nice plot when plotted
# specs for Bin and Sparselybin histograms
if q[1] == q[0]:
# in case of highly imbalanced data it can happen that q05=q95. If so use min and max instead.
Expand Down Expand Up @@ -354,7 +355,7 @@ def get_data_type(self, df, col):
:param str col: column
"""
if col not in self.get_features(df):
raise KeyError('column "{0:s}" not in input dataframe'.format(col))
raise KeyError(f'column "{col:s}" not in input dataframe')
return df[col].dtype

def categorize_features(self, df):
Expand Down Expand Up @@ -488,7 +489,7 @@ def get_hist_bin(self, hist, features, quant, col, dt):
else:
raise RuntimeError("Do not know how to interpret bin specifications.")
else:
# string and boolians are treated as categories
# string and booleans are treated as categories
hist = hg.Categorize(quantity=quant, value=hist)

return hist
2 changes: 1 addition & 1 deletion popmon/hist/filling/make_histograms.py
Expand Up @@ -171,7 +171,7 @@ def get_data_type(df, col):
:param str col: column
"""
if col not in df.columns:
raise KeyError('Column "{0:s}" not in input dataframe.'.format(col))
raise KeyError(f'Column "{col:s}" not in input dataframe.')
dt = dict(df.dtypes)[col]

if hasattr(dt, "type"):
Expand Down
2 changes: 1 addition & 1 deletion popmon/hist/filling/pandas_histogrammar.py
Expand Up @@ -101,7 +101,7 @@ def assert_dataframe(self, df):
:param df: input (pandas) data frame
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("retrieved object not of type {}".format(pd.DataFrame))
raise TypeError(f"retrieved object not of type {pd.DataFrame}")
if df.shape[0] == 0:
raise RuntimeError("data is empty")
return df
Expand Down
10 changes: 6 additions & 4 deletions popmon/hist/filling/spark_histogrammar.py
Expand Up @@ -149,12 +149,12 @@ def get_data_type(self, df, col):
:param str col: column
"""
if col not in df.columns:
raise KeyError('Column "{0:s}" not in input dataframe.'.format(col))
raise KeyError(f'Column "{col:s}" not in input dataframe.')
dt = dict(df.dtypes)[col]
# spark conversions to numpy or python equivalent
if dt == "string":
dt = "str"
elif dt == "timestamp":
elif dt in ["timestamp", "date"]:
dt = np.datetime64
elif dt == "boolean":
dt = bool
Expand Down Expand Up @@ -184,7 +184,9 @@ def process_features(self, df, cols_by_type):
col=col, type=self.var_dtype[col]
)
)
to_ns = sparkcol(col).cast("float") * 1e9

# first cast to timestamp (in case column is stored as date)
to_ns = sparkcol(col).cast("timestamp").cast("float") * 1e9
idf = idf.withColumn(col, to_ns)

hg.sparksql.addMethods(idf)
Expand Down Expand Up @@ -222,7 +224,7 @@ def construct_empty_hist(self, df, features):
return hist

def fill_histograms(self, idf):
"""Fill the histogramss
"""Fill the histograms
:param idf: input data frame used for filling histogram
"""
Expand Down
6 changes: 2 additions & 4 deletions popmon/hist/filling/utils.py
Expand Up @@ -34,9 +34,7 @@ def check_column(col, sep=":"):
if isinstance(col, str):
col = col.split(sep)
elif not isinstance(col, list):
raise TypeError(
'Columns "{}" needs to be a string or list of strings'.format(col)
)
raise TypeError(f'Columns "{col}" needs to be a string or list of strings')
return col


Expand All @@ -54,7 +52,7 @@ def check_dtype(dtype):
if dtype in {np.str_, np.string_, np.object_}:
dtype = np.dtype(str).type
except BaseException:
raise RuntimeError('unknown assigned datatype "{}"'.format(dtype))
raise RuntimeError(f'unknown assigned datatype "{dtype}"')
return dtype


Expand Down
6 changes: 3 additions & 3 deletions popmon/hist/histogram.py
Expand Up @@ -163,7 +163,7 @@ def project_split2dhist_on_axis(splitdict, axis="x"):
"splitdict: {wt}, type should be a dictionary.".format(wt=type(splitdict))
)
if axis not in ["x", "y"]:
raise ValueError("axis: {axis}, can only be x or y.".format(axis=axis))
raise ValueError(f"axis: {axis}, can only be x or y.")

hdict = dict()

Expand Down Expand Up @@ -217,9 +217,9 @@ def _edit_name(self, axis_name, xname, yname, convert_time_index, short_keys):
if convert_time_index and self.is_ts:
axis_name = pd.Timestamp(axis_name)
if not short_keys:
axis_name = "{name}={binlabel}".format(name=xname, binlabel=axis_name)
axis_name = f"{xname}={axis_name}"
if self.n_dim >= 2:
axis_name = "{name}[{slice}]".format(name=yname, slice=axis_name)
axis_name = f"{yname}[{axis_name}]"
return axis_name

def sparse_bin_centers_x(self):
Expand Down
8 changes: 4 additions & 4 deletions popmon/pipeline/metrics_pipelines.py
Expand Up @@ -106,7 +106,7 @@ def metrics_self_reference(
apply_funcs=[
dict(
func=rolling_lr_zscore,
suffix="_trend{w}_zscore".format(w=window),
suffix=f"_trend{window}_zscore",
entire=True,
window=window,
metrics=["mean", "phik", "fraction_true"],
Expand Down Expand Up @@ -227,7 +227,7 @@ def metrics_external_reference(
apply_funcs=[
dict(
func=rolling_lr_zscore,
suffix="_trend{w}_zscore".format(w=window),
suffix=f"_trend{window}_zscore",
entire=True,
window=window,
metrics=["mean", "phik", "fraction_true"],
Expand Down Expand Up @@ -342,7 +342,7 @@ def metrics_rolling_reference(
apply_funcs=[
dict(
func=rolling_lr_zscore,
suffix="_trend{w}_zscore".format(w=window),
suffix=f"_trend{window}_zscore",
entire=True,
window=window,
metrics=["mean", "phik", "fraction_true"],
Expand Down Expand Up @@ -456,7 +456,7 @@ def metrics_expanding_reference(
apply_funcs=[
dict(
func=rolling_lr_zscore,
suffix="_trend{w}_zscore".format(w=window),
suffix=f"_trend{window}_zscore",
entire=True,
window=window,
metrics=["mean", "phik", "fraction_true"],
Expand Down
6 changes: 3 additions & 3 deletions popmon/resources.py
Expand Up @@ -28,10 +28,10 @@
import popmon

# data files that are shipped with popmon.
_DATA = dict(
(_.name, _)
_DATA = {
_.name: _
for _ in pathlib.Path(resource_filename(popmon.__name__, "test_data")).glob("*")
)
}

# Tutorial notebooks
_NOTEBOOK = {
Expand Down
8 changes: 2 additions & 6 deletions popmon/stitching/hist_stitcher.py
Expand Up @@ -124,9 +124,7 @@ def stitch_histograms(
)
dts = [type(tv) for tv in time_bin_idx]
if not dts.count(dts[0]) == len(dts):
raise TypeError(
"time_bin_idxs have inconsistent datatypes: {dts}".format(dts=dts)
)
raise TypeError(f"time_bin_idxs have inconsistent datatypes: {dts}")

# basic checks and conversions
if isinstance(hists_basis, dict) and len(hists_basis) > 0:
Expand Down Expand Up @@ -338,9 +336,7 @@ def _insert_hists(self, hbasis, hdelta_list, time_bin_idx, mode):
)
dts = [type(tv) for tv in time_bin_idx]
if not dts.count(dts[0]) == len(dts):
raise TypeError(
"time_bin_idxs have inconsistent datatypes: {dts}".format(dts=dts)
)
raise TypeError(f"time_bin_idxs have inconsistent datatypes: {dts}")
if not isinstance(time_bin_idx[0], (str, int, np.integer)):
raise TypeError("time_bin_idxs should be an (ordered) string or integer.")

Expand Down
2 changes: 1 addition & 1 deletion popmon/visualization/backend.py
Expand Up @@ -91,7 +91,7 @@ def set_matplotlib_backend(backend=None, batch=None, silent=True):
)
)
logger.warning(
'Set Matplotlib backend to "{0:s}"; non-interactive backend required, but "{1:s}" requested.'.format(
'Set Matplotlib backend to "{:s}"; non-interactive backend required, but "{:s}" requested.'.format(
ni_backends[0], backend
)
)
Expand Down
10 changes: 10 additions & 0 deletions pyproject.toml
@@ -0,0 +1,10 @@
[tool.pytest.ini_options]
markers = ["spark"]

[tool.pytest.ini_options.spark_options]
"spark.executor.id" = "driver"
"spark.app.name" = "PySparkShell"
"spark.executor.instances" = 1
"master" = "local[*]"
"spark.driver.host" = "192.168.1.78"
"spark.sql.catalogImplementation" = "in-memory"

0 comments on commit 841bdbd

Please sign in to comment.