# RDDs

# Creamos un contexo para crear RDDs

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext(master = "local", appName = "Transformaciones sobre un RDD") 

In [2]:
sc

In [114]:
# sc.stop()

# Cargamos un RDDs

Para la realzación del ejercicio, con ayuda del archivo paises, realizaremos el equivalente de operaciones 'select','count','group by' y 'filter / where'

Cambia el valor de la ruta para que apunte a la ruta donde tienes los datos

In [3]:
path = "files/"
equiposOlimpicosRDD = sc.textFile(path+"paises.csv")\
                        .map(lambda line : line.split(","))

In [4]:
equiposOlimpicosRDD.take(10)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR']]

### Determinamos la cantidad de siglas para los equipos olímpicos existentes

Con ayuda del método 'take', podrás identificar que en la ultima posisión tenemos las siguas para cada pais/equipo. 

Con esto obtenremos la cantidad precisa de equipos jugadores en los olimpicos

In [24]:
equiposOlimpicosRDD.map(lambda x : (x[2])).distinct().count()

231

### Agrupamos datos para poder determinar cuantos equipos posee un pais

Recuerda que los RDDs poseen una estructura de llave-valor, por lo cual debemos poner primero el valor 'llave'.

Con 'mapValues', al componente 'valor' le indicamos que operación deseamos que se le aplique.

In [31]:
equiposOlimpicosRDD.map(lambda x : (x[2], x[1] ))\
                    .groupByKey() \
                    .mapValues(list).take(2)

[('sigla', ['equipo']),
 ('AUT',
  ['30. Februar',
   'Austria',
   'Austria-1',
   'Austria-2',
   'Breslau',
   'Brigantia',
   'Donar III',
   'Evita VI',
   'May-Be 1960',
   '"R.-V. Germania; Leitmeritz"',
   'Surprise'])]

In [32]:
equiposOlimpicosRDD.map(lambda x :  (x[2], x[1] ))\
                    .groupByKey()\
                    .mapValues(len)\
                    .take(5)

[('sigla', 1), ('AUT', 11), ('MEX', 9), ('ARG', 18), ('AFG', 1)]

### Operación filter para obtener un subconjunto

Con el método 'filter', reducimos el conjuntos de equipos.

Nos quedamos con la sigla equivalente de argentina

In [35]:
equiposArgentinos = equiposOlimpicosRDD.filter(lambda l : "ARG" in l)
equiposArgentinos.collect()  

[['4', 'Acturus', 'ARG'],
 ['37', 'Antares', 'ARG'],
 ['42', 'Arcturus', 'ARG'],
 ['43', 'Ardilla', 'ARG'],
 ['45', 'Argentina', 'ARG'],
 ['46', 'Argentina-1', 'ARG'],
 ['47', 'Argentina-2', 'ARG'],
 ['119', 'Blue Red', 'ARG'],
 ['238', 'Covunco III', 'ARG'],
 ['252', 'Cupidon III', 'ARG'],
 ['288', 'Djinn', 'ARG'],
 ['436', 'Gullvinge', 'ARG'],
 ['644', 'Matrero II', 'ARG'],
 ['672', 'Mizar', 'ARG'],
 ['774', 'Pampero', 'ARG'],
 ['843', 'Rampage', 'ARG'],
 ['1031', 'Tango', 'ARG'],
 ['1162', 'Wiking', 'ARG']]

In [39]:
equiposOlimpicosRDD.countApprox(20)

1185

# Accciones sobre RDDs

In [5]:
deportistaOlimpicoRDD = sc.textFile(path+"deportista.csv").map(lambda line : line.split(","))
deportistaOlimpico2RDD = sc.textFile(path+"deportista2.csv").map(lambda line : line.split(","))

In [6]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpico2RDD)

In [53]:
deportistaOlimpicoRDD.count()

135572

### Formas de visualizar datos de un RDDs

La operación 'take' nos devuelve 'N' valores que encuentre spark.

La operación 'top', previo ordena respecto al valor llave y nos devuelve 'N' valores.

La operacion 'takeSample', nos devuelve una muestra aleatoria de los valores, Observa que recibe tres parametros

| Orden | Argumento | Descripción | Valor
|-------|--------|-----|--------|
|1|withReplacement|Indica si la muetra podrá traer replicados|Bool|
|2| num| Cantidad de valores a retornar|int|
|3|sed|semilla para el generador aleatorio|int|

Nota: Si encuentras complicado leer el código en los segmentos donde usamos indices en las listas, ejecutalo por partes para que visualizes que componentes seleccionamos 

Nota: Para hacer join con RDDs, debemos tener selecionada la llave al inicio del RDD para poder realizar el cruce.

In [7]:
equiposOlimpicosRDD.top(5)

[['id', 'equipo', 'sigla'],
 ['999', 'Stella-2', 'NOR'],
 ['998', 'State VI', 'CAN'],
 ['997', 'Starlight III', 'GBR'],
 ['996', 'Starita', 'NED']]

