In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext(master='local', appName="TransformacionesYAcciones")

In [3]:
rdd1 = sc.parallelize([1,2,3])
type(rdd1)

pyspark.rdd.RDD

In [4]:
# Para visualizar usamos funcion collect()
rdd1.collect()

[1, 2, 3]

In [5]:
sc

In [6]:
pwd

'/home/rb/Platzi/Escuela de Data Science/22.- Curso Introductorio de Spark'

In [7]:
!ls ./files/

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista.csv	 evento.csv	      paises.csv
deportista2.csv  juegos.csv	      resultados.csv


In [8]:
path = '/home/rb/Platzi/Escuela de Data Science/22.- Curso Introductorio de Spark/files/'

equiposOlimpicosRDD = sc.textFile(path+'paises.csv') \
    .map(lambda line: line.split(","))

In [9]:
# muestra la data con  take(n), evita usar collect() 
equiposOlimpicosRDD.take(15)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR'],
 ['10', 'Alcyon-6', 'FRA'],
 ['11', 'Alcyon-7', 'FRA'],
 ['12', 'Aldebaran', 'ITA'],
 ['13', 'Aldebaran II', 'ITA'],
 ['14', 'Aletta', 'IRL']]

### Acciones de modificacion sobre RDDs

In [10]:
# Contar cuantas Siglas tenemos
equiposOlimpicosRDD.map(lambda x: (x[2])).distinct().count()

231

In [11]:
# Agrupacion a partir de siglas y obtener cantidad de equipos por pais
equiposOlimpicosRDD \
    .map(lambda x: (x[2], x[1]))\
    .groupByKey() \
    .mapValues(len).take(5)

[('sigla', 1), ('AUT', 11), ('MEX', 9), ('ARG', 18), ('AFG', 1)]

In [12]:
# Agrupacion a partir de siglas y obtener cantidad de equipos por pais
equiposOlimpicosRDD \
    .map(lambda x: (x[2], x[1])) \
    .groupByKey() \
    .mapValues(list).take(3)

[('sigla', ['equipo']),
 ('AUT',
  ['30. Februar',
   'Austria',
   'Austria-1',
   'Austria-2',
   'Breslau',
   'Brigantia',
   'Donar III',
   'Evita VI',
   'May-Be 1960',
   '"R.-V. Germania; Leitmeritz"',
   'Surprise']),
 ('MEX',
  ['A North American Team',
   'Acipactli',
   'Chamukina',
   'Mexico',
   'Mexico-1',
   'Mexico-2',
   'Nausikaa 4',
   'Tlaloc',
   'Xolotl'])]

In [13]:
# Obtener los datos solo de un pais o contenido con filter()
equiposArgentinos = equiposOlimpicosRDD.filter(lambda l: "ARG" in l)
equiposArgentinos.collect()

[['4', 'Acturus', 'ARG'],
 ['37', 'Antares', 'ARG'],
 ['42', 'Arcturus', 'ARG'],
 ['43', 'Ardilla', 'ARG'],
 ['45', 'Argentina', 'ARG'],
 ['46', 'Argentina-1', 'ARG'],
 ['47', 'Argentina-2', 'ARG'],
 ['119', 'Blue Red', 'ARG'],
 ['238', 'Covunco III', 'ARG'],
 ['252', 'Cupidon III', 'ARG'],
 ['288', 'Djinn', 'ARG'],
 ['436', 'Gullvinge', 'ARG'],
 ['644', 'Matrero II', 'ARG'],
 ['672', 'Mizar', 'ARG'],
 ['774', 'Pampero', 'ARG'],
 ['843', 'Rampage', 'ARG'],
 ['1031', 'Tango', 'ARG'],
 ['1162', 'Wiking', 'ARG']]

**Por que no usar collect()**, si tuviéramos un servidor y su cluster correspondiente collect() hace que maquina server **driver** pida a todos cluster **ejecutores** que envíen la información que estaba distribuida respecto a esa consulta y la centralicen en el driver, imagina que tienes 10 millones de registros, pero tu computadora quizá solo soporte 1000, o peor aun sobrecargues la red, y consumas recursos criticos en produccion. **NUNCA USES COLLECT()** a menos de que los datos sepas de antemano que son pocos y nunca en produccion, mejor toma muestras con **take()**

In [14]:
# cuenta todos los valores, nada recomentable para datos extensos
equiposOlimpicosRDD.count()

1185

In [15]:
# CountApprox(n) toma un valor en milisegundos y devuelve los valores contados si termina,
# si el tiempo fue insuficiente la ejecucion termina y devuelve la cuenta hasta la terminacion.
equiposOlimpicosRDD.countApprox(20)

1185

