# Improving django-celery tasks with metaprogramming

StuPIG, 2015-12-11

Michael Käufl

# TOC
- Quest & Conditions
- The Problem
- First Attempts
- The Method Caller
- Function/Method Attributes
- The Metaclass

### Quest & Conditions

### Quest

- Import data from wikidata based on id (e.g. Stuttgart: Q1022)
- Use Django and Celery tasks
- Run import async and use one Celery task per wikidata id
- Python 3.4

### Conditions

- There is a registry with a mapping: `class name` -> `class`
- We have to use an importer class in `app/importer.py`
- Only basic python data types are allowed as parameters when calling a task async (`str`, `int`, `list`, `set`, `dict`, …)

## The Problem

## First attempts

In [None]:
# importer.py
from wikidata.tasks import import_wikidata_single

class WikidataImporter:
    
    def import_bulk(self, wikidata_ids):
        for wikidata_id in wikidata_ids:
            task_import_wikidata_single.delay(wikidata_id)

In [None]:
# tasks.py
from celery import app
from somewhere import Registry

@app.task
def task_import_wikidata():  # Interface from outside
    wikidata_ids = [
        'Q1022', 'Q28865', 'Q1032372',
    ]
    importer = Registry.get_importer_instance('WikidataImporter')
    importer.import_bulk(wikidata_ids)

@app.task
def task_import_wikidata_single(wikidata_id):  # Used by importer
    # download and save
    pass

In [None]:
# importer.py
from wikidata.tasks import task_import_wikidata_single

class WikidataImporter:
    
    def import_bulk(self, wikidata_ids):
        for wikidata_id in wikidata_ids:
            task_import_wikidata_single.delay(wikidata_id)
    
    def import_single(self, wikidata_id):
        # download and save
        pass

In [None]:
# tasks.py
from celery import app
from somewhere import Registry

@app.task
def task_import_wikidata():  # Interface from outside
    wikidata_ids = [
        'Q1022', 'Q28865', 'Q1032372',
    ]
    importer = Registry.get_importer_instance('WikidataImporter')
    importer.import_bulk(wikidata_ids)

@app.task
def task_import_wikidata_single(wikidata_id):  # Used by importer
    importer = Registry.get_importer_instance('WikidataImporter')
    importer.import_single(wikidata_id)

## The Method Caller

### Idea

- Use a generalized task that calls a method of an importer.
- Signature of method caller:
  - name of importer class
  - name of method
  - \*args and \*\*kwargs

### Implementation

In [None]:
def task_import_call_method(importer_name, method_name, *args, **kwargs):
    importer = Registry.get_importer_instance(class_name=importer_name)

    # Check preconditions, e.g.:
    if not hasattr(importer, method_name):
        raise ValueError('not an attribute')

    method = getattr(importer, method_name)
    
    if not inspect.ismethod(method) and not inspect.isfunction(method):
        raise ValueError('not a method/function')

    return method(*args, **kwargs)

### The Importer

In [None]:
# importer.py
from general.tasks import task_import_call_method

class WikidataImporter:
    
    def import_bulk(self, wikidata_ids):
        for wikidata_id in wikidata_ids:
            task_import_call_method.delay(
                importer_name=self.__class__.__name__,
                method_name='import_single',
                # Actual arguments:
                wikidata_id=wikidata_id,
            )
    
    def import_single(self, wikidata_id):
        # download and save
        pass


### Why not perfect?

1. Readability
  - better: no need to switch files
  - worse: too many LOC (`importer_name`, `method_name`, …)
1. No chance to verify `method_name` prior to runtime

# Function/Method Attributes

### Idea

- Functions/Methods are objects and can have attributes
  - Django admin: `.short_description = '…'`
  - Celery: `.delay()`
  
- Use a decorator to add an attribute `as_task()`

- Call decorated methods async in importer:


In [None]:
# importer.py
from somewhere import as_task

class WikidataImporter:
    
    def import_bulk(self, wikidata_ids):
        for wikidata_id in wikidata_ids:
            self.import_single.as_task(
                wikidata_id=wikidata_id,
            )
    
    @as_task
    def import_single(self, wikidata_id):
        # download and save
        pass

### No unbound Methods any more

In [3]:
class MyLittlePony:
    def ride(self):
        print('riding')

In [5]:
pony = MyLittlePony()
pony.ride()
pony.ride

riding


<bound method MyLittlePony.ride of <__main__.MyLittlePony object at 0x7f4ad489dcc0>>

In [6]:
MyLittlePony.ride

<function __main__.MyLittlePony.ride>

### Decorator Implementation

In [None]:
from general.tasks import task_import_call_method
from functools import partial

def as_task(method):
    if '<locals>' in method.__qualname__:
        raise ValueError

    qualname = method.__qualname__.split('.')
    if len(qualname) != 2:
        raise ValueError

    class_name = qualname[0]

    method.as_task = partial(
        task_import_call_method.delay,
        class_name,
        method.__name__,
    )

    return method

### Why not perfect?

Class name is based on “implementation time”, not run time.

=> Does not work with inheritance.

# The Metaclass

### Idea

- Use decorator `as_task` as a simple annotation of async methods
- Check if parent class has a method decorated with `as_task`.
- Use metaprogramming to
  - Add function attribute to decorated methods
  - Do the same if parent class has an according method
  - If not, add a method (see below) with same name and attribute
- Implementation of method:

In [None]:
def foo(*args, **kwgars):
    super().foo(*args, **kwargs)

### Implementation

In [2]:
from abc import ABCMeta
import inspect
from functools import partial

registry = dict()

def as_task(method):
    def raiser(*args, **kwargs):
        raise TypeError()

    method.as_task = raiser

    return method