In [8]:
deportistaOlimpicoRDD.top(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['99999', 'Alexander Grant Alick Rennie', '1', '32', '182', '71', '967'],
 ['99998', 'Robert John Bob Renney', '1', '21', '178', '90', '66'],
 ['99997', 'Thomas Renner', '1', '24', '183', '86', '71'],
 ['99996', 'Sara Renner', '2', '21', '168', '63', '174']]

In [9]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]])\
            .join(equiposOlimpicosRDD.map(lambda x : [x[0],x[2]]))\
                        .take(6)

[('199', (['1', 'A Dijiang', '1', '24', '180', '80'], 'CHN')),
 ('199', (['2', 'A Lamusi', '1', '23', '170', '60'], 'CHN')),
 ('199', (['602', 'Abudoureheman', '1', '22', '182', '75'], 'CHN')),
 ('199', (['1463', 'Ai Linuer', '1', '25', '160', '62'], 'CHN')),
 ('199', (['1464', 'Ai Yanhan', '2', '14', '168', '54'], 'CHN')),
 ('199', (['3605', 'An Weijiang', '1', '22', '178', '72'], 'CHN'))]

In [69]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]) \
        .join(equiposOlimpicosRDD.map(lambda x : [x[0],x[2]])).top(5)

[('999', (['92679', 'Trygve Bjarne Pedersen', '1', '35', '0', '0'], 'NOR')),
 ('999', (['1144', 'Henrik Agersborg', '1', '47', '0', '0'], 'NOR')),
 ('999', (['10765', 'Einar Berntsen', '1', '28', '0', '0'], 'NOR')),
 ('998',
  (['111659', 'G. Bernard Bernie Skinner', '1', '34', '182', '82'], 'CAN')),
 ('996', (['116030', 'Edward Eddy Stutterheim', '1', '39', '0', '0'], 'NED'))]

In [72]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]])\
                        .join(equiposOlimpicosRDD.map(lambda x : [x[0],x[2]]))\
                        .takeSample(False,6,25)

[('362', (['131505', 'Steven Woodburn', '1', '24', '185', '90'], 'FRA')),
 ('967', (['13626', 'Jill Brresen', '2', '22', '170', '57'], 'RSA')),
 ('482', (['44299', 'Gumundur Gumundsson', '1', '23', '174', '77'], 'ISL')),
 ('970', (['68062', 'Lee MinHui', '2', '28', '174', '65'], 'KOR')),
 ('794', (['92442', 'Luis Paz Zoldan', '1', '19', '187', '82'], 'PER')),
 ('413', (['26822', 'Jared Mark Deacon', '1', '24', '185', '77'], 'GBR'))]

## Importancia de countAprox

Debido a la cantidad de datos no siempre es recomendable hacer operaciones tipo count.

Por lo que 'countAprox' es la solución mas viable cuando solo queremos darnos una idea de cuantos datos podemos leer durante un tiempo determinado. Nota: el parametro está en milisegundos

In [10]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                            .map(lambda x : [x[0],x[2]])).count()

135427

In [74]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).countApprox(20)

135427

# Acciones de modificacion

### Obtenemos el equipo y el deportista

Guardamos el RDD resultante en equipoDeportista

In [11]:
equipoDeportista = deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                                            .map(lambda x : [x[0],x[2]]))

In [93]:
equipoDeportista.top(1)


[('999', (['92679', 'Trygve Bjarne Pedersen', '1', '35', '0', '0'], 'NOR'))]

In [79]:
equipoDeportista.map(lambda x : (x[1][0][0],x[1][0][1:],x[1][1]) ).top(5)

[('99999', ['Alexander Grant Alick Rennie', '1', '32', '182', '71'], 'RSA'),
 ('99998', ['Robert John Bob Renney', '1', '21', '178', '90'], 'AUS'),
 ('99997', ['Thomas Renner', '1', '24', '183', '86'], 'AUT'),
 ('99996', ['Sara Renner', '2', '21', '168', '63'], 'CAN'),
 ('99995', ['Robert Renner', '1', '22', '182', '75'], 'SLO')]

### Carga de resultados

Esta tabla posee las medallas que los jugadores han ganado

In [12]:
resultado = sc.textFile(path+"resultados.csv").map(lambda line : line.split(","))

In [13]:
resultado.take(5)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['1', 'NA', '1', '39', '1'],
 ['2', 'NA', '2', '49', '2'],
 ['3', 'NA', '3', '7', '3'],
 ['4', 'Gold', '4', '2', '4']]

Eliminamos todas las medallas no ganadoras y nos quedamos con el valor de la medalla y  del deportista_id

In [14]:
resultadoGanador = resultado.filter(lambda l : 'NA' not in l[1])
resultadoGanador = resultadoGanador.map(lambda l : [l[2],l[1]])

In [15]:
resultadoGanador.take(15)

