Skip to content

Commit

Permalink
Merge pull request IntelPython#12 from IntelPython/tc/feature/streaming
Browse files Browse the repository at this point in the history
feature/streaming
  • Loading branch information
fschlimb committed Nov 16, 2018
2 parents 49022e7 + 3ab0ba3 commit ab38201
Show file tree
Hide file tree
Showing 21 changed files with 702 additions and 51 deletions.
3 changes: 1 addition & 2 deletions conda-recipe-nodist/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ test:
imports:
- daal4py
source_files:
- examples/*batch.py
- examples/data/batch
- examples
- tests

about:
Expand Down
17 changes: 16 additions & 1 deletion conda-recipe-nodist/run_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
import os
import sys
import unittest

if sys.platform in ['win32', 'cygwin']:
os.environ['PATH'] = ';'.join([os.environ['PATH'], os.path.join(os.environ['CONDA_PREFIX'], 'Library', 'bin', 'libfabric')])

here = os.path.abspath(os.path.dirname(__file__))
ex_dir = os.path.join(here, "examples")

from examples.run_examples import run_all
from tests.test_examples import Test

s = unittest.defaultTestLoader.discover('tests')
r = unittest.TextTestRunner()
r.run(s)
sys.exit(0 if r._makeResult().wasSuccessful() else 1)
ret1 = 0 if r._makeResult().wasSuccessful() else 1

os.chdir(ex_dir)
ret2 = run_all(True)

sys.exit(ret1 + ret2)
6 changes: 1 addition & 5 deletions conda-recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ test:
imports:
- daal4py
source_files:
- examples/__init__.py
- examples/run_examples.py
- examples/*batch.py
- examples/*spmd.py
- examples/data
- examples
- tests

about:
Expand Down
5 changes: 5 additions & 0 deletions doc/algorithms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Detailed description of parameters and semantics are described in
Examples:

- `Single-Process SVD <https://github.com/IntelPython/daal4py/blob/master/examples/svd_batch.py>`_
- `Streaming SVD <https://github.com/IntelPython/daal4py/blob/master/examples/svd_streaming.py>`_
- `Multi-Process SVD <https://github.com/IntelPython/daal4py/blob/master/examples/svd_spmd.py>`_

.. autoclass:: daal4py.svd
Expand All @@ -51,6 +52,7 @@ Detailed description of parameters and semantics are described in
Examples:

- `Single-Process Low Order Moments <https://github.com/IntelPython/daal4py/blob/master/examples/low_order_moms_dense_batch.py>`_
- `Streaming Low Order Moments <https://github.com/IntelPython/daal4py/blob/master/examples/low_order_moms_dense_streaming.py>`_

.. autoclass:: daal4py.low_order_moments
.. autoclass:: daal4py.low_order_moments_result
Expand Down Expand Up @@ -147,6 +149,7 @@ Detailed description of parameters and semantics are described in
Examples:

- `Single-Process Naive Bayes <https://github.com/IntelPython/daal4py/blob/master/examples/naive_bayes_batch.py>`_
- `Streaming Naive Bayes <https://github.com/IntelPython/daal4py/blob/master/examples/naive_bayes_streaming.py>`_
- `Multi-Process Naive Bayes <https://github.com/IntelPython/daal4py/blob/master/examples/naive_bayes_spmd.py>`_

.. autoclass:: daal4py.multinomial_naive_bayes_training
Expand Down Expand Up @@ -268,6 +271,7 @@ Detailed description of parameters and semantics are described in
Examples:

- `Single-Process Linear Regression <https://github.com/IntelPython/daal4py/blob/master/examples/linear_regression_batch.py>`_
- `Streaming Linear Regression <https://github.com/IntelPython/daal4py/blob/master/examples/linear_regression_streaming.py>`_
- `Multi-Process Linear Regression <https://github.com/IntelPython/daal4py/blob/master/examples/linear_regression_spmd.py>`_

.. autoclass:: daal4py.linear_regression_training
Expand All @@ -287,6 +291,7 @@ Detailed description of parameters and semantics are described in
Examples:

- `Single-Process Ridge Regression <https://github.com/IntelPython/daal4py/blob/master/examples/ridge_regression_batch.py>`_
- `Streaming Ridge Regression <https://github.com/IntelPython/daal4py/blob/master/examples/ridge_regression_streaming.py>`_
- `Multi-Process Ridge Regression <https://github.com/IntelPython/daal4py/blob/master/examples/ridge_regression_spmd.py>`_

.. autoclass:: daal4py.ridge_regression_training
Expand Down
5 changes: 5 additions & 0 deletions doc/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ Singular Value Decomposition (SVD)
- `Single-Process PCA Transform <https://github.com/IntelPython/daal4py/blob/master/examples/pca_transform_batch.py>`_

- `Single-Process SVD <https://github.com/IntelPython/daal4py/blob/master/examples/svd_batch.py>`_
- `Streaming SVD <https://github.com/IntelPython/daal4py/blob/master/examples/svd_streaming.py>`_
- `Multi-Process SVD <https://github.com/IntelPython/daal4py/blob/master/examples/svd_spmd.py>`_

Moments of Low Order

- `Single-Process Low Order Moments <https://github.com/IntelPython/daal4py/blob/master/examples/low_order_moms_dense_batch.py>`_
- `Streaming Low Order Moments <https://github.com/IntelPython/daal4py/blob/master/examples/low_order_moms_dense_streaming.py>`_

Decision Forest Classification

Expand All @@ -53,6 +55,7 @@ k-Nearest Neighbors (kNN)
Multinomial Naive Bayes

- `Single-Process Naive Bayes <https://github.com/IntelPython/daal4py/blob/master/examples/naive_bayes_batch.py>`_
- `Streaming Naive Bayes <https://github.com/IntelPython/daal4py/blob/master/examples/naive_bayes_streaming.py>`_
- `Multi-Process Naive Bayes <https://github.com/IntelPython/daal4py/blob/master/examples/naive_bayes_spmd.py>`_

Support Vector Machine (SVM)
Expand Down Expand Up @@ -83,11 +86,13 @@ Gradient Boosted Regression
Linear Regression

- `Single-Process Linear Regression <https://github.com/IntelPython/daal4py/blob/master/examples/linear_regression_batch.py>`_
- `Streaming Linear Regression <https://github.com/IntelPython/daal4py/blob/master/examples/linear_regression_streaming.py>`_
- `Multi-Process Linear Regression <https://github.com/IntelPython/daal4py/blob/master/examples/linear_regression_spmd.py>`_

Ridge Regression

- `Single-Process Ridge Regression <https://github.com/IntelPython/daal4py/blob/master/examples/ridge_regression_batch.py>`_
- `Streaming Ridge Regression <https://github.com/IntelPython/daal4py/blob/master/examples/ridge_regression_streaming.py>`_
- `Multi-Process Ridge Regression <https://github.com/IntelPython/daal4py/blob/master/examples/ridge_regression_spmd.py>`_

K-Means Clustering
Expand Down
29 changes: 20 additions & 9 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ Fast, Scalable and Easy Machine Learning With DAAL4PY

Daal4py makes your Machine Learning algorithms in Python lightning fast and easy
to use. It provides highly configurable Machine Learning kernels, some of which
can be easily and efficiently scaled out to clusters of workstations.
Internally it uses Intel® DAAL (Intel® Data Analytics Acceleration
Library) to deliver the best performance.
support streaming input data and/or can be easily and efficiently scaled out to
clusters of workstations. Internally it uses Intel® DAAL (Intel® Data Analytics
Acceleration Library) to deliver the best performance.

Designed for Data Scientists and Framework Designers
----------------------------------------------------
Expand All @@ -17,7 +17,9 @@ powerful machine learning building blocks directly in a high-productivity manner
A simplified API gives high-level abstractions to the user with minimal boilerplate,
allowing for quick to write and easy to maintain code when utilizing Jupyter Notebooks.
For scaling capabilities, daal4py also provides the ability to do distributed machine
learning, giving a quick way to scale out.
learning, giving a quick way to scale out. Its streaming mode provides a
felxible mechanism for processing large amounts of data and/or non-contiguous
input data.

For framework designers, daal4py's has been fashioned to be built under other
frameworks from both an API and feature perspective. The machine learning models split
Expand Down Expand Up @@ -68,14 +70,22 @@ The full example could look like this::
result = kmeans_init(10, method="plusPlusDense").compute('data.csv')
print(result.centroids)

One can even run this on a cluster by simple adding initializing/finalizing the
network and adding a keyword-parameter::
One can even :ref:`run this on a cluster <distributed>` by simply
adding initializing/finalizing the network and adding a keyword-parameter::

from daal4py import daalinit, daalfini, kmeans_init
daalinit()
kmeans_init(10, method="plusPlusDense", distributed=True).compute(my_file)
result = kmeans_init(10, method="plusPlusDense", distributed=True).compute(my_file)
daalfini()

Last but not least, daal4py allows :ref:`getting input data from streams <streaming>`::

from daal4py import svd
algo = svd(streaming=True)
for input in stream_or_filelist:
algo.compute(input)
result = algo.finalize()

Daal4py's Design
----------------
The design of daal4py utilizes several different technologies to deliver Intel®
Expand Down Expand Up @@ -141,6 +151,7 @@ Content
:caption: Contents:

About daal4py <index>
Distribution <scaling>
Streaming <streaming>
Supported Algorithms <algorithms>
examples
scaling
Examples <examples>
2 changes: 2 additions & 0 deletions doc/scaling.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _distributed:

###############################################
Scaling on Distributed Memory (Multiprocessing)
###############################################
Expand Down
56 changes: 56 additions & 0 deletions doc/streaming.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
.. _streaming:

##############
Streaming Data
##############
For large quantities of data it might be impossible to provide all input data at
once. This might be because the data resides in multiple files and merging it is
to costly (or not feasible in other ways). In other cases the data is simply too
large to be loaded completely into memory. Or, the data might come in as an
actual stream. daal4py's streaming mode allows you to process such data.

Besides supporting certain use cases, streaming also allows interleaving I/O
operations with computation.

daal4py's streaming mode is as easy as follows:

1. When constructing the algorithm configure it with ``streaming=True``::

algo = daal4py.svd(streaming=True)
2. Repeat calling ``compute(input-data)`` with chunks of your input (arrays or
files)::

for f in input_files:
algo.compute(f)
3. When done with inputting, call ``finalize()`` to obtain the result::

result = algo.finalize()

The streaming algorithms also accept arrays as input, e.g. the data can come
from a stream rather than from multiple files. Here is an example which
simulates a data stream using a generator which reads a file in chunks:
`SVD reading stream of data <https://github.com/IntelPython/daal4py/blob/master/examples/stream.py>`_

Supported Algorithms and Examples
---------------------------------
The following algorithms support streaming:

- SVD (svd)

- `SVD <https://github.com/IntelPython/daal4py/blob/master/examples/svd_streaming.py>`_

- Linear Regression Training (linear_regression_training)

- `Linear Regression <https://github.com/IntelPython/daal4py/blob/master/examples/linear_regression_stream.py>`_

- Ridge Regression Training (ridge_regression_training)

- `Ridge Regression <https://github.com/IntelPython/daal4py/blob/master/examples/ridge_regression_stream.py>`_

- Multinomial Naive Bayes Training (multinomial_naive_bayes_training)

- `Naive Bayes <https://github.com/IntelPython/daal4py/blob/master/examples/naive_bayes_stream.py>`_

- Moments of Low Order

- `Low Order Moments <https://github.com/IntelPython/daal4py/blob/master/examples/low_order_moms_dense_streaming.py>`_
82 changes: 82 additions & 0 deletions examples/linear_regression_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#*******************************************************************************
# Copyright 2014-2018 Intel Corporation
# All Rights Reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License"), the following terms apply:
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
#*******************************************************************************

# daal4py Linear Regression example for streaming on shared memory systems

import daal4py as d4p
import numpy as np

# let's try to use pandas' fast csv reader
try:
import pandas
read_csv = lambda f, c, s=0, n=None: pandas.read_csv(f, usecols=c, delimiter=',', header=None, skiprows=s, nrows=n, dtype=np.float64).values
except:
# fall back to numpy genfromtxt
def read_csv(f, c, s=0, n=np.iinfo(np.int64).max):
a = np.genfromtxt(f, usecols=c, delimiter=',', skip_header=s, max_rows=n)
if a.shape[0] == 0:
raise Exception("done")
if a.ndim == 1:
return a[:, np.newaxis]
return a

def main():
infile = "./data/batch/linear_regression_train.csv"
testfile = "./data/batch/linear_regression_test.csv"

# Configure a Linear regression training object for streaming
train_algo = d4p.linear_regression_training(interceptFlag=True, streaming=True)

chunk_size = 250
lines_read = 0
# read and feed chunk by chunk
while True:
# Read data in chunks
# Let's have 10 independent, and 2 dependent variables (for each observation)
try:
indep_data = read_csv(infile, range(10), lines_read, chunk_size)
dep_data = read_csv(infile, range(10,12), lines_read, chunk_size)
except:
break
# Now feed chunk
train_algo.compute(indep_data, dep_data)
lines_read += indep_data.shape[0]

# All chunks are done, now finalize the computation
train_result = train_algo.finalize()

# Now let's do some prediction
predict_algo = d4p.linear_regression_prediction()
# read test data (with same #features)
pdata = read_csv(testfile, range(10))
ptdata = read_csv(testfile, range(10,12))
# now predict using the model from the training above
predict_result = predict_algo.compute(pdata, train_result.model)

# The prediction result provides prediction
assert predict_result.prediction.shape == (pdata.shape[0], dep_data.shape[1])

return (train_result, predict_result, ptdata)


if __name__ == "__main__":
(train_result, predict_result, ptdata) = main()
print("\nLinear Regression coefficients:\n", train_result.model.Beta)
print("\nLinear Regression prediction results: (first 10 rows):\n", predict_result.prediction[0:10])
print("\nGround truth (first 10 rows):\n", ptdata[0:10])
print('All looks good!')
Loading

0 comments on commit ab38201

Please sign in to comment.