# RDDs + Programación Funcional

# Creamos un contexo para crear RDDs

In [None]:
import pandas as pd
!pip install pyspark --quiet
from pyspark import SparkContext
sc = SparkContext(master = "local", appName = "Transformaciones sobre un RDD")

# Cargamos un RDDs

Para la realzación del ejercicio, con ayuda del archivo paises, realizaremos el equivalente de operaciones 'select','count','group by' y 'filter / where'

Cambia el valor de la ruta para que apunte a la ruta donde tienes los datos

In [None]:
from google.colab import drive
drive.mount('/content/drive/')
#LEER CSV
equiposOlimpicosRDD = sc.textFile("/content/paises.csv").map(lambda line : line.split(","))

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:
equiposOlimpicosRDD.take(10)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR']]

### Deshacemos un RDD

Con ayuda del método `collect()`, permite desparalelizar un RDD.


In [None]:
equiposOlimpicosRDD.collect()

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR'],
 ['10', 'Alcyon-6', 'FRA'],
 ['11', 'Alcyon-7', 'FRA'],
 ['12', 'Aldebaran', 'ITA'],
 ['13', 'Aldebaran II', 'ITA'],
 ['14', 'Aletta', 'IRL'],
 ['15', 'Algeria', 'ALG'],
 ['16', 'Ali-Baba II', 'SWE'],
 ['17', 'Ali-Baba IV', 'SUI'],
 ['18', 'Ali-Baba IX', 'SUI'],
 ['19', 'Ali-Baba VI', 'SUI'],
 ['20', 'Allegro', 'FRA'],
 ['21', 'Almaz', 'URS'],
 ['22', 'Aloha II', 'SWE'],
 ['23', 'Amateur Athletic Association', 'AUS'],
 ['24', 'American Samoa', 'ASA'],
 ['25', 'Amolgavar', 'ESP'],
 ['26', 'Amstel Amsterdam', 'NED'],
 ['27', 'Amulet-3', 'FRA'],
 ['28', 'Amulet-7', 'FRA'],
 ['29', 'Ancora', 'GBR'],
 ['30', 'Andorinha', 'BRA'],
 ['31', 'Andorra', 'AND'],
 ['32', 'Andromeda', 'GBR'],
 ['33', 'Angelita', 'USA'

Creamos un dataframe a partir de un RDD mediante `pd.DataFrame()`

In [None]:
pd.DataFrame(equiposOlimpicosRDD.collect()[1:], columns = equiposOlimpicosRDD.collect()[0] )

Unnamed: 0,id,equipo,sigla
0,1,30. Februar,AUT
1,2,A North American Team,MEX
2,3,Acipactli,MEX
3,4,Acturus,ARG
4,5,Afghanistan,AFG
...,...,...,...
1179,1180,Yugoslavia-2,YUG
1180,1181,Zambia,ZAM
1181,1182,Zefyros,GRE
1182,1183,Zimbabwe,ZIM


# Operaciones sobre un RDDs

Para la realzación del ejercicio, con ayuda del archivo paises, realizaremos el equivalente de operaciones `select`,`count`,`group by` , `filter` y `where`

### Ejercicio 1:
**Mostrar los países sin repetidos**

*   Seleccionar la columna de las siglas PISTA: Usa un map y quedate con la columna con indice 2.
*   Usamos el `distinct()`



In [None]:
#Inserta aquí tu código
columna_siglas = equiposOlimpicosRDD.map(lambda fila: fila[2])
# We have to eliminate repetitions
columnas_siglas_sin_repetidos = columna_siglas.distinct()
#columnas_siglas_sin_repetidos.collect()
columnas_siglas_sin_repetidos.take(5)

['sigla', 'AUT', 'MEX', 'ARG', 'AFG']

### Ejercicio 2
**Contar la cantidad de siglas diferentes de los equipos olímpicos existentes**

PISTA: Puedes usar `count()`

In [None]:
#Inserta aquí tu código
count_unique_siglas = columnas_siglas_sin_repetidos.count()
print(count_unique_siglas)


231


### Ejercicio 3 (Díficil, puedes hacerlo el último)
**Agrupamos datos para poder determinar cuantos equipos posee un pais**

Recuerda que los RDDs poseen una estructura de `clave-valor`, por lo cual debemos poner primero el valor 'clave' (la sigla del país).

Con 'mapValues', al componente 'valor' le indicamos que operación deseamos que se le aplique.

**Objetivo 1**: Tener un diccionario con `clave` la sigla del país y `valor` la lista de equipos de ese país

Con `groupByKey` podemos agrupar por `clave`, en este caso por sigla del país  

In [None]:
#Inserta aquí tu código
#agrupando por pais = count (como SQL)
equiposOlimpicosRDD.map( lambda fila : (fila[2],fila[1]) ).groupByKey().mapValues(list).take(5)
#se puede encontrar hecho y hay que hacer razonamiento
#se busca que seamos capaces de leerlo

[('sigla', ['equipo']),
 ('AUT',
  ['30. Februar',
   'Austria',
   'Austria-1',
   'Austria-2',
   'Breslau',
   'Brigantia',
   'Donar III',
   'Evita VI',
   'May-Be 1960',
   '"R.-V. Germania; Leitmeritz"',
   'Surprise']),
 ('MEX',
  ['A North American Team',
   'Acipactli',
   'Chamukina',
   'Mexico',
   'Mexico-1',
   'Mexico-2',
   'Nausikaa 4',
   'Tlaloc',
   'Xolotl']),
 ('ARG',
  ['Acturus',
   'Antares',
   'Arcturus',
   'Ardilla',
   'Argentina',
   'Argentina-1',
   'Argentina-2',
   'Blue Red',
   'Covunco III',
   'Cupidon III',
   'Djinn',
   'Gullvinge',
   'Matrero II',
   'Mizar',
   'Pampero',
   'Rampage',
   'Tango',
   'Wiking']),
 ('AFG', ['Afghanistan'])]

**Objetivo 2**: Tener un diccionario con `clave` la sigla del país y `valor` la cantidad de equipos de ese país

In [None]:
#Inserta aquí tu código


### Operación filter para obtener un subconjunto

Con el método 'filter', reducimos el conjuntos de equipos.

Nos quedamos con la sigla equivalente de argentina

In [None]:
equiposArgentinos = equiposOlimpicosRDD.filter(lambda l : "ARG" in l)
equiposArgentinos.collect()

[['4', 'Acturus', 'ARG'],
 ['37', 'Antares', 'ARG'],
 ['42', 'Arcturus', 'ARG'],
 ['43', 'Ardilla', 'ARG'],
 ['45', 'Argentina', 'ARG'],
 ['46', 'Argentina-1', 'ARG'],
 ['47', 'Argentina-2', 'ARG'],
 ['119', 'Blue Red', 'ARG'],
 ['238', 'Covunco III', 'ARG'],
 ['252', 'Cupidon III', 'ARG'],
 ['288', 'Djinn', 'ARG'],
 ['436', 'Gullvinge', 'ARG'],
 ['644', 'Matrero II', 'ARG'],
 ['672', 'Mizar', 'ARG'],
 ['774', 'Pampero', 'ARG'],
 ['843', 'Rampage', 'ARG'],
 ['1031', 'Tango', 'ARG'],
 ['1162', 'Wiking', 'ARG']]

# Accciones sobre RDDs

In [None]:
deportistaOlimpicoRDD = sc.textFile("/content/deportista.csv").map(lambda line : line.split(","))
deportistaOlimpico2RDD = sc.textFile("/content/deportista2.csv").map(lambda line : line.split(","))

In [None]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpico2RDD)

