In [None]:
import inspect
from typing import Union, Optional, Dict, Any, Callable
from functools import wraps, update_wrapper, partial
from core.config import settings

'''
put @Publish(subscriber=subscriber) above a class would make it a publisher class 
which enables it to monitor what it sent to the msg-center class and send messages to subscribers

put @Publish.register(event = event) above a method inside the class would sign the function up 
with the event
'''


class Publisher:
    def __init__(self, subscriber = None, events = settings.EVENTS):
        # if cls and  not inspect.isclass(cls):
        #     raise TypeError('Publisher needs to be a class')
        self.subscriber = subscriber
        self.events = events

    def __call__(self, cls=None):
        self.cls = cls
        self.channel = {event: dict() for event in self.events}
        for event in self.channel:
            self.channel[event]['subscriber'] = self.subscriber

        update_wrapper(self, cls)
        def wrapper(*args, **kwargs):
            value = cls(*args, **kwargs)
            return value
        return wrapper

    @classmethod
    def register(self, event=None, subscriber=None):
        self.channel[event]['subscriber'] = subscriber or self.subscriber
        
    # class register(Publisher):
    #     def __init__(self, func=None,*,event:str=None):
    #         print(super().get_channel())
    #         self.func = func
    #         update_wrapper(self, func)

    #     def __call__(self, *args, **kwargs):
    #         return self.func(*args,**kwargs)

    #     def __get__(self, instance, owner):
    #         return partial(self.__call__, instance)


@Publisher(subscriber=manager)
class Sales:
    def __init__(self, events=None, **kwargs: Dict[str, Any]):
        self.sto_data = pd.DataFrame()

    def sales_sql(self, country: str):
        _sales_sql = f'''country:{country}
                      '''
        return _sales_sql

sales = Sales()

In [58]:
class Decorator(object):
    def __init__(self, arg):
        self.arg = arg
    def __call__(self, cls):
        class Wrapped(cls):
            classattr = self.arg

            def new_method(self, value):
                return value * 2
        return Wrapped

@Decorator("decorated class")
class TestClass(object):
    name:str = 'race'
    def __init__(self, at):
        self.at = at
    def new_method(self, value):
        return value * 3

In [69]:
from functools import wraps
def test(func):
    call_num = 0 
    @wraps(func)
    def decorator(*args,**kwargs):
        nonlocal call_num 
        call_num += 1
        func.__setattr__('call_num', call_num)
        return func(*args,**kwargs)
    return decorator

In [70]:
@test
def tested(a:int=3):
    print(a)

In [14]:
from kafka import TopicPartition, KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost')
for _ in range(100):
    producer.send('my_favorite_topic', b'123')

In [207]:
import inspect

class Decorator(object):
    def __init__(self, *args, **kwargs):

        self.arg = args[0]
        print('args:', args)
        print('function: ', inspect.isfunction(self.arg))
        print('class: ', inspect.isclass(self.arg))
        
    def __call__(self, cls):
        if inspect.isclass(self.arg):
            def new_method(value):
                return 'value_get'
            setattr(self, 'new_method', new_method)
            return self

        class Wrapped(cls):
            classattr = self.arg
            def new_method(self, value):
                return value * 2
        return Wrapped

In [208]:
@Decorator
class Test:
    def __init__(self, name):
        self.name = name
test = Test('race')

args: (<class '__main__.Test'>,)
function:  False
class:  True


In [13]:
import pika
import logging
from core.config import settings



class RabbitMqConnection:
    def __init__(self, con):

    def __get__(self, owner_instance, owner_class=None):
        if not owner_instance._con:
            connection = pika.BlockingConnection(
                settings.RABBITMQ_CONNECTION_PARAMETER
            )
        else:
            connection = owner_instance._con

        logging.info(f'Connection bult')

        return connection

    def __set__(self, owner_instance, connection):
        logging.info(f'Updating Mq connetion :{connection} to {settings.RABBITMQ_CONNECTION_PARAMETER.host}')
        owner_instance._con = connection


class Broker:

    con = RabbitMqConnection()
    
    def __init__(self, con = None):
        self.con = con

broker = Broker()
broker.con

In [4]:
import time

class LazyProperty:
    def __init__(self, function):
        self.function = function
        self.name = function.__name__

    def __get__(self, obj, type=None) -> object:
        obj.__dict__[self.name] = self.function(obj)
        return obj.__dict__[self.name]

class DeepThought:
    @LazyProperty
    def meaning_of_life(self):
        time.sleep(3)
        return 42

my_deep_thought_instance = DeepThought()
my_deep_thought_instance.__dict__

