Skip to content

Commit

Permalink
Fast forward to gpuopenanalytics/pygdf master (#4)
Browse files Browse the repository at this point in the history
* adding eq datetime ops for pygdf

* flake8 fixes

* Drop Python 2.7, Add Python 3.7

* removing int coercion for datetime

* Remove Python 3.7 build

* bumping numba

* forgot to commit meta.yaml changes

* flake8

* commutative addition

* commutative subtraction and multiplication

* reflected floordiv and truediv

* cleanup

* stray comment

* change rsub method

* further testing rsub

* rsub docstring

* revert back

* type coercion

* revert to pseudo-commutative implementation

* commutative ops tests

* test comment cleanup

* Feature/reflected ops noncommutative testing (#1)

* np array solution

* cleanup

* np solution for division

* full reflected ops tests

* cleanup

* switching lambda scalar to 2

* Update README.md

Conda installation instruction needed changes with pygdf version.

* Feature/reflected ops update (#2)

* test binary_operator

* test one line

* essentially use _binaryop with a line flipped

* expand to all non commutative reflected ops

* revert rmul

* Feature/reflected ops update (#3)

* test binary_operator

* test one line

* essentially use _binaryop with a line flipped

* expand to all non commutative reflected ops

* revert rmul

* rbinaryop function for clarity

* add scalar to array generation to avoid division by zero behavior

* remove integer division test due to libgdf bug

* Fix timezone issue when converting from datetime object into datetime64

* Remove unused import to fix flake8

* Initial modifications for new join API
  • Loading branch information
beckernick committed Aug 29, 2018
1 parent 875c289 commit 02c2029
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 37 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ matrix:
include:
- env: PYTHON=3.6
- env: PYTHON=3.5
- env: PYTHON=2.7

before_install:
# install miniconda
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ You can get a minimal conda installation with [Miniconda](https://conda.io/minic
You can install and update PyGDF using the conda command:

```bash
conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a3
```

You can create and activate a development environment using the conda command:
Expand Down
4 changes: 2 additions & 2 deletions conda-recipes/pygdf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ requirements:
build:
- python
- setuptools
- numba >0.35
- numba>=0.40.0dev
run:
- python
- setuptools
- libgdf 0.1.0a3.*
- libgdf_cffi 0.1.0a3.*
- pandas 0.20.*
- numba >0.35
- numba>=0.40.0dev

test:
requires:
Expand Down
2 changes: 1 addition & 1 deletion conda_environments/builddocs_py35.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dependencies:
- pytest
- libgdf=0.1.0a3.*
- libgdf_cffi=0.1.0a3.*
- numba>=0.35.0
- numba>=0.40.0dev
- pandas=0.20.*
- pip:
- numpydoc
Expand Down
2 changes: 1 addition & 1 deletion conda_environments/testing_py35.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ dependencies:
- pytest
- libgdf=0.1.0a3.*
- libgdf_cffi=0.1.0a3.*
- numba>=0.35.0
- numba>=0.40.0dev
- pandas=0.20.3
- notebook>=0.5.0
33 changes: 22 additions & 11 deletions pygdf/_gdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ def apply_join(col_lhs, col_rhs, how, method='hash'):
raise ValueError(msg)

joiner = _join_how_api[how]
join_result_ptr = ffi.new("gdf_join_result_type**", None)
method_api = _join_method_api[method]
gdf_context = ffi.new('gdf_context*')

Expand All @@ -179,6 +178,9 @@ def apply_join(col_lhs, col_rhs, how, method='hash'):
msg = "method not supported"
raise ValueError(msg)

col_result_l = columnview(0, None, dtype=np.int32)
col_result_r = columnview(0, None, dtype=np.int32)

if(how in ['left', 'inner']):
list_lhs = []
list_rhs = []
Expand All @@ -188,19 +190,28 @@ def apply_join(col_lhs, col_rhs, how, method='hash'):

# Call libgdf

joiner(len(col_lhs), list_lhs, list_rhs, join_result_ptr, gdf_context)
joiner(len(col_lhs), list_lhs, list_rhs, col_result_l,
col_result_r, gdf_context)
else:
joiner(col_lhs[0].cffi_view, col_rhs[0].cffi_view, join_result_ptr)
joiner(col_lhs[0].cffi_view, col_rhs[0].cffi_view, col_result_l,
col_result_r)

# Extract result
join_result = join_result_ptr[0]
dataptr = libgdf.gdf_join_result_data(join_result)
datasize = libgdf.gdf_join_result_size(join_result)
ary = _as_numba_devarray(intaddr=int(ffi.cast("uintptr_t", dataptr)),
nelem=datasize, dtype=np.int32)
ary = ary.reshape(2, datasize // 2)
yield ((ary[0], ary[1]) if datasize > 0 else (ary, ary))
libgdf.gdf_join_result_free(join_result)

# yield ((ary[0], ary[1]) if datasize > 0 else (ary, ary))

left = _as_numba_devarray(intaddr=int(ffi.cast("uintptr_t",
col_result_l.data)),
nelem=col_result_l.size, dtype=np.int32)

right = _as_numba_devarray(intaddr=int(ffi.cast("uintptr_t",
col_result_r.data)),
nelem=col_result_r.size, dtype=np.int32)

yield(left, right)

libgdf.gdf_column_free(col_result_l)
libgdf.gdf_column_free(col_result_r)


def apply_prefixsum(col_inp, col_out, inclusive):
Expand Down
80 changes: 64 additions & 16 deletions pygdf/datetime.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
import datetime as dt

import numpy as np
from . import columnops, _gdf
import pandas as pd

from . import columnops, _gdf, utils
from .buffer import Buffer
from .cudautils import compact_mask_bytes
from libgdf_cffi import libgdf


_unordered_impl = {
'eq': libgdf.gdf_eq_generic,
'ne': libgdf.gdf_ne_generic,
}


class DatetimeColumn(columnops.TypedColumnBase):
# TODO - we only support milliseconds (date64)
# we should support date32 and timestamp, but perhaps
# only after we move to arrow
# we also need to support other formats besides Date64
funcs = {
'year': libgdf.gdf_extract_datetime_year,
'month': libgdf.gdf_extract_datetime_month,
Expand All @@ -14,29 +27,23 @@ class DatetimeColumn(columnops.TypedColumnBase):
'minute': libgdf.gdf_extract_datetime_minute,
'second': libgdf.gdf_extract_datetime_second,
}
_npdatetime64_dtype = np.dtype('datetime64[ms]')

def __init__(self, data, mask=None, null_count=None, dtype=None):
# currently libgdf datetime kernels fail if mask is null
if mask is None:
mask = np.ones(data.mem.size, dtype=np.bool)
mask = compact_mask_bytes(mask)
mask = Buffer(mask)
super(DatetimeColumn, self).__init__(data=data,
mask=mask,
null_count=null_count,
dtype=dtype
)
# the column constructor removes mask if it's all true
self._mask = mask
self._precision = 1e-3
self._inverse_precision = 1e3
self._pandas_conversion_factor = 1e9 * self._precision

@classmethod
def from_numpy(cls, array):
# hack, coerce to int, then set the dtype
array = array.astype('datetime64[ms]')
dtype = np.int64
array = array.astype(cls._npdatetime64_dtype)
assert array.dtype.itemsize == 8
buf = Buffer(array.astype(dtype, copy=False))
buf.dtype = array.dtype
buf = Buffer(array)
return cls(data=buf, dtype=buf.dtype)

@property
Expand Down Expand Up @@ -68,9 +75,50 @@ def get_dt_field(self, field):
self,
dtype=np.int16
)
# force mask again
out._mask = self.mask
_gdf.apply_unaryop(self.funcs[field],
self,
out)
return out

def normalize_binop_value(self, other):
if isinstance(other, dt.datetime):
other = np.datetime64(other)

if isinstance(other, pd.Timestamp):
ary = utils.scalar_broadcast_to(
other.value * self._pandas_conversion_factor,
shape=len(self),
dtype=self._npdatetime64_dtype
)
elif isinstance(other, np.datetime64):
other = other.astype(self._npdatetime64_dtype)
ary = utils.scalar_broadcast_to(
other,
shape=len(self),
dtype=self._npdatetime64_dtype
)
else:
raise TypeError('cannot broadcast {}'.format(type(other)))

buf = Buffer(ary)
result = self.replace(data=buf, dtype=self.dtype)
return result

def unordered_compare(self, cmpop, rhs):
lhs, rhs = self, rhs
return binop(
lhs, rhs,
op=_unordered_impl[cmpop],
out_dtype=np.bool
)

def to_pandas(self, index):
return pd.Series(self.to_array().astype(self.dtype), index=index)


def binop(lhs, rhs, op, out_dtype):
masked = lhs.has_null_mask or rhs.has_null_mask
out = columnops.column_empty_like(lhs, dtype=out_dtype, masked=masked)
null_count = _gdf.apply_binaryop(op, lhs, rhs, out)
out = out.replace(null_count=null_count)
return out
5 changes: 4 additions & 1 deletion pygdf/queryutils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@

# Copyright (c) 2018, NVIDIA CORPORATION.

import ast

import datetime as dt
import six
import numpy as np

Expand Down Expand Up @@ -202,6 +203,8 @@ def query_execute(df, expr, callenv):
name = name[len(ENVREF_PREFIX):]
try:
val = envdict[name]
if isinstance(val, dt.datetime):
val = np.datetime64(val)
except KeyError:
msg = '{!r} not defined in the calling environment'
raise NameError(msg.format(name))
Expand Down
25 changes: 25 additions & 0 deletions pygdf/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,16 @@ def _binaryop(self, other, fn):
outcol = self._column.binary_operator(fn, other._column)
return self._copy_construct(data=outcol)

def _rbinaryop(self, other, fn):
"""
Internal util to call a binary operator *fn* on operands *self*
and *other* for reflected operations. Return the output Series.
The output dtype is determined by the input operands.
"""
other = self._normalize_binop_value(other)
outcol = other._column.binary_operator(fn, self._column)
return self._copy_construct(data=outcol)

def _unaryop(self, fn):
"""
Internal util to call a unary operator *fn* on operands *self*.
Expand All @@ -281,18 +291,33 @@ def _unaryop(self, fn):
def __add__(self, other):
return self._binaryop(other, 'add')

def __radd__(self, other):
return self._rbinaryop(other, 'add')

def __sub__(self, other):
return self._binaryop(other, 'sub')

def __rsub__(self, other):
return self._rbinaryop(other, 'sub')

def __mul__(self, other):
return self._binaryop(other, 'mul')

def __rmul__(self, other):
return self._rbinaryop(other, 'mul')

def __floordiv__(self, other):
return self._binaryop(other, 'floordiv')

def __rfloordiv__(self, other):
return self._rbinaryop(other, 'floordiv')

def __truediv__(self, other):
return self._binaryop(other, 'truediv')

def __rtruediv__(self, other):
return self._rbinaryop(other, 'truediv')

__div__ = __truediv__

def _normalize_binop_value(self, other):
Expand Down
27 changes: 27 additions & 0 deletions pygdf/tests/test_binops.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,30 @@ def test_series_cmpop_mixed_dtype(cmpop, lhs_dtype, rhs_dtype):

np.testing.assert_array_equal(cmpop(Series(lhs), Series(rhs)).to_array(),
cmpop(lhs, rhs))


_reflected_ops = [
lambda x: 1 + x,
lambda x: 2 * x,
lambda x: 2 - x,
lambda x: 2 // x,
]


@pytest.mark.parametrize('func, dtype', list(product(_reflected_ops, _dtypes)))
def test_reflected_ops_scalar(func, dtype):
import pandas as pd

# create random series
np.random.seed(12)
random_series = pd.Series(np.random.sample(100) + 10, dtype=dtype)

# gpu series
gs = Series(random_series)
gs_result = func(gs)

# pandas
ps_result = func(random_series)

# verify
np.testing.assert_allclose(ps_result, gs_result)
50 changes: 47 additions & 3 deletions pygdf/tests/test_datetime.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import pytest

import datetime as dt
import numpy as np
import pandas as pd
from pandas.util.testing import assert_index_equal, assert_series_equal
from pandas.util.testing import (
assert_index_equal, assert_series_equal,
assert_frame_equal
)
from pygdf.dataframe import Series, DataFrame
from pygdf.index import DatetimeIndex

Expand All @@ -19,8 +24,6 @@ def data2():
fields = ['year', 'month', 'day',
'hour', 'minute', 'second']

# fields = ['year']


@pytest.mark.parametrize('data', [data1(), data2()])
def test_series(data):
Expand Down Expand Up @@ -56,3 +59,44 @@ def test_setitem_datetime():
a = DataFrame()
a['a'] = pd.date_range('20010101', '20010105').values
# TODO check some stuff


def test_issue_165():
df_pandas = pd.DataFrame()
start_date = dt.datetime.strptime("2000-10-21", '%Y-%m-%d')
data = [(start_date + dt.timedelta(days=x)) for x in range(6)]
df_pandas["dates"] = data
df_pandas["num"] = [1, 2, 3, 4, 5, 6]
df_pygdf = DataFrame.from_pandas(df_pandas)

base = df_pandas.query("dates==@start_date")
test = df_pygdf.query("dates==@start_date")
assert_frame_equal(base, test.to_pandas())
assert len(test) > 0

mask = df_pygdf.dates == start_date
base_mask = df_pandas.dates == start_date
assert_series_equal(mask.to_pandas(), base_mask, check_names=False)
assert mask.to_pandas().sum() > 0

start_date_ts = pd.Timestamp(start_date)
test = df_pygdf.query("dates==@start_date_ts")
base = df_pandas.query("dates==@start_date_ts")
assert_frame_equal(base, test.to_pandas())
assert len(test) > 0

mask = df_pygdf.dates == start_date_ts
base_mask = df_pandas.dates == start_date_ts
assert_series_equal(mask.to_pandas(), base_mask, check_names=False)
assert mask.to_pandas().sum() > 0

start_date_np = np.datetime64(start_date_ts, 'ns')
test = df_pygdf.query("dates==@start_date_np")
base = df_pandas.query("dates==@start_date_np")
assert_frame_equal(base, test.to_pandas())
assert len(test) > 0

mask = df_pygdf.dates == start_date_np
base_mask = df_pandas.dates == start_date_np
assert_series_equal(mask.to_pandas(), base_mask, check_names=False)
assert mask.to_pandas().sum() > 0

0 comments on commit 02c2029

Please sign in to comment.