### Formas de visualizar datos de un RDDs

La operación 'take' nos devuelve 'N' valores que encuentre spark.

La operación 'top', previo ordena respecto al valor llave y nos devuelve 'N' valores.

La operacion 'takeSample', nos devuelve una muestra aleatoria de los valores, Observa que recibe tres parametros

| Orden | Argumento | Descripción | Valor
|-------|--------|-----|--------|
|1|withReplacement|Indica si la muetra podrá traer replicados|Bool|
|2| num| Cantidad de valores a retornar|int|
|3|seed|semilla para el generador aleatorio|int|

Nota: Si encuentras complicado leer el código en los segmentos donde usamos indices en las listas, ejecutalo por partes para que visualizes que componentes seleccionamos

Nota: Para hacer join con RDDs, debemos tener selecionada la llave al inicio del RDD para poder realizar el cruce.

In [None]:
deportistaOlimpicoRDD.take(3)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199']]

In [None]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).take(3)

[['equipo_id',
  ['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso']],
 ['199', ['1', 'A Dijiang', '1', '24', '180', '80']],
 ['199', ['2', 'A Lamusi', '1', '23', '170', '60']]]

In [None]:
equiposOlimpicosRDD.map(lambda x : [x[0],x[2]]).take(3)

[['id', 'sigla'], ['1', 'AUT'], ['2', 'MEX']]

In [None]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).take(6)

[('199', (['1', 'A Dijiang', '1', '24', '180', '80'], 'CHN')),
 ('199', (['2', 'A Lamusi', '1', '23', '170', '60'], 'CHN')),
 ('199', (['602', 'Abudoureheman', '1', '22', '182', '75'], 'CHN')),
 ('199', (['1463', 'Ai Linuer', '1', '25', '160', '62'], 'CHN')),
 ('199', (['1464', 'Ai Yanhan', '2', '14', '168', '54'], 'CHN')),
 ('199', (['3605', 'An Weijiang', '1', '22', '178', '72'], 'CHN'))]