42
42
42


In [25]:
class _Depends:
    def __init__(
        self, dependency: Optional[Callable[..., Any]] = None, *, use_cache: bool = True
    ):
        self.dependency = dependency
        self.use_cache = use_cache

    def __repr__(self) -> str:
        attr = getattr(self.dependency, "__name__", type(self.dependency).__name__)
        print('attr: ', attr)
        cache = "" if self.use_cache else ", use_cache=False"
        print('cache: ', 'cache')
        return f"{self.__class__.__name__}({attr}{cache})"

In [26]:
from typing import Optional, Callable,Any
def Depends(  # noqa: N802
    dependency: Optional[Callable[..., Any]] = None, *, use_cache: bool = True
) -> Any:
    return _Depends(dependency=dependency, use_cache=use_cache)
    
from kafka import TopicPartition, KafkaProducer
producer = KafkaProducer(bootstrap_servers='18.167.33.29')

In [49]:
class A:
    def __init__(self, name:str, age:int):
        self.name = name
        self.age = age
        self.queue = []
        print(f'name: {name}, age: {age}')
    

    def build_obj(self, name:str, age:int):
        obj = A(name, age)
        self.queue.append(obj)
    
    @classmethod
    def bind(cls, app=None):
        print(cls.__bound__)

In [62]:
class TaskRegistry(dict):
    def register(self, task):
        if task.name is None:
            raise InvalidTaskError(
                'Task class {!r} must specify .name attribute'.format(
                    type(task).__name__))
        task = inspect.isclass(task) and task() or task
        self[task.name] = task

In [68]:
class Celery:
    def __init__(self, main=None,**kwargs):
        self._tasks = None
        self.registry_cls = TaskRegistry
        if not isinstance(self._tasks, TaskRegistry):
            self._tasks = self.registry_cls(self._tasks or {}) # _tasks is a dict of class TaskRegistry
                                                               # that instantiates all classes     
    def task(self, *args, **opts):
        """     @app.task
                def fun():  """
        def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
            def _create_task_cls(fun):
                ret = self._task_from_fun(fun, **opts)
                return ret
            return _create_task_cls

        if len(args) == 1 and callable(args[0]): 
            return inner_create_task_cls(**opts)(*args)
        else:
            raise Exception('argument 1 to @task() must be a callable')
        return inner_create_task_cls(**opts)

    def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
        name = name or self.gen_task_name(fun.__name__, fun.__module__) 
                    '''def gen_task_name(app, name, module_name):
                            """Generate task name from name/module pair."""
                            module_name = module_name or '__main__' # get module absolute path
                            module = sys.modules[module_name]

                            if module:
                                module_name = module.__name__
                                # - If the task module is used as the __main__ script
                                # - we need to rewrite the module part of the task name
                                # - to match App.main.
                                if MP_MAIN_FILE and module.__file__ == MP_MAIN_FILE:
                                    # - see comment about :envvar:`MP_MAIN_FILE` above.
                                    module_name = '__main__'
                            if module_name == '__main__' and app.main:
                                return '.'.join([app.main, name])
                            return '.'.join(p for p in (module_name, name) if p)
                    '''


        base = base or self.Task # function

        run = fun # if bind else staticmethod(fun)
        task = type(fun.__name__, (base,), dict({
            'app': self,
            'name': name,
            'run': run,
            '_decorated': True,
            '__doc__': fun.__doc__,
            '__module__': fun.__module__,
            '__annotations__': fun.__annotations__,
            '__header__': staticmethod(head_from_fun(fun, bound=bind)),
            '__wrapped__': run}, **options))()
        task.__qualname__ = fun.__qualname__

        self._tasks[task.name] = task
        task.bind(self)  # connects task to this app

        '''    @classmethod
                def bind(cls, app):
                    was_bound, cls.__bound__ = cls.__bound__, True
                    cls._app = app
                    conf = app.conf
                    cls._exec_options = None  # clear option cache

                    if cls.typing is None:
                        cls.typing = app.strict_typing

                    for attr_name, config_name in cls.from_config:
                        if getattr(cls, attr_name, None) is None:
                            setattr(cls, attr_name, conf[config_name])

                    # decorate with annotations from config.
                    if not was_bound:
                        cls.annotate()

                        from celery.utils.threads import LocalStack
                        cls.request_stack = LocalStack()

                    cls.on_bound(app)      # PeriodicTask uses this to add itself 
                    return app             # to the PeriodicTask schedule
        '''

        return task
celery = Celery()
celery._tasks

