<a href="https://colab.research.google.com/github/s34836/BIG_DATA/blob/main/lab5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [25]:
!pip install multiprocess
import sys
from collections import defaultdict
from multiprocess import Pool, cpu_count
from typing import Callable, List, Tuple, Any

class MapReduce:
    def __init__(self, mapper: Callable[[Any], List[Tuple[Any, Any]]],
                 reducer: Callable[[Any, List[Any]], Any]):
        """
        Initialize with custom mapper and reducer functions.

        :param mapper: A function that maps input to key-value pairs.
        :param reducer: A function that reduces key-value pairs to output.
        """
        self.mapper = mapper
        self.reducer = reducer

    def _map_worker(self, data_chunk):
        """Process data chunk through the mapper."""
        return self.mapper(data_chunk)

    def _reduce_worker(self, kv_pairs):
        """Process key-value pairs through the reducer."""
        key, values = kv_pairs
        return self.reducer(key, values)

    def execute(self, data: List[Any], num_workers: int = None) -> List[Tuple[Any, Any]]:
        """
        Execute the MapReduce operation.

        :param data: Input data to be processed.
        :param num_workers: Number of workers for parallel processing (defaults to number of CPU cores).
        :return: The final reduced result as a list of key-value pairs.
        """
        if num_workers is None:
            num_workers = cpu_count()

        # Map Phase: Apply mapper function to the input data in parallel.
        with Pool(num_workers) as pool:
            mapped = pool.map(self._map_worker, data)

        # Combine all mapped results into a list of key-value pairs.
        kv_store = defaultdict(list)
        for sublist in mapped:
            for key, value in sublist:
                kv_store[key].append(value)

        # Reduce Phase: Apply reducer function to each key's values.
        with Pool(num_workers) as pool:
            reduced = pool.map(self._reduce_worker, kv_store.items())

        return reduced

def word_count_mapper(document: str) -> List[Tuple[str, int]]:
    """
    Mapper function that splits a document into words and emits key-value pairs of (word, 1).
    """
    words = document.split()
    return [(word, 1) for word in words]

def word_count_reducer(word: str, counts: List[int]) -> Tuple[str, int]:
    """
    Reducer function that sums the occurrences of each word.
    """
    return word, sum(counts)

def read_file_to_array(filename):
    try:
        with open(filename, 'r') as file:
            lines = file.readlines()
            return lines
    except FileNotFoundError:
        print(f"Error: File '{filename}' not found.")
        return []
    except Exception as e:
        print(f"An error occurred: {e}")
        return []

def main():
  #read file
  documents = read_file_to_array("short_story.txt")
  #it will skip special characters and punctuation, it will not be case sensitive (e.g. dad
  #and Dad are to be treated as the same word).
  if documents:
      for i in range(len(documents)):
          # Convert to lowercase
          documents[i] = documents[i].lower()
          # Remove special characters and punctuation
          documents[i] = re.sub(r'[^\w\s]', '', documents[i])
    # Initialize MapReduce with the custom mapper and reducer.
  map_reduce = MapReduce(mapper=word_count_mapper, reducer=word_count_reducer)

  # Execute MapReduce on the documents.
  result = map_reduce.execute(data=documents, num_workers=4)

  # Display the results
  for word, count in result:
    print(f"{word}: {count}")


if __name__ == "__main__":
  main()



death: 6
was: 21
accompanied: 1
by: 12
the: 122
patter: 1
of: 51
nurses: 2
rubbersoled: 1
shoes: 1
marching: 1
briskly: 1
along: 1
linoleum: 1
floors: 1
smell: 1
antiseptic: 1
solution: 1
a: 43
gurney: 1
wheel: 1
in: 41
need: 1
oil: 1
squeaking: 1
rhythmically: 1
as: 21
another: 3
patient: 1
rushed: 1
past: 3
starchy: 1
white: 2
sheets: 2
and: 61
sharp: 1
angular: 1
shapes: 1
headdresses: 1
my: 41
body: 6
revolted: 1
against: 4
crisp: 1
order: 1
violent: 2
expulsions: 1
browns: 1
greens: 1
yellows: 1
bleached: 1
hospital: 3
whites: 1
veins: 1
stood: 3
out: 4
on: 19
small: 5
pale: 1
neck: 2
spittle: 1
flew: 1
from: 14
mouth: 4
i: 66
screamed: 2
cool: 5
bed: 2
tucked: 1
with: 22
neat: 2
stern: 1
corners: 1
were: 10
twisted: 3
into: 8
angry: 1
damp: 3
knots: 1
writhing: 1
am: 7
ordinarily: 2
demure: 1
person: 1
so: 4
you: 2
can: 4
imagine: 2
cringing: 1
horror: 1
at: 13
this: 11
loud: 2
messy: 1
departure: 1
shudder: 1
when: 8
mind: 2
invariably: 1
drifts: 2
to: 41
remember: 4
it: 9
happi