In [None]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).top(5)

[('999', (['92679', 'Trygve Bjarne Pedersen', '1', '35', '0', '0'], 'NOR')),
 ('999', (['1144', 'Henrik Agersborg', '1', '47', '0', '0'], 'NOR')),
 ('999', (['10765', 'Einar Berntsen', '1', '28', '0', '0'], 'NOR')),
 ('998',
  (['111659', 'G. Bernard Bernie Skinner', '1', '34', '182', '82'], 'CAN')),
 ('996', (['116030', 'Edward Eddy Stutterheim', '1', '39', '0', '0'], 'NED'))]

In [None]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).takeSample(False,4,10)

[('93',
  (['76502', 'Yelena Viktorovna Matoshko', '2', '30', '177', '80'], 'BLR')),
 ('487', (['110997', 'Ajit Singh', '1', '23', '185', '73'], 'IND')),
 ('249',
  (['41654', 'Toms Pedro Gonzlez Barrios', '1', '21', '178', '75'], 'CUB')),
 ('259', (['7157', 'Jlius Bal', '1', '22', '0', '0'], 'TCH'))]

### Ejercicio 4
**Muestra 17 filas del RDD deportistaOlimpicoRDD**


In [None]:
#Inserta aquí tu código
deportistaOlimpicoRDD.take(18)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705'],
 ['6', 'Per Knut Aaland', '1', '31', '188', '75', '1096'],
 ['7', 'John Aalberg', '1', '31', '183', '72', '1096'],
 ['8', 'Cornelia Cor Aalten Strannood ', '2', '18', '168', '0', '705'],
 ['9', 'Antti Sami Aalto', '1', '26', '186', '96', '350'],
 ['10', 'Einar Ferdinand Einari Aalto', '1', '26', '0', '0', '350'],
 ['11', 'Jorma Ilmari Aalto', '1', '22', '182', '76.5', '350'],
 ['12', 'Jyri Tapani Aalto', '1', '31', '172', '70', '350'],
 ['13', 'Minna Maarit Aalto', '2', '30', '159', '55.5', '350'],
 ['14', 'Pirjo Hannele Aalto Mattila ', '2', '32', '171', '65', '350'],
 ['15', 'Arvo Ossian Aaltonen', '1', '22', '0', '0', '35

## Importancia de countAprox

Debido a la cantidad de datos no siempre es recomendable hacer operaciones tipo count.

Por lo que 'countAprox' es la solución mas viable cuando solo queremos darnos una idea de cuantos datos podemos leer durante un tiempo determinado. Nota: el parametro está en milisegundos

In [None]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                            .map(lambda x : [x[0],x[2]])).count()

135427

In [None]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).countApprox(20)

135427

# Acciones de modificacion

### Obtenemos el equipo y el deportista

Guardamos el RDD resultante en equipoDeportista

In [None]:
equipoDeportista = deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                                            .map(lambda x : [x[0],x[2]]))

In [None]:
equipoDeportista.take(1)


[('199', (['1', 'A Dijiang', '1', '24', '180', '80'], 'CHN'))]

In [None]:
equipoDeportista.map(lambda x : (x[1][0][0],x[1][0][1:],x[1][1]) )

PythonRDD[73] at RDD at PythonRDD.scala:53

### Carga de resultados

Esta tabla posee las medallas que los jugadores han ganado

In [None]:
resultado = sc.textFile("/content/resultados.csv").map(lambda line : line.split(","))

Eliminamos todas las medallas no ganadoras y nos quedamos con el valor de la medalla y  del deportista_id

In [None]:
resultadoGanador = resultado.filter(lambda l : 'NA' not in l[1])
resultadoGanador = resultadoGanador.map(lambda l : [l[2],l[1]])

In [None]:
resultadoGanador.take(15)

[['deportista_id', 'medalla'],
 ['4', 'Gold'],
 ['15', 'Bronze'],
 ['15', 'Bronze'],
 ['16', 'Bronze'],
 ['17', 'Bronze'],
 ['17', 'Gold'],
 ['17', 'Gold'],
 ['17', 'Gold'],
 ['17', 'Bronze'],
 ['20', 'Gold'],
 ['20', 'Bronze'],
 ['20', 'Silver'],
 ['20', 'Bronze'],
 ['20', 'Silver']]

### Obtenemos la relación buscada: deportista,pais y medalla.

In [None]:
jugadoresMedalla =  equipoDeportista.join(resultadoGanador)
jugadoresMedalla.take(1)

[('716',
  ((['553', 'John Charles Abrams', '1', '22', '183', '0'], 'NZL'), 'Gold'))]

### Agrupamos las medallas respecto a la sigla del pais jugador