In [867]:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
engine = create_engine('sqlite:///db.db')
metadata = MetaData()
user_table = Table(
    'user_table',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('user_name',String(50), nullable=False),
    Column('full_name',String(225)),
)
metadata.create_all()

In [1465]:
class Decorator:
    def __init__(self, func=None, *deco_args, name='race', **deco_kwargs):
        self._locals = locals()
        self._without_params = True
        if func and inspect.isfunction(func):
            self.func = func
            wraps(self.func)(self)
        else:
            self.func = None
            self.deco_args = (func,) + deco_args
            self.deco_kwargs = deco_kwargs
            self.deco_others = {k: v for k, v in self._locals.items(
            ) if k not in ('self', 'deco_args', 'deco_kwargs')}

    def __call__(self, func_or_cls=None, *func_args, **func_kwargs):
        if self.func is None:
            self.func = func_or_cls
            self._without_params = False

        self.func_args = (func_or_cls,) + func_args
        self.func_kwargs = func_kwargs

        @wraps(self.func)
        def wrapper(*wrap_func_args, **wrap_func_kwargs):
            self.func_args = wrap_func_args or self.func_args
            self.func_kwargs = wrap_func_kwargs or self.func_kwargs
            func_result = self.func(*self.func_args, **self.func_kwargs) 
            return func_result

        if self._without_params: # func = Decorator(func); func() = Decorator.__call__ = wrapper
            return wrapper()     # func()() = wrapper() = result
        else:                    # func = Decorator()(func) = Decorator.__call__(func) = wrapper
            return wrapper       # func() = wrapper() = result

    def __get__(self, obj, cls):
        if obj is None:          # instance method does not take call from class
            return self
        else:
            return types.MethodType(self, obj)

In [7]:
import typing as t
from inspect import isclass, isfunction, ismethod
from logging import getLogger
from functools import partial, wraps
from types import FunctionType, MethodType
from typing import Union

class Decorated:
    args: tuple
    kwargs: dict
    wrapped: t.Callable
    result: t.Optional[t.Any]

    def __init__(self, wrapped, args, kwargs, result=None):
        sup = super(Decorated, self)
        sup.__setattr__("args", get_fn_args(wrapped, args))
        sup.__setattr__("kwargs", kwargs)
        sup.__setattr__("wrapped", self._sets_results(wrapped))
        sup.__setattr__("result", result)

    def __str__(self):
        if hasattr(self.wrapped, "__name__"):
            name = self.wrapped.__name__
        else:
            name = str(self.wrapped)
        return "<Decorated {}({}, {})>".format(name, self.args, self.kwargs)

    def __call__(self, *args, **kwargs):    
        return self.wrapped(*args, **kwargs)

    def __setattr__(self, key, value):
        raise AttributeError(
            'Cannot set "{}" because {} is immutable'.format(key, self))

    def _sets_results(self, wrapped):
        @wraps(wrapped)
        def wrapped_wrapped(*args, **kwargs):
            res = wrapped(*args, **kwargs)
            super(Decorated, self).__setattr__("result", res)
            return res
        return wrapped_wrapped


def before(func, implicit_method_decoration=True, instance_methods_only=False, **extras):
    def decorator(decorated):
        def wrapper(*args, **kwargs):
            fn = func
            decor = Decorated(decorated, args, kwargs)
            fret = fn(decor, **extras)

            if fret is not None:
                args, kwargs = fret

            ret = decor(*args, **kwargs)
            return ret

        if implicit_method_decoration and isclass(decorated):
            return ClassWrapper.wrap(decorated, decorator,
                instance_methods_only=instance_methods_only,)
        return wraps(decorated)(wrapper)
    return decorator

def after(func, implicit_method_decoration=True, instance_methods_only=False, **extras):
    def decorator(decorated):
        def wrapper(*args, **kwargs):
            decor = Decorated(decorated, args, kwargs)
            orig_ret = decor(*args, **kwargs)
            fret = func(decor, **extras)

            if fret is not None:
                return fret
            return orig_ret

        if implicit_method_decoration and isclass(decorated):
            return ClassWrapper.wrap(decorated, decorator,
                instance_methods_only=instance_methods_only,)
        return wraps(decorated)(wrapper)
    return decorator

def instead(
    func, implicit_method_decoration=True, instance_methods_only=False, **extras):
    def decorator(decorated):
        def wrapper(*args, **kwargs):
            decor = Decorated(decorated, args, kwargs)
            return func(decor, **extras)

        if implicit_method_decoration and isclass(decorated):
            return ClassWrapper.wrap(decorated, decorator,
                instance_methods_only=instance_methods_only,)
        return wraps(decorated)(wrapper)
    return decorator


