<a href="https://colab.research.google.com/github/kousiknandy/pycolab/blob/main/MapReduce_Mode.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [25]:
from collections import Counter, defaultdict
import glob
from concurrent.futures import ThreadPoolExecutor

class Mapper:
    def __init__(self, itrable):
        self.counts = Counter(itrable)

    def __call__(self):
        return self.counts

class Emitter:
    def __init__(self, filename):
        self.filename = filename

    def __call__(self):
        with open(self.filename) as f:
            for l in f:
                yield int(l)

class Readers:
    def __init__(self, glob):
        self.glob = glob

    def __call__(self):
        yield from glob.glob(self.glob)

class Partition:
    def __init__(self, partitions=1):
        self.partitions = [defaultdict(list)] * partitions
        self.n = partitions

    def __call__(self, iterables):
        for counters in iterables:
            for k, v in counters.items():
                self.partitions[k % self.n][k].append(v)
        yield from self.partitions

class Reducer:
    def __init__(self, counters):
        self.counters = counters

    def sum_up(self):
        self.sums = {}
        for k, v in self.counters.items():
            self.sums[k] = sum(v)
        return self

    def mode(self):
        mode_f, mode_v = 0, 0
        for k,v in self.sums.items():
            if v > mode_v:
                mode_f = k
                mode_v = v
        return mode_f

class MapReducer:
    def __init__(self, glob, parts=1):
        self.glob = glob
        self.parts = parts

    def map(self, filename):
        e = Emitter(filename)
        m = Mapper(e())
        return m()

    def reduce(self, counters):
        r = Reducer(counters)
        return r.sum_up().mode()

    def __call__(self):
        r = Readers(self.glob)
        with ThreadPoolExecutor(max_workers=2) as executor:
            res = executor.map(self.map, r())
        p = Partition(partitions=self.parts)
        with ThreadPoolExecutor(max_workers=2) as executor:
            res = executor.map(self.reduce, p(res))
        print(max(res))



In [26]:
m = MapReducer("/home/data*.txt")
m()

26
