# Use @classmethod Polymorphism to Construct Objects Generically

#### Say that i'm writing a MapReduce implementation, and i want a commom class to represent the input data.

In [21]:
class InputData:
    def read(self):
        raise NotImplementedError

#### I also have a concrete subclass of InputData that reads data from a file on disk

In [22]:
class PathInputData(InputData):
    def __init__(self, path):
        super().__init__()
        self.path = path
    
    def read(self):
        with open(self.path) as f:
            return f.read()

#### I could have any number of InputData subclasses, like PathInputData, and each of them could implement the standard interfaces for read to return the data ro process

In [23]:
class Worker:
    def __init__(self, input_data):
        self.input_data = input_data
        self.result = None
    
    def map(self):
        raise NotImplementedError
    
    def reduce(self, other):
        raise NotImplementedError

#### Here, i define a concrete subclass of Worker to implement the specific MapReduce function i want to apply-a simple newline counter

In [24]:
class LineCountWorker(Worker):
    def map(self):
        data = self.input_data.read()
        self.result = data.count('\n')
    
    def reduce(self, other):
        self.result += other.result

In [25]:
import os

In [26]:
def generate_inputs(data_dir):
    for name in os.listdir(data_dir):
        yield PathInputData(os.path.join(data_dir, name))

In [27]:
def create_workers(input_list):
    workers = []
    for input_data in input_list:
        workers.append(LineCountWorker(input_data))
    return workers

In [28]:
from threading import Thread

In [29]:
def execute(workers):
    threads = [Thread(target=w.map) for w in workers]
    for thread in threads: thread.start()
    for thread in threads: thread.join()
    
    first, *rest = workers
    for worker in rest:
        first.reduce(worker)
    return first.result

#### Finally, i connect all the pieces together in a function to run each step

In [30]:
def mapreduce(data_dir):
    inputs = generate_inputs(data_dir)
    workers = create_workers(inputs)
    return execute(workers)

#### Run this function on a set of test input files works great

In [31]:
import os
import random

In [32]:
def write_test_files(tmpdir):
    os.makedirs(tmpdir)
    for i in range(100):
        with open(os.path.join(tmpdir, str(i)), 'w') as f:
            f.write('\n' * random.randint(0, 100))

In [33]:
tmpdir = 'test_inputs'
write_test_files(tmpdir)

FileExistsError: [Errno 17] File exists: 'test_inputs'

In [34]:
result = mapreduce(tmpdir)
print(f'There are {result} lines')

There are 4682 lines


In [35]:
class GenericInputData:
    def read(self):
        raise NotImplementedError
    
    @classmethod
    def generate_inputs(cls, config):
        raise NotImplementedError

In [36]:
class PathInputData(GenericInputData):
    ...
    @classmethod
    def generate_inputs(cls, config):
        data_dir = config['data_dir']
        for name in os.listdir(data_dir):
            yield cls(os.path.join(data_dir, name))

In [37]:
class GenericWorker:
    def __init__(self, input_data):
        self.input_data = input_data
        self.result = None
    
    def map(self):
        raise NotImplementedError
    
    def reduce(self, other):
        raise NotImplementedError
        
    @classmethod
    def create_workers(cls, input_class, config):
        workers = []
        for input_data in input_class.generate_inputs(config):
            workers.append(cls(input_data))
        return workers

In [38]:
class LineCountWorker(GenericWorker):
    ...

In [39]:
def mapreduce(worker_class, input_class, config):
    workers = worker_class.create_workers(input_class, config)
    return execute(workers)

In [40]:
config = {'data_dir': tmpdir}
result = mapreduce(LineCountWorker, PathInputData, config)
print(f'There are {result} lines')

TypeError: PathInputData() takes no arguments