Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions laygo/transformers/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ def createTransformer[T](_type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SI
return Transformer[T, T](chunk_size=chunk_size) # type: ignore


def build_chunk_generator[T](chunk_size: int) -> Callable[[Iterable[T]], Iterator[list[T]]]:
"""
Returns a function that breaks an iterable into chunks of a specified size.
This is useful for creating transformers that process data in manageable chunks.
"""

def chunk_generator(data: Iterable[T]) -> Iterator[list[T]]:
data_iter = iter(data)
while chunk := list(itertools.islice(data_iter, chunk_size)):
yield chunk

return chunk_generator


class Transformer[In, Out]:
"""
Defines and composes data transformations by passing context explicitly.
Expand All @@ -45,6 +59,7 @@ def __init__(
# The default transformer now accepts and ignores a context argument.
self.transformer: InternalTransformer[In, Out] = transformer or (lambda chunk, ctx: chunk) # type: ignore
self.error_handler = ErrorHandler()
self._chunk_generator = build_chunk_generator(chunk_size) if chunk_size else lambda x: iter([list(x)])

@classmethod
def from_transformer[T, U](
Expand All @@ -58,6 +73,11 @@ def from_transformer[T, U](
transformer=copy.deepcopy(transformer.transformer), # type: ignore
)

def set_chunker(self, chunker: Callable[[Iterable[In]], Iterator[list[In]]]) -> "Transformer[In, Out]":
"""Sets a custom chunking function for the transformer."""
self._chunk_generator = chunker
return self

def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "Transformer[In, Out]":
"""Registers an error handler for the transformer."""
# This method is a placeholder for future error handling logic.
Expand All @@ -69,12 +89,6 @@ def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "Trans
self.error_handler.on_error(handler) # type: ignore
return self

def _chunk_generator(self, data: Iterable[In]) -> Iterator[list[In]]:
"""Breaks an iterable into chunks of a specified size."""
data_iter = iter(data)
while chunk := list(itertools.islice(data_iter, self.chunk_size)):
yield chunk

def _pipe[U](self, operation: Callable[[list[Out], PipelineContext], list[U]]) -> "Transformer[In, U]":
"""Composes the current transformer with a new context-aware operation."""
prev_transformer = self.transformer
Expand Down
Loading