# Item 39: Use `@classmethod` Polymorphism to Construct Objects Generically

Python supports polymorphism for both classes and objects. 

Polymorphism enbales multiple classes in a hierarchy to implement their own unique versions of a method. This means that many classes can fulfill the same interface or abstract base class while providing different functionality.

In [1]:
# Say we want to write a MapReduce implementation and want a common class to represent the input data. We can
# define such a class with a read method that musth be defined by subclasses
class InputData:
    def read(self):
        raise NotImplementedError

In [2]:
# We also have a concrete subclass of InputData that reads data from a disk
class PathInputData(InputData):
    def __init__(self, path):
        super().__init__()
        self.path = path

    def read(self):
        with open(self.path) as f:
            return f.read()

In [3]:
# We'd want a similar abstract interface as that of read() for MapReduce worker that consumes he input data in
# in a standard way
class Worker:
    def __init__(self, input_data):
        self.input_data = input_data
        self.result = None

    def map(self):
        raise NotImplementedError

    def reduce(self, other):
        raise NotImplementedError
    

In [4]:
# Here, we define a concrete subclass of Worker to implement the specific MapReduce function we want to apply
class LineCountWorker(Worker):
    def map(self):
        data = self.input_data.read()
        self.result = data.count('\n')
    
    def reduce(self, other):
        self.result += other.result

It ma look like the implementation is going great, but we've reached the biggest hurdle in all of this. What connects all of these pieces? We have a nice set of classes with reasonable interfaces and abstractions, but that's only useful once the sibjects are constructed. What's responsible for building the objects and orchestrating MapReduce?

Simple approach: build and connect the objects with some helper functions.

In [5]:
# Here, we list the contents of a directory and construct a PathInputData instance for each file it contains
import os

def generate_inputs(data_dir):
    for name in os.listdir(data_dir):
        yield PathInputData(os.path.join(data_dir, name))

In [12]:
# Next, we create the LineCounterWorker instances by using the InputData instances returned by generate_inputs
def create_workers(input_list):
    workers = []
    for input_data in input_list:
        workers.append(LineCountWorker(input_data))
    return workers

In [13]:
# We execute this Worker instances by fanning out the map step to multiple threads. Then, we call reduce
# repeatedly to combine the results into one final value
from threading import Thread

def execute(workers):
    threads = [Thread(target=w.map) for w in workers]
    for thread in threads: thread.start()
    for thread in threads: thread.join()

    first, *rest = workers
    for worker in rest:
        first.reduce(worker)
    return first.result

In [14]:
# Finally, we connect all the pieces together in a function to run each step
def mapreduce(data_dir):
    inputs = generate_inputs(data_dir)
    workers = create_workers(inputs)
    return execute(workers)

## *Make sure to run the following block of code to generate test input files*

In [15]:
# Test this on some test input files
import os
import random

def write_test_files(tmpdir):
    os.makedirs(tmpdir)
    for i in range(100):
        with open(os.path.join(tmpdir, str(i)), 'w') as f:
            f.write('\n' * random.randint(0, 100))

tmpdir = 'test_inputs'
write_test_files(tmpdir)

result = mapreduce(tmpdir)
print(f'There are {result} lines')

There are 5210 lines


The problem with the code above is that the `mapreduce` function is not generic at all. If we wanted to create another `InputData` or `Worker` subclass, we'd also have to rewrite the `generate_inputs`, `create_workers`, and `mapreduce` functions to match the new implementation.

This problem boils down to needing a generic way to construct objects. Since Python only allows for the single constructor method (`__init__`), its unreasonable for us to require every `InputData` subclass to have a compatible constructor (as we would do in other programming languages).

The best way to solve this issue is with *class method* polymorphism.

In [16]:
# Here we apply the mentioned idea to the MapReduce classes. We extend the Inputdata class with a generic 
# @classmehtod that's responsible for creating a new InputData instance using a common interface
class GenericInputData:
    def read(self):
        raise NotImplementedError

    @classmethod
    def generate_inputs(cls, config):
        raise NotImplementedError

In [28]:
# Here, we use the config dictionary to find the directory to list for input files
class PathInputData(GenericInputData):
    def __init__(self, path):
        super().__init__()
        self.path = path

    def read(self):
        with open(self.path) as f:
            return f.read()

    @classmethod
    def generate_inputs(cls, config):
        data_dir = config['data_dir']
        for name in os.listdir(data_dir):
            yield cls(os.path.join(data_dir, name))

In [29]:
# Similarly, we can makethe create_workers helper part of the GenericWorker class
class GenericWorker:
    def __init__(self, input_data):
        self.input_data = input_data
        self.result = None

    def map(self):
        raise NotImplementedError

    def reduce(self, other):
        raise NotImplementedError

    @classmethod
    def create_workers(cls, input_class, config):
        workers = []
        for input_data in input_class.generate_inputs(config):
            workers.append(cls(input_data))
        return workers

The call to `input_class.generate_inputs` is the polymorphism that the author is trying to show. We can see how `create_workers` calling `cls()` provides an alternative way to construct `GenericCoworker` objects besides using the `__init__` method directly.

In [30]:
# The effect on our concrete GenericWorker subclass is nothing more than changing its parent
class LineCountWorker(GenericWorker):
    def map(self):
        data = self.input_data.read()
        self.result = data.count('\n')
    
    def reduce(self, other):
        self.result += other.result

In [31]:
# Finally, we can rewrite the mapreduce function to be completely generic by calling create_workers
def mapreduce(worker_class, input_class, config):
    workers = worker_class.create_workers(input_class, config)
    return execute(workers)

In [32]:
# Running the new worker on a set of test files produces the same result as the old implementation
config = {'data_dir': tmpdir}
result = mapreduce(LineCountWorker, PathInputData, config)
print(f'There are {result} lines')

There are 5210 lines
