<a href="https://colab.research.google.com/github/nadienuncanada/BigData/blob/main/BigData_TP2_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# BigData 2024 - Spark

## Inicializacion

In [None]:
from google.colab import drive
drive.mount('/content/drive')
ruta = '/content/drive/MyDrive/BigData'
GENERALinputPath= ruta + '/input'
ruta_heroes = GENERALinputPath + '/Heroes/Heroes.txt'

# Instalamos Spark para Python
!pip install pyspark

import os

# Instalamos Java SDK 8
!apt-get install -y openjdk-8-jdk -qq > /dev/null
!echo $(/usr/libexec/java_home -v 1.8)

#set environment variable
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!echo 2 | update-alternatives --config java

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
sc = SparkContext("local", "My program")
sqlContext = SQLContext(sc)

### Datos del enunciado


**Pautas generales**

• La entrega consiste en la implementación de un script con Spark, resolviendo todas las consignas presentes en este enunciado. Se deberá entregar el código fuente implementado y un documento con la comparación solicitada.

• Los alumnos pueden conformar grupo de no más de dos integrantes y hacer una única
entrega grupal.

• La entrega se realiza por la mensajería del curso en IDEAS.

• La fecha límite de entrega es el 25 de noviembre de 2024.

**Aclaración**

Suponga que la cantidad de jugadores, de misiones y de héroes es Big Data.
En cada job planteado en la solución piense si una función combiner contribuye o no para la
optimización del job. En caso de contribuir implemente dicha función

**Enunciado**

Una empresa ha desarrollado un videojuego online donde millones de jugadores se conectan a
diario para realizar diferentes misiones. Cada jugador elije una misión en la cual participar y de
su participación obtiene un puntaje y el tiempo en segundo que duró la misión. El jugador para
participar de una misión debe elegir cinco héroes de un conjunto de héroes disponibles. Por
cada participación, los servidores del juego almacenan la siguiente información:


• ID_Jugador

• ID_Misión

• ID_Héroe_1

• ID_Héroe_2

• ID_Héroe_3

• ID_Héroe_4

• ID_Héroe_5

• Puntaje obtenido

• Tiempo de la misión en segundos

Donde los cinco ID de los héroes son IDs de un conjunto de héroes que el jugador tiene para
elegir y no guardan relación alguna entre ellos.
Todos los jugadores pueden participar en la misión que deseen cuántas veces deseen y por cada
misión obtienen un puntaje y también se registra el tiempo que duró la misión. Un jugador
puede jugar todas las veces que quiera.
Cada vez que un jugador decide jugar una misión, debe elegir cinco héroes, los cuales pueden
ser totalmente distintos de una partida a la siguiente, aún jugando en la misma misión.
Por ejemplo, el jugador con ID 231 juega a la misión 492 con los héroes 42, 901, 72, 831 y 4.
Luego el mismo jugador puede jugar a la misma misión con otros héroes o los mismos, o repetir
solo algunos de ellos.

Semanalmente los administradores del juego quieren obtener algunas estadísticas para
regalarle premios a los mejores jugadores y hacer mejoras al juego según la preferencia de los
jugadores.


Basado en el problema y dataset presentados en el TP1, implemente una solución en Spark a las
siguientes consignas:

**1) La misión más jugada (independientemente de la cantidad de jugadores que la jugaron, ni cuántas veces la jugó cada uno).**

**2) El héroe más elegido en cada una de las misiones (el primer puesto podría ser
compartido por más de un héroe, de hecho, todos los héroes podrían haber participado de la misión la misma cantidad de veces).**

**3) Todos los jugadores que usaron más de H héroes distintos (H es un parámetro de la consulta).**

**4) Todos los jugadores que participaron al menos N veces de la misión más jugada y el puntaje total del jugador en esa misión sea mayor a P. La misión más jugada es aquella en la que participaron más jugadores distintos, aunque sea una vez (N y P son parámetros de la consulta). Suponga que la misión más jugada es única (no hay empate en el primer puesto).**

**5) Calcular para cada héroe su índice P. Un héroe tiene índice P si al menos P jugadores distintos lo eligieron para jugar P misiones distintas.**
Ejemplos:

Un héroe fue elegido para jugar 4 misiones distintas, al héroe lo eligieron 4 jugadores distintos. Su índice P es 4.

Un héroe fue elegido para jugar 3 misiones distintas, pero lo eligieron 7 jugadores distintos. Su índice P es 3.

Un héroe fue elegido para jugar 5 misiones distintas, pero lo eligieron 2 jugadores distintos. Su índice P es 2.

Con lo cual, el índice P de un héroe puede calcularse como MIN(Mh, Jh), dónde Mh es la cantidad de misiones distintas en que el héroe fue elegido (no importa por que jugadores) y Jh es la cantidad de jugadores que eligieron al héroe (no importa en cuántas misiones).

In [None]:
heroesTXT = sc.textFile(ruta_heroes)

heroes = heroesTXT.map(lambda t : t.split("\t")) \
                    .map(lambda t: Row( id_jugador = int(t[0]),
                        id_mision = int(t[1]),
                        id_heroe_1 = int(t[2]),
                        id_heroe_2 = int(t[3]),
                        id_heroe_3 = int(t[4]),
                        id_heroe_4 = int(t[5]),
                        id_heroe_5 = int(t[6]),
                        puntaje = int(t[7]) ,
                        tiempo = int(t[8])
                  ) )

heroesDF = sqlContext.createDataFrame(heroes)
heroesDF.registerTempTable("Heroe")