In [None]:
d = {'Gold':7, 'Silver':5, 'Bronze':4}
paisesMedallas = jugadoresMedalla.map(lambda x : (x[1][0][-1],d[x[1][1]]) )
paisesMedallas.takeSample(False,10)

[('MAS', 7),
 ('AUS', 5),
 ('ITA', 5),
 ('ITA', 5),
 ('GER', 5),
 ('NED', 5),
 ('NED', 5),
 ('ALG', 4),
 ('NED', 7),
 ('NED', 7)]

### Obtenemos los valores de los puntuajes históricos de los paises jugadores

In [None]:
from operator import add
conclusion = paisesMedallas.reduceByKey((add)).sortBy(lambda x : x[1],ascending=False)
conclusion.take(10)

[('ITA', 74920),
 ('NED', 65560),
 ('GER', 22323),
 ('JPN', 19950),
 ('NZL', 9220),
 ('TCH', 8160),
 ('BLR', 5012),
 ('RSA', 4735),
 ('TUR', 3965),
 ('GHA', 3430)]

# Estadística básica sobre los RDDs

Spark posee una suite integrada de forma natural para poder obtener estadísticas básicas.

In [None]:
conclusion.map(lambda l : l[1]).stats()

(count: 65, mean: 3735.6153846153843, stdev: 12505.143665022015, max: 74920.0, min: 5.0)

In [None]:
conclusion.map(lambda l : l [1]).mean()

3735.6153846153843

In [None]:
conclusion.map(lambda l : l [1]).sum()

242815

In [None]:
conclusion.map(lambda l : l [1]).histogram(10)

([5.0,
  7496.5,
  14988.0,
  22479.5,
  29971.0,
  37462.5,
  44954.0,
  52445.5,
  59937.0,
  67428.5,
  74920],
 [59, 2, 2, 0, 0, 0, 0, 0, 1, 1])

## Ejercicios Programación funcional

### Ejercicio 5
**Escribir una función que aplique el IVA a un precio.**
* Nombre de la función: apply_iva
* Argumentos: precio, iva
* Resultado: precio con iva aplicado


* Prueba a llamar a la función


In [None]:
#Inserta aquí tu código
#This needs some reasoning skills; and so we have to take a generic situations
#If im paying 150eur for a product, thats the total cost
#what part of the price is IVA and base imponible? IVA 21% and the 21% is on top of the 100% value of the product

# Make a generic situation and make a code that can be used for whenever we need to
def apply_iva(base_imponible):
 return base_imponible * 121 / 100



### Ejercicio 6
** Escribe una función que aplica la función cuadrado() a todos los elementos de una lista.**

    Parámetros:
        funcion: Recibe la función a aplicar.
        lista: Es una lista con valores que se pasarán como argumentos a funcion.
    Devuelve:
        Una lista con el resultado de aplicar la función a los valores de la lista.

In [None]:
def cuadrado(n):
    return n ** 2

In [None]:
#Inserta aquí tu código
#This code is essentially doing a map and making a transformation in question

lista = [1,2,3,4,5,]
#To be able to save the list:
resultado = []

#We've created a new function
def ejercicios6(lista):
  resultado = []
#make a loop over list SUPER NECESSARY
  for numero in lista:
    resultado.append(numero ** 2)
    #resultado.append(cuadrado (numero))
  return resultado # to be able to save the list

#print (resultado)


In [None]:
def multx3(numero)
  return numero * 3

In [None]:
def ejercicios6(lista,funcion):
  resultado = []
  for numero in lista:
    resultado.append(funcion(numero))
  return resultado # to be able to save the list

In [None]:
ejercicio6([1,2,3,4,5,6], multx3)

In [None]:
#list(map(multx3,[1,2,3,4,5,6]))
list(map( lambda numero : numero * 3 , [1,2,3,4,5,6]))

In [None]:
rdd_lista = sc.parallelize([1,2,3,4,5,6])
rdd_lista.map (lambda numero : numero * 3 ).collect()

In [None]:
def cuadrado(n):
    return n * n

print(aplica_funcion_lista(cuadrado, [1, 2, 3, 4]))

### Ejercicio 7
**Detectar y corregir los errores del siguiente programa que aplica el iva a una factura:**


In [None]:
#Debugging:
#We write quickly what we believe to be the solution and then we fix the errors
#callback: palindrome

def aplica_iva(base, iva = 21):
    base = base * ( (iva+100) / 100 )
    return base

#one of 2 things:


base = float(input('Introduce la base imponible de la factura: '))
iva = int(input('Introduce la IVA de la factura: '))
print(aplica_iva(base, iva))

#Notebook: types of errors and what needs to be done
#initially logical error

Introduce la base imponible de la factura: 50
Introduce la IVA de la factura: 21
60.5
