Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spark support #145

Merged
merged 4 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ and this project adheres to `Semantic Versioning <https://semver.org/spec/v2.0.0
Unreleased
------------------------------------------------------------------------------

* |Feature| Add training metadata reporting and querying `issue #142 <https://github.com/schuderer/mllaunchpad/issues/142>`_,
* |Feature| Add training metadata reporting and querying,
`issue #142 <https://github.com/schuderer/mllaunchpad/issues/142>`_,
by `Andreas Schuderer <https://github.com/schuderer>`_.
* |Feature| Add support for typed CSVs (option ``dtypes_path`` of
:class:`FileDataSource <mllaunchpad.datasources.FileDataSource>` and
:class:`FileDataSink <mllaunchpad.datasources.FileDataSink>`),
`issue #127 <https://github.com/schuderer/mllaunchpad/issues/127>`_,
by `Elisa Partodikromo <https://github.com/planeetjupyter>`_.
* |Feature| Add Spark support (experimental), see ``examples/spark_datasource.py``
and `issue #145 <https://github.com/schuderer/mllaunchpad/issues/145>`_,
by `Andreas Schuderer <https://github.com/schuderer>`_.
* |Fixed| Keep generated RAML files free of command line messages,
`issue #126 <https://github.com/schuderer/mllaunchpad/issues/126>`_,
by `Andreas Schuderer <https://github.com/schuderer>`_.
Expand Down
Binary file modified docs/_static/examples.zip
Binary file not shown.
4 changes: 2 additions & 2 deletions docs/about.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ databases and CSV files are already included.

The model "payload" itself is the only time where code needs
to be produced by the model developer. Here, the developer is
basically filling in three functions in an R or Python template
(Spark support is planned):
basically filling in three functions in an :ref:`R <other_technologies>` or Python template
(:ref:`Spark <other_technologies>` is supported, too):

* ``train`` - uses data provided by Data Sources and returns
a trained model object
Expand Down
6 changes: 3 additions & 3 deletions docs/datasources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ lists of built-in and external :class:`DataSources <mllaunchpad.resource.DataSou
and :class:`DataSinks <mllaunchpad.resource.DataSink>`.

Each subclass of :class:`DataSources <mllaunchpad.resource.DataSource>` (e.g. :class:`~mllaunchpad.datasources.FileDataSource`,
:class:`~mllaunchpad.datasources.OracleDataSource`)
serves one or several ``types`` (e.g. ``csv``, ``euro_csv``, ``dbms.oracle``).
:class:`~mllaunchpad.datasources.SqlDataSource`)
serves one or several ``types`` (e.g. ``csv``, ``euro_csv``, ``dbms.sql``).
You specify the ``type`` in your DataSource's :doc:`configuration <config>`.
The same ``type`` can even be served by several different
:class:`~mllaunchpad.resource.DataSource` subclasses, in which case the
Expand Down Expand Up @@ -93,7 +93,7 @@ of the DataSources that correspond with those tables, but configure a
connection only once. For this, you specify a separate ``dbms:`` section in your
configuration where you give each connection a name (e.g. ``my_connection``) which
you can refer to in your ``datasource`` config by a type like e.g. ``dmbs.my_connection``.
See :class:`~mllaunchpad.datasources.OracleDataSource` below for an example.
See :class:`~mllaunchpad.datasources.SqlDataSource`) below for an example.

Built-in DataSources and DataSinks
------------------------------------------------------------------------------
Expand Down
79 changes: 79 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,85 @@ Configuration
See separate page :doc:`config`.


.. _other_technologies:

What about support for R, Spark, <other technology>?
------------------------------------------------------------------------------

ML Launchpad is designed to be as technology-agnostic and flexible as possible.
For machine learning technologies, this means that it does not care whether you use it
with SciKit-Learn, PyTorch, spaCy, etc. Just import the Python packages you need and enjoy.
See the tutorial in the next section for an example using SciKit-Learn.

For interfacing with the outside world (getting data, etc.), we created interfaces
for extending this functionality. The most common kinds are already supported out of
the box. For getting and persisting data, look into inheriting
:doc:`DataSources and DataSinks <datasources>`. For providing your model results in
other ways as the provided WSGI API (events, Azure functions, etc),
look into the :doc:`mllaunchpad API <mllaunchpad>` (particularly ``get_validated_config()``
and ``predict()``).

That said, we already accumulated some partial or complete solutions, and the one you need
might already be there:

- **Oracle, Impala, Hive, etc. support** is covered by :class:`~mllaunchpad.datasources.SqlDataSource`).
It uses `SQLAlchemy <https://docs.sqlalchemy.org/>`_, which adds a lot
of flexibility to the datasource configuration. Please see the :class:`SqlDataSource docs <mllaunchpad.datasources.SqlDataSource>`
for more information. There also are some special classes like OracleDataSource and, in the examples,
ImpalaDataSource, but those were made before SqlDataSource, and we suggest trying SqlDataSource first.
- **R support** works by using and adapting the ``r_example*`` files in the examples directory (experimental).
You leave ``r_model.py`` as is, configure it as the ``model:module:``, where you also configure ``model:r_file`` and
``model:r_dependencies`` with your script and R requirements. You will have to have R installed, as
well as the Python package ``rpy2[pandas]``.
- **Spark support** is available through the ``spark_datasource.py`` module in the examples (experimental).
Copy it into your project and include it in your config using the ``plugins:`` directive. Its detailed use
is documented in the module itself.
- **Containerization** is straightforward to do -- build an image that exposes the
ML Launchpad REST API::

