In [2]:
import numpy as np
import pyarrow as pa
from build_measurand.component import make_component
from tests.conftest import SAMPLE_NDARRAY

In [3]:
def numpy_2d_array_to_arrow_table(array: np.ndarray) -> pa.Table:
    arrays = [pa.array(col) for col in array.T]
    table = pa.Table.from_arrays(arrays, names=[str(i) for i in range(len(arrays))])
    return table

In [4]:
SAMPLE_PYARRAY = {word_size: numpy_2d_array_to_arrow_table(SAMPLE_NDARRAY[word_size]) for word_size in [8,10,12]}

In [30]:
pa.binary(12)

FixedSizeBinaryType(fixed_size_binary[12])

In [44]:
import pyarrow.compute as pac
data = SAMPLE_PYARRAY[8][15-1]
result = pa.chunked_array([
    np.zeros(len(chunk), dtype=np.uint8)
    for chunk in data.chunks
])
# result = pa.chunked_array(np.zeros(len(data), dtype=np.uint8))
for i in range(8):
    digit = pac.bit_wise_and(data, np.uint8(1))
    result = pac.add(result, pac.shift_left(digit, np.uint8(i)))
    data = pac.shift_right(data, np.uint8(1))
    print(data[0], digit[0], result[0])
print(result.type)

7 1 1
3 1 3
1 1 7
0 1 15
0 0 15
0 0 15
0 0 15
0 0 15
uint8


<pyarrow.lib.ChunkedArray object at 0x000002684A667560>
[
  [
    2,
    2,
    2,
    2,
    2,
    ...
    2,
    2,
    2,
    2,
    2
  ]
]

In [16]:
from functools import cached_property
from typing import Optional
from pydantic import BaseModel, Field
from build_measurand.utils import _size_to_uint, _reverse_bits
import pyarrow.compute as pac

class Component(BaseModel):
    word: int = Field(frozen=True)
    mask: Optional[int] = Field(default=None, frozen=True)
    shift: int = Field(default=0, frozen=True)
    reverse: bool = Field(default=False, frozen=True)
    one_based: bool = Field(default=True, frozen=True)
    word_size: int = Field(default=8, frozen=True)

    @classmethod
    def from_spec(
        cls, spec: str, word_size: int = 8, one_based: bool = True
    ) -> "Component":
        return make_component(spec=spec, word_size=word_size, one_based=one_based)

    @cached_property
    def size(self) -> int:
        if self.mask:
            return f"{self.mask:b}".count("1")
        return self.word_size

    def __eq__(self, other: "Component") -> bool:
        return all(
            [
                self.word == other.word,
                self.mask == other.mask,
                self.shift == other.shift,
                self.reverse == other.reverse,
                self.word_size == other.word_size,
            ]
        )

    def build_ndarray(self, data: np.ndarray) -> np.ndarray:
        uint_dtype = _size_to_uint(self.word_size)
        tmp = data[:, self.word]

        if self.mask:
            mask = np.array([self.mask], dtype=uint_dtype)
            tmp = np.bitwise_and(tmp, mask)

        rshift = np.array([self.shift], dtype=uint_dtype)
        if rshift:
            tmp = np.right_shift(tmp, np.uint8(rshift))

        if self.reverse:
            tmp = _reverse_bits(tmp, self.size)

        return tmp

    def build_pytable(self, data: pa.Table) -> pa.array:
        uint_dtype = _size_to_uint(self.word_size)

        tmp = data[self.word]

        if self.mask:
            mask = pa.array([self.mask], dtype=uint_dtype)
            tmp = pac.bit_wise_and(SAMPLE_PYARRAY[8][1], mask)
        
        rshift = np.array([self.shift], dtype=uint_dtype)
        if rshift:
            tmp = pac.shift_right(tmp, np.uint8(rshift))
        
        return tmp

: 