*** 맵리듀스를 구현하면서 파이썬의 다형성 공부하기

다형성을 이용하면 여러 클래스가 같은 인터페이스나 추상 클래스를 충족하면서도 다른 기능을 제공할 수 있다.

In [1]:
# 입력 데이터를 표현할 공통 클래스 이 인터페이스를 상속하려면 read 메서드를 구현해야한다.
class InputData(object):
    def read(self):
        raise NotImplementedError

In [4]:
# 디스크에 있는 파일에서 데이터를 읽어오도록 구현한 InputData의 서브 클래스
class PathInputData(InputData):
    def __init__(self, path):
        super().__init__()
        self.path = path
    
    def read(self):
        return open(self.path).read()

PathInputData와 같은 InputData 서브클래스가 몇 개든 있을 수 있다,
각 서브클래스에서는 read()를 구현할 것이다. 다른 InputData 서브클래스는
네트워크에서 데이터를 읽어오거나 데이터의 압축을 해제하는 기능 등을 할 수 있다.

입력 데이터를 처리하는 맵리듀스 작업 클래스에서도 비슷한 추상 인터페이스가 필요하다

In [6]:
#맵리듀스 작업을 할 작업클래스의 서브클래스
class Worker(object):
        def __init__(self, input_data):
            self.input_data = input_data
            self.result = None
            
        def map(self):
            raise NotImplementedError
            
        def reduce(self, other):
            raise NotImplementedError

In [7]:
# Worker의 구체 서브클래스
class LineCountWorker(Worker):
    def map(self):
        data = self.input_data.read()
        self.result = data.count('\n')
        
    def reduce(self, other):
        self.result += other.result

자 이제 무엇으로 객체를 만들고 맵리듀스를 조율할까???

가장 간단한 방법은 헬퍼 함수로 직접 객체를 만들고 연결하는 것이다.

In [8]:
def generate_input(data_dir):
    for name in os.listdir(data_dir):
        yield PathInputData(os.path.join(data_dir, name))

In [10]:
# generate_inputs 함수에서 반환한 InputData 인스턴스를 사용하는 LineCountWorker 인스턴스를 생성
def create_workers(input_list):
    workers = []
    for input_data in input_list:
        workers.append(LineCountWorker(input_data))
    return workers

map 단계를 여러 스레드로 나눠서 이 Worker 인스턴스들을 실행한다. 그런 다음 reduce를 반복적으로 호출해서 결과를 최종값 하나로 합친다.

In [11]:
def execute(workers):
    threads = [Thread(target=w.map) for w in workers]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
        
    first, rest = workers[0], workers[1:]
    for worker in rest:
        first.reduce(worker)
    return first.result

In [13]:
# 마지막으로 단계별로 실행하려고 mapreduce 함수에서 모든 조각을 연결한다.
def mapreduce(data_dir):
    inputs = generate_input(data_dir)
    workers = create_workers(inputs)
    return execute(workers)

In [16]:
from tempfile import TemporaryDirectory

def wirte_test_files(tmpdir):
    with TemporaryDirectory() as tmpdir:
        write_test_files(tmpdir)
        result = mapreduce(tmpdir)
print('There are', result, 'lines')



여기서 만약 다른 InputData나 Worker 서브클래스를 작성한다면 generate_inputs, create_workers, mapreduce 함수를 알맞게 다시 작성해야 한다.

@classmethod 다형성을 이용하면 이 문제를 해결할 수 있다.
@classmethod 다형성은 생성된 객체가 아니라 전체 클래스에 적용된다는 점만 빼면, InputData.read에 사용한 인스턴스 메서드 다형성과 똑같다.

In [19]:
class GenericInputData(object):
    def read(self):
        raise NotImplementedError
    
    @classmethod
    def generate_inputs(cls, config):
        raise NotImplementedError
        

class PathInputData(GenericInputData):
    def __init__(self, path):
        super().__init__()
        self.path = path
    
    def read(self):
        return open(self.path).read()
    
    @classmethod
    def generate_inputs(cls, config):
        data_dir = config['data_dir']
        for name in os.listdir(data_dir):
            yield cls(os.path.join(data_dir, name))

위의 코드에서 generate_inputs 메서드는 GenericInputData를 구현하는 서브클래스가 해석할 설정 파라미터들을 담은 딕셔너리를 받는다.
입력 파일들을 얻어올 디렉터리를 config로 알아냈다.

비슷하게 GenericWorker 클래스에 create_workers 헬퍼를 작성해보자, input_class 파라미터는 GenericInputData 의 서브클래스여야 한다.)
cls()를 범용 생성자로 사용해서 GenericWorker를 구현한 서브클래스의 인스턴스를 생성한다.

In [20]:
class GenericWorker(object):
    def __init__(self, input_data):
            self.input_data = input_data
            self.result = None
            
    def map(self):
        raise NotImplementedError
            
    def reduce(self, other):
        raise NotImplementedError
        
    @classmethod
    def create_workers(cls, input_class, config):
        workers = []
        for input_data in input_class.generate_inputs(config):
            workers.append(cls(input_data))
        return workers

In [21]:
def mapreduce(worker_class, input_class, config):
    workers = worker_class.create_wokers(input_class, config)
    return execute(workers)

이렇게 하면 GenericInputData 와 GenericWorker의 다른 서브클래스를 원하는 대로 만들어도 글루코드를 작성할 필요가 없다.