# Example Dockerfile
ARG PYTHON=3.7
FROM python:${PYTHON}-slim-buster as mllp
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
vim \
unixodbc-dev \
unixodbc \
libpq-dev \
&& apt-get clean \
&& apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /var/www/mllp/app
COPY . . # In your project, be selective in what you put into the image.
RUN pip install -r requirements.txt
RUN pip install gunicorn
RUN python -m mllaunchpad -c my_config.yml train # If not pre-trained earlier.
EXPOSE 5000
CMD gunicorn --workers 4 --bind 0.0.0.0:5000 mllaunchpad.wsgi


- **Azure/Firebase/AWS lambda functions** for prediction can be easily created using the
:doc:`mllaunchpad API <mllaunchpad>`:

.. code-block:: python

import json
import azure.functions as func
import mllaunchpad # see https://mllaunchpad.readthedocs.io/en/stable/mllaunchpad.html

conf = mllaunchpad.get_validated_config("my_cfg_file_or_stream_or_url.yml") # None=use LAUNCHPAD_CFG env var

def main(req: func.HttpRequest) -> func.HttpResponse:
# (you need to validate params yourself here, skipped in this example)
result = mllaunchpad.predict(conf, arg_dict=req.params)
return func.HttpResponse(json.dumps(result), mimetype="application/json")

- For any other technology, there's a good chance that you can tackle it with one of these
mechanisms (extending DataSources/DataSinks or through the API). If you are unsure,
`please create an issue <https://github.com/schuderer/mllaunchpad/issues>`_.



.. _tutorial:

Tutorial
Expand Down
176 changes: 176 additions & 0 deletions examples/spark_datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import logging
# import os
from typing import Dict, Optional, Union, Generator # , Iterable, Optional, Union, cast

from mllaunchpad.resource import DataSource, Raw
from mllaunchpad.datasources import fill_nas, get_connection_args
# import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
# from pyspark.sql.types import *
from sqlalchemy.dialects import mysql
from sqlalchemy import text

logger = logging.getLogger(__name__)


def bind_sql_params(query, params):
"""Bind SQL params in a way as to avoid SQL injection.
This is needed because SparkSession.sql does not allow parameters.
"""
bound_query = str(
text(query)
.bindparams(**params)
.compile(dialect=mysql.dialect(), compile_kwargs={"literal_binds": True}))
return bound_query


class SparkDataSource(DataSource):
"""DataSource to retrieve Spark data as Spark DataFrames or Pandas DataFrames.

Configuration example::

plugins:
- examples.spark_datasource # or wherever you put this module

dbms:
# ... (other connections)
# Example:
my_connection: # NOTE: You can use the same connection for several datasources and datasinks
type: spark
master: local[*]
options:
# dict, passed to `config()` when creating the spark session
spark.hadoop.yarn.resourcemanager.principal_var: HADOOP_USER_NAME

# ...
datasources:
# ... (other datasources)
my_datasource:
type: dbms.my_connection
query: SELECT * FROM somewhere.my_table WHERE id = :id # fill `:params` by calling `get_dataframe` with a `dict`
expires: 0 # generic parameter, see documentation on DataSources
tags: [train] # generic parameter, see documentation on DataSources and DataSinks
"""

serves = ["dbms.spark"]

