# Plantilla para la Tarea online BDA02

# Nombre del alumno: Victoria Jiménez Martín

En esta tarea deberás completar las celdas que están incompletas. Se muestra el resultado esperado de la ejecución. Se trata de que implementes un proceso MapReduce que produzca ese resultado. Puedes implementar el proceso MapReduce con el lenguaje y librería que prefieras (`Bash`, Python, `mrjob` ...). Los datos de entrada del proceso son meros ejemplos y el proceso que implementes debería funcionar con esos y cualquier otro fichero de entrada que tenga la misma estructura.

## 1.- Partiendo del fichero de `notas.txt`, calcula la nota más alta obtenida por cada alumno con un proceso MapReduce.

Es decir, que si tenemos el fichero de notas:

In [1]:
%%writefile notas.txt
pedro 6 7
luis 0 4
ana 7
pedro 8 1 3
ana 5 6 7
ana 10
luis 3

Overwriting notas.txt


Se espera obtener el siguiente resultado:

![solución 1](./img/1.png)

In [14]:
%%writefile notas.py

from mrjob.job import MRJob

class MaxGrades(MRJob):
    # Divide cada línea por espacios, signando el primer elemento a student y el resto a grades.
    # Luego, envía cada estudiante con su nota más alta.
    def mapper(self, _, line):
        student, *grades = line.split()
        grades = [int(grade) for grade in grades]
        yield student, max(grades)
        
    # Para cada estudiante, toma todas las notas máximas enviadas desde el mapper y calcula la nota máxima global.
    def reducer(self, student, grades):
        yield student, max(grades)

if __name__ == '__main__':
    MaxGrades.run()

Overwriting notas.py


In [15]:
! chmod ugo+x notas.py

In [16]:
! python3 notas.py -r hadoop notas.txt

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/notas.root.20240117.173229.083758
uploading working dir files to hdfs:///user/root/tmp/mrjob/notas.root.20240117.173229.083758/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/notas.root.20240117.173229.083758/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar2317807799491329889/] [] /tmp/streamjob2711160104362418343.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.18.0.3:8032
  Connecting to ResourceManager at yarnmaster/172.18.0.3:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1705510300565_0005
  

## 2.- Usando un proceso MapReduce muestra las 10 palabras más utilizadas en `El Quijote`.

Lo primero será descargar El Quijote:

In [6]:
! wget -O '2000-0.txt' https://www.gutenberg.org/files/2000/2000-0.txt

--2024-01-07 23:29:34--  https://www.gutenberg.org/files/2000/2000-0.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47, 2610:28:3090:3000:0:bad:cafe:47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2226045 (2.1M) [text/plain]
Saving to: ‘2000-0.txt’


2024-01-07 23:29:38 (761 KB/s) - ‘2000-0.txt’ saved [2226045/2226045]



Al igual que hicimos en la primera práctica, eliminamos aquellas líneas que son metadata y no forman parte de la obra. Sobrescribimos el fichero sin esas líneas.

In [10]:
with open('2000-0.txt') as f:
    lines = f.readlines()

head = 24
tail = 360
book = lines[head:-tail]

with open('2000-0.txt', 'w') as f:
    for line in book:
        f.write(f"{line}\n")


Writing palabrasQuijote.py


El resultado debería ser el mismo que el que obtuvimos en la primera práctica.

![solución 2](./img/2.png)

In [20]:
%%writefile palabrasQuijote.py

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")

class MostUsedWords(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_top_words)
        ]

    # Separa cada línea en palabras y las envía al reducer con un conteo de 1.
    def mapper_get_words(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    # Suma todos los conteos para cada palabra.
    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)

    # Toma todas las palabras y sus conteos, los ordena y devuelve las 10 palabras más frecuentes.
    def reducer_find_top_words(self, _, word_count_pairs):
        top_words = sorted(word_count_pairs, reverse=True)[:10]
        for count, word in top_words:
            yield word, count

if __name__ == '__main__':
    MostUsedWords.run()

Overwriting palabrasQuijote.py


In [21]:
! chmod ugo+x palabrasQuijote.py

