Item 39 Use @classmethod Polymorphism to Construct Objects Generically

Things to Remember
- Python only supports a single constructor per class: the __init__ method.
- Use @classmethod to define alternative constructors for your classes.
- Use class method polymorphism to provide generic ways to build and connect many concrete subclasses.   

Background
- MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster

- A MapReduce program is composed of a map procedure, which performs filtering and sorting (such as sorting students by first name into queues, one queue for each name), and a reduce method, which performs a summary operation (such as counting the number of students in each queue, yielding name frequencies).

In [None]:
# you are writing a MapReduce implementation

class InputData: # a common class represents the input data
    def read(self):
        raise NotImplementedError


In [None]:
# a concrete subclass that reads data from a file on disk
class PathInputData(InputData):
    def __init__(self, path):
        super().__init__()
        self.path = path
    def read(self):
        with open(self.path) as f:
            return f.read()

In [None]:
# - abstract interface for the MapReduce worker
#   that consumes the input data in a standard
#   way 

class Worker:
    def __init__(self, input_data):
        self.input_data = input_data
        self.result = None
    def map(self):
        raise NotImplementedError
    def reduce(self, other):
        raise NotImplementedError

In [None]:
# a simple newline counter
class LineCountWorker(Worker):
    def map(self):
        data = self.input_data.read()
        self.result = data.count('\n')
    def reduce(self, other):
        self.result += other.result

Now we need something that is responsible for building the objects and orchestrating the MapReduce
- using helper functions
- class method polymorphism

In [None]:
import os

def generate_inputs(data_dir):
    for name in os.listdir(data_dir):
        yield PathInputData(os.path.join(data_dir, name)) 

In [None]:
def create_workers(input_list):
    workers = []
    for input_data in input_list:
        workers.append(LineCountWorker(input_data))
    return workers


In [None]:
# fan out the map step (check Item 53)
from threading import Thread

def execute(workers):
    threads = [Thread(target=w.map) for w in workers]
    for thread in threads: thread.start()
    # join: asks main thread to wait until
    #       everyone is done.
    for thread in threads: thread.join()

    first, *rest = workers
    for worker in rest:
        first.reduce(worker)
    return first.result


In [None]:
# connect all the pieces together
def mapreduce(data_dir):
    inputs = generate_inputs(data_dir)
    workers = create_workers(inputs)
    return execute(workers)

In [None]:
import os
import random
import shutil


def write_test_files(tmpdir):
    if os.path.isdir(tmpdir):
        shutil.rmtree(tmpdir)
    os.makedirs(tmpdir)
    for i in range(100):
        # - create a new file with a random number of 
        #   new lines 
        with open(os.path.join(tmpdir, str(i)), 'w') as f:
            f.write('\n' * random.randint(0, 100))



In [None]:
tmpdir = 'test_inputs'
write_test_files(tmpdir)
result = mapreduce(tmpdir)
print(f'There are {result} lines') 

Problems with the above approach
- the mapreduce function is not generic; you have to rewrite generate_input and create_workers, and mapreduce to match if you need to add new InputData or Worker subclass
- you can't solve the problem with constructor polymorphism as Python only allows for the single constructor method \__init\__ and it's unreasonable to require every InputData subclass to have a compatible constructor

In [None]:
# class method polymorphism 

# - move generate_inputs into
#   the base class
class GenericInputData:
    def read(self):
        raise NotImplementedError

    # - static factory pattern
    # - subclass need to decide how 
    #   an instance is created and
    #   interpret what is in the
    #   config dict
    # - cls is the generic constructor 

    @classmethod
    def generate_inputs(cls, config):
        raise NotImplementedError

In [None]:
class PathInputData(GenericInputData):
    def __init__(self, path):
        super().__init__()
        self.path = path
    def read(self):
        with open(self.path) as f:
            return f.read()
    # - determine how the instances
    #   are created
    @classmethod
    def generate_inputs(cls, config):
        data_dir = config['data_dir']
        # - return an instance for each
        #   file in the dir 
        for name in os.listdir(data_dir):
            yield cls(os.path.join(data_dir, name))

In [None]:
class GenericWorker:
    def __init__(self, input_data):
        self.input_data = input_data
        self.result = None
    def map(self):
        raise NotImplementedError
    def reduce(self, other):
        raise NotImplementedError
    
    @classmethod
    def create_workers(cls, input_class, config):
        workers = []
        # - construct instances of the GenericWorker subclass
        # - we use input_class.generate_inputs to achieve
        #   class polymorphism 
        for input_data in input_class.generate_inputs(config):
            workers.append(cls(input_data))
        return workers

In [None]:
# change the parent class
class LineCountWorker(GenericWorker):
    def map(self):
        data = self.input_data.read()
        self.result = data.count('\n')
    def reduce(self, other):
        self.result += other.result

In [None]:
# - the generic version
# - you can now add more subclasses without
#   the need to modify this method
def mapreduce(worker_class, input_class, config):
    workers = worker_class.create_workers(input_class, config)
    return execute(workers)


In [None]:
tmpdir = 'test_inputs'
write_test_files(tmpdir)
config = { 'data_dir' : tmpdir}
result = mapreduce(LineCountWorker, PathInputData, config)
print(f'There are {result} lines')
