Skip to content

Commit

Permalink
Implement .apply_rows() to do row-by-row transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
sklam committed Jan 12, 2018
1 parent e70deb2 commit 35fad20
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 2 deletions.
95 changes: 95 additions & 0 deletions pygdf/applyutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from weakref import WeakKeyDictionary
import functools

from numba.utils import pysignature, exec_
from numba import cuda


def apply_rows(df, func, incols, outcols, kwargs):
# Get input columns
inputs = {k: df[k].to_gpu_array() for k in incols}
# Allocate output columns
outputs = {}
for k, dt in outcols.items():
outputs[k] = cuda.device_array(len(df), dtype=dt)
# Get signature of user function
sig = pysignature(func)
# Compile kernel
kernel = _load_cache_or_make_row_wise_kernel(func, sig.parameters.keys(),
kwargs.keys())
# Bind argument
args = {}
for dct in [inputs, outputs, kwargs]:
args.update(dct)
bound = sig.bind(**args)
# Launch kernel
blksz = 64
blkct = min(16, max(1, len(df) // blksz))
kernel[blkct, blksz](*bound.args)
# Prepare output frame
outdf = df.copy()
for k in sorted(outcols):
outdf[k] = outputs[k]
return outdf


def _make_row_wise_kernel(func, argnames, extras):
"""
Make a kernel that does a stride loop over the input columns.
"""
# Build kernel source
argnames = list(map(_mangle_user, argnames))
extras = list(map(_mangle_user, extras))
source = """
def elemwise({args}):
{body}
"""

args = ', '.join(argnames)
body = []

body.append('tid = cuda.grid(1)')
body.append('ntid = cuda.gridsize(1)')

for a in argnames:
if a not in extras:
start = 'tid'
stop = ''
stride = 'ntid'
srcidx = '{a} = {a}[{start}:{stop}:{stride}]'
body.append(srcidx.format(a=a, start=start, stop=stop,
stride=stride))

body.append("inner({})".format(args))

indented = ['{}{}'.format(' ' * 4, ln) for ln in body]
# Finalize source
concrete = source.format(args=args, body='\n'.join(indented))
# Get bytecode
glbs = {'inner': cuda.jit(device=True)(func),
'cuda': cuda}
exec_(concrete, glbs)
# Compile as CUDA kernel
kernel = cuda.jit(glbs['elemwise'])
return kernel


_cache = WeakKeyDictionary()


@functools.wraps(_make_row_wise_kernel)
def _load_cache_or_make_row_wise_kernel(func, *args, **kwargs):
"""Caching version of ``_make_row_wise_kernel``.
"""
try:
return _cache[func]
except KeyError:
kernel = _make_row_wise_kernel(func, *args, **kwargs)
_cache[func] = kernel
return kernel


def _mangle_user(name):
"""Mangle user variable name
"""
return "__user_{}".format(name)
57 changes: 56 additions & 1 deletion pygdf/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from numba import cuda

from . import cudautils, formatting, queryutils, _gdf
from . import cudautils, formatting, queryutils, _gdf, applyutils
from .index import GenericIndex, EmptyIndex, Index, RangeIndex
from .series import Series
from .buffer import Buffer
Expand Down Expand Up @@ -685,6 +685,61 @@ def query(self, expr):
newdf[col] = newseries
return newdf

def apply_rows(self, func, incols, outcols, kwargs):
"""Apply a user function as a CUDA kernel over each row.
Parameters
----------
func : function
incols: list
A list of names of input columns
outcols: dict
A dictionary of output column names and their dtype.
kwargs: dict
name-value of extra arguments. These values are passed
directly into the function.
Examples
--------
With a ``DataFrame`` like so:
>>> df = DataFrame()
>>> df['in1'] = in1 = np.arange(nelem)
>>> df['in2'] = in2 = np.arange(nelem)
>>> df['in3'] = in3 = np.arange(nelem)
Define the user function for ``.apply_rows``:
>>> def kernel(in1, in2, in3, out1, out2, extra1, extra2):
... for i, (x, y, z) in enumerate(zip(in1, in2, in3)):
... out1[i] = extra2 * x - extra1 * y
... out2[i] = y - extra1 * z
The user function should loop over the columns and set the output for
each row. Each iteration of the loop **MUST** be independent of each
other. The order of the loop execution can be arbitrary.
Call ``.apply_rows`` with the name of the input columns, the name and
dtype of the output columns, and, optionally, a dict of extra
arguments.
>>> outdf = df.apply_rows(kernel,
... incols=['in1', 'in2', 'in3'],
... outcols=dict(out1=np.float64,
... out2=np.float64),
... kwargs=dict(extra1=2.3, extra2=3.4))
**Notes**
The arguments corresponding to input/output columns are cuda
device arrays from numba. These arrays are strided in a way to
improve parallelism when the code is executed on the GPU.
The loop in the user function may look like serial code but it will be
executed by multiple threads.
"""
return applyutils.apply_rows(self, func, incols, outcols, kwargs)

def to_pandas(self):
"""Convert to a Pandas DataFrame.
"""
Expand Down
38 changes: 38 additions & 0 deletions pygdf/tests/test_cuda_apply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
Test method that apply GPU kernel to a frame.
"""

import pytest
import numpy as np

from pygdf import DataFrame


@pytest.mark.parametrize('nelem', [1, 2, 64, 128, 1000, 5000])
def test_df_apply_rows(nelem):
def kernel(in1, in2, in3, out1, out2, extra1, extra2):
for i, (x, y, z) in enumerate(zip(in1, in2, in3)):
out1[i] = extra2 * x - extra1 * y
out2[i] = y - extra1 * z

df = DataFrame()
df['in1'] = in1 = np.arange(nelem)
df['in2'] = in2 = np.arange(nelem)
df['in3'] = in3 = np.arange(nelem)

extra1 = 2.3
extra2 = 3.4

expect_out1 = extra2 * in1 - extra1 * in2
expect_out2 = in2 - extra1 * in3

outdf = df.apply_rows(kernel,
incols=['in1', 'in2', 'in3'],
outcols=dict(out1=np.float64, out2=np.float64),
kwargs=dict(extra1=extra1, extra2=extra2))

got_out1 = outdf['out1'].to_array()
got_out2 = outdf['out2'].to_array()

np.testing.assert_array_almost_equal(got_out1, expect_out1)
np.testing.assert_array_almost_equal(got_out2, expect_out2)
2 changes: 1 addition & 1 deletion pygdf/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,6 @@ def check(**kwargs):
else:
np.testing.assert_array_equal(out.index, take_indices)


check()
check(ignore_index=True)

0 comments on commit 35fad20

Please sign in to comment.