In [22]:
! python3 palabrasQuijote.py -r hadoop 2000-0.txt

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/palabrasQuijote.root.20240117.173636.260181
uploading working dir files to hdfs:///user/root/tmp/mrjob/palabrasQuijote.root.20240117.173636.260181/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/palabrasQuijote.root.20240117.173636.260181/files/
Running step 1 of 2...
  packageJobJar: [/tmp/hadoop-unjar2934571464954495916/] [] /tmp/streamjob1139812660036353079.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.18.0.3:8032
  Connecting to ResourceManager at yarnmaster/172.18.0.3:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.sta

## 3.- Muestra la clasificación de temporada 2021/2022 de La Liga pero únicamente de los puntos obtenidos como visitante.

En [esta Web](https://resultados.as.com/resultados/futbol/primera/2021_2022/clasificacion/) puedes consultar cuántos puntos obtuvo cada equipo fuera de casa.

Empezamos descargando el fichero de resultados de la temporada 2021/2022 y renombrándolo a `laliga2122.csv`.

In [8]:
! wget -O laliga2122.csv https://www.football-data.co.uk/mmz4281/2122/SP1.csv

--2024-01-07 23:47:25--  https://www.football-data.co.uk/mmz4281/2122/SP1.csv
Resolving www.football-data.co.uk (www.football-data.co.uk)... 217.160.0.246
Connecting to www.football-data.co.uk (www.football-data.co.uk)|217.160.0.246|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 172174 (168K) [text/csv]
Saving to: ‘laliga2122.csv’


2024-01-07 23:47:26 (722 KB/s) - ‘laliga2122.csv’ saved [172174/172174]



Se espera este resultado:

![solución 3](./img/3.png)

In [38]:
%%writefile clasificacion.py

from mrjob.job import MRJob
import csv

class VisitorPointsMRJob(MRJob):

    # Extrae el equipo visitante y los puntos obtenidos de cada línea (representando un partido) del archivo CSV.
    def mapper(self, _, line):
         # Convertir la línea en un diccionario usando csv.DictReader
        row = dict(zip(['Div', 'Date', 'Time','HomeTeam', 'AwayTeam', 'FTHG', 'FTAG', 'FTR'], next(csv.reader([line]))))

        visitor_team = row['AwayTeam']
        points = 0
        if row['FTR'] == 'A':
            points = 3
        elif row['FTR'] == 'D':
            points = 1

        yield visitor_team, points

    # Suma los puntos de cada equipo visitante.
    def reducer(self, team, points):
        yield team, sum(points)

if __name__ == '__main__':
    VisitorPointsMRJob.run()

Overwriting clasificacion.py


In [39]:
! chmod ugo+x clasificacion.py

In [40]:
! python3 clasificacion.py -r hadoop laliga2122.csv

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/clasificacion.root.20240117.180628.915861
uploading working dir files to hdfs:///user/root/tmp/mrjob/clasificacion.root.20240117.180628.915861/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/clasificacion.root.20240117.180628.915861/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar4181290725929563182/] [] /tmp/streamjob6530032604275923716.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.18.0.3:8032
  Connecting to ResourceManager at yarnmaster/172.18.0.3:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/j

## 4.- Muestra la diferencia de goles entre el equipo que más goles ha marcado y el que menos goles ha marcado en la temporada 2021/2022 de La Liga.

Se espera que el proceso MapReuce produzca una salida similar a la siguiente:

![solución 4](./img/4.png)

In [1]:
%%writefile goaldifference.py

from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class GoalDifferenceMRJob(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer),
            MRStep(reducer=self.reducer_find_goal_difference)
        ]

    def mapper(self, _, line):
        if line.startswith('Div'):
            return
        
        # Ignorar líneas mal formateadas o la cabecera del CSV
        try:
            row = list(csv.reader([line]))[0]
            if len(row) < 7:
                return

            home_team = row[3]
            away_team = row[4]
            home_goals = int(row[5])
            away_goals = int(row[6])

            yield home_team, home_goals
            yield away_team, away_goals

        except ValueError:
            pass  # Saltar líneas que no pueden ser procesadas

    def reducer(self, team, goals):
        total_goals = sum(goals)
        yield None, (team, total_goals)

    def reducer_find_goal_difference(self, _, team_goals):
        min_goals = float('inf')
        max_goals = 0
        team_min_goals = None
        team_max_goals = None

        for team, goals in team_goals:
            if goals < min_goals:
                min_goals = goals
                team_min_goals = team
            if goals > max_goals:
                max_goals = goals
                team_max_goals = team
        
        # Verificar si se encontraron equipos válidos
        yield "Equipo con mas goles", (team_max_goals, max_goals)
        yield "Equipo con menos goles", (team_min_goals, min_goals)
        yield "Diferencia de goles", max_goals - min_goals

