In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar -xvf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import sys
from pyspark.sql.functions import count

In [None]:
# Install ngrok
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

In [None]:
# Configure Spark UI
conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
sc.stop()

In [None]:
# Create a URL through you can access the Spark UI
get_ipython().system_raw('./ngrok http 4050 &')

In [None]:
# Then wait for 10s to access the URL
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[],"uri":"/api/tunnels"}


In [None]:
sc = SparkContext(master = "local", appName="transformaciones")

In [None]:
rdd1 = sc.parallelize([1,2,3])
type(rdd1)

pyspark.rdd.RDD

In [None]:
rdd1.collect()

[1, 2, 3]

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
path = '/content/drive/My Drive/Colab Notebooks/db/spark/{}'
path

'/content/drive/My Drive/Colab Notebooks/db/spark/{}'

In [None]:
equipos_olimpicos_rdd = sc.textFile(path.format('paises.csv')).map(lambda line: line.split(","))

In [None]:
equipos_olimpicos_rdd.take(5)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG']]

In [None]:
equipos_olimpicos_rdd.map(lambda x: (x[2])).distinct().count()

231

In [None]:
equipos_olimpicos_rdd.map(lambda x: (x[2], x[1])).groupByKey().mapValues(len).take(5)

[('sigla', 1), ('AUT', 11), ('MEX', 9), ('ARG', 18), ('AFG', 1)]

In [None]:
equipos_olimpicos_rdd.map(lambda x: (x[2], x[1])).groupByKey().mapValues(list).take(5)

[('sigla', ['equipo']),
 ('AUT',
  ['30. Februar',
   'Austria',
   'Austria-1',
   'Austria-2',
   'Breslau',
   'Brigantia',
   'Donar III',
   'Evita VI',
   'May-Be 1960',
   '"R.-V. Germania; Leitmeritz"',
   'Surprise']),
 ('MEX',
  ['A North American Team',
   'Acipactli',
   'Chamukina',
   'Mexico',
   'Mexico-1',
   'Mexico-2',
   'Nausikaa 4',
   'Tlaloc',
   'Xolotl']),
 ('ARG',
  ['Acturus',
   'Antares',
   'Arcturus',
   'Ardilla',
   'Argentina',
   'Argentina-1',
   'Argentina-2',
   'Blue Red',
   'Covunco III',
   'Cupidon III',
   'Djinn',
   'Gullvinge',
   'Matrero II',
   'Mizar',
   'Pampero',
   'Rampage',
   'Tango',
   'Wiking']),
 ('AFG', ['Afghanistan'])]

In [None]:
equipos_argentinos = equipos_olimpicos_rdd.filter(lambda l: "ARG" in l)
equipos_argentinos.collect()

[['4', 'Acturus', 'ARG'],
 ['37', 'Antares', 'ARG'],
 ['42', 'Arcturus', 'ARG'],
 ['43', 'Ardilla', 'ARG'],
 ['45', 'Argentina', 'ARG'],
 ['46', 'Argentina-1', 'ARG'],
 ['47', 'Argentina-2', 'ARG'],
 ['119', 'Blue Red', 'ARG'],
 ['238', 'Covunco III', 'ARG'],
 ['252', 'Cupidon III', 'ARG'],
 ['288', 'Djinn', 'ARG'],
 ['436', 'Gullvinge', 'ARG'],
 ['644', 'Matrero II', 'ARG'],
 ['672', 'Mizar', 'ARG'],
 ['774', 'Pampero', 'ARG'],
 ['843', 'Rampage', 'ARG'],
 ['1031', 'Tango', 'ARG'],
 ['1162', 'Wiking', 'ARG']]

In [None]:
equipos_olimpicos_rdd.count()

1185

Cuenta en una cantidad fija de milisegundo, si se tarda más corta el proceso

In [None]:
equipos_olimpicos_rdd.countApprox(1000)

1185

In [None]:
deportista_olimpico_rdd = sc.textFile(path.format('deportista.csv')).map(lambda l : l.split(','))
deportista_olimpico_rdd2 = sc.textFile(path.format('deportista2.csv')).map(lambda l : l.split(','))

In [None]:
deportista_olimpico_rdd2.take(5)

[['67787', 'Lee BongJu', '1', '27', '167', '56', '970'],
 ['67788', 'Lee BuTi', '1', '23', '164', '54', '203'],
 ['67789', 'Anthony N. Buddy Lee', '1', '34', '172', '62', '1096'],
 ['67790', 'Alfred A. Butch Lee Porter', '1', '19', '186', '80', '825'],
 ['67791', 'Lee ByeongGu', '1', '22', '175', '68', '970']]

In [None]:
deportista_olimpico_rdd = deportista_olimpico_rdd.union(deportista_olimpico_rdd2)

In [None]:
deportista_olimpico_rdd.countApprox(1000)

135572

In [None]:
deportista_olimpico_rdd.top(2)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['99999', 'Alexander Grant Alick Rennie', '1', '32', '182', '71', '967']]

In [None]:
deportista_olimpico_rdd.map(lambda l: [l[-1], l[:-1]]).join(equipos_olimpicos_rdd.map(lambda x : [x[0], x[2]])).takeSample(False, 5, 25)