def __init__(
self, identifier: str, datasource_config: Dict, dbms_config: Dict
):
super().__init__(identifier, datasource_config)

self.dbms_config = dbms_config

logger.info("Creating spark session for datasource {}...".format(self.id))
spark_prep = (SparkSession
.builder
.appName("SparkDataSource_" + self.id)
.master(self.dbms_config["master"])
)

spark_cfg = get_connection_args(self.dbms_config)
for key, val in spark_cfg.items():
spark_prep = spark_prep.config(key, val)
self.spark = spark_prep.getOrCreate()

def get_dataframe(
self, params: Dict = None, chunksize: Optional[int] = None
) -> Union[pd.DataFrame, Generator]:
"""Get data as pandas dataframe.

Null values are replaced by ``numpy.nan``.

Example::

my_df = data_sources["my_datasource"].get_dataframe({"id": 387})

:param params: Query parameters to fill in query (e.g. replace query's `:id` parameter with value `387`)
:type params: optional dict
:param chunksize: Return an iterator where chunksize is the number of rows to include in each chunk.
:type chunksize: not supported

:return: DataFrame object, possibly cached according to config value of `expires:`
"""
df = self.get_spark_dataframe(params, chunksize).toPandas()
return fill_nas(df, as_generator=chunksize is not None)

def get_spark_dataframe(
self, params: Dict = None, chunksize: Optional[int] = None
) -> Union[pd.DataFrame, Generator]:
"""Get data as Spark dataframe.

Example::

spark_df = data_sources["my_datasource"].get_spark_dataframe({"id": 387})

:param params: Query parameters to fill in query (e.g. replace query's `:id` parameter with value `387`)
:type params: optional dict
:param chunksize: Return an iterator where chunksize is the number of rows to include in each chunk.
:type chunksize: not supported

:return: Spark DataFrame object, possibly cached according to config value of `expires:`
"""
if chunksize:
raise ValueError("Parameter `chunksize` not supported for SparkDataSource.")

raw_query = self.config["query"]
params = params or {}
query = bind_sql_params(raw_query, params)
kw_options = self.options

logger.debug(
"Fetching query {} with params {}, chunksize {}, and options {}...".format(
query, params, chunksize, kw_options
)
)
return self.spark.sql(query)

def get_raw(
self, params: Dict = None, chunksize: Optional[int] = None
) -> Raw:
"""Not implemented.

:raises NotImplementedError: Raw/blob format currently not supported.
"""
raise NotImplementedError(
"SparkDataSource currently does not not support raw format/blobs. "
'Use method "get_dataframe" for dataframes'
)


def test():
from mllaunchpad import get_validated_config_str
test_cfg = get_validated_config_str("""
dbms:
# ... (other connections)
# Example:
my_connection: # NOTE: You can use the same connection for several datasources and datasinks
type: spark
master: local[*]
options:
# dict, passed to `config()` when creating the spark session
spark.hadoop.yarn.resourcemanager.principal_var: HADOOP_USER_NAME
# ...
datasources:
# ... (other datasources)
my_datasource:
type: dbms.my_connection
query: SELECT * FROM cml_poc.iris
#WHERE seniorcitizen = :senior # fill `:params` by calling `get_dataframe` with a `dict`
expires: 0 # generic parameter, see documentation on DataSources
tags: [train] # generic parameter, see documentation on DataSources and DataSinks
model:
name: bla
version: 1.0.0
module: bla
model_store:
location: bla
""")
ds = SparkDataSource("my_datasource", test_cfg["datasources"]["my_datasource"], test_cfg["dbms"]["my_connection"])
try:
df = ds.get_dataframe()
# df = ds.get_spark_dataframe().toPandas()
finally:
ds.spark.stop()
return df
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import_heading_stdlib=Stdlib imports
import_heading_thirdparty=Third-party imports
import_heading_firstparty=Project imports
known_first_party=mllaunchpad
known_third_party=click,dill,flask,flask_restful,nox,numpy,pandas,pkg_resources,pytest,ramlfications,rpy2,setuptools,sklearn,werkzeug,yaml
known_third_party=click,dill,flask,flask_restful,nox,numpy,pandas,pkg_resources,pyspark,pytest,ramlfications,rpy2,setuptools,sklearn,sqlalchemy,werkzeug,yaml
# known_third_party=pandas,numpy,flask,flask-restful,ramlfications,werkzeug,sklearn,dill,pyyaml,pytest
# known_third_party=pytest # rest is installed so isort can detect it

Expand Down