if __name__ == '__main__':
    GoalDifferenceMRJob.run()

Overwriting goaldifference.py


In [2]:
! chmod ugo+x goaldifference.py

In [3]:
! python3 goaldifference.py -r hadoop laliga2122.csv

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/goaldifference.root.20240119.190711.723555
uploading working dir files to hdfs:///user/root/tmp/mrjob/goaldifference.root.20240119.190711.723555/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/goaldifference.root.20240119.190711.723555/files/
Running step 1 of 2...
  packageJobJar: [/tmp/hadoop-unjar8299926449389177728/] [] /tmp/streamjob7856076941581919361.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.18.0.4:8032
  Connecting to ResourceManager at yarnmaster/172.18.0.4:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.stagin

## 5.- Calcula la racha de los últimos cinco partidos de cada equipo en la clasificación final de La Liga en la temporada 2021/2022.

[Observa](https://www.google.com/search?q=clasificacion+liga+2021+2022&oq=clasificacion+liga+2021+2022#sie=lg) que las últimas columnas de la clasificación muestran cuál ha sido el resultado de los últimos 5 partidos de cada equipo.

![clasificacion](./img/clasificacion.png)

Se trata de que muestres la clasificación final junto con los resultados de los últimos 5 partidos. Este ejercicio es un poco más difícil y laborioso que los otros. Si usas `mrjob` probablemente te sea útil utilizar [ordenación secundaria por valor](https://mrjob.readthedocs.io/en/latest/job.html#secondary-sort), aunque también se puede resolver sin hacer uso de ella.

Se espera este resultado:

![solución 5](./img/5.png)

In [4]:
%%writefile clasificationMR.py

#!/usr/bin/python3
#!/usr/bin/env python3
from mrjob.job import MRJob
from collections import deque

class TeamPointsMRJob(MRJob):

    def mapper(self, _, line):
        # Dividir la línea y extraer los campos necesarios
        row = line.split(',')
        if row[0] == 'Div':  # Saltar la cabecera
            return
        try:
            home_team = row[3]
            away_team = row[4]
            home_goals = int(row[5])
            away_goals = int(row[6])
            result = 'H' if home_goals > away_goals else 'D' if home_goals == away_goals else 'A'

            # Emitir puntos y resultado para el equipo local y visitante
            if result == 'H':
                yield home_team, ('W', 3)
                yield away_team, ('L', 0)
            elif result == 'D':
                yield home_team, ('D', 1)
                yield away_team, ('D', 1)
            elif result == 'A':
                yield away_team, ('W', 3)
                yield home_team, ('L', 0)

        except ValueError:
            pass  # Saltar líneas mal formadas

    def reducer(self, team, values):
        total_points = 0
        last_five = deque(maxlen=5)
        for result, points in values:
            total_points += points
            # Convertir 'W', 'D', 'L' a valores numéricos y añadir a last_five
            numeric_result = 3 if result == 'W' else 1 if result == 'D' else 0
            last_five.append(numeric_result)
        yield team, (total_points, list(last_five))

if __name__ == '__main__':
    TeamPointsMRJob.run()


Overwriting clasificationMR.py


In [5]:
! chmod ugo+x clasificationMR.py

In [7]:
! python3 clasificationMR.py -r hadoop laliga2122.csv

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/clasificationMR.root.20240119.190845.765610
uploading working dir files to hdfs:///user/root/tmp/mrjob/clasificationMR.root.20240119.190845.765610/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/clasificationMR.root.20240119.190845.765610/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar8071806759620557945/] [] /tmp/streamjob6371129225352331386.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.18.0.4:8032
  Connecting to ResourceManager at yarnmaster/172.18.0.4:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.sta