flat_heroes = heroesTXT.map(lambda t : t.split("\t")) \
                  .flatMap(lambda t: [(int(t[0]), int(t[1]), int(hero), int(t[7]), int(t[8]))
                                      for hero in [int(t[2]), int(t[3]), int(t[4]), int(t[5]), int(t[6])]]) \
                    .map(lambda t: Row( id_jugador = int(t[0]),
                        id_mision = int(t[1]),
                        id_heroe = int(t[2]),
                        puntaje = int(t[3]) ,
                        tiempo = int(t[4])
                  ) )

flat_heroesDF = sqlContext.createDataFrame(flat_heroes)
flat_heroesDF.registerTempTable("FlatHeroe")




### EJ1
La misión más jugada (independientemente de la cantidad de jugadores que la jugaron,
ni cuántas veces la jugó cada uno).

In [None]:
result = sqlContext.sql(
    "SELECT id_mision, count(*) as total_jugadas \
     FROM Heroe \
     GROUP BY id_mision \
     ORDER BY total_jugadas desc"
)
result.first()

### EJ2
El héroe más elegido en cada una de las misiones (el primer puesto podría ser compartido por más de un héroe, de hecho, todos los héroes podrían haber participado de la misión la misma cantidad de veces).

In [None]:
cantidad_repeticiones_heroe_por_mision = sqlContext.sql(
    "SELECT id_mision, id_heroe, count(*) as cantidad_repeticiones  \
     FROM FlatHeroe \
     GROUP BY id_mision, id_heroe "
)

cantidad_repeticiones_heroe_por_mision.registerTempTable("CantRepeticionHeroePorMision")

max_heroe_por_mision = sqlContext.sql(
    "SELECT a.id_mision, a.id_heroe, a.cantidad_repeticiones \
     FROM CantRepeticionHeroePorMision a \
     JOIN ( \
         SELECT id_mision, MAX(cantidad_repeticiones) as max_repeticiones \
         FROM CantRepeticionHeroePorMision \
         GROUP BY id_mision \
     ) b ON a.id_mision = b.id_mision AND a.cantidad_repeticiones = b.max_repeticiones \
     ORDER BY a.id_mision asc, a.id_heroe asc"
)

max_heroe_por_mision.collect()

### EJ3
Todos los jugadores que usaron más de H héroes distintos (H es un parámetro de la consulta).

In [None]:
H = 20

result = sqlContext.sql(
    f"SELECT id_jugador, count(distinct(id_heroe)) as cantidad_heroes \
     FROM FlatHeroe \
     GROUP BY id_jugador \
     HAVING cantidad_heroes > {H} "
)

result.collect()

### EJ4
Todos los jugadores que participaron al menos N veces de la misión más jugada y el puntaje total del jugador en esa misión sea mayor a P. La misión más jugada es aquella en la que participaron más jugadores distintos, aunque sea una vez (N y P son parámetros de la consulta). Suponga que la misión más jugada es única (no hay empate en el primer puesto).

In [None]:
H = 20

result = sqlContext.sql(
    f"SELECT id_mision, count(distinct(id_jugador)) as cantidad_jugadores \
     FROM Heroe \
     GROUP BY id_mision \
     ORDER BY cantidad_jugadores desc, id_mision asc"
)

mision_mas_jugada = result.first()

print(mision_mas_jugada)

In [None]:
N = 1
P = 5000
mision_mas_jugada_id = mision_mas_jugada.id_mision

result = sqlContext.sql(
    f"SELECT id_jugador, count(*) as cantidad_jugadas, sum(puntaje) as puntaje_total \
     FROM Heroe \
     WHERE id_mision = {mision_mas_jugada_id} \
     GROUP BY id_jugador \
     HAVING cantidad_jugadas >= {N} AND puntaje_total > {P} \
     ORDER BY id_jugador"
)

res = result.collect()
res

### EJ5
Calcular para cada héroe su índice P. Un héroe tiene índice P si al menos P jugadores distintos lo eligieron para jugar P misiones distintas. Ejemplos:

Un héroe fue elegido para jugar 4 misiones distintas, al héroe lo eligieron 4 jugadores distintos. Su índice P es 4.

Un héroe fue elegido para jugar 3 misiones distintas, pero lo eligieron 7 jugadores distintos. Su índice P es 3.

Un héroe fue elegido para jugar 5 misiones distintas, pero lo eligieron 2 jugadores distintos. Su índice P es 2.

Con lo cual, el índice P de un héroe puede calcularse como MIN(Mh, Jh), dónde Mh es la cantidad de misiones distintas en que el héroe fue elegido (no importa por que jugadores) y Jh es la cantidad de jugadores que eligieron al héroe (no importa en cuántas misiones).

In [None]:
cantidad_misiones = sqlContext.sql(
    f"SELECT id_heroe, count(distinct(id_mision)) as cantidad_misiones \
     FROM FlatHeroe \
     GROUP BY id_heroe "
)
cantidad_jugadores = sqlContext.sql(
    f"SELECT id_heroe, count(distinct(id_jugador)) as cantidad_jugadores \
     FROM FlatHeroe \
     GROUP BY id_heroe "
)
cantidad_misiones.registerTempTable("HeroeCantMisiones")
cantidad_jugadores.registerTempTable("HeroeCantJugaores")


result = sqlContext.sql(
    f"SELECT jh.id_heroe, LEAST(cantidad_misiones, cantidad_jugadores) as P \
     FROM HeroeCantMisiones as mh inner join HeroeCantJugaores as jh on mh.id_heroe = jh.id_heroe \
     ORDER BY id_heroe asc"
)

result.collect()