In [None]:
from abc import ABC, abstractmethod


class AbstractMeta(type):
    def __init__(cls, name, bases, dct):
        super().__init__(name, bases, dct)
        print("Extended by:", cls.__name__)


class AbstractClass(ABC, metaclass=AbstractMeta):
    _subclasses = []

    @classmethod
    def _register_subclass(cls, subclass):
        cls._subclasses.append(subclass)

    @classmethod
    def get_subclasses(cls):
        return cls._subclasses


class ConcreteClass1(AbstractClass):
    def __init__(self):
        super().__init__()


class ConcreteClass2(AbstractClass):
    def __init__(self):
        super().__init__()


# Usage example
print(AbstractClass.get_subclasses())

In [None]:
from abc import ABC, ABCMeta, abstractmethod

class AbstractClass(ABC):
    def __init__(self, cls) -> None:
        super().__init__()
        print(cls.name)
    
class 




print(AbstractClass.get_subclasses())  # Output: ['ConcreteClass']

# Innovative Pipeline Design

In [34]:
from typing import *
from abc import ABC, ABCMeta, abstractmethod

Accumulator = Union[Generator, Iterator, Collection]


class SourceNode:
    def __input__(self):
        raise AttributeError("SourceNode does not take input")

    def __input__(self):
        raise NotImplementedError("Output was not linked!")

    # NOTE: gotta implement __output__ here
    @property
    def output(self) -> Generator[Any, None, None]:
        collection = self.__output__()
        yield from collection

    @output.setter
    def output(self, value):
        def create_output_function():
            yield from value()

        self.__output__ = create_output_function


class InterimNode:

    # EXECUTOR

    def __executor__(self, value):
        # no change
        return value

    @property
    def executor(self):
        # TODO: Ensure self
        return self.__executor__

    @executor.setter
    def executor(self, value):
        self.__executor__ = value

    # OUTPUT

    def __output__(self):
        for record in self.__input__():
            # TODO: Surround with try catch,
            # and maybe recieve callback as parameter to deal with bad record
            yield self.executor(record)

    @property
    def output(self) -> Generator[Any, None, None]:
        return self.__output__()

    # INPUT
    def __input__(self):
        raise NotImplementedError("Input was not linked!")

    @property
    def input(self) -> Generator[Any, None, None]:
        return self.__input__()

    @input.setter
    def input(self, value):
        if not isinstance(value, Accumulator):
            raise ValueError(
                f"Input has to be either of type Generator, Iterator or Collection. Recieved: {type(value)}"
            )

        def create_input_function():
            yield from value

        self.__input__ = create_input_function


class SinkNode:

    def __output__(self):
        raise AttributeError("SinkNode does not produce output")

    def __input__(self):
        raise NotImplementedError("Input was not linked!")

    @property
    def input(self):
        raise AttributeError("SinkNode does not produce output")

    @property.setter
    def input(self, value: Accumulator):

        ...

# Create Transformer

In [44]:
from random import randint

tr = InterimNode()


def source():
    yield from [1, 2, 3]


def plus10(value):
    return value + randint(10, 40)


tr.executor = plus10
tr.input = source()

for i in tr.output:
    print(i)

30
40
42


In [48]:
from typing import Any


class NodeFactory:
    def build_source_node(udf_extractor) -> SourceNode:
        node = SourceNode()
        node.output = udf_extractor
        return node

    def build_interim_node(udf_executor, input=None) -> InterimNode:
        node = InterimNode()
        node.executor = udf_executor
        if input:
            node.input = input
        return node

    def build_sink_node(udf_loader) -> SinkNode: ...

In [51]:
def extractor():
    yield from [1, 2, 3, 4, 5, 6, 7]


def transformer(value):
    return value + 9


def loader(values):
    for value in values:
        print(value)


source = NodeFactory.build_source_node(extractor)
transf = NodeFactory.build_interim_node(transformer)
transf.input = source.output

In [52]:
for i in transf.output:
    print(i)

10
11
12
13
14
15
16


In [146]:
def adder(num):
    num["New Field"] = "Fuck Yeah"
    return num


transformer = NodeFactory.build_interim_node(adder, input=[{"Hey There": "Hello"}])

for a in transformer.output:
    print(a)

{'Hey There': 'Hello', 'New Field': 'Fuck Yeah'}


In [125]:
a = NodeFactory.build_interim_node(adder, input=[1, 2, 3, 4, 5]).output

Settter is called!


In [126]:
next(a)

NotImplementedError: Input was not linked!

In [None]:
SourceNode().join(MidNode().join(MidNode().join(SinkNode())))

In [35]:
class Test(ABC):
    @abstractmethod
    def test(self):
        print("Hey")


class Real(Test):
    def __init__(self, func) -> None:
        self.implementation = func
        super().__init__()

    def test(self):
        self.implementation()


def fuck():
    print("fuck")


real = Real(fuck)

real.test()

fuck


In [136]:
class Test:
    def __hey__(self):
        raise NotImplemented("BAD")

    @property
    def hey(self):
        return self.__hey__()

    @hey.setter
    def hey(self, value):
        def new_hey():
            print(value)

        self.__hey__ = new_hey

In [138]:
test = Test()
test.hey = "GOOD"
test.hey

