forked from rapidsai/cudf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
null_mask.pyx
130 lines (104 loc) · 3.98 KB
/
null_mask.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
from enum import Enum
from rmm._lib.device_buffer cimport DeviceBuffer, device_buffer
from cudf.core.buffer import acquire_spill_lock, as_buffer
from libcpp.memory cimport unique_ptr
from libcpp.pair cimport pair
from libcpp.utility cimport move
from cudf._lib.column cimport Column
from cudf._lib.cpp.column.column_view cimport column_view
from cudf._lib.cpp.libcpp.memory cimport make_unique
from cudf._lib.cpp.null_mask cimport (
bitmask_allocation_size_bytes as cpp_bitmask_allocation_size_bytes,
bitmask_and as cpp_bitmask_and,
bitmask_or as cpp_bitmask_or,
copy_bitmask as cpp_copy_bitmask,
create_null_mask as cpp_create_null_mask,
underlying_type_t_mask_state,
)
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport mask_state, size_type
from cudf._lib.utils cimport table_view_from_columns
class MaskState(Enum):
"""
Enum for null mask creation state
"""
UNALLOCATED = <underlying_type_t_mask_state> mask_state.UNALLOCATED
UNINITIALIZED = <underlying_type_t_mask_state> mask_state.UNINITIALIZED
ALL_VALID = <underlying_type_t_mask_state> mask_state.ALL_VALID
ALL_NULL = <underlying_type_t_mask_state> mask_state.ALL_NULL
@acquire_spill_lock()
def copy_bitmask(Column col):
"""
Copies column's validity mask buffer into a new buffer, shifting by the
offset if nonzero
"""
if col.base_mask is None:
return None
cdef column_view col_view = col.view()
cdef device_buffer db
cdef unique_ptr[device_buffer] up_db
with nogil:
db = move(cpp_copy_bitmask(col_view))
up_db = move(make_unique[device_buffer](move(db)))
rmm_db = DeviceBuffer.c_from_unique_ptr(move(up_db))
buf = as_buffer(rmm_db)
return buf
def bitmask_allocation_size_bytes(size_type num_bits):
"""
Given a size, calculates the number of bytes that should be allocated for a
column validity mask
"""
cdef size_t output_size
with nogil:
output_size = cpp_bitmask_allocation_size_bytes(num_bits)
return output_size
def create_null_mask(size_type size, state=MaskState.UNINITIALIZED):
"""
Given a size and a mask state, allocate a mask that can properly represent
the given size with the given mask state
Parameters
----------
size : int
Number of elements the mask needs to be able to represent
state : ``MaskState``, default ``MaskState.UNINITIALIZED``
State the null mask should be created in
"""
if not isinstance(state, MaskState):
raise TypeError(
"`state` is required to be of type `MaskState`, got "
+ (type(state).__name__)
)
cdef device_buffer db
cdef unique_ptr[device_buffer] up_db
cdef mask_state c_mask_state = <mask_state>(
<underlying_type_t_mask_state>(state.value)
)
with nogil:
db = move(cpp_create_null_mask(size, c_mask_state))
up_db = move(make_unique[device_buffer](move(db)))
rmm_db = DeviceBuffer.c_from_unique_ptr(move(up_db))
buf = as_buffer(rmm_db)
return buf
@acquire_spill_lock()
def bitmask_and(columns: list):
cdef table_view c_view = table_view_from_columns(columns)
cdef pair[device_buffer, size_type] c_result
cdef unique_ptr[device_buffer] up_db
with nogil:
c_result = move(cpp_bitmask_and(c_view))
up_db = move(make_unique[device_buffer](move(c_result.first)))
dbuf = DeviceBuffer.c_from_unique_ptr(move(up_db))
buf = as_buffer(dbuf)
return buf, c_result.second
@acquire_spill_lock()
def bitmask_or(columns: list):
cdef table_view c_view = table_view_from_columns(columns)
cdef pair[device_buffer, size_type] c_result
cdef unique_ptr[device_buffer] up_db
with nogil:
c_result = move(cpp_bitmask_or(c_view))
up_db = move(make_unique[device_buffer](move(c_result.first)))
dbuf = DeviceBuffer.c_from_unique_ptr(move(up_db))
buf = as_buffer(dbuf)
return buf, c_result.second