### Acciones de conteo sobre RDDs

In [16]:
# Visualizamos los csv disponibles
!ls ./files/

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista.csv	 evento.csv	      paises.csv
deportista2.csv  juegos.csv	      resultados.csv


In [17]:
# importamos ambos
deportistaOlimpicoRDD = sc.textFile(path+'deportista.csv') \
    .map(lambda line: line.split(","))

deportistaOlimpicoRDD2 = sc.textFile(path+'deportista2.csv') \
    .map(lambda line: line.split(","))

deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpicoRDD2)

In [18]:
# Unimos los RDD con union(). Spark solo tiene esta operacion
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpicoRDD2)

Como realizamos una operacion tipo modelo de caja negra es buena practica usar un **count()** para determinar la calidad de los datos que poseen los archivos, esto fuerza a utilizar todo el contenido del RDD.

Si los archivos vienen sanos o tienen la calidad minima para que spark pueda utilizarlos esta operacion se realizara sin ningun problema.

In [19]:
deportistaOlimpicoRDD.count()

135572

In [20]:
# ver contenido en equipos olimpicos
equiposOlimpicosRDD.top(2)

[['id', 'equipo', 'sigla'], ['999', 'Stella-2', 'NOR']]

In [21]:
deportistaOlimpicoRDD.top(2)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['99999', 'Alexander Grant Alick Rennie', '1', '32', '182', '71', '967']]

revisamos el modelo relacional

<img src="files/modelo_relacional.jpg">

In [22]:
# Seleccionamos las columnas ID para hacer un join
deportistaOlimpicoRDD \
    .map(lambda deportista: [deportista[-1], deportista[:-1]]) \
    .join(equiposOlimpicosRDD.map(lambda equipo: [equipo[0], equipo[2]]))\
    .take(6)

[('199', (['1', 'A Dijiang', '1', '24', '180', '80'], 'CHN')),
 ('199', (['2', 'A Lamusi', '1', '23', '170', '60'], 'CHN')),
 ('199', (['602', 'Abudoureheman', '1', '22', '182', '75'], 'CHN')),
 ('199', (['1463', 'Ai Linuer', '1', '25', '160', '62'], 'CHN')),
 ('199', (['1464', 'Ai Yanhan', '2', '14', '168', '54'], 'CHN')),
 ('199', (['3605', 'An Weijiang', '1', '22', '178', '72'], 'CHN'))]

In [23]:
# Utilisando takeSample( repetidos, muestra, semilla)
deportistaOlimpicoRDD \
    .map(lambda deportista: [deportista[-1], deportista[:-1]]) \
    .join(equiposOlimpicosRDD.map(lambda equipo: [equipo[0], equipo[2]]))\
    .takeSample(False, 6, 25)

[('362', (['131505', 'Steven Woodburn', '1', '24', '185', '90'], 'FRA')),
 ('967', (['13626', 'Jill Brresen', '2', '22', '170', '57'], 'RSA')),
 ('482', (['44299', 'Gumundur Gumundsson', '1', '23', '174', '77'], 'ISL')),
 ('970', (['68062', 'Lee MinHui', '2', '28', '174', '65'], 'KOR')),
 ('794', (['92442', 'Luis Paz Zoldan', '1', '19', '187', '82'], 'PER')),
 ('413', (['26822', 'Jared Mark Deacon', '1', '24', '185', '77'], 'GBR'))]

In [24]:
resultado = sc.textFile(path+'resultados.csv') \
    .map(lambda line: line.split(","))

In [25]:
resultadoGanador = resultado.filter(lambda w: 'NA' not in w[1])

In [26]:
resultadoGanador.take(2)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['4', 'Gold', '4', '2', '4']]

**Reto**: hacer un join con equipos y deportista para obtener valores importantes

In [27]:
# Approach 1: En el supuesto de un deportista solo tiene una medalla
deportistaOlimpicoRDD \
    .map(lambda deportista: [deportista[-1], deportista[:-1]]) \
    .join(equiposOlimpicosRDD.map(lambda equipo: [equipo[0], equipo[2]]))\
    .join(resultadoGanador.map(lambda resultado: [resultado[2], resultado[1]])) \
    .takeSample(False, 6, 25)

[('507',
  ((['18909', 'Enrico Castelli', '1', '27', '0', '0'], 'ITA'), 'Silver')),
 ('705',
  ((['99600', 'John Philip Ernest Marie Flip Regout', '1', '21', '0', '0'],
    'NED'),
   'Silver')),
 ('399',
  ((['15937', 'Magdalena Brzeska Peschel Sabolocka ', '2', '18', '173', '48'],
    'GER'),
   'Silver')),
 ('705',
  ((['13408',
     'Anna Johanna Geertruida Maria Annie Borckink',
     '2',
     '24',
     '0',
     '0'],
    'NED'),
   'Bronze')),
 ('705',
  ((['48310', 'Johannes Hendricus Heuckelbach', '1', '27', '0', '0'], 'NED'),
   'Gold')),
 ('619',
  ((['59062', 'Noraseela Mohd Khalid', '2', '32', '166', '55'], 'MAS'),
   'Gold'))]