def decorate(
    before=None,
    after=None,
    instead=None,
    before_kwargs=None,
    after_kwargs=None,
    instead_kwargs=None,
    implicit_method_decoration=True,
    instance_methods_only=False,
    **extras
):
    if all(arg is None for arg in (before, after, instead)):
        raise ValueError('At least one of "before," "after," or "instead" must be provided')

    my_before = before
    my_after = after
    my_instead = instead

    before_kwargs = before_kwargs or {}
    after_kwargs = after_kwargs or {}
    instead_kwargs = instead_kwargs or {}

    for opts in (before_kwargs, after_kwargs, instead_kwargs):
        opts["implicit_method_decoration"] = implicit_method_decoration
        opts["instance_methods_only"] = instance_methods_only

    def decorator(decorated):
        wrapped = decorated
        if my_instead is not None:
            global instead
            wrapped = instead(my_instead, **{**instead_kwargs, **extras})(wrapped)

        if my_before is not None:
            global before
            wrapped = before(my_before, **{**before_kwargs, **extras})(wrapped)

        if my_after is not None:
            global after
            wrapped = after(my_after, **{**after_kwargs, **extras})(wrapped)

        def wrapper(*args, **kwargs):
            return wrapped(*args, **kwargs)

        if implicit_method_decoration and isclass(wrapped):
            return ClassWrapper.wrap(decorated,decorator,
                instance_methods_only=instance_methods_only,)
        return wraps(decorated)(wrapper)
    return decorator

def construct_decorator(
    before=None,
    after=None,
    instead=None,
    before_kwargs=None,
    after_kwargs=None,
    instead_kwargs=None,
    implicit_method_decoration=True,
    instance_methods_only=False,
    **extras):
    return partial(
        decorate,
        before=before,
        after=after,
        instead=instead,
        before_kwargs=before_kwargs,
        after_kwargs=after_kwargs,
        instead_kwargs=instead_kwargs,
        implicit_method_decoration=implicit_method_decoration,
        instance_methods_only=instance_methods_only,
        **extras)

In [17]:
def dec_for_cls(obj):
    ori_getattr = cls.__getattribute__
    def new_getattr(self, attr_name):
        print('getting', attr_name)
        return ori_getattr(self, attr_name)
    cls.__getattribute__ = new_getattr
    return cl

In [21]:
test(a=2,t=3)

TypeError: test() missing 1 required positional argument: 'self'

In [22]:
class Vehicle():
    can_fly = False
    number_of_weels = 0

class Car(Vehicle):
    number_of_weels = 4

    def __init__(self, color):
        self.color = color

my_car = Car("red")
print(my_car.__dict__)
print(type(my_car).__dict__)

{'color': 'red'}
{'__module__': '__main__', 'number_of_weels': 4, '__init__': <function Car.__init__ at 0x000001E71B7AEF70>, '__doc__': None}


In [30]:
my_car = Car("red")

print(my_car.__dict__['color'])
print(type(my_car).__dict__['number_of_weels'])
print(type(my_car).__base__.__dict__['can_fly'])

red
4
False


In [1]:
from core.config import settings
from conn.conn import Connection
from pika import BlockingConnection

In [11]:
from core.config import settings
from conn.conn import Connection
from pika import BlockingConnection

class RabbitMQConnection(Connection):
    __pool = dict()

    def __init__(self, fget=None, fset=None, fdel=None, con_param=None, max_con=3):
        self.con_param = con_param
        self.con = None
        self.max_con = max_con
        self.co

    def __set_name__(self, cls, name)->None:
        self.name = name

    def __get__(self, obj, cls)->BlockingConnection:
        con = self.__pool.get(f'con_{self.con_num}')
        if con and con.is_open:
            return con
        con = self.build_connection(self.con_param)
        self.__pool.update({f'con_{self.con_num}':con})
        return self.__pool[f'con_{self.con_num}']

    def __set__(self, obj, value)-> None:
        raise Exception('Immutable class')

    def __delete__(self, obj):
        obj.__dict__[self.name].close()

    def channel(self):
        try:
            if self.__channel.is_open:
                return self.__channel
            else:
                raise AttributeError

        except (KeyError, AttributeError):
            self.__channel = self.connection.channel()
            return self.__channel

    def build_connection(self, con_param):
        connection = BlockingConnection(con_param)
        return connection

In [12]:
class A:
    connection = RabbitMQConnection(con_param = settings.RMQ_CON_PARAM)
a = A()

In [13]:
a.connection

AttributeError: 'RabbitMQConnection' object has no attribute 'con_num'