# **Comparativa Map Reduce**

En este ejemplo, ejemplo ampliamente utilizado al mostrar en docencia los diferentes formatos de Map Reduce, realizaremos una descarga de una gran base de datos pública (Proyecto Gutenberg - El Proyecto Gutenberg fue desarrollado por Michael Hart en 1971 con el fin de crear una biblioteca de libros electrónicos gratuitos a partir de libros que ya existen físicamente.​ Estos libros electrónicos se encuentran disponibles desde entonces en Internet), implementar un formato de counting en map reduce de forma secuencial, y tratar de paralelizarlo, donde veremos una limitación de Python.

No se trata de la implementación más eficiente, ya sabemos que al final estas implementaciones ya están disponibles en formato

**Descripción del problema**: realizar un word counting de todas las ocurrencias que aparecen del autor Leo Tolstoy disponibles bajo el [proyecto Gutenberg](https://jovian.com/outlink?url=https%3A%2F%2Fwww.gutenberg.org).

### Descarga y pre-procesamiento de los ficheros:

Tomamos la [referencia](https://www.gutenberg.org/files/59195/59195-h/59195-h.htm) donde se encuentran estos fucheros. Primero liberamos el entorno de ejecuciones anteriores y obtendremos el listado de libros y su contenido.*texto en cursiva*

#### Liberamos el entorno

In [38]:
!rm -r books map-output reduce-output merged.pickle results*json

zsh:1: no matches found: results*json


#### Obtenemos los libros y su contenido

Para ello empleamos las librerías de Python requests y BeautifulSoup (esta librería está ampliamente extendida en el uso de WebScraping sobre webs dinámicas en su código fuente). Con ello, parseamos el contenido para obtener un listado de los títulos de los libros y sus directorios (la versión URL donde encontrarlos).

In [39]:
import requests
from bs4 import BeautifulSoup
import string
from urllib.parse import urlparse

Función **get_book_lines**:

* Realiza una solicitud HTTP GET a la página de índice del autor especificada por author_index.

*Utiliza BeautifulSoup para analizar el documento HTML de la página obtenida.

* Encuentra la primera tabla en la página, que se espera que contenga los enlaces a los libros en formato HTML.

* Itera sobre los enlaces de la tabla y procesa cada uno. Para cada enlace de libro, se realiza lo siguiente:

    a. Se limpia el nombre del libro eliminando la puntuación, convirtiéndolo en minúsculas y reemplazando los espacios con guiones.

    b. Se extrae el identificador único del libro a partir de la URL.

    c. Se construye la URL del directorio de archivos del libro en el Proyecto Gutenberg utilizando el identificador.

    d. Se realiza otra solicitud HTTP GET a la URL del directorio de archivos del libro.

    e. Se analiza el documento HTML de la página del directorio del libro.

    f. Se filtran los enlaces para encontrar el archivo de texto del libro (que tiene 'txt' en su URL).

    g. Se selecciona el primer archivo de texto encontrado y se construye la URL completa del libro.

    h. Se almacena el nombre del libro (limpiado) como clave y la URL completa del libro como valor en un diccionario all_books.

Se devuelve el diccionario all_books que contiene los nombres de los libros y sus respectivas URL completas en formato de texto.

Además, maneja casos de error:

* Si la solicitud a la página de índice del autor no tiene éxito (código de estado diferente de 200), se genera una excepción.

* Si no se puede recuperar la información del directorio de archivos del libro (código de estado diferente de 200), se genera una excepción.

In [76]:
# Purpose: Perform a word count on all occurrences from the author Leo Tolstoy available under the Gutenberg Project.


def get_book_links(author_index):
    """
    Retrieves links to books by a specific author from the Gutenberg Project.

    Args:
        author_index (str): URL to the author's index page on the Gutenberg project.

    Returns:
        dict: A dictionary where keys are book titles (with punctuation removed and words joined by hyphens) and values are URLs to the text files.

    Raises:
        Exception: If unable to retrieve the author index page or specific book directory listing.
    """
    # Create a translation table to replace all punctuation in a string with spaces.
    # Utilizes the Unicode ordinal of each punctuation character for the replacement.
    punctuation_table = dict.fromkeys((ord(i) for i in string.punctuation), " ")

    response = requests.get(author_index)  # Attempt to fetch the author's index page.
    if response.status_code == 200:
        document = BeautifulSoup(response.text, "html.parser")
        # Extract links from the first table found, assuming it contains book links. May need adjustment for page structure changes.
        book_index = document.find("table").find_all("a")
        all_books = {}  # Initialize a dictionary to store book titles and their URLs.

        print("Retrieving ", end="")
        for link in book_index:
            if not link.text.startswith("#"):  # Exclude anchor links.
                print(".", end="")  # Visual feedback for progress.
                # Normalize the book title: remove punctuation, convert to lowercase, split into words, and rejoin with hyphens.
                book = "-".join(link.text.translate(punctuation_table).lower().split())

                url = urlparse(link["href"])
                book_id = url.path.split("/")[
                    2
                ]  # Extract the book ID from the URL path.
                book_dir_url = f"https://www.gutenberg.org/files/{book_id}/"  # Construct the URL to the book's directory.

                # Fetch the directory listing for the book to find the .txt file.
                book_dir_request = requests.get(book_dir_url)
                if book_dir_request.status_code == 200:
                    book_dir = BeautifulSoup(book_dir_request.text, "html.parser")
                    # Find the first .txt file in the directory listing, assuming it's the book text.
                    # Note: Assumes there are no duplicates or prioritizes the first if sorted alphabetically.
                    book_file = sorted(
                        [
                            a.text
                            for a in filter(
                                lambda a: "txt" in a["href"], book_dir.find_all("a")
                            )
                        ]
                    )[0]
                    book_url = "".join([book_dir_url, book_file])
                    all_books[book] = (
                        book_url  # Map the normalized book title to its URL.
                    )
                else:
                    raise Exception("Unable to retrieve: " + book_dir_url)

        print("Done.")
        return all_books
    else:
        raise Exception("Unable to retrieve: " + author_index)

In [42]:
index_url = "https://www.gutenberg.org/files/59195/59195-h/59195-h.htm"
all_books = get_book_links(index_url)

Retrieving ...............................Done.


#### Obtenemos y almacenamos los libros en el entorno local

In [43]:
import os
from io import StringIO
import re

Función **retrieve_books**:

* Inicializa expresiones regulares para marcar el inicio y el final de un libro en formato de texto.

* Intenta crear un directorio especificado por el parámetro directory para almacenar los libros. Si el directorio ya existe y exists_ok es True, no se produce ningún error. De lo contrario, se genera una excepción FileExistsError.

* Itera sobre los elementos del diccionario all_books, que contiene nombres de libros y sus respectivas URL completas en formato de texto.

* Para cada libro, se realiza lo siguiente:

    a. Realiza una solicitud HTTP GET a la URL del libro.

    b. Si la solicitud es exitosa (código de estado 200), se procesa el contenido del libro.

    c. Se establece la codificación del texto como UTF-8.

    d. Se crea un objeto StringIO para leer el texto obtenido.

    e. Se abre un archivo de salida en modo de escritura con codificación UTF-8 en el directorio especificado.

    f. Se busca el inicio del libro dentro del texto y se omite todo antes de ese punto.

    g. Se escribe el contenido del libro en el archivo de salida hasta encontrar el final del libro.

    h. Se cierran los archivos.

Se imprime "Hecho." cuando se han procesado todos los libros.

Además, maneja casos de error:

* Si el directorio especificado por directory ya existe y exists_ok es False, se genera una excepción FileExistsError.

In [44]:
def retrieve_books(all_books, directory="books", exists_ok=True):
    # Start of the ebook - In the file they are marked by: *** START OF THE PROJECT GUTENBERG EBOOK THE FORGED COUPON AND OTHER STORIES ***
    # ? Regex function which filters that at the beginning of a string, there has to be exactly three *,
    # ? allowing any number of spaces and then followed by the literal word START
    start = re.compile(r"^[*]{3}\s*START")

    # End of the ebook - In the file marked by: *** END OF THE PROJECT GUTENBERG EBOOK THE FORGED COUPON AND OTHER STORIES ***
    # ? Same as above but for the end.
    end = re.compile(r"^[*]{3}\s*END")

    try:
        os.mkdir(directory)  # Create the directory named books
    except FileExistsError:
        if not exists_ok:
            raise

    for book, url in all_books.items():
        print("Retrieving", book, end=" ")
        r = requests.get(url)
        print(r.status_code)
        if r.status_code == 200:
            # * Once connection has been established, we pass the text from the file intro the input file.
            # * We create within our book folder a text file where we will save the book locally - write.
            r.encoding = "utf-8"
            input_file = StringIO(
                r.text
            )  # ? - StringIO helps treat strings as file-like objects.
            output_file = open(
                "".join([directory, "/", book, ".txt"]), "w", encoding="utf-8"
            )

            # * Find the start of the book applying the RegEx Condition
            for line in input_file:
                if start.match(line):
                    break

            # * Write in the book file until you've found the end of the book using the 2nd RegEx condition
            for line in input_file:
                if end.match(line):
                    break
                output_file.write(line)

            input_file.close()
            output_file.close()
    print("Done.")

In [45]:
retrieve_books(all_books)

Retrieving the-forged-coupon-and-others 200
Retrieving the-kreutzer-sonata-and-others 200
Retrieving father-sergius 200
Retrieving master-and-man 200
Retrieving anna-karenina 200
Retrieving resurrection 200
Retrieving childhood 200
Retrieving boyhood 200
Retrieving youth 200
Retrieving war-and-peace 200
Retrieving census-and-what-to-do 200
Retrieving on-the-significance-of-science-and-art 200
Retrieving the-cossacks 200
Retrieving what-men-live-by-and-other-tales 200
Retrieving a-letter-to-a-hindu 200
Retrieving the-awakening 200
Retrieving plays-complete-edition 200
Retrieving the-power-of-darkness 200
Retrieving bethink-yourselves 200
Retrieving tolstoy-on-shakespeare 200
Retrieving fables-for-children-stories-for-children 200
Retrieving where-love-is-there-god-is-also 200
Retrieving a-russian-proprietor-and-others 200
Retrieving what-shall-we-do 200
Retrieving my-religion 200
Retrieving katia 200
Retrieving journal-of-leo-tolstoi-vol-i-1895-1899 200
Retrieving sevastopol 200
Retriev

In [77]:
!du -sh books

 14M	books


^^Comando que nos dice cuando espacio en el equipo ocupa la carpeta books. La cual hemos averiguado que son 14Mb

### Implementación **tradicional** de MapReduce

Vamos a realizar una implementación tradicional y secuencial del word count:

*   Para cada fichero .txt en el directorio obtenido, abrimos el archivo y lo leemos
*   Pre-procesamos su contenido
*   Contamos la ocurrencia de palabras
*   Resultado por palabra




#### Pre-procesado

Hacemos un pre-procesado básico donde quitamos signos de puntuación, convertimos en minúsculas y separamos las palabras por espacios

In [48]:
import sys
import unicodedata

The objective of the following function is generating a dict. called punctuation containing Unicode punctuation characters with their replacements. 
We then update to replace the dashes and hypens with spaces. Then using a function, we will remove all punctuation from the input text. Followed by lowercase + split

In [49]:
# Generate a translation table of Unicode punctuation, replace dashes with space, and everything else with None
# ? - This creates a dictionary from unicode characters, where for each element in unicode, if it's start with P. It's classified as punctuation.
# ? - This means that we are creating a dictionary for the punctuation varaibles - Set to none, initially.
punctuation = dict.fromkeys(
    i for i in range(sys.maxunicode) if unicodedata.category(chr(i)).startswith("P")
)
# * Then we are updating the existing dictionary, if any of those characters are in the section of dashes or hypens, we substitute them into spaces.
punctuation.update(
    dict.fromkeys((i for i in punctuation if unicodedata.category(chr(i)) == "Pd"), " ")
)


# Removes all punctuation from the text, including dashes and hypens, make lowercase, and split into words
def tokenize(text):
    return text.translate(punctuation).lower().split()

### Unicode is used as it has a standardised way of representing characters from different scripts and languages. It also is more efficient, with o(n) efficiency as we're iterating through a list, however, it's O(1) access to verify. 

#### Implementación tradicional

La complejidad de este algoritmo es O(n), donde en este caso n sería el número total de palabras a leer en todos los ficheros.

¿**Qué es la complejidad en un algoritmo**?

La complejidad de un algoritmo es una medida de la cantidad de recursos que consumo en relación al tamaño del input que procesa. Se puede analizar en términos de tiempo o espacio. La complejidad temporal, por ejemplo, se indica en términos de la cantidad de operaciones que realiza el algoritmo.

Para determinar la complejidad de un algoritmo, se analiza como aumenta su tiempo de ejecución o uso de memoria a medida que el tamaño de la entrada crece. Por lo general se busca expresar esta relación en términos asintóticos, utilizando la notación O(n), O(log(n))...

Un algoritmo de wordcount secuencial en Python tiene orden n porque para contar las palabras en un texto, el algoritmo simplemente recorre cada palabra del texto una vez y aumenta el contador por cada vez que encuentra la palavra. La cantidad de operaciones realizadas es proporcional al número de palabras en el texto, por lo que su complejidad es lineal.

Ejemplos de complejidad:

*   Orden constante: O(1)
*   Orden logarítmico: O(log(n))
*   Orden lineal: O(n)
*   Orden cuadrático: O(n^2)
*   Orden polinomial: O(n^a) a>2
*   Orden exponencial: O(a^n) a>2
*   Orden factorial: O(n!)





In [50]:
import os

Función **count_words**:

* Enumera los archivos en el directorio especificado por directory que tienen la extensión ".txt".

* Inicializa un diccionario counts para almacenar el recuento de palabras.
Para cada archivo en la lista de archivos:

    a. Si verbose es True, imprime el nombre del archivo que está siendo procesado.
    b. Abre el archivo en modo de lectura.

    c. Lee cada línea del archivo y tokeniza el contenido multiplicando cada línea por el factor factor.

    d. Para cada palabra tokenizada, actualiza su conteo en el diccionario counts.

* Devuelve el diccionario counts que contiene el recuento de palabras en todos los archivos.

In [51]:
def count_words(directory="books", verbose=True, factor=1):
    # * Initialise a dict, and then registers all files in /books that end with .txt
    files = [file for file in os.listdir(directory) if file.endswith(".txt")]
    counts = {}
    for file in files:
        if (
            verbose
        ):  # * Si verbose es verdadero, imprime que archivo esta siendo procesado.
            print("processing", file)
        with open("".join([directory, "/", file])) as f:
            # * Opens file and then for each line processes it into a dictionary.
            # * Multiplicando cada linea por el factor -> ¿Porque?
            for line in f:
                for word in tokenize(line * factor):
                    # ? - Cuenta la palabra, y si se repite, cogera el diccionario, y le añadirá un valor de +1 a la clave
                    # ? - Esto sería como el Reduce. En caso de que no esté la palabra en el diccionario, lo inicializa a 1.
                    counts[word] = counts.get(word, 0) + 1
    return counts

Obtenemos y ordenamos las palabras por ocurrencias, de mayor a menor.

In [52]:
%%time
total_counts = count_words(verbose=False)
# ? - For every key, value pair in the dictionary, we sort them.
# ? - They will be sorted first by the value, and then by key.
results = sorted(
    [(k, v) for k, v in total_counts.items()], key=lambda x: (x[1], x[0]), reverse=True
)

CPU times: user 860 ms, sys: 13 ms, total: 873 ms
Wall time: 898 ms


* CPU times: Estos son los tiempos de CPU utilizados por el proceso durante su ejecución. La salida generalmente incluye dos valores:

  * user: El tiempo de CPU utilizado por el código del usuario, es decir, el tiempo que la CPU pasó ejecutando el código del programa.

  * sys: El tiempo de CPU utilizado por el sistema durante la ejecución del proceso, como para operaciones de E/S.

* Total: Esta es la suma de los tiempos de CPU del usuario y del sistema. Representa el tiempo total de CPU utilizado por el proceso.

* Wall time: Este es el tiempo real de la ejecución del proceso, también conocido como tiempo de pared. Representa el tiempo total transcurrido desde el inicio hasta el final de la ejecución del proceso, incluyendo tiempos de espera y otros factores externos al CPU.

In [53]:
results[:10]

[('the', 152564),
 ('and', 97142),
 ('to', 75919),
 ('of', 67755),
 ('a', 47462),
 ('in', 42594),
 ('he', 41209),
 ('that', 34880),
 ('his', 30863),
 ('i', 30778)]

Almacenamos los resultados en formato json

In [54]:
import json

In [55]:
with open("results-traditional.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(results, indent=4))

### Proceso Map Reduce (sin paralelizar)

Como vimos, generaremos una función map, donde a cada palabra se le asigna una ocurrencia, en formato clave, valor. El formato es: (word, 1)

Función **mapper**:

La función primero tokeniza el texto multiplicando cada línea por el factor dado. Esto significa que si factor es mayor que 1, cada línea del texto se repetirá esa cantidad de veces antes de ser tokenizada. Luego, itera sobre cada palabra tokenizada. Por cada palabra, la función emite un par clave-valor utilizando yield. La clave es la palabra misma y el valor es siempre 1.

In [56]:
"""
This is not your typical function, it's called a generator function. 
So the purpose in our program is to iterate over this initialised dictionary, later.  
"""


def mapper(text, factor=1):
    for word in tokenize(text * factor):
        # * Genera el diccionario pero inicializa todo a uno => Pero en realidad lo esta añadiendo a una tupla
        yield (word, 1)

Como estamos buscando hacer un count, nuestra función reduce toma el formato que ya vimos en la teoría de (clave, valor), en orden, y lo reduciremos de tal manera que agregamos por clave. Ejemplo:

input: [('and', 1), ('and', 1), ('the', 1), ('the', 1), ('the', 1)]

output: [('and', 2), ('the', 3)]

Función **reducer**:

* La función inicializa variables current_key y word_count.

* Itera sobre los pares clave-valor en key_value_data.

* Por cada par:

    * Si la clave es la misma que la current_key, se suma el valor al conteo de palabras (word_count).

    * Si la clave es diferente de current_key, se agrega una tupla a la lista results que contiene la current_key y su word_count, luego se actualiza current_key y word_count con los valores de la nueva clave y su valor respectivamente.

    * Después del bucle, si hay una current_key (es decir, si quedó algún par sin procesar), se agrega a results la tupla final que contiene la current_key y su word_count.

* Finalmente, devuelve la lista results, que contiene las claves y sus conteos combinados.

El resultado de esta función es una lista de tuplas donde cada tupla contiene una clave (por ejemplo, una palabra) y su conteo total en el conjunto de datos.

In [57]:
def reducer(key_value_data):
    # * Initialize current_key and word_count with the first key-value pair from the input data.
    results = []
    current_key = None
    word_count = 0
    # ! - This is assuming that the keys will indeed be sorted before we apply reduce.
    # ! - If they were random the algorithm wouldn't work.
    for key, value in key_value_data:
        if current_key == key:
            word_count += value
        else:
            # If the current key does not match the key of the current iteration,
            # append the current_key and its aggregated word_count to the results list.
            results.append((current_key, word_count))
            # Then, reset current_key and word_count with the new key and its value
            # to start aggregating counts for the new key.
            current_key, word_count = key, value

    # After exiting the loop, append the last key and its aggregated count to the results.
    # ! - This step is necessary because the loop's logic only appends to results upon encountering a new key.
    # !- The final key-value pair is processed inside the loop but is appended after it.
    results.append((current_key, word_count))
    return results

#### Map Reduce Framework:

Como vimos, el esquema de ejecución de Map Reduce tiene 3 partes: map, shuffle y reduce.

In [58]:
import pickle
import os

Construimos la función, que apoyándose en la función de map, lee el archivo y la ejecuta, de forma secuencial. Toma el input inicial, lo lleva por la función map, y escribe el output intermedio.

Función **map_file**:

* Esta función procesa un archivo de entrada (posiblemente un archivo de texto) ubicado en el directorio de entrada (input_dir).

* Utiliza una función mapper para mapear cada línea del archivo de entrada, generando pares clave-valor.

* Los resultados del mapeo se almacenan en un archivo de salida (con extensión '.pickle') en el directorio de salida (output_dir).

In [59]:
def map_file(filename, input_dir, output_dir="map-output", factor=1):
    results = []
    input_file = "".join([input_dir, "/", filename])
    output_file = "".join(
        [output_dir, "/", filename.split(".")[0], "-map.pickle"]
    )  # * For serialization.
    with open(input_file) as f:
        for line in f:
            results += mapper(
                line, factor
            )  # * Generator function, all initialised to 1.
    with open(
        output_file, "wb"
    ) as f:  # * Write the sorted results into our pickle output file.
        pickle.dump(sorted(results), f)

!!! - La ventaja que trae pickle, es que serializa y deserializa las estructuras de Python. Convirtiendolas en un byte-stream. Es almacenamiento mucho más eficiente y serializado. Mucho más rapido que lectura o escritura de archivos. 

---

La función shuffle se utilizará simplemente para pasar el nombre de un archivo intermedio a las funciones reduce. En frameworks a gran escala como Apache Hadoop, la función shuffle también se asegurará de que los **pares clave-valor estén ordenados** y se **encargará de distribuir los datos a los reducers utilizando diversas técnicas.**

Función **shuffle**:

* Enumera los archivos en el directorio especificado (map_dir) que tienen la extensión '.pickle'.

* Itera sobre cada archivo encontrado.

* Para cada archivo, emite el nombre del archivo utilizando yield. Esto significa que la función actúa como un generador, devolviendo un archivo a la vez a medida que avanza la iteración.

In [60]:
def shuffle(map_dir="map-output"):  # * This takes in a folder as parameter.
    # * For every file in the folder, if the file ends with .pickle, add it to the list.
    files = [file for file in os.listdir(map_dir) if file.endswith(".pickle")]
    # * Then for every file in the list, instead of returning it, we're going to place this as a generator function.
    for filename in files:
        yield filename

Función **reduce_maps**:

* Construye las rutas de archivo de entrada y salida utilizando el nombre del archivo y los directorios especificados.

* Abre el archivo de entrada en modo de lectura binaria ('rb').

* Utiliza pickle.load para cargar los datos del archivo de entrada, que se espera que sean resultados del mapeo serializados.

* Aplica la función reducer a los datos cargados para realizar la operación de reducción.

* Abre el archivo de salida en modo de escritura binaria ('wb').

* Utiliza pickle.dump para escribir los resultados reducidos en el archivo de salida.

El resultado de esta función es un archivo de salida que contiene los resultados reducidos de los datos del archivo de entrada. Estos resultados reducidos están listos para su posterior procesamiento o análisis.

Esta función ejecuta la etapa de reduce, tomamos el input intermedio y llevamos a cabo la agregación.

In [61]:
def reduce_maps(filename, input_dir="map-output", output_dir="reduce-output"):
    input_file = "".join([input_dir, "/", filename])
    output_file = "".join(
        [output_dir, "/", filename.split(".")[0], "-reduce.pickle"]
    )  # * We're going to serialise the reduced data and dump it on a pickle file.

    with open(
        input_file, "rb"
    ) as input_fd:  # * We will open the input file from the maps to get the serialised data, and we will open the out to write.
        with open(output_file, "wb") as output_fd:
            # * Here it's saying - Write to the reduced file, the mapped file and apply the reduce func. to it.
            pickle.dump(reducer(pickle.load(input_fd)), output_fd)

Unificamos todos los outputs de las ejecuciones de reduce, y las ordenamos.

Función **merge_results**:

* Enumera los archivos en el directorio especificado por input_dir que tienen la extensión '.pickle'.

* Itera sobre cada archivo encontrado.

* Para cada archivo, abre el archivo en modo de lectura binaria ('rb').

* Utiliza pickle.load para cargar los datos del archivo, que se espera que sean resultados de operaciones de reducción.

* Agrega los datos cargados a la lista results.

* Ordena los elementos de la lista results utilizando la función sorted.

* Abre el archivo de salida en modo de escritura binaria ('wb').

* Utiliza pickle.dump para escribir los resultados fusionados en el archivo de salida.

In [66]:
def merge_results(input_dir="reduce-output", output_file="merged.pickle"):
    # * Once we have our reduced file, we wish to merge.
    files = [file for file in os.listdir(input_dir) if file.endswith(".pickle")]

    results = []
    for filename in files:
        # * Get the reduced words from ALL OF THE FILE, and then sort them, then dump them into a single pickle file.
        with open("".join([input_dir, "/", filename]), "rb") as f:
            file_data = pickle.load(f)
            if None in file_data:
                print(f"Found None in {filename}")  # Debugging line
            results += file_data

    # Filter out None values before sorting
    results = [r for r in results if r is not None]
    results = sorted(results)

    with open(output_file, "wb") as f:
        pickle.dump(list(results), f)

Implementación en función con todos los pasos anteriores. En este caso, al ser una ejecución del algoritmo en formato secuencial, al ser más complejo que la forma tradicional (que por ser un problema sencillo ya es eficiente), será un proceso más costoso en tiempo de ejecución. Aún así, veremos que el resultado es el mismo.

La complejidad temporal de esta implementación del algoritmo MapReduce es aproximadamente O(3n+nlog2(n)) donde n es el número total de palabras en todos los archivos de entrada. En el peor de los casos, en el que cada palabra es única, procesamos todas las palabras en el map y en la reducción final.

In [63]:
def map_reduce_sequential(input_dir="books", factor=1):
    # Create intermediate directories, raise exception if they already exist.
    intermediate_dirs = ["map-output", "reduce-output"]
    # * We prepare the folders to store the outputs.
    for directory in intermediate_dirs:
        os.makedirs(directory, exist_ok=False)

    # * Map the input text files, which will dump these in a pickle folder to be read.
    files = [file for file in os.listdir(input_dir) if file.endswith(".txt")]
    for filename in files:
        map_file(filename, input_dir, factor=factor)

    # * Shuffle and apply the reducer
    for (
        filename
    ) in (
        shuffle()
    ):  # ? -> Remember the generator function, how this was going to give us the mapper pickle files all sorted?
        reduce_maps(filename)

    # Merge the results
    merge_results()  # ? - Such that it's all placed in a single file.

    # Final reduce
    with open("merged.pickle", "rb") as f:  # ! - Read the final results.
        results = reducer(pickle.load(f))
    return results

In [68]:
!rm -r map-output reduce-output merged.pickle # * Removes the files so we can re-execute from the start.

rm: merged.pickle: No such file or directory


In [69]:
%%time
total_counts = map_reduce_sequential()
results = sorted(
    [(k, v) for k, v in total_counts], key=lambda x: (x[1], x[0]), reverse=True
)

TypeError: '<' not supported between instances of 'str' and 'NoneType'

In [None]:
results[:10]

[('the', 152564),
 ('and', 97142),
 ('to', 75919),
 ('of', 67755),
 ('a', 47462),
 ('in', 42594),
 ('he', 41209),
 ('that', 34880),
 ('his', 30863),
 ('i', 30778)]

In [71]:
with open("results-map-reduce-sequential.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(results, indent=4))

In [70]:
!diff -u results-traditional.json results-map-reduce-sequential.json

diff: results-map-reduce-sequential.json: No such file or directory


#### Conclusiones:

Tenemos un mismo output, sin ninguna diferencia, pero se ejecuta con un tiempo ~4 veces más lento que la anterior implementación. En este caso, como vimos, guarda relación con la complejidad del algoritmo.

Todo esto se debe a que no estamos trabajando de forma paralelizada, en entorno distribuido, estamos ejecutando todas las tareas una tras otra.

### MapReduce empleando **threads**

Los threads nos permiten aprovechar las capacidades multiprocesador de nuestras máquinas para ejecutar varias instrucciones a la vez, como subprocesos independientes. Se crean instanciando la clase Thread. Es una forma de optimizar el rendimiento de ejecución.

La complejidad global de nuestro algoritmo sigue siendo la misma. La implementación será más rápida al contar con la capacidad de ejecutar tareas en paralelo en las fases de reduce y map.

In [72]:
import concurrent.futures

Función **map_reduce_threads**:

La función map_reduce_threads(input_dir='books', factor=1, workers=None) es una implementación de procesamiento distribuido utilizando hilos (threads) en Python. Realiza una operación de map-reduce en una colección de archivos de texto ubicados en un directorio de entrada (input_dir).

* Crea los directorios intermedios necesarios para almacenar los resultados temporales de la operación map y reduce (map-output y reduce-output).

* Itera sobre los archivos en el directorio de entrada y ejecuta la operación de mapeo (map_file) en cada archivo utilizando un ThreadPoolExecutor con el número máximo de hilos especificado por workers.

* Luego, inicia una operación de barajado (shuffle) y ejecuta la operación de reducción (reduce_maps) en los archivos obtenidos del barajado utilizando un ThreadPoolExecutor.

* Fusiona los resultados de la operación de reducción utilizando la función merge_results.

* Finalmente, realiza una operación final de reducción en los resultados fusionados utilizando la función reducer.

* El resultado final de la función es el resultado de la operación de reducción final, que es un objeto Python generado por la función reducer.

La función utiliza hilos (THREADS) para ejecutar las operaciones de map y reduce en paralelo, lo que puede mejorar el rendimiento en sistemas con múltiples CPUs o en operaciones de entrada/salida intensivas donde el procesamiento se bloquea esperando el sistema de entrada y salida (el sistema E/S es la interfaz que tiene el computador con el exterior y el objetivo que tiene es facilitar las operaciones de E/S entre los periféricos y la memoria o los registros del procesador.).

In [73]:
def map_reduce_threads(input_dir="books", factor=1, workers=None):
    # Create intermediate directories
    intermediate_dirs = ("map-output", "reduce-output")
    for directory in intermediate_dirs:
        os.makedirs(directory, exist_ok=True)

    # Map the data
    files = [file for file in os.listdir(input_dir) if file.endswith(".txt")]
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
        for filename in files:
            executor.submit(map_file, filename, input_dir, **{"factor": factor})

    # Shuffle and reduce
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
        for filename in shuffle():
            executor.submit(reduce_maps, filename)

    # Merge results
    merge_results()

    # Final reduce
    with open("merged.pickle", "rb") as f:
        results = reducer(pickle.load(f))
    return results

In [74]:
!rm -r map-output reduce-output merged.pickle

rm: merged.pickle: No such file or directory


In [75]:
%%time
total_counts = map_reduce_threads(workers=9)
results = sorted(
    [(k, v) for k, v in total_counts], key=lambda x: (x[1], x[0]), reverse=True
)  # ! - Look at that. So fast!!

TypeError: '<' not supported between instances of 'str' and 'NoneType'

In [None]:
results[:10]

[(None, 0)]

In [None]:
with open("results-map-reduce-threads.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(results, indent=4))

In [None]:
!diff -u results-traditional.json results-map-reduce-threads.json

--- results-traditional.json	2024-02-03 16:18:24
+++ results-map-reduce-threads.json	2024-02-03 17:07:51
@@ -1,151566 +1,6 @@
 [
     [
-        "the",
-        152564
-    ],
-    [
-        "and",
-        97142
-    ],
-    [
-        "to",
-        75919
-    ],
-    [
-        "of",
-        67755
-    ],
-    [
-        "a",
-        47462
-    ],
-    [
-        "in",
-        42594
-    ],
-    [
-        "he",
-        41209
-    ],
-    [
-        "that",
-        34880
-    ],
-    [
-        "his",
-        30863
-    ],
-    [
-        "i",
-        30778
-    ],
-    [
-        "was",
-        30427
-    ],
-    [
-        "it",
-        27302
-    ],
-    [
-        "with",
-        23876
-    ],
-    [
-        "not",
-        22224
-    ],
-    [
-        "had",
-        19539
-    ],
-    [
-        "is",
-        19482
-    ],
-    [
-        "her",
-        18976
-    ],
-    [
-        "but",
-        18932
-    ],
-    [
-        "you",
-        18272
-    ],
-   

**Conclusiones**:

El algoritmo MapReduce con threads se ejecuta aproximadamente en el mismo tiempo que la versión secuencial. La razón principal por la que no vemos una gran aceleración, si es que vemos alguna, se debe a la arquitectura de Python. Python limita por defecto la ejecución de un thread simultáneamente.

La forma de paralelizar correctamente sin limitaciones sería con la librería de multiprocessing, lo cual no se puede ejecutar desde el notebook.

Python utiliza el Global Interpreter Lock (GIL), que es un mecanismo que garantiza que solo un hilo ejecute código de Python al mismo tiempo en un proceso de Python. Esto significa que, aunque se utilicen múltiples hilos para ejecutar operaciones en paralelo, solo un hilo puede ejecutar código de Python en un momento dado. Como resultado, los hilos no pueden aprovechar completamente los sistemas con múltiples núcleos de CPU para realizar cálculos en paralelo.

Debido a esta limitación del GIL, los beneficios de utilizar hilos para paralelizar operaciones en Python pueden ser limitados, especialmente en operaciones intensivas en CPU donde se necesita realizar un trabajo significativo en código de Python. En tales casos, otras soluciones como el procesamiento paralelo basado en procesos (por ejemplo, utilizando la biblioteca multiprocessing).