Skip to content

Commit

Permalink
Enhance spark API. (#114)
Browse files Browse the repository at this point in the history
* Update spark apps to use spark-operator.

* Refactor spark apps directory.

* Allow spark.forecast to predict on train data.

* Allow custom aggregations for hierarchical data.

* Update version to 1.2.4.

* Always use strings for index columns.

This avoids problems with aggregation when attempting to use NA values
for integer columns.

* Ensure consistent schema for stderr.

* Fix some bugs with tree models.

Tree models can now accept max_forecast_steps=None and return_prev=True

* Fix bug in time series reconciliaton code.

* Added dataset for testing hierarchical time series

Derived from https://www.kaggle.com/datasets/manjeetsingh/retaildataset
which is released under a CC0 license.

* Add test coverage for pyspark code.
  • Loading branch information
aadyotb committed Jul 15, 2022
1 parent e22b6d6 commit 638529e
Show file tree
Hide file tree
Showing 16 changed files with 8,821 additions and 164 deletions.
9 changes: 5 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
ARG spark_uid=185
FROM apache/spark-py:v3.2.1
FROM gcr.io/spark-operator/spark-py:v3.1.1

# Change to root user for installation steps
USER 0

# Install pyarrow (for spark-sql) and Merlion; get pyspark & py4j from the PYTHONPATH
ENV PYTHONPATH="${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:${PYTHONPATH}"
ENV PYTHONPATH="${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-0.10.9-src.zip:${PYTHONPATH}"
COPY *.md ./
COPY setup.py ./
COPY merlion merlion
RUN pip install pyarrow "./" && pip uninstall -y py4j
RUN pip install pyarrow "./"

# Copy Merlion pyspark apps
COPY spark /opt/spark/apps
COPY apps /opt/spark/apps
RUN chmod g+w /opt/spark/apps
USER ${spark_uid}
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ You can install `merlion` from PyPI by calling ``pip install salesforce-merlion`
cloning this repoand calling ``pip install Merlion/``, or ``pip install -e Merlion/`` to install in editable mode.
You may install additional dependencies via ``pip install salesforce-merlion[all]``, or by calling
``pip install "Merlion/[all]"`` if installing from source. Individually, the optional dependencies include ``plot``
for interactive plots, ``prophet`` for the popular [Prophet](https://github.com/facebook/prophet) model,
and ``deep-learning`` for all deep learning models.
for interactive plots and ``deep-learning`` for all deep learning models.

To install the data loading package `ts_datasets`, clone this repo and call ``pip install -e Merlion/ts_datasets/``.
This package must be installed in editable mode (i.e. with the ``-e`` flag) if you don't want to manually specify the
Expand Down
124 changes: 124 additions & 0 deletions apps/anomaly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#
# Copyright (c) 2022 salesforce.com, inc.
# All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
# For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
#
import argparse
import json
import re

from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, FloatType, StructField, StructType
from merlion.spark.dataset import read_dataset, write_dataset, TSID_COL_NAME
from merlion.spark.pandas_udf import anomaly


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--data", required=True, help="Path at which the dataset is stored.")
parser.add_argument("--output_path", required=True, help="Path at which to save output anomaly scores.")
parser.add_argument(
"--train_test_split", required=True, help="First timestamp in the dataset which should be used for testing."
)
parser.add_argument("--file_format", default="csv", help="File format of train data & output file.")
parser.add_argument(
"--model",
default=json.dumps({"name": "DefaultDetector"}),
help="JSON dict specifying the model we wish to use for anomaly detection.",
)
parser.add_argument(
"--index_cols",
default="[]",
help="JSON list of columns used to demarcate different time series. For example, if the dataset contains sales "
'for multiple items at different stores, this could be \'["store", "item"]\'. '
"If not given, we assume the dataset contains only 1 time series.",
)
parser.add_argument(
"--time_col",
default=None,
help="Name of the column containing timestamps. If not given, use the first non-index column.",
)
parser.add_argument(
"--data_cols",
default="[]",
help="JSON list of columns to use when modeling the data. If not given, use all non-index, non-time columns.",
)
args = parser.parse_args()

# Parse index_cols JSON string
try:
index_cols = json.loads(re.sub("'", '"', args.index_cols))
assert isinstance(index_cols, list)
except (json.decoder.JSONDecodeError, AssertionError) as e:
parser.error(
f"Expected --index_cols to be a JSON list. Got {args.index_cols}.\n" f"Caught {type(e).__name__}({e})"
)
else:
args.index_cols = index_cols

# Parse data_cols JSON string
try:
data_cols = json.loads(re.sub("'", '"', args.data_cols))
assert isinstance(data_cols, list)
except (json.decoder.JSONDecodeError, AssertionError) as e:
parser.error(
f"Expected --data_cols to be a JSON list if given. Got {args.data_cols}.\n"
f"Caught {type(e).__name__}({e})"
)
else:
args.data_cols = data_cols

# Parse JSON string for the model and set the model's target_seq_index
try:
model = json.loads(re.sub("'", '"', args.model))
assert isinstance(model, dict)
except (json.decoder.JSONDecodeError, AssertionError) as e:
parser.error(
f"Expected --model to be a JSON dict specifying a Merlion model. Got {args.model}.\n"
f"Caught {type(e).__name__}({e})"
)
else:
args.model = model

return args


def main():
args = parse_args()

# Read the dataset as a Spark DataFrame, and process it.
# This will add a TSID_COL_NAME column to identify each time series with a single integer.
spark = SparkSession.builder.appName("anomaly").getOrCreate()
df = read_dataset(
spark=spark,
file_format=args.file_format,
path=args.train_data,
time_col=args.time_col,
index_cols=args.index_cols,
data_cols=args.data_cols,
)
if args.time_col is None:
args.time_col = df.schema.fieldNames()[0]
args.index_cols = args.index_cols + [TSID_COL_NAME]

# Use spark to predict anomaly scores for each time series in parallel
index_fields = [df.schema[c] for c in args.index_cols]
pred_fields = [StructField(args.time_col, DateType()), StructField("anom_score", FloatType())]
output_schema = StructType(index_fields + pred_fields)
anomaly_df = df.groupBy(args.index_cols).applyInPandas(
lambda pdf: anomaly(
pdf,
index_cols=args.index_cols,
time_col=args.time_col,
train_test_split=args.train_test_split,
model=args.model,
),
schema=output_schema,
)

write_dataset(df=anomaly_df, time_col=args.time_col, path=args.output_path, file_format=args.file_format)


if __name__ == "__main__":
main()
206 changes: 206 additions & 0 deletions apps/forecast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#
# Copyright (c) 2022 salesforce.com, inc.
# All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
# For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
#
import argparse
import json
import re
from warnings import warn

from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, FloatType, StructField, StructType
from merlion.spark.dataset import create_hier_dataset, read_dataset, write_dataset, TSID_COL_NAME
from merlion.spark.pandas_udf import forecast, reconciliation


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--train_data", required=True, help="Path at which the train data is stored.")
parser.add_argument("--output_path", required=True, help="Path at which to save output forecasts.")
parser.add_argument(
"--time_stamps",
required=True,
help='JSON list of times we want to forecast, e.g. \'["2022-01-01 00:00:00", "2020-01-01 00:01:00"]\'.',
)
parser.add_argument("--target_col", required=True, help="Name of the column whose value we want to forecast.")
parser.add_argument(
"--predict_on_train", action="store_true", help="Whether to return the model's prediction on the training data."
)
parser.add_argument("--file_format", default="csv", help="File format of train data & output file.")
parser.add_argument(
"--model",
default=json.dumps({"name": "DefaultForecaster"}),
help="JSON dict specifying the model we wish to use for forecasting.",
)
parser.add_argument(
"--index_cols",
default="[]",
help="JSON list of columns used to demarcate different time series. For example, if the dataset contains sales "
'for multiple items at different stores, this could be \'["store", "item"]\'. '
"If not given, we assume the dataset contains only 1 time series.",
)
parser.add_argument(
"--hierarchical",
action="store_true",
default=False,
help="Whether the time series have a hierarchical structure. If true, we aggregate the time series in the "
"dataset (by summation), in the order specified by index_cols. For example, if index_cols is "
'\'["store", "item"]\', we first sum the sales of all items within store, and then sum the global '
"sales of all stores and all items.",
)
parser.add_argument(
"--agg_dict",
default="{}",
help="JSON dict indicating how different data columns should be aggregated if working with hierarchical time "
"series. Keys are column names, values are names of standard aggregations (e.g. sum, mean, max, etc.). "
"If a column is not specified, it is not aggregated. Note that we always sum the target column, regardless of "
"whether it is specified. This ensures that hierarchical time series reconciliation works correctly.",
)
parser.add_argument(
"--time_col",
default=None,
help="Name of the column containing timestamps. We use the first non-index column if one is not given.",
)
parser.add_argument(
"--data_cols",
default=None,
help="JSON list of columns to use when modeling the data."
"If not given, we do univariate forecasting using only target_col.",
)
args = parser.parse_args()

# Parse time_stamps JSON string
try:
time_stamps = json.loads(re.sub("'", '"', args.time_stamps))
assert isinstance(time_stamps, list) and len(time_stamps) > 0
except (json.decoder.JSONDecodeError, AssertionError) as e:
parser.error(
f"Expected --time_stamps to be a non-empty JSON list. Got {args.time_stamps}.\n Caught {type(e).__name__}({e})"
)
else:
args.time_stamps = time_stamps

# Parse index_cols JSON string
try:
index_cols = json.loads(re.sub("'", '"', args.index_cols)) or []
assert isinstance(index_cols, list)
except (json.decoder.JSONDecodeError, AssertionError) as e:
parser.error(
f"Expected --index_cols to be a JSON list. Got {args.index_cols}.\n Caught {type(e).__name__}({e})"
)
else:
args.index_cols = index_cols

# Parse agg_dict JSON string
try:
agg_dict = json.loads(re.sub("'", '"', args.agg_dict)) or {}
assert isinstance(agg_dict, dict)
except (json.decoder.JSONDecodeError, AssertionError) as e:
parser.error(f"Expected --agg_dict to be a JSON dict. Got {args.agg_dict}.\n Caught {type(e).__name__}({e})")
else:
if args.target_col not in agg_dict:
agg_dict[args.target_col] = "sum"
elif agg_dict[args.target_col] != "sum":
warn(
f'Expected the agg_dict to specify "sum" for target_col {args.target_col}, '
f'but got {agg_dict[args.target_col]}. Manually changing to "sum".'
)
agg_dict[args.target_col] = "sum"
args.agg_dict = agg_dict

# Set default data_cols if needed & make sure target_col is in data_cols
if args.data_cols is None:
args.data_cols = [args.target_col]
else:
try:
data_cols = json.loads(re.sub("'", '"', args.data_cols))
assert isinstance(data_cols, list)
except (json.decoder.JSONDecodeError, AssertionError) as e:
parser.error(
f"Expected --data_cols to be a JSON list if given. Got {args.data_cols}.\n"
f"Caught {type(e).__name__}({e})"
)
else:
args.data_cols = data_cols
if args.target_col not in args.data_cols:
parser.error(f"Expected --data_cols {args.data_cols} to contain --target_col {args.target_col}.")

# Parse JSON string for the model and set the model's target_seq_index
try:
model = json.loads(re.sub("'", '"', args.model))
assert isinstance(model, dict)
except (json.decoder.JSONDecodeError, AssertionError) as e:
parser.error(
f"Expected --model to be a JSON dict specifying a Merlion model. Got {args.model}.\n"
f"Caught {type(e).__name__}({e})"
)
else:
target_seq_index = {v: i for i, v in enumerate(args.data_cols)}[args.target_col]
model["target_seq_index"] = target_seq_index
args.model = model

# Only do hierarchical forecasting if there are index columns specifying a hierarchy
args.hierarchical = args.hierarchical and len(args.index_cols) > 0

return args


def main():
args = parse_args()

# Read the dataset as a Spark DataFrame, and process it.
# This will add a TSID_COL_NAME column to identify each time series with a single integer.
spark = SparkSession.builder.appName("forecast").getOrCreate()
df = read_dataset(
spark=spark,
file_format=args.file_format,
path=args.train_data,
time_col=args.time_col,
index_cols=args.index_cols,
data_cols=args.data_cols,
)
if args.time_col is None:
args.time_col = df.schema.fieldNames()[0]
args.index_cols = args.index_cols + [TSID_COL_NAME]

# Convert to a hierarchical dataset if desired
if args.hierarchical:
df, hier_matrix = create_hier_dataset(
spark=spark, df=df, time_col=args.time_col, index_cols=args.index_cols, agg_dict=args.agg_dict
)

# Use spark to generate forecasts for each time series in parallel
index_fields = [df.schema[c] for c in args.index_cols]
pred_fields = [
StructField(args.time_col, DateType()),
StructField(args.target_col, FloatType()),
StructField(f"{args.target_col}_err", FloatType()),
]
output_schema = StructType(index_fields + pred_fields)
forecast_df = df.groupBy(args.index_cols).applyInPandas(
lambda pdf: forecast(
pdf,
index_cols=args.index_cols,
time_col=args.time_col,
target_col=args.target_col,
time_stamps=args.time_stamps,
model=args.model,
predict_on_train=args.predict_on_train,
agg_dict=args.agg_dict,
),
schema=output_schema,
)

# Apply hierarchical time series reconciliation if desired
if args.hierarchical:
forecast_df = forecast_df.groupBy(args.time_col).applyInPandas(
lambda pdf: reconciliation(pdf, hier_matrix=hier_matrix, target_col=args.target_col), schema=output_schema
)

write_dataset(df=forecast_df, time_col=args.time_col, path=args.output_path, file_format=args.file_format)


if __name__ == "__main__":
main()

0 comments on commit 638529e

Please sign in to comment.