[('970', (['68062', 'Lee MinHui', '2', '28', '174', '65'], 'KOR')),
 ('154', (['39161', 'Angel Merdzhanov Gavrilov', '1', '24', '0', '0'], 'BUL')),
 ('1084',
  (['62843', 'Olha Vasylivna Korobka', '2', '18', '181', '167'], 'UKR')),
 ('678', (['97550', 'Puntsagiin Skhbat', '1', '24', '174', '82'], 'MGL')),
 ('1096', (['106789', 'Hugo Scherzer', '1', '43', '0', '0'], 'USA'))]

In [None]:
resultados_rdd = sc.textFile(path.format('resultados.csv')).map(lambda l : l.split(','))

In [None]:
resultados_rdd.countApprox(1000)

271117

In [None]:
resultados_rdd = resultados_rdd.filter(lambda x : 'NA' not in x[1])

In [None]:
resultados_rdd.countApprox(1000)

39784

In [None]:
resultados_rdd.top(5)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['99993', 'Silver', '50600', '39', '207'],
 ['99986', 'Silver', '50597', '45', '199'],
 ['99985', 'Bronze', '50597', '43', '72'],
 ['99963', 'Bronze', '50580', '37', '245']]

In [None]:
deportista_pais = deportista_olimpico_rdd.map(lambda x : [x[-1], x[:-1] ]).join(equipos_olimpicos_rdd.map(lambda y : [y[0], y[2]]))

In [None]:
deportista_pais.take(5)

[('199', (['1', 'A Dijiang', '1', '24', '180', '80'], 'CHN')),
 ('199', (['2', 'A Lamusi', '1', '23', '170', '60'], 'CHN')),
 ('199', (['602', 'Abudoureheman', '1', '22', '182', '75'], 'CHN')),
 ('199', (['1463', 'Ai Linuer', '1', '25', '160', '62'], 'CHN')),
 ('199', (['1464', 'Ai Yanhan', '2', '14', '168', '54'], 'CHN'))]

In [None]:
paises_medallas = deportista_pais.join(resultados_rdd)

In [None]:
paises_medallas.take(5)

[('74',
  ((['65', 'Patimat Abakarova', '2', '21', '165', '49'], 'AZE'), 'Gold')),
 ('74', ((['129', 'Ruslan Abbasov', '1', '22', '181', '74'], 'AZE'), 'Gold')),
 ('74', ((['130', 'Tural Abbasov', '1', '18', '182', '76'], 'AZE'), 'Gold')),
 ('74', ((['131', 'Tran Abbasova', '2', '33', '159', '53'], 'AZE'), 'Gold')),
 ('74',
  ((['335', 'Abdulqdir Abdullayev', '1', '28', '188', '91'], 'AZE'), 'Gold'))]

In [None]:
valores_medallas = {
    'Gold': 7, 'Silver': 5, 'Bronze': 4
}
valores_medallas

{'Bronze': 4, 'Gold': 7, 'Silver': 5}

In [None]:
paises_medallas = paises_medallas.map(lambda x: (x[1][0][-1], 
                               valores_medallas[x[1][1]]
                               ))
paises_medallas.takeSample(False, 10, 10)

[('CAN', 7),
 ('ARG', 7),
 ('ARG', 7),
 ('MEX', 4),
 ('LIE', 7),
 ('ARG', 7),
 ('CAN', 7),
 ('CAN', 7),
 ('CAN', 7),
 ('ARG', 7)]

In [None]:
from operator import add

In [None]:
conclusion_rdd = paises_medallas.reduceByKey((add)).sortBy(lambda x : x[1], ascending = False)
conclusion_rdd.collect()

[('CAN', 32538),
 ('ARG', 12520),
 ('HUN', 10860),
 ('MEX', 6124),
 ('RSA', 3788),
 ('BLR', 3580),
 ('LTU', 1535),
 ('MGL', 1460),
 ('USA', 1342),
 ('AZE', 1218),
 ('BAH', 1057),
 ('LIE', 917),
 ('MDA', 840),
 ('BAR', 575),
 ('BER', 436),
 ('JOR', 300),
 ('NIG', 294),
 ('CHN', 294),
 ('ANT', 280),
 ('CAY', 273),
 ('FRA', 244),
 ('BEN', 216),
 ('GRE', 203),
 ('MAL', 200),
 ('GBR', 186),
 ('ANZ', 172),
 ('VNM', 156),
 ('BHU', 147),
 ('NOR', 143),
 ('PLE', 140),
 ('KOR', 140),
 ('DJI', 140),
 ('ROU', 135),
 ('BUR', 128),
 ('VIN', 112),
 ('AUS', 111),
 ('COM', 105),
 ('BDI', 100),
 ('CPV', 91),
 ('STP', 52),
 ('EUN', 48),
 ('BEL', 48),
 ('SWE', 45),
 ('FIN', 42),
 ('YAR', 40),
 ('DEN', 36),
 ('MAS', 35),
 ('NED', 24),
 ('BRA', 23),
 ('PRK', 21),
 ('RUS', 20),
 ('IRL', 14),
 ('THA', 14),
 ('POR', 12),
 ('RHO', 10),
 ('WIF', 8),
 ('GER', 7),
 ('ITA', 5),
 ('SGP', 4)]