In [19]:
from functools import reduce
from operator import concat
from datetime import date
from random import choice, sample, randint
from faker import Faker
faker = Faker(locale='es_MX')
date_format = '%Y-%m-%d'

In [139]:
# (ID_JUGADOR, NOMBRE, NACIMIENTO, PAIS)
# (ID_PARTIDO, ID_JUGADOR, TIEMPO, DISTANCIA, GOLES)
# (ID_PARTIDO, ANO, SEMESTRE, FECHA, LOCAL, VISITANTE)

CANTIDAD_JUGADORES = 40
CANTIDAD_PARTIDOS = 20
JUGADORES_POR_PARTIDO = 22
CANTIDAD_EQUIPOS = 12
EQUIPOS = [faker.city() for equipo in range(CANTIDAD_EQUIPOS)]

def jugador_random(id_jugador):
    """
        Devuelve un jugador random con el id dado.
    """
    fecha_nacimiento = faker.date(pattern=date_format, end_datetime=date(1999,12,31))
    paises = ('ARGENTINA', 'BRASIL', 'PARAGUAY', 'CHILE')
    return (
        id_jugador,
        faker.name(),
        fecha_nacimiento,
        choice(paises)
    )

def estadisticas_partido(id_partido):
    """
        Devuelve una lista con JUGADORES_POR_PARTIDO estadisticas
        random para el id_partido dado.
    """
    jugadores = sample(range(CANTIDAD_JUGADORES), JUGADORES_POR_PARTIDO)
    return [
        (id_partido, jugador, randint(1,90), randint(10,500), randint(0,3) ) for jugador in jugadores
    ]

def partido_random(id_partido):
    """
        Devuelve un partido random con el id dado.
    """
    local, visitante = tuple(sample(EQUIPOS, 2))
    return (
        id_partido,
        str(faker.date_between('-5y').year),
        str(randint(1,2)),
        randint(1,12),
        local,
        visitante
    )

data_jugadores = [jugador_random(id_jugador) for id_jugador in range(CANTIDAD_JUGADORES)]
data_estadisticas = reduce(concat, [estadisticas_partido(partido) for partido in range(CANTIDAD_PARTIDOS)] , [])
data_partidos = [partido_random(id_partido) for id_partido in range(CANTIDAD_PARTIDOS)]

In [143]:
# (ID_JUGADOR, NOMBRE, NACIMIENTO, PAIS)
jugadores = sc.parallelize(data_jugadores)
# (ID_PARTIDO, ID_JUGADOR, TIEMPO, DISTANCIA, GOLES)
estadisticas = sc.parallelize(data_estadisticas)
# (ID_PARTIDO, ANO, SEMESTRE, FECHA, LOCAL, VISITANTE)
partidos = sc.parallelize(data_partidos)

jugadores = jugadores.filter(lambda x: x[-1]=='ARGENTINA')\
    .map(lambda x: (x[0], x[1]))
# argentinos, (ID, NOMBRE)
estadisticas = estadisticas.map(lambda x: (x[1], (x[0],x[2])))
# (ID_JUGADOR, (ID_PARTIDO, TIEMPO))
estadisticas_argentinos = estadisticas.join(jugadores)
# (ID_JUGADOR, ((ID_PARTIDO, TIEMPO), NOMBRE))
estadisticas_argentinos = estadisticas_argentinos.map(lambda x: (x[1][0][0], (x[0],x[1][1], x[1][0][1])))
# (ID_PARTIDO, (ID_JUGADOR, NOMBRE, TIEMPO))
partidos = partidos.map(lambda x: (x[0], (x[1],x[2])))
# (ID_PARTIDO, (ANO,SEMESTRE))
estadisticas_argentinos = estadisticas_argentinos.join(partidos)
# (ID_PARTIDO, ((ID_JUGADOR, NOMBRE, TIEMPO_PARTIDO), (ANO,SEMESTRE))
estadisticas_argentinos = estadisticas_argentinos.map(lambda x: ((x[1][0][0],x[1][1][0],x[1][1][1]),(x[1][0][1],x[1][0][2])))
# ((ID_JUGADOR, ANO, SEMESTRE), (NOMBRE, TIEMPO_PARTIDO))
tiempos_jugador_semestre = estadisticas_argentinos.reduceByKey(lambda x,y: (x[0], x[1]+y[1]))
# ((ID_JUGADOR, ANO, SEMESTRE), (NOMBRE, TIEMPO_SEMESTRE))
tiempos_jugador_semestre = tiempos_jugador_semestre.sortByKey().map(lambda x: (x[0][0], (x[0][1],x[0][2],*x[1], False)))
# (ID_JUGADOR, (ANO, SEMESTRE, NOMBRE, TIEMPO_SEMESTRE, False))
def es_consecutivo(x_ano, x_sem, y_ano, y_sem):
    return x_ano == y_ano and x_sem < y_sem or y_ano > x_ano and y_sem < x_sem 
    
def reducer(x,y):
    x_ano, x_sem, x_nombre, x_tiempo, x_aumento = x
    if x_aumento:
        return x
    y_ano, y_sem, y_nombre, y_tiempo, y_aumento = y
    if not es_consecutivo(x_ano, x_sem, y_ano, y_sem):
        return y
    if (y_tiempo/x_tiempo) < 1.5:
        return y
    return (y_ano, y_sem, y_nombre, y_tiempo, True)
superaron = tiempos_jugador_semestre.reduceByKey(reducer)\
    .filter(lambda x: x[1][-1])\
    .map(lambda x: x[1][2])