GOOD


In [None]:
from typing import Iterator, Generator, Dict, List
from queue import Queue


def Extractor():

    requestQueue = Queue()

    def _init_(self):
        # init APIs, client, connectors, whatever...
        #

        ...

    def extract(self, collection: Iterator) -> Generator[Dict, None, None]: ...


def Transformer():
    def _init_(self): ...

    def transform(self, record: Dict) -> Dict: ...


def Loader():
    def _init_(self): ...

    def load(self, record: Dict) -> bool: ...


def ETL():
    def _init_(
        self,
        extractors: List[Extractor],
        transformers: List[Transformer],
        loaders: List[Transformer],
    ):
        self.extractors = extractors
        self.transformers = transformers
        self.loaders = loaders

    def extract(self) -> Generator[Dict, None, None]:
        flag = True
        for extractor in self.extractors:
            while True:
                try:
                    yield next(extractor)
                except Exception as e:
                    # if the end of generator is reached (when StopIteration raised)
                    # then continue gracefully, otherwise flag it as bad
                    # TODO: extract more information about the state from here: such as which extractor failed and the error
                    flag &= isinstance(e, StopIteration)
                    break

    def run(self) -> bool: ...

In [6]:
def extract():
    print("Started.")
    flag = True
    for extractor in self.extractors:
        while True:
            try:
                yield next(extractor)
            except Exception as e:
                # if the end of generator is reached (when StopIteration raised)
                # then continue gracefully, otherwise flag it as bad
                # TODO: extract more information about the state from here: such as which extractor failed and the error
                flag &= isinstance(e, StopIteration)
                break

In [8]:
from functools import partial


class Loader:
    def __init__(self, *args, **kwargs) -> None:
        pass

    def __enter__(self):
        self.active = True
        print("Client Initialized!")
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.active = False
        print("Client Released!")
        # Perform any cleanup or resource releasing here

    def load(self, record):
        if not self.active:
            raise ResourceWarning("Client has already been closed!")
        print(f"Record=[{record}] has been loaded.")


def template_load_one(client, record):
    client.load(record)


with Loader(blah=1) as loader:
    load_one = partial(template_load_one, loader)


def load_all_v1(records):
    for record in records:
        load_one(record)


def load_all_v2(records):
    with Loader(blah=1) as loader:
        for record in records:
            loader.load(record)

Client Initialized!
Client Released!


In [12]:
loader = Loader("")

with loader as c:
    ...

Client Initialized!
Client Released!


In [10]:
records = [1, 2, 3, 4, 5]

load_all_v2(records)

Client Initialized!
Record=[1] has been loaded.
Record=[2] has been loaded.
Record=[3] has been loaded.
Record=[4] has been loaded.
Record=[5] has been loaded.
Client Released!


TypeError: extract() missing 1 required positional argument: 'self'

In [16]:
from typing import *


def func(abc: int, bcd: str) -> List:
    return [abc, bcd]

In [17]:
new_type = Callable[[int, str], List]

In [20]:
isinstance(func, new_type)

TypeError: Subscripted generics cannot be used with class and instance checks

In [44]:
value = {"Message": "Hello World!"}

sentinel = StopIteration()


def load_one(value):
    print("Loaded To Kafka:", value)


def load_bulk():
    try:
        print("Started Loading...")
        while True:
            value = yield  # get value from as you go
            load_one(value)
    finally:
        # Any cleanup code can be placed here
        print("Finished Loading...")

In [46]:
# input: Generator

gen = load_bulk()

gen.send(None)

gen.send(123)
gen.send(234)
gen.send(345)
gen.send(456)
gen.send(567)
gen.send(StopIteration("Sentinel"))

gen.close()

Started Loading...
Loaded To Kafka: 123
Loaded To Kafka: 234
Loaded To Kafka: 345
Loaded To Kafka: 456
Loaded To Kafka: 567
Loaded To Kafka: Sentinel
Finished Loading...


In [None]:
def load():
    import time

    try:
        while True:
            value = yield

            counter += 1

            if counter % batch_size == 0:
                time.sleep(backoff)
    finally:
        # Stop Generator Gracefully. Maybe Release some resources
        print("Closing Generator...")
        return total_counter


# How to use
ETL = load()
next(ETL)

for i in range(20):  # Load 20 values for demonstration
    print("Loading value:", i)
    ETL.send(i)  # Sending the value to the generator
    response = next(ETL)  # Getting the response from the generator
    if response:
        print("Received signal to pause")
        # You can handle the pause signal here
    else:
        print("No pause signal")


GET = True

if GET:
    # Either get returned value:
    try:
        ETL.send(StopIteration())
    except StopIteration as retval:
        statistics = retval.value
    print("Returned Value:", statistics)
else:
    # Or ignore and exit
    ETL.close()

In [None]:
import itertools

generator = None


def peek():
    global generator
    """Peek at the next item in the generator without consuming it."""
    try:
        first_item = next(generator)

        def regenerate():
            yield first_item
            yield from generator

        generator = regenerate()
        return True
    except StopIteration:
        return False


# Example usage:


def my_generator():
    yield 1
    yield 2
    yield 3


generator = my_generator()

In [64]:
print(peek())

TypeError: peek() takes 0 positional arguments but 1 was given