[['deportista_id', 'medalla'],
 ['4', 'Gold'],
 ['15', 'Bronze'],
 ['15', 'Bronze'],
 ['16', 'Bronze'],
 ['17', 'Bronze'],
 ['17', 'Gold'],
 ['17', 'Gold'],
 ['17', 'Gold'],
 ['17', 'Bronze'],
 ['20', 'Gold'],
 ['20', 'Bronze'],
 ['20', 'Silver'],
 ['20', 'Bronze'],
 ['20', 'Silver']]

In [50]:
resultadoGanador.count()

39784

### Obtenemos la relación buscada: deportista,pais y medalla.

In [16]:
deportistaOlimpicoRDD.top(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['99999', 'Alexander Grant Alick Rennie', '1', '32', '182', '71', '967'],
 ['99998', 'Robert John Bob Renney', '1', '21', '178', '90', '66'],
 ['99997', 'Thomas Renner', '1', '24', '183', '86', '71'],
 ['99996', 'Sara Renner', '2', '21', '168', '63', '174']]

In [17]:
equipoDeportista.filter(lambda x: '716' in x[1][0]).take(2)

[('45', (['716', 'Lautaro Germn Acosta', '1', '20', '168', '72'], 'ARG'))]

In [18]:
equipoDeportista_c = equipoDeportista.map(lambda x: (x[1][0][0],(x[0],x[1][0][1:],x[1][1])))

In [113]:
resultadoGanador.filter(lambda x: x[0]=='17282').take(2)

[['17282', 'Bronze']]

In [94]:
equipoDeportista.top(4)

[('999', (['92679', 'Trygve Bjarne Pedersen', '1', '35', '0', '0'], 'NOR')),
 ('999', (['1144', 'Henrik Agersborg', '1', '47', '0', '0'], 'NOR')),
 ('999', (['10765', 'Einar Berntsen', '1', '28', '0', '0'], 'NOR')),
 ('998',
  (['111659', 'G. Bernard Bernie Skinner', '1', '34', '182', '82'], 'CAN'))]

In [19]:
equipoDeportista_c.take(1)

[('1', ('199', ['A Dijiang', '1', '24', '180', '80'], 'CHN'))]

In [25]:
jugadoresMedalla_c =  equipoDeportista_c.join(resultadoGanador)
test_list = jugadoresMedalla_c.take(2)

In [22]:
jugadoresMedalla =  equipoDeportista.join(resultadoGanador)
jugadoresMedalla.take(2)

[('716',
  ((['553', 'John Charles Abrams', '1', '22', '183', '0'], 'NZL'), 'Gold')),
 ('716',
  ((['698', 'Paul Douglas Ackerley', '1', '27', '179', '68'], 'NZL'), 'Gold'))]

In [51]:
jugadoresMedalla.count()

45100

In [36]:
test_list[0][1][0][-1]

'CHN'

In [42]:
jugadoresMedalla_c.count()

39783

### Agrupamos las medallas respecto a la sigla del pais jugador

In [40]:
d = {'Gold':7, 'Silver':5, 'Bronze':4}
paisesMedallas = jugadoresMedalla_c.map(lambda x : (x[1][0][-1],d[x[1][1]]) )
paisesMedallas.takeSample(False,10)

[('ITA', 7),
 ('GER', 5),
 ('HUN', 5),
 ('GBR', 4),
 ('URS', 5),
 ('AUS', 5),
 ('ARG', 5),
 ('URS', 5),
 ('FRA', 4),
 ('RUS', 5)]

In [41]:
paisesMedallas.count()

39783

In [43]:
from operator import add

In [None]:
paisesMedallas

### Obtenemos los valores de los puntuajes históricos de los paises jugadores

In [45]:
from operator import add
conclusion = paisesMedallas.reduceByKey((add))\
            .sortBy(lambda x : x[1],ascending=False)
conclusion.take(10)

[('USA', 32137),
 ('URS', 14834),
 ('GBR', 10925),
 ('GER', 10896),
 ('FRA', 9265),
 ('ITA', 8755),
 ('SWE', 8110),
 ('CAN', 7209),
 ('AUS', 6755),
 ('HUN', 6142)]

# Estadística básica sobre los RDDs

Spark posee una suite integrada de forma natural para poder obtener estadísticas básicas.

In [46]:
conclusion.map(lambda l : l[1]).stats()

(count: 150, mean: 1415.7599999999995, stdev: 3532.7790301309997, max: 32137.0, min: 4.0)

In [47]:
conclusion.map(lambda l : l [1]).mean()

1415.7599999999995

In [48]:
conclusion.map(lambda l : l [1]).sum()

212364

In [49]:
conclusion.map(lambda l : l [1]).histogram(10)

([4.0,
  3217.3,
  6430.6,
  9643.900000000001,
  12857.2,
  16070.5,
  19283.800000000003,
  22497.100000000002,
  25710.4,
  28923.7,
  32137],
 [130, 11, 5, 2, 1, 0, 0, 0, 0, 1])

In [52]:
sc.stop()