def method_caller(class_name, func_name, *args, **kwargs):
    cls = registry[class_name]
    obj = cls()
    func = getattr(obj, func_name)

    print('CALLING {}\n'.format(func))
    func(*args, **kwargs)

In [3]:
def print_info(mcls, name, bases, namespace):  # Signature of __new__()
    print('\nMETA INFO')

    print('  mcls: {}'.format(mcls))    # Metaclass
    print('  name: {}'.format(name))    # Name of class
    print('  bases: {}'.format(bases))  # Parent classes

    print('  namespace:')               # Attributes
    for attr_name, attr_value in namespace.items():
        if not attr_name.startswith('__'):
            print('    {name}: {value}'.format(
                name=attr_name,
                value=attr_value,
            ))

In [5]:
class MetaImporter(ABCMeta):

    def __new__(mcls, name, bases, namespace):
        new = super().__new__(mcls, name, bases, namespace)
        print_info(mcls, name, bases, namespace)

        registry[name] = new

        prepared = set()
        
        task_methods = ((attr_name, attr)  # All decorated methods
            for attr_name, attr in namespace.items()
            if inspect.isfunction(attr) and hasattr(attr, 'as_task'))

        for attr_name, attr in task_methods:
            mcls.prep_as_task(name, attr)
            prepared.add(attr_name)

        for base in bases:
            mcls.process_base(new, name, base, prepared)

        return new

    def prep_as_task(class_name, func):
        print('\nprep func\n  {}\n  [{}]'.format(func, func.__name__))
        func.as_task = partial(method_caller, class_name, func.__name__)

    def process_base(cls, class_name, base, prepared):
        process_base(cls, class_name, base, prepared)

In [4]:
def task_methods_of_base(base, prepared):
    attrs = (
        (attr_name, getattr(base, attr_name))
        for attr_name in dir(base)
        if not attr_name.startswith('__')
    )
    return (
        (attr_name, attr)
        for attr_name, attr in attrs
        if inspect.isfunction(attr) and hasattr(attr, 'as_task')
        if attr_name not in prepared
    )

def process_base(cls, class_name, base, prepared):
    print('\nprocess base {}'.format(base))

    for func_name, attr in task_methods_of_base(base, prepared):
        if hasattr(cls, func_name):
            attr = getattr(cls, func_name)
            if not inspect.ismethod(attr) and not inspect.isfunction(attr):
                raise ValueError('not a method/function')
            attr.as_task = partial(method_caller, class_name, func_name)
            prepared.add(func_name)
            continue

        def super_caller(self, *args, **kwargs):
            parent = super(cls, self)
            parent_func = getattr(parent, func_name)
            return parent_func(*args, **kwargs)
        super_caller.as_task = partial(method_caller, class_name, func_name)

        setattr(cls, func_name, super_caller)
        # print('task method:', func_name)
        prepared.add(func_name)

In [6]:
class BaseImporter(metaclass=MetaImporter):

    @as_task
    def do_something(self):
        print('Done something in {}, defined in BaseImporter'.format(
            self.__class__.__name__))


META INFO
  mcls: <class '__main__.MetaImporter'>
  name: BaseImporter
  bases: ()
  namespace:
    do_something: <function BaseImporter.do_something at 0x7f5230e5ae18>

prep func
  <function BaseImporter.do_something at 0x7f5230e5ae18>
  [do_something]


In [7]:
importer = BaseImporter()
importer.do_something.as_task()

CALLING <bound method BaseImporter.do_something of <__main__.BaseImporter object at 0x7f5230e850b8>>

Done something in BaseImporter, defined in BaseImporter


In [8]:
class ManInTheMiddle(BaseImporter):

    @as_task
    def do_something(self):
        print('  Done something in {}, defined in ManInTheMiddle'.format(
            self.__class__.__name__))


META INFO
  mcls: <class '__main__.MetaImporter'>
  name: ManInTheMiddle
  bases: (<class '__main__.BaseImporter'>,)
  namespace:
    do_something: <function ManInTheMiddle.do_something at 0x7f5230e63840>

prep func
  <function ManInTheMiddle.do_something at 0x7f5230e63840>
  [do_something]

process base <class '__main__.BaseImporter'>


In [9]:
importer = ManInTheMiddle()
importer.do_something.as_task()

CALLING <bound method ManInTheMiddle.do_something of <__main__.ManInTheMiddle object at 0x7f5230e85898>>

  Done something in ManInTheMiddle, defined in ManInTheMiddle


In [10]:
class ConcreteImporter(ManInTheMiddle):

    @as_task
    def do_something(self):
        print('Done something in {}, defined in ConcreteImporter'.format(
        self.__class__.__name__))

    pass


META INFO
  mcls: <class '__main__.MetaImporter'>
  name: ConcreteImporter
  bases: (<class '__main__.ManInTheMiddle'>,)
  namespace:
    do_something: <function ConcreteImporter.do_something at 0x7f5230e63b70>

prep func
  <function ConcreteImporter.do_something at 0x7f5230e63b70>
  [do_something]

process base <class '__main__.ManInTheMiddle'>


In [11]:
importer = ConcreteImporter()
importer.do_something.as_task()

CALLING <bound method ConcreteImporter.do_something of <__main__.ConcreteImporter object at 0x7f5234c0eef0>>

Done something in ConcreteImporter, defined in ConcreteImporter


In [140]:
for name, cls in registry.items():
    print('{name:17} ->   {cls}'.format(name=name, cls=cls))

BaseImporter      ->   <class '__main__.BaseImporter'>
ManInTheMiddle    ->   <class '__main__.ManInTheMiddle'>
ConcreteImporter  ->   <class '__main__.ConcreteImporter'>


# Thank you!

Further questions?