- Official docs: https://pandas.pydata.org/docs/development/extending.html
- StackOverflow: https://stackoverflow.com/a/68972163/247482
- Arrow integration: https://arrow.apache.org/docs/python/extending_types.html#defining-extension-types-user-defined-types


In [1]:
from __future__ import annotations

import builtins
from uuid import UUID
from typing import TYPE_CHECKING, ClassVar, Self, Never, get_args
from collections.abc import Sequence, Iterable

import numpy as np
import pandas as pd
from numpy.typing import NDArray
from pandas.api.indexers import check_array_indexer
from pandas.api.extensions import ExtensionDtype, ExtensionArray
from pandas.core.ops.common import unpack_zerodim_and_defer
from pandas.core.algorithms import take

if TYPE_CHECKING:
    import pyarrow
    from pandas.core.arrays import BooleanArray

In [2]:
UuidLike = UUID | bytes | int | str

# 16 void bytes: 128 bit, every pattern valid, no funky behavior like 0 stripping.
_UuidNumpyDtype = np.dtype("V16")
_UuidScalar = _UuidNumpyDtype.type


def _to_uuid(v: UuidLike) -> UUID:
    match v:
        case UUID():
            return v
        case bytes():
            return UUID(bytes=v)
        case int():
            return UUID(int=v)
        case str():
            return UUID(v)
    msg = f"Unknown type for Uuid: {type(v)} is not {get_args(UuidLike)}"
    raise TypeError(msg)


class UuidDtype(ExtensionDtype):
    # ExtensionDtype essential API (3 class attrs and methods)

    name: ClassVar[str] = "uuid"
    type: ClassVar[builtins.type[UUID]] = UUID

    @classmethod
    def construct_array_type(cls) -> type[UuidExtensionArray]:
        return UuidExtensionArray

    # ExtensionDtype overrides

    kind: ClassVar[str] = _UuidNumpyDtype.kind
    # index_class: ClassVar[type[pd.Index]] = pd.Index

    @property
    def na_value(self) -> Never:
        # TODO: figure this out
        raise NotImplementedError()

    # IO

    def __from_arrow__(self, array: pyarrow.Array) -> ExtensionArray:
        ...


class UuidExtensionArray(ExtensionArray):
    # Implementation details and convenience

    _data: NDArray[_UuidScalar]

    def __init__(self, values: Iterable[UuidLike], *, copy: bool = False) -> None:
        if isinstance(values, np.ndarray):
            self._data = values.astype(_UuidNumpyDtype, copy=copy)
        else:
            # TODO: more efficient
            self._data = np.array(
                [_to_uuid(x).bytes for x in values], dtype=_UuidNumpyDtype
            )

        if self._data.ndim != 1:
            raise ValueError("Array only supports 1-d arrays")

    # ExtensionArray essential API (11 class attrs and methods)

    dtype: ClassVar[UuidDtype] = UuidDtype()

    @classmethod
    def _from_sequence(
        cls,
        data: Iterable[UuidLike],
        dtype: UuidDtype | None = None,
        copy: bool = False,
    ) -> Self:
        if dtype is None:
            dtype = UuidDtype()

        if not isinstance(dtype, UuidDtype):
            msg = f"'{cls.__name__}' only supports 'UuidDtype' dtype"
            raise TypeError(msg)
        return cls(data, copy=copy)

    def __getitem__(self, index) -> Self | UUID:
        if isinstance(index, int):
            return UUID(bytes=self._data[index].tobytes())
        index = check_array_indexer(self, index)
        return self._simple_new(self._data[index])

    # def __setitem__(self, index, value):

    def __len__(self) -> int:
        return len(self._data)

    @unpack_zerodim_and_defer("__eq__")
    def __eq__(self, other):
        return self._cmp("eq", other)

    def nbytes(self) -> int:
        return self._data.nbytes

    def isna(self) -> NDArray[np.bool_]:
        return pd.isna(self._data)

    def take(
        self, indexer, *, allow_fill: bool = False, fill_value: UUID | None = None
    ) -> Self:
        if allow_fill and fill_value is None:
            fill_value = self.dtype.na_value

        result = take(self._data, indexer, allow_fill=allow_fill, fill_value=fill_value)
        return self._simple_new(result)

    def copy(self) -> Self:
        return self._simple_new(self._data.copy())

    @classmethod
    def _concat_same_type(cls, to_concat: Sequence[Self]) -> Self:
        return cls._simple_new(np.concatenate([x._data for x in to_concat]))

    # Helpers

    @classmethod
    def _simple_new(cls, values: NDArray[_UuidScalar]) -> Self:
        result = UuidExtensionArray.__new__(cls)
        result._data = values
        return result

    def _cmp(self, op: str, other) -> BooleanArray:
        if isinstance(other, UuidExtensionArray):
            other = other._data
        elif isinstance(other, Sequence):
            other = np.asarray(other)
            if other.ndim > 1:
                raise NotImplementedError("can only perform ops with 1-d structures")
            if len(self) != len(other):
                raise ValueError("Lengths must match to compare")

        method = getattr(self._data, f"__{op}__")
        result = method(other)

        # if result is NotImplemented:
        #     result = invalid_comparison(self._data, other, op)

        rv: BooleanArray = pd.array(result, dtype="boolean")  # type: ignore
        return rv

    # IO

    def __arrow_array__(self, type=None):
        """convert the underlying array values to a pyarrow Array"""
        import pyarrow

        return pyarrow.array(..., type=type)

Pandas has a bug around void dtypes: https://github.com/pandas-dev/pandas/issues/54810


In [3]:
from contextlib import contextmanager


@contextmanager
def patch_pandas_constructors():
    @classmethod
    def _validate_dtype(
        cls, dtype: np.dtype | ExtensionDtype
    ) -> np.dtype | ExtensionDtype | None:
        if dtype is None:
            return None

        from pandas.core.dtypes.common import pandas_dtype

        dtype = pandas_dtype(dtype)
        # a compound dtype
        if getattr(dtype, "fields", None) is not None:
            raise NotImplementedError(
                "compound dtypes are not implemented "
                f"in the {cls.__name__} constructor"
            )

        return dtype

    from unittest.mock import patch
    from pandas.core.generic import NDFrame

    with patch.object(NDFrame, "_validate_dtype", _validate_dtype):
        yield

In [8]:
from uuid import uuid4

UuidExtensionArray([0, uuid4()])

<UuidExtensionArray>
[UUID('00000000-0000-0000-0000-000000000000'), UUID('5748c75a-4418-4bc2-81b8-59703a6ba0cd')]
Length: 2, dtype: uuid

In [10]:
with patch_pandas_constructors():
    s = pd.Series([uuid4()], dtype=UuidDtype(), name="s")
s

0    8be4f607-afaa-4f34-868d-727bc9088999
Name: s, dtype: uuid

In [12]:
pd.DataFrame(s)

Unnamed: 0,s
0,8be4f607-afaa-4f34-868d-727bc9088999
