From 35fad20f4e1bd7ad2c353dff2c813bdcb919ce38 Mon Sep 17 00:00:00 2001 From: Siu Kwan Lam Date: Fri, 12 Jan 2018 15:44:23 -0600 Subject: [PATCH] Implement .apply_rows() to do row-by-row transformation --- pygdf/applyutils.py | 95 ++++++++++++++++++++++++++++++++++ pygdf/dataframe.py | 57 +++++++++++++++++++- pygdf/tests/test_cuda_apply.py | 38 ++++++++++++++ pygdf/tests/test_dataframe.py | 2 +- 4 files changed, 190 insertions(+), 2 deletions(-) create mode 100644 pygdf/applyutils.py create mode 100644 pygdf/tests/test_cuda_apply.py diff --git a/pygdf/applyutils.py b/pygdf/applyutils.py new file mode 100644 index 00000000000..4668cc923d4 --- /dev/null +++ b/pygdf/applyutils.py @@ -0,0 +1,95 @@ +from weakref import WeakKeyDictionary +import functools + +from numba.utils import pysignature, exec_ +from numba import cuda + + +def apply_rows(df, func, incols, outcols, kwargs): + # Get input columns + inputs = {k: df[k].to_gpu_array() for k in incols} + # Allocate output columns + outputs = {} + for k, dt in outcols.items(): + outputs[k] = cuda.device_array(len(df), dtype=dt) + # Get signature of user function + sig = pysignature(func) + # Compile kernel + kernel = _load_cache_or_make_row_wise_kernel(func, sig.parameters.keys(), + kwargs.keys()) + # Bind argument + args = {} + for dct in [inputs, outputs, kwargs]: + args.update(dct) + bound = sig.bind(**args) + # Launch kernel + blksz = 64 + blkct = min(16, max(1, len(df) // blksz)) + kernel[blkct, blksz](*bound.args) + # Prepare output frame + outdf = df.copy() + for k in sorted(outcols): + outdf[k] = outputs[k] + return outdf + + +def _make_row_wise_kernel(func, argnames, extras): + """ + Make a kernel that does a stride loop over the input columns. + """ + # Build kernel source + argnames = list(map(_mangle_user, argnames)) + extras = list(map(_mangle_user, extras)) + source = """ +def elemwise({args}): +{body} +""" + + args = ', '.join(argnames) + body = [] + + body.append('tid = cuda.grid(1)') + body.append('ntid = cuda.gridsize(1)') + + for a in argnames: + if a not in extras: + start = 'tid' + stop = '' + stride = 'ntid' + srcidx = '{a} = {a}[{start}:{stop}:{stride}]' + body.append(srcidx.format(a=a, start=start, stop=stop, + stride=stride)) + + body.append("inner({})".format(args)) + + indented = ['{}{}'.format(' ' * 4, ln) for ln in body] + # Finalize source + concrete = source.format(args=args, body='\n'.join(indented)) + # Get bytecode + glbs = {'inner': cuda.jit(device=True)(func), + 'cuda': cuda} + exec_(concrete, glbs) + # Compile as CUDA kernel + kernel = cuda.jit(glbs['elemwise']) + return kernel + + +_cache = WeakKeyDictionary() + + +@functools.wraps(_make_row_wise_kernel) +def _load_cache_or_make_row_wise_kernel(func, *args, **kwargs): + """Caching version of ``_make_row_wise_kernel``. + """ + try: + return _cache[func] + except KeyError: + kernel = _make_row_wise_kernel(func, *args, **kwargs) + _cache[func] = kernel + return kernel + + +def _mangle_user(name): + """Mangle user variable name + """ + return "__user_{}".format(name) diff --git a/pygdf/dataframe.py b/pygdf/dataframe.py index a1099220b24..5243bb2a281 100644 --- a/pygdf/dataframe.py +++ b/pygdf/dataframe.py @@ -8,7 +8,7 @@ from numba import cuda -from . import cudautils, formatting, queryutils, _gdf +from . import cudautils, formatting, queryutils, _gdf, applyutils from .index import GenericIndex, EmptyIndex, Index, RangeIndex from .series import Series from .buffer import Buffer @@ -685,6 +685,61 @@ def query(self, expr): newdf[col] = newseries return newdf + def apply_rows(self, func, incols, outcols, kwargs): + """Apply a user function as a CUDA kernel over each row. + + Parameters + ---------- + func : function + incols: list + A list of names of input columns + outcols: dict + A dictionary of output column names and their dtype. + kwargs: dict + name-value of extra arguments. These values are passed + directly into the function. + + Examples + -------- + + With a ``DataFrame`` like so: + + >>> df = DataFrame() + >>> df['in1'] = in1 = np.arange(nelem) + >>> df['in2'] = in2 = np.arange(nelem) + >>> df['in3'] = in3 = np.arange(nelem) + + Define the user function for ``.apply_rows``: + + >>> def kernel(in1, in2, in3, out1, out2, extra1, extra2): + ... for i, (x, y, z) in enumerate(zip(in1, in2, in3)): + ... out1[i] = extra2 * x - extra1 * y + ... out2[i] = y - extra1 * z + + The user function should loop over the columns and set the output for + each row. Each iteration of the loop **MUST** be independent of each + other. The order of the loop execution can be arbitrary. + + Call ``.apply_rows`` with the name of the input columns, the name and + dtype of the output columns, and, optionally, a dict of extra + arguments. + + >>> outdf = df.apply_rows(kernel, + ... incols=['in1', 'in2', 'in3'], + ... outcols=dict(out1=np.float64, + ... out2=np.float64), + ... kwargs=dict(extra1=2.3, extra2=3.4)) + + **Notes** + + The arguments corresponding to input/output columns are cuda + device arrays from numba. These arrays are strided in a way to + improve parallelism when the code is executed on the GPU. + The loop in the user function may look like serial code but it will be + executed by multiple threads. + """ + return applyutils.apply_rows(self, func, incols, outcols, kwargs) + def to_pandas(self): """Convert to a Pandas DataFrame. """ diff --git a/pygdf/tests/test_cuda_apply.py b/pygdf/tests/test_cuda_apply.py new file mode 100644 index 00000000000..af29a9caa2e --- /dev/null +++ b/pygdf/tests/test_cuda_apply.py @@ -0,0 +1,38 @@ +""" +Test method that apply GPU kernel to a frame. +""" + +import pytest +import numpy as np + +from pygdf import DataFrame + + +@pytest.mark.parametrize('nelem', [1, 2, 64, 128, 1000, 5000]) +def test_df_apply_rows(nelem): + def kernel(in1, in2, in3, out1, out2, extra1, extra2): + for i, (x, y, z) in enumerate(zip(in1, in2, in3)): + out1[i] = extra2 * x - extra1 * y + out2[i] = y - extra1 * z + + df = DataFrame() + df['in1'] = in1 = np.arange(nelem) + df['in2'] = in2 = np.arange(nelem) + df['in3'] = in3 = np.arange(nelem) + + extra1 = 2.3 + extra2 = 3.4 + + expect_out1 = extra2 * in1 - extra1 * in2 + expect_out2 = in2 - extra1 * in3 + + outdf = df.apply_rows(kernel, + incols=['in1', 'in2', 'in3'], + outcols=dict(out1=np.float64, out2=np.float64), + kwargs=dict(extra1=extra1, extra2=extra2)) + + got_out1 = outdf['out1'].to_array() + got_out2 = outdf['out2'].to_array() + + np.testing.assert_array_almost_equal(got_out1, expect_out1) + np.testing.assert_array_almost_equal(got_out2, expect_out2) diff --git a/pygdf/tests/test_dataframe.py b/pygdf/tests/test_dataframe.py index fcb8fd2e574..37d9bb4253e 100644 --- a/pygdf/tests/test_dataframe.py +++ b/pygdf/tests/test_dataframe.py @@ -420,6 +420,6 @@ def check(**kwargs): else: np.testing.assert_array_equal(out.index, take_indices) - check() check(ignore_index=True) +