### Solucion al reto por el profesor

In [28]:
deportistaPaises = deportistaOlimpicoRDD \
    .map(lambda l: [l[-1], l[:-1]]) \
    .join(equiposOlimpicosRDD.map(lambda x: [x[0], x[2]]))

In [29]:
deportistaPaises.join(resultadoGanador).take(6)

[('74',
  ((['65', 'Patimat Abakarova', '2', '21', '165', '49'], 'AZE'), 'Gold')),
 ('74', ((['129', 'Ruslan Abbasov', '1', '22', '181', '74'], 'AZE'), 'Gold')),
 ('74', ((['130', 'Tural Abbasov', '1', '18', '182', '76'], 'AZE'), 'Gold')),
 ('74', ((['131', 'Tran Abbasova', '2', '33', '159', '53'], 'AZE'), 'Gold')),
 ('74',
  ((['335', 'Abdulqdir Abdullayev', '1', '28', '188', '91'], 'AZE'), 'Gold')),
 ('74',
  ((['336', 'Arif Yadulla Abdullayev', '1', '27', '164', '60'], 'AZE'),
   'Gold'))]

### Operaciones Numéricas

In [30]:
# el valor de puntaje de paises a lo largo del tiempo
valoresMedallas = {'Gold': 7,
                   'Silver': 5,
                   'Bronze': 4}

In [31]:
paisesMedallas = deportistaPaises.join(resultadoGanador)

In [32]:
paisesMedallas = paisesMedallas \
    .map(lambda x: (x[1][0][-1], valoresMedallas[x[1][1] ] ) ) 

In [33]:
paisesMedallas.take(5)

[('AZE', 7), ('AZE', 7), ('AZE', 7), ('AZE', 7), ('AZE', 7)]

In [34]:
from operator import add

conclusion = paisesMedallas.reduceByKey((add)) \
    .sortBy(lambda x:x[1], ascending=False)

In [35]:
conclusion.take(10)

[('CAN', 32538),
 ('ARG', 12520),
 ('HUN', 10860),
 ('MEX', 6124),
 ('RSA', 3788),
 ('BLR', 3580),
 ('LTU', 1535),
 ('MGL', 1460),
 ('USA', 1342),
 ('AZE', 1218)]

## MI INTERPRETACION

In [36]:
resultado.take(10)
# utilizar deportista_id y medalla

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['1', 'NA', '1', '39', '1'],
 ['2', 'NA', '2', '49', '2'],
 ['3', 'NA', '3', '7', '3'],
 ['4', 'Gold', '4', '2', '4'],
 ['5', 'NA', '5', '36', '5'],
 ['6', 'NA', '5', '36', '6'],
 ['7', 'NA', '5', '38', '5'],
 ['8', 'NA', '5', '38', '6'],
 ['9', 'NA', '5', '40', '5']]

In [37]:
deportistaOlimpicoRDD.take(10)
#utilizar deportista_id y equipo_id

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705'],
 ['6', 'Per Knut Aaland', '1', '31', '188', '75', '1096'],
 ['7', 'John Aalberg', '1', '31', '183', '72', '1096'],
 ['8', 'Cornelia Cor Aalten Strannood ', '2', '18', '168', '0', '705'],
 ['9', 'Antti Sami Aalto', '1', '26', '186', '96', '350']]

In [73]:
equiposOlimpicosRDD \
.sortBy(lambda x: x[0])\
.take(10)

[['1', '30. Februar', 'AUT'],
 ['10', 'Alcyon-6', 'FRA'],
 ['100', 'Belgium-4', 'BEL'],
 ['1000', 'Struten', 'NOR'],
 ['1001', 'Studenternes Roklub-2', 'NOR'],
 ['1002', 'Subbnboana', 'GER'],
 ['1003', 'Sudan', 'SUD'],
 ['1004', 'Sunrise', 'NED'],
 ['1005', 'Sunshine', 'SWE'],
 ['1006', 'Suriname', 'SUR']]

In [39]:
# el valor de puntaje de paises a lo largo del tiempo
valoresMedallas = {'Gold': 7,
                   'Silver': 5,
                   'Bronze': 4}

In [97]:
def deportistaMedallas(resultadoGanador, valoresMedallas):
    """Obtiene la cantidad de Puntos por medallas de los depostistas Olimpicos"""
    #Obtengo el header del RDD
    headerMedallas = resultadoGanador \
            .map(lambda x: (x[2], valoresMedallas.get(x[1], 'Medallas')))\
            .filter(lambda x: x[1] == 'Medallas')
    
    # Obtengo y proceso la data numerica del RDD
    deportistaData = resultadoGanador \
        .map(lambda x: (x[2], valoresMedallas.get(x[1], 'Medallas')))\
        .filter(lambda x: x[1] != 'Medallas') \
        .map(lambda x: [int(x[0]), int(x[1])])\
        .foldByKey(1, add) \
        .sortBy(lambda x: x[0])
    
    
    deportistaMedallas = headerMedallas + deportistaData
  
    return deportistaMedallas

deportistaMedallas = deportistaMedallas(resultadoGanador, valoresMedallas)
deportistaMedallas.collect()

[('deportista_id', 'Medallas'),
 (4, 8),
 (15, 9),
 (16, 5),
 (17, 30),
 (20, 47),
 (21, 8),
 (25, 6),
 (29, 5),
 (30, 6),
 (37, 5),
 (38, 6),
 (40, 12),
 (42, 8),
 (56, 8),
 (62, 5),
 (63, 5),
 (65, 5),
 (67, 6),
 (72, 8),
 (73, 20),
 (76, 8),
 (80, 8),
 (84, 6),
 (85, 8),
 (86, 5),
 (90, 6),
 (93, 8),
 (99, 6),
 (100, 5),
 (103, 5),
 (106, 22),
 (107, 20),
 (108, 20),
 (139, 5),
 (143, 5),
 (145, 5),
 (150, 8),
 (153, 6),
 (165, 6),
 (250, 5),
 (259, 5),
 (297, 8),
 (298, 6),
 (316, 8),
 (337, 8),
 (339, 13),
 (351, 8),
 (357, 6),
 (359, 8),
 (367, 8),
 (379, 5),
 (385, 6),
 (391, 6),
 (392, 8),
 (394, 6),
 (395, 5),
 (399, 6),
 (401, 8),
 (404, 15),
 (411, 8),
 (416, 10),
 (423, 8),
 (424, 5),
 (428, 6),
 (443, 8),
 (454, 8),
 (455, 24),
 (460, 10),
 (465, 5),
 (495, 6),
 (507, 17),
 (509, 5),
 (514, 6),
 (519, 13),
 (521, 8),
 (527, 5),
 (529, 6),
 (543, 5),
 (547, 6),
 (548, 5),
 (576, 5),
 (580, 5),
 (582, 8),
 (583, 15),
 (608, 8),
 (610, 5),
 (618, 5),
 (619, 8),
 (627, 5),
 (6

In [151]:
def paisesEquipos(equiposOlimpicosRDD):
    """Obtiene el performance por Equipo"""
#     header = sc.parallelize([('Pais', 'equipo_id')])   
    
    paisesEquipos = equiposOlimpicosRDD \
                        .map(lambda x: [x[-1], x[0]]) \
                        .filter(lambda x: x[1] != 'id') \
                        .map(lambda x: [x[0], int(x[1])])\

    
    paisesEquipos =  paisesEquipos
    
    return paisesEquipos

paisesEquipos = paisesEquipos(equiposOlimpicosRDD)
paisesEquipos.take(10)

[['AUT', 1],
 ['MEX', 2],
 ['MEX', 3],
 ['ARG', 4],
 ['AFG', 5],
 ['IRL', 6],
 ['SUI', 7],
 ['ALB', 8],
 ['POR', 9],
 ['FRA', 10]]

In [152]:
def performanceEquipos(deportistaMedallas, deportistaOlimpicoRDD):
#     header = sc.parallelize([('deportista_id', 'equipo_id')])  
    
    equipos = deportistaOlimpicoRDD \
                .map(lambda x: [x[0], x[6]]) \
                .filter(lambda x: x[0] != 'deportista_id') \
                .map(lambda x: [int(x[0]), int(x[1])]) \

    
#     performance = equipos.join(deportistaMedallas)
#                     .join(deportistaOlimpicoRDD) \
#                     .map(lambda x: (x[1][1], x[0], x[1][0])) \

    performanceEquipos = equipos
    
    return performanceEquipos

performanceEquipos = performanceEquipos(deportistaMedallas, deportistaOlimpicoRDD)
performanceEquipos.take(10)
 

[[1, 199],
 [2, 199],
 [3, 273],
 [4, 278],
 [5, 705],
 [6, 1096],
 [7, 1096],
 [8, 705],
 [9, 350],
 [10, 350]]