In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

# PARCIAL

Se tiene información estadística de la temporada regular de todos los jugadores de la NBA en un RDD de tuplas con el siguiente formato: (id_jugador, nombre, promedio_puntos, promedio_asistencias, promedio_robos, promedio_bloqueos, promedio_rebotes, promedio_faltas).

Un analista de la cadena ESPN está trabajando con un RDD que corresponde a la primera ronda de playoffs y que tiene el siguiente formato: (id_jugador, id_partido, timestamp, cantidad_puntos, cantidad_rebotes, cantidad_bloqueos, cantidad_robos, cantidad_asistencias, cantidad_faltas).

En base a estos RDDs se quiere programar en PySpark un programa que genere un RDD con los nombres (sin duplicados) de los jugadores que lograron en algún partido de playoffs una cantidad de asistencias mayor a su promedio histórico (el de la temporada regular). Link

Llamaremos: rdd_po: RDD con los datos de los playoffs. rdd_tr: RDD con los datos de temporada regular

Resolucion: https://github.com/idontdomath/datos-spark-lesson/blob/master/2019-02/001-examenes-2015-2016.ipynb

In [3]:
# usamos para simplificar el formato, que puede obtenerse con un map.
# (id_jugador, nombre, promedio_asistencias)
players_all_time_stats = [
    (1, 'Manu Ginobili', 800),
    (2, 'Kobe Bryant', 100),
    (3, 'Marc Gasol', 25),
    (4, 'James Harden', 1000)]

# usamos para simplificar el formato, que puede obtenerse con un map.
# (id_jugador, id_partido, timestamp, cantidad_asistencias)
scores = [
  (1, 1, 1, 100),
  (1, 1, 3, 100),
  (2, 1, 1, 150),
  (2, 1, 3, 150),
  (3, 2, 2, 50),
  (3, 2, 3, 50),      
  (1, 2, 1, 150),
  (1, 2, 3, 150),
]

rdd_jugadores = sc.parallelize(players_all_time_stats)
rdd_po = sc.parallelize(scores)

In [5]:
asist_por_partido = rdd_po.map(lambda x: ((x[0],x[1]),x[3]))\
    .reduceByKey(lambda x, y: x+y)\
    .map(lambda x: (x[0][0],x[1]))
asist_por_partido.take(1)

[(1, 300)]

In [8]:
asist_por_jugador = rdd_jugadores.map(lambda x: (x[0],x))
todo = asist_por_partido.leftOuterJoin(asist_por_jugador)
todo.collect()

[(1, (300, (1, 'Manu Ginobili', 800))),
 (1, (200, (1, 'Manu Ginobili', 800))),
 (2, (300, (2, 'Kobe Bryant', 100))),
 (3, (100, (3, 'Marc Gasol', 25)))]

In [12]:
todo.filter(lambda x: (x[1][0]>x[1][1][2])).map(lambda x: (x[1][1][1]))\
    .distinct().collect()

['Kobe Bryant', 'Marc Gasol']

# 2016-02 Parcial
En este ejercicio queremos programar un sistema que recomiende textos a usuarios en base a sus gustos sobre ciertos términos (palabras).

Se cuenta con un RDD de textos de la forma (docId, texto) donde texto es un string de longitud variable.

Además contamos con un RDD que indica qué términos le gustan o no a cada usuario de la forma (userId, término, score) por ejemplo (23, “calesita”, -2).

Se pide programar en Spark un programa que calcule el score total de cada documento para cada usuario generando un RDD de la forma (userId, docId, score) en donde el score es simplemente la suma de los scores del usuario para los términos que aparecen en el documento.

Puede haber términos en los documentos para los cuales no exista score de algunos usuarios, en estos casos simplemente los consideramos neutros (score=0)

https://github.com/idontdomath/datos-spark-lesson/blob/master/2019-02/001-examenes-2015-2016.ipynb

In [24]:
documents = [
    (1, 'pablo honey'),
    (2, 'the bends'),
    (3, 'ok computer'),
    (4, 'kid a'),
    (5, 'amnesiac'),
    (6, 'hail to the thief'),
    (7, 'in rainbows'),
    (8, 'the king of limbs'),
    (9, 'a moon shaped pool')
]

# (doc_id, text)

scores = [
    ('thom', 'pablo', 1),
    ('thom', 'honey', 1),
    ('martin', 'pablo', -1),
    ('martin', 'honey', -1),
    ('martin', 'ok', 30),
    ('martin', 'computer', 30)
]

# (used_id, termino, score)

documents_rdd = sc.parallelize(documents)
scores_rdd = sc.parallelize(scores)

# Primer Cuatrimestre de 2018. Examen parcial, primera oportunidad.
Nintendo of America (EEUU) tiene información de ventas de videojuegos físicas mensuales totalizadas en EEUU las cuales se realizan en cadenas de tiendas de videojuegos en el siguiente RDD: (id_videojuego, id_tienda, mes, anio, total_ventas_mensuales).

Por otro lado tenemos un RDD con información de las tiendas y de su ubicación (id_tienda, direccion, latitud, longitud, codigo_postal, estado). Con esta información escribir un programa en pySpark para obtener la tienda que realizó menor cantidad de ventas en el estado de "Georgia" en todo el año 2017.

Criterio: Es importante filtrar los datos que son necesarios antes de comenzar a trabajar, si no lo hacen se descuenta un min de 5ptos. Hay descuentos de 3 ptos si realizan operaciones de mas, o ineficientes (por ejemplo realizar un takeordered cuando necesitan solo obtener mínimo). Si los formatos para realizar el join no se corresponde a (K, V) descuento de 5 puntos.

Resolucion: https://github.com/idontdomath/datos-spark-lesson/blob/master/2019-02/003-examenes-2018.ipynb

In [39]:

sales = [
    (1, 1, '01', '2017', 500), # sera la minima
    (1, 2, '01', '2017', 500),
    (1, 2, '01', '2017', 500),
    (1, 2, '01', '2017', 500),
    (1, 2, '01', '2017', 500),
    (1, 1, '01', '2016', 500),
    (1, 2, '01', '2016', 500),
    (1, 2, '01', '2016', 500),
    (1, 2, '01', '2016', 500),
    (1, 2, '01', '2016', 500),
    (2, 3, '01', '2017', 500),
    (2, 3, '01', '2017', 500),
    (2, 3, '01', '2017', 500),
    (2, 3, '01', '2017', 500),
    (2, 3, '01', '2017', 500),
    (4, 3, '01', '2017', 500),
    (4, 3, '01', '2017', 500),
    (4, 3, '02', '2017', 500),
    (4, 3, '03', '2017', 500),    

]

#  (id_videojuego, id_tienda, mes, anio, total_ventas_mensuales).

stores = [
    (1 , 'address 1', -1, -1, '30002', 'Georgia'),
    (2 , 'address 2', -2, -2, '30003', 'Georgia'),
    (3 , 'address 2', -3, -3, '30004', 'Georgia'),
    (4 , 'address 2', -4, -4, '10119', 'New York')    
]

# (id_tienda, direccion, latitud, longitud, codigo_postal, estado)

sales_rdd = sc.parallelize(sales)
stores_rdd = sc.parallelize(stores)

# Primer Cuatrimestre de 2018. Examen parcial, tercera oportunidad
El GCPD (Gotham City Police Dept) recolecta la información de casos policiales que acontecen en Ciudad Gótica. Esta información se encuentra guardada en un archivo con el siguiente formato: (fecha, id_caso, descripción, estado_caso, categoría, latitud, longitud).

Los posibles estados que puede tener un caso son 1: caso abierto, 2: caso resuelto, 3: cerrado sin resolución. Las fechas se encuentran en el formato YYYY-MM-DD.

Por otro lado el comisionado Gordon guarda un registro detallado sobre en cuáles casos fue activada la batiseñal para pedir ayuda del vigilante, Batman. Esta información se encuentra en un archivo con el siguiente formato (id_caso, respuesta), siendo campo respuesta si la señal tuvo una respuesta positiva (1) o negativa (0) de parte de él. El sector encargado de las estadísticas oficiales del GCPD quiere analizar las siguientes situaciones:

a) Las categorías que hayan incrementado su tasa de resolución en al menos un 10% en el último trimestre, con respecto al trimestre anterior. b) Tasa de participación de Batman por categoría, para los delitos contra la propiedad (que enmarcan las categorías incendio intencional, robo, hurto, y robo de vehículos)

Resolución:

Primero filter por fecha ult dos trimestres. Luego map con key compuesta categoria y trimestre y value (estado_caso == resuelto, estado == sin_resolucion or estado == resuelto). ReduceByKey, donde sumes los dos values. Un map cambiando la clave a categoria, y dejando en value el trimestre y los dos valores. Un groupByKey para juntar en la misma categoria los dos valores de cada trimestre. Por ultimo un filter que compare las tasas de cada trimestre, para ver cuales cumplen con la condición. Collect al final. Vale 8 puntos.

Si no filtran al ppio descuento 3 puntos. Map u operaciones innecesarias descuento de 3 puntos mínimo.

Otra solución posible es: se pueden generar dos RDD: uno para cada trimestre. Sobre esos dos RDD, se calculan las tasas de resolución por categoría y luego se los joinea por el campo de categoría para poder verificar la condición de que haya incrementado la resolución en un 10%. Esta segunda alternativa no es escalable, ya que si tuvieramos que trabajar con mas trimestres o hacer el corte por mes o por dia no tendria sentido generar un rdd por dia. (Descuento de 5puntos)

b) Será necesario primero filtrar el RDD por las categorías necesarias para reducir el volumen de información (descuento de 3 puntos si lo hacen después) y luego joinear por id_caso con el segundo RDD utilizando un left join (descuento de 3 puntos si se usa inner, 5 puntos si las claves no coinciden, no sirve). El left join nos dará aquellos casos en los que batman fue llamado y en los que no fue llamados. A partir de ahí, podemos mapear algo del estilo (categoria, (fue_llamado, 1)) y con un reduceByKey calcular la tasa de participación. El campo “fue_llamado” será 1 si respondió a la batiseñal o 0 en caso contrario. El b vale 7 puntos.

Resolucion: https://github.com/idontdomath/datos-spark-lesson/blob/master/2019-02/003-examenes-2018.ipynb

In [66]:
# (fecha, id_caso, descripción, estado_caso, categoría, latitud, longitud).
cases = [("2019-01-01", 1, "case 1", 2, "otro delito", -1, -1), 
         ("2019-06-01", 2, "case 2", 2, "robo", -1, -1),
         ("2019-06-01", 3, "case 2", 3, "robo", -1, -1),         
         ("2019-06-01", 4, "case 2", 1, "robo", -1, -1),         
         ("2019-06-01", 5, "case 2", 2, "robo", -1, -1),
         ("2019-09-01", 6, "case 2", 2, "robo", -1, -1),
         ("2019-09-01", 7, "case 2", 2, "robo", -1, -1),         
         ("2019-09-01", 8, "case 2", 2, "robo", -1, -1),
         ("2019-09-01", 9, "case 2", 2, "robo", -1, -1),
         ("2019-09-01", 10, "case 2", 3, "robo", -1, -1),
         ("2019-09-01", 60, "case 2", 3, "robo", -1, -1),
         ("2019-09-01", 70, "case 2", 3, "robo", -1, -1),         
         ("2019-09-01", 80, "case 2", 1, "robo", -1, -1),
         ("2019-09-01", 90, "case 2", 2, "robo", -1, -1),
         ("2019-09-01", 100, "case 2", 2, "robo", -1, -1),
         ("2019-09-01", 600, "case 2", 2, "robo", -1, -1),
         ("2019-09-01", 700, "case 2", 3, "robo", -1, -1),         
         ("2019-09-01", 800, "case 2", 1, "robo", -1, -1),
         ("2019-09-01", 900, "case 2", 1, "robo", -1, -1),
         ("2019-09-01", 1000, "case 2", 1, "robo", -1, -1),
         ("2019-09-01", 6000, "case 2", 2, "robo", -1, -1),
         ("2019-09-01", 7000, "case 2", 2, "robo", -1, -1),         
         ("2019-09-01", 8000, "case 2", 3, "robo", -1, -1),
         ("2019-09-01", 9000, "case 2", 1, "robo", -1, -1),
         ("2019-09-01", 10000, "case 2", 2, "robo", -1, -1),
         ("2019-06-01", 92, "case 2", 2, "hurto", -1, -1),
         ("2019-06-01", 93, "case 2", 3, "hurto", -1, -1),         
         ("2019-06-01", 94, "case 2", 3, "hurto", -1, -1),         
         ("2019-06-01", 95, "case 2", 3, "hurto", -1, -1),
         ("2019-09-01", 96, "case 2", 2, "hurto", -1, -1),
 
        ]

# (id_caso, respuesta)
batsignal = [(1,1),
         (2,1),
         (3,0),
         (4,0),
         (5,1),
         (6,0),
         (7,1),
         (8,0),
         (9,1),
         (10,0),         
         (60,0),
         (70,1),
         (80,1),
         (90,1),
         (100,1),
         (600,0),
         (700,1),
         (800,0),
         (900,1),
         (1000,1),
         (6000,0),
         (7000,1),
         (8000,0),
         (9000,1),
         (10000,1),
         (92,0),
         (93,0),             
         (94,0),
         (95,0),             
         (96,1)             
        ]

cases_rdd = sc.parallelize(cases)
batsignal_rdd = sc.parallelize(batsignal)

# PARCIAL
A partir de la plataforma online (e-shop) de los países en los que opera, Nintendo tiene información de ventas de videojuegos diarias digitales por país en el siguiente RDD: (id_videojuego, codigo_pais, fecha, visitas_diarias, total_ventas_diarias).

Por otro lado se tienen otro RDD que tiene información de todos los videojuegos que se venden en su plataforma con el siguiente formato (id_videojuego, titulo, rating_pegi, rating_esbr). Tener en cuenta que un mismo videojuego se puede vender en distintos países y esos nos permitirá obtener métricas a nivel global.

Con esta información escribir un programa en pySpark que permita:

a) Obtener el videojuego con más ventas digitales globales (es decir en todos los países) en un RDD con el siguiente formato: (id_videojuego, titulo, total), siendo total la cantidad total de ventas digitales globales

b) Para el videojuego con mas ventas, obtener cual es el país para el cual ser registra una mayor tasa de conversión (es decir, mayor total_ventas_diarias / visitas_diarias)

Resolucion: https://github.com/CrossNox/7506-OD2/blob/master/spark/2017C2_2_VentasDeJuegos.ipynb

In [67]:
datos_ventas_diarias = [
    (1, 'AR', '2018-01-01', 30, 10),
    (1, 'ES', '2018-01-01', 23, 13),
    (2, 'US', '2018-01-04', 45, 5),
    (2, 'MX', '2018-01-04', 20, 10),
    (2, 'US', '2018-01-06', 50, 15),
    (3, 'AR', '2018-01-06', 10, 2),
    (1, 'US', '2018-01-06', 14, 4),
    (3, 'ES', '2018-01-10', 34, 11),
    (4, 'ES', '2018-01-11', 42, 24),
    (4, 'US', '2018-01-11', 83, 34),
    (4, 'AR', '2018-01-11', 27, 20),
    (4, 'MX', '2018-01-11', 47, 18),
    (4, 'AR', '2018-01-20', 10, 0),
    (4, 'US', '2018-01-21', 34, 2),
    (4, 'ES', '2018-01-21', 25, 7)
]

datos_videojuegos = [
    (1, 'Zelda: Breath of the Wild', 9, 8),
    (2, 'Mario Kart', 9, 7),
    (3, 'Splatoon 2', 11, 8),
    (4, 'Monster Hunter Generations Ultimate', 13, 10)
]

# PARCIAL
Se cuenta con un RDD con información sobre patentamientos de autos con la siguiente información (patente, marca, modelo, versión, tipo_vehiculo, provincia, fecha), donde tipo_vehiculo indica si la unidad patentada es auto, pickup, camión o moto.

Se pide generar un programa en pySpark que indique la marca y modelo del auto más patentado por tipo de vehículo en la provincia de Buenos Aires en el mes de Abril de 2017.

Resolucion: https://github.com/CrossNox/7506-OD2/blob/master/spark/2017C2_1_Patentamientos.ipynb

In [69]:

#  Creamos algunos datos para poder hacer el seguimiento de la resolución.
#  El resultado final debería ser ('Chevrolet', 'Sonic'), ('Ford', 'Cargo 712') y ('Honda', 'Hornet 160R').

datos_patentamientos = [
    ('MHG 100', 'Fiat', 'Siena', 1, 'auto', 'Buenos Aires', '2017-03-15'),
    ('MHG 101', 'Ford', 'Cargo 712', 2, 'camion', 'Chaco', '2017-03-19'),
    ('MHG 102', 'Ford', 'Cargo 712', 4, 'camion', 'Buenos Aires', '2017-04-01'),
    ('MHG 103', 'Fiat', 'Siena', 2, 'auto', 'Buenos Aires', '2017-04-02'),
    ('MHG 104', 'Chevrolet', 'Sonic', 1, 'auto', 'Buenos Aires', '2017-04-02'),
    ('MHG 105', 'Fiat', 'Siena', 3, 'auto', 'Uruguay', '2017-04-03'),
    ('MHG 106', 'Fiat', 'Siena', 1, 'auto', 'Buenos Aires', '2017-04-05'),
    ('MHG 107', 'Chevrolet', 'Sonic', 2, 'auto', 'Buenos Aires', '2017-04-17'),
    ('MHG 108', 'Chevrolet', 'Sonic', 1, 'auto', 'Buenos Aires', '2017-04-19'),
    ('MHG 109', 'Ford', 'Cargo 712', 4, 'camion', 'Buenos Aires', '2017-04-19'),
    ('MHG 110', 'Ford', 'Cargo 712', 2, 'camion', 'Buenos Aires', '2017-04-19'),
    ('MHG 111', 'Fiat', 'Siena', 3, 'auto', 'Cordoba', '2017-04-20'),
    ('MHG 112', 'Chevrolet', 'Sonic', 2, 'auto', 'Buenos Aires', '2017-04-21'),
    ('MHG 113', 'Fiat', 'Sedan', 2, 'auto', 'Buenos Aires', '2017-04-23'),
    ('MHG 114', 'Fiat', 'Sedan', 1, 'auto', 'Buenos Aires', '2017-04-24'),
    ('MHG 115', 'Honda', 'Hornet 160R', 1, 'moto', 'Buenos Aires', '2017-04-25'),
    ('MHG 116', 'Honda', 'Hornet 160R', 1, 'moto', 'Buenos Aires', '2017-04-25'),
    ('MHG 117', 'Ducati', 'SuperSport', 1, 'moto', 'Buenos Aires', '2017-04-26'),
    ('MHG 118', 'Scania', '420', 4, 'camion', 'Buenos Aires', '2017-04-26')
]

rdd_pt = sc.parallelize(datos_patentamientos)


# Parcial
Una red social almacena el contenido de los chats entre sus usuarios en un RDD que tiene registros con el siguiente formato: (chat_id, user_id, nickname, text). Queremos saber cuál es el usuario (user_id) que mas preguntas hace en sus chats, contabilizamos una pregunta por cada caracter “?” que aparezca en el campo text. Programar en Spark un programa que identifique al usuario preguntón.

Resolucion: https://colab.research.google.com/drive/1lHA82VFp-yr9sH5ttxOoyzOV--zKhD7y#scrollTo=cicGxWn2xPsg

In [70]:
chats = [
    (1, 1, 'damu', 'Qué es esto?'),
    (2, 2, 'martin', 'Un chat!???'),
    (3, 1, 'damu', 'Ahhh! Y de donde salio? Whatsapp?'),
    (4, 2, 'martin', 'Sí! Cómo sabias????'),
    (5, 1, 'damu', 'Adivine'),
    (6, 3, 'luis', 'Hola!')
]

chats_rdd = sc.parallelize(chats)

# Parcial

UBER almacena en un cluster todos los datos sobre el movimiento y viajes de todos sus vehículos. Existe un proceso que nos devuelve un RDD llamado trip_summary con los siguientes campos: (driver_id, car_id, trip_id, customer_id, date (YYYYMMDD), distance_traveled), Programar usando Py Spark un programa que nos indique cual fue el conductor con mayor promedio de distancia recorrida por viaje para Abril de 2016.

Resolucion: https://colab.research.google.com/drive/1D_I6NcHvpIz3r25MHMY25j1n9Zulba3S#scrollTo=cicGxWn2xPsg

In [71]:
trips = [
    (1, 1, 1, 1, '20160101', 10),
    (2, 2, 2, 2, '20160202', 20),
    (1, 1, 3, 1, '20160402', 15),
    (1, 1, 4, 3, '20160405', 20),
    (2, 2, 5, 4, '20160410', 25),
    (3, 3, 6, 3, '20160415', 15),
    (2, 2, 7, 1, '20160420', 40),
    (3, 3, 8, 2, '20160505', 80)
]

trips_rdd = sc.parallelize(trips)

# Parcial
1) El servicio meteorológico registra datos del tiempo para todas las ciudades donde posee una base de medición.
Esta información se encuentra en dos RDD. 

En el primero se tiene información de las bases de medición: (ID_BASE, NOMBRE, PCIA, CIUDAD, LAT, LON).

El segundo posee información sobre las mediciones en sí: (TIMESTAMP, ID_BASE, TEMPERATURA, HUMEDAD, PRESIÓN, DIRECCIÓN VIENTO, VELOCIDAD VIENTO).

Se desea obtener un reporte para las bases de la Provincia de Buenos Aires. 

El mismo debe informar los ID y nombre de bases de medición que hayan registrado una variación de temperatura promedio mensual mayor al 30% en el año 2018 (dada la temperatura promedio de un mes, esta se debe comparar con el promedio del mes anterior, para determinar la variación porcentual).


In [None]:
# create the Spark Session
spark = SparkSession.builder.getOrCreate()
# create the Spark Context
sc = spark.sparkContext

#Datos aleatorios extraidos de la web para aportar un set ejemplo y utilización de random

#CANTIDAD de datos
m = 1000

provincias_a = ['Álava', 'Albacete','Alicante','Almería', 'Asturias','Ávila','Badajoz','Barcelona','Cordoba','Cordoba','Cordoba','Cordoba','Cordoba','Cordoba','Cordoba','Cordoba','Cordoba','Cordoba','Cordoba','Cordoba','Cordoba']
ciudades_a = ["Andalucía", "Aragón", "Canarias", "Cantabria", "Castilla y León", "Castilla-La Mancha", "Cataluña", "Ceuta", "Comunidad Valenciana", "Comunidad de Madrid", "Extremadura", "Galicia", "Islas Baleares", "La Rioja", "Melilla", "Navarra", "País Vasco", "Principado de Asturias", "Región de Murcia"]
nombres_bases = ["one", "two", "three", "four", "five","six", "seven", "eight", "nine", "ten", "eleven"]
direcciones_a = ["S","E","O","N","SO","SE","NE","NO","h","m","y","x"]

nombres = []
provincias = []
ciudades = []
bases_id = []
direcciones = []

for i in range(m):
    j = random.randint(0,9) #para que se correspondan las mismas id
    bases_id.append(j)
    nombres.append(nombres_bases[j-1])
    ciudades.append(choice(ciudades_a))
    provincias.append(choice(provincias_a))
    direcciones.append(choice(direcciones_a))

#CREACION DE FECHAS===============================================================================================

def str_time_prop(start, end, format, prop):
    stime = time.mktime(time.strptime(start, format))
    etime = time.mktime(time.strptime(end, format))
    ptime = stime + prop * (etime - stime)
    return time.strftime(format, time.localtime(ptime))

def random_date(start, end, prop):
    return str_time_prop(start, end, '%m-%d-%Y', prop)

dates = []

for i in range(m):
    dates.append( datetime.strptime(random_date("1-1-2016", "1-1-2020", random.random()), '%m-%d-%Y'))

#CREACION DEL SET DE DATOS===================================================================================

df = pd.DataFrame({'ID_BASE': bases_id,
                   'NOMBRE': nombres,
                   'PCIA': provincias,
                   'CIUDAD':ciudades,}
                  )

df2 = pd.DataFrame({'TIMESTAMP': sorted(dates),
                   'ID_BASE': bases_id,
                   'TEMPERATURA': random.sample(range(0,m),k=m),
                   'HUMEDAD':random.sample(range(0,m),k=m),
                   'PRESION':random.sample(range(0,m),k=m),
                   'DIRECCION VIENTO':direcciones,
                   'VELOCIDAD VIENTO':random.sample(range(0,m),k=m),}
                  )

#MODIFICACION DE LOS DATOS DE LA COLUMNDA  TIMESTAP A DATE CORRECTAMENTE
df2["TIMESTAMP"] = pd.to_datetime(df2["TIMESTAMP"])

sqlContext = SQLContext(sc)
df1 = sqlContext.createDataFrame(df)
df2 = sqlContext.createDataFrame(df2)
bases = df1.rdd
mediciones = df2.rdd 

# Parcial
1) Dado los acontecimientos en USA, deseamos obtener datos que nos den mayor información sobre las muertes de personas de raza negra por parte de oficiales de policía.

Para ello, tenemos un csv con información sobre las muertes por parte de oficiales de policía en USA desde 2015 hasta 2017: (name, date, race, city, state)

Y otro csv con información sobre el porcentaje de cada raza en las ciudades de USA: (state, city, share_white, share_black, share_native_american, share_asian, share_hispanic) Se pide:

a) Obtener el estado con mayor porcentaje de muertes de personas de raza negra teniendo en cuenta la cantidad total de muertes por parte de oficiales en ese estado. (10 pts)

b) Obtener los 10 estados con mayor diferencia entre el porcentaje de muertes y el porcentaje de gente de raza negra en ese estado. Para ello, considerar el porcentaje de raza de un estado como el promedio de los valores de sus ciudades. Por ejemplo si en Texas el porcentaje de muertes de personas de raza negra por parte de la policía es del 36% y el promedio de share_black para Texas es 24% la diferencia es 0.12. (15 pts) Resolver ambos puntos usando la API de RDDs de PySpark.

In [2]:
muertes = [
    ('Juan', '2016-01-01', 'negra','ciudad1','estado1'),
    ('Gerardo', '2016-01-01', 'negra','ciudad1','estado1'),
    ('Silvia', '2016-01-01', 'blanca','ciudad2','estado2'),
    ('Margarita', '2016-01-01', 'blanca','ciudad3','estado2'),
    ('Andres', '2016-01-01', 'asiatico','ciudad4','estado3'),
    ('Pablo', '2016-01-01', 'asiatico','ciudad5','estado3')
]

ciudades = [
    ('estado1', 'ciudad1', 10,40,20,20,10),
    ('estado2', 'ciudad2', 40,10,20,20,10),
    ('estado2', 'ciudad3', 10,10,20,20,40),
    ('estado3', 'ciudad4', 15,15,15,15,40),
    ('estado3', 'ciudad5', 70,20,0,10,0)
]

muertes_rdd = sc.parallelize(muertes)
ciudades_rdd = sc.parallelize(ciudades)

In [3]:
muertes_rdd.collect()

[('Juan', '2016-01-01', 'negra', 'ciudad1', 'estado1'),
 ('Gerardo', '2016-01-01', 'negra', 'ciudad1', 'estado1'),
 ('Silvia', '2016-01-01', 'blanca', 'ciudad2', 'estado2'),
 ('Margarita', '2016-01-01', 'blanca', 'ciudad3', 'estado2'),
 ('Andres', '2016-01-01', 'asiatico', 'ciudad4', 'estado3'),
 ('Pablo', '2016-01-01', 'asiatico', 'ciudad5', 'estado3')]

In [4]:
ciudades_rdd.collect()

[('estado1', 'ciudad1', 10, 40, 20, 20, 10),
 ('estado2', 'ciudad2', 40, 10, 20, 20, 10),
 ('estado2', 'ciudad3', 10, 10, 20, 20, 40),
 ('estado3', 'ciudad4', 15, 15, 15, 15, 40),
 ('estado3', 'ciudad5', 70, 20, 0, 10, 0)]

###### Punto A

In [9]:
muertes_por_estado = muertes_rdd.map(lambda x: (x[4], (1 if x[2]=='negra' else 0, 1)))\
    .reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1]))\
    .map(lambda x: (x[0], float(x[1][0]/x[1][1])*100))

In [10]:
muertes_por_estado.reduce(lambda x, y: x if x[1]>y[1] else y)

('estado1', 100.0)

###### Punto B

In [11]:
negros_por_ciudad = ciudades_rdd.map(lambda x: (x[0], (x[3], 1)))\
        .reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1]))\
        .map(lambda x: (x[0], float(x[1][0]/x[1][1]))).cache()

In [17]:
agrupo = muertes_por_estado.join(negros_por_ciudad).map(lambda x: (x[0],(x[1][0]-x[1][1]))) #Falta el abs
agrupo.takeOrdered(10, key = lambda x: -x[1])

[('estado1', 60.0), ('estado2', -10.0), ('estado3', -17.5)]

# Parcial 2018 2do cuatri, 1era oport

Tenemos información sobre recetas en 3 RDD de Spark.

Recetas: (id_receta, nombre, tiempo_preparación, dificultad)

Ingredientes: (id_ingrediente, nombre)

Ingredientes por Receta: (id_receta, id_ingrediente, cantidad)

Se pide:

a) Obtener el nombre de todas las recetas que tengan Cordero.

b) Calcular la cantidad total de cada ingrediente si queremos hacer todas las recetas con Cordero que sean fáciles.

In [24]:
recetas = [
    (1, 'receta1', 1, 'facil'),
    (2, 'receta2', 2, 'facil'),
    (3, 'receta3', 2, 'facil'),
    (4, 'receta4', 2, 'medio'),
    (5, 'receta5', 3, 'facil')
]
#id_receta, nombre, tiempo, dificultad

ingredientes = [   
    (1, 'cordero'),
    (2, 'manzana'),
    (3, 'pera'),
    (4, 'tomate'),
    (5, 'aji'),
    (6, 'limon'),
    (7, 'queso')
]

#id ingrediente, ingrediente

ingredientes_por_receta = [
    (1,1,5),
    (1,2,5),
    (1,3,5),
    (1,4,5),
    (2,3,5),
    (2,4,5),
    (2,2,5),
    (3,2,5),
    (3,4,5),
    (3,3,5),
    (3,1,5),
    (3,5,5),
    (4,1,5),
    (4,2,5),
    (5,1,5),
    (5,3,5),
    (5,4,5),
    (5,2,5),
    (5,7,5),
    (5,6,5)
]

#id_receta, id_ingr, cantidad

recetas_rdd = sc.parallelize(recetas)
ingredientes_rdd = sc.parallelize(ingredientes)
ingredientes_por_receta_rdd = sc.parallelize(ingredientes_por_receta)

In [25]:
cordero = ingredientes_rdd.filter(lambda x: x[1]=='cordero')#.map(lambda x: (x[0],x[1]))
ing_por_rec_map = ingredientes_por_receta_rdd.map(lambda x: (x[1],x[0])) #id_ing, (id_receta, cant)
rec_con_cord = cordero.join(ing_por_rec_map).map(lambda x: (x[1][1], x)) #id_rec, (id_ing,nombre_ing,id_rec)
rec_con_cord.collect()

[(1, (1, ('cordero', 1))),
 (3, (1, ('cordero', 3))),
 (4, (1, ('cordero', 4))),
 (5, (1, ('cordero', 5)))]

In [26]:
recetas_con_map = recetas_rdd.map(lambda x: (x[0], (x[1], x[3]))) #id_rec, (nombre, dific)
todo=rec_con_cord.join(recetas_con_map).map(lambda x: (x[1][1][0]))

In [27]:
todo.collect()

['receta1', 'receta3', 'receta4', 'receta5']

###### Parte B

b) Calcular la cantidad total de cada
ingrediente si queremos hacer todas las
recetas con Cordero que sean fáciles. (8
puntos) 


In [28]:
recetas_faciles = recetas_con_map.filter(lambda x: x[1][1]=='facil') #id_rec, (nombre, dificultad)
con_cordero = ingredientes_rdd.filter(lambda x: x[1]=='cordero') #ind_ing, nombre
ing_rec_map = ingredientes_por_receta_rdd.map(lambda x: (x[0], (x[1],x[2]))) #id_rec, (id_ing, cant)

In [29]:
recetas_faciles.collect()
#id receta, nombre, dificultad

[(1, ('receta1', 'facil')),
 (2, ('receta2', 'facil')),
 (3, ('receta3', 'facil')),
 (5, ('receta5', 'facil'))]

In [30]:
recetas_ingredientes_faciles = ing_rec_map.rightOuterJoin(recetas_faciles)
recetas_ingredientes_faciles.collect()
#id_receta, id_ing, cantidad, nombre, dificultad

[(1, ((1, 5), ('receta1', 'facil'))),
 (1, ((2, 5), ('receta1', 'facil'))),
 (1, ((3, 5), ('receta1', 'facil'))),
 (1, ((4, 5), ('receta1', 'facil'))),
 (2, ((3, 5), ('receta2', 'facil'))),
 (2, ((4, 5), ('receta2', 'facil'))),
 (2, ((2, 5), ('receta2', 'facil'))),
 (3, ((2, 5), ('receta3', 'facil'))),
 (3, ((4, 5), ('receta3', 'facil'))),
 (3, ((3, 5), ('receta3', 'facil'))),
 (3, ((1, 5), ('receta3', 'facil'))),
 (3, ((5, 5), ('receta3', 'facil'))),
 (5, ((1, 5), ('receta5', 'facil'))),
 (5, ((3, 5), ('receta5', 'facil'))),
 (5, ((4, 5), ('receta5', 'facil'))),
 (5, ((2, 5), ('receta5', 'facil'))),
 (5, ((7, 5), ('receta5', 'facil'))),
 (5, ((6, 5), ('receta5', 'facil')))]

In [8]:
con_cordero.collect()
#id_ind, nombre

[(1, 'cordero')]

In [39]:
ingredientes_por_receta = ing_rec_map.map(lambda x: (x[1][0], x[0])).join(con_cordero).map(lambda x: (x[1][0],x))
ingredientes_por_receta.collect()
#id_ing, id_rect, nombre_ing

[(1, (1, (1, 'cordero'))),
 (3, (1, (3, 'cordero'))),
 (4, (1, (4, 'cordero'))),
 (5, (1, (5, 'cordero')))]

In [42]:
ing_mas_recetas = ingredientes_por_receta.join(recetas_ingredientes_faciles).map(lambda x: (x[1][1][0][0],x[1][1][0][1])).reduceByKey(lambda x, y: x+y)
ing_mas_recetas.collect()

[(1, 15), (2, 15), (3, 15), (4, 15), (5, 5), (6, 5), (7, 5)]

In [None]:
recetas_faciles = recetas_con_map.filter(lambda x: x[1][1]=='facil') #id_rec, (nombre, dificultad)
con_cordero = ingredientes_rdd.filter(lambda x: x[1]=='cordero') #ind_ing, nombre
ing_rec_map = ingredientes_por_receta_rdd.map(lambda x: (x[0], (x[1],x[2]))) #id_rec, (id_ing, cant)
recetas_ingredientes_faciles = ing_rec_map.rightOuterJoin(recetas_faciles)
ingredientes_por_receta = ing_rec_map.map(lambda x: (x[1][0], x[0])).join(con_cordero).map(lambda x: (x[1][0],x))
ing_mas_recetas = ingredientes_por_receta.join(recetas_ingredientes_faciles).map(lambda x: (x[1][1][0][0],x[1][1][0][1])).reduceByKey(lambda x, y: x+y)


# Parcial 2018 2C Parcial 3

In [7]:
usuarios = [
    (1,"n1","arg",1),
    (2,"n2","arg",2),
    (3,"n3","bra",3),
    (4,"n4","arg",4),
    (5,"n5","chi",5),
    (6,"n6","arg",6)
]]
]

usuarios_rdd = sc.parallelize(usuarios)

comentarios = [
    (1,1,1,'hola soy #pedro #pato #lucas #pato',1),
    (2,2,2,'hola soy #marcos #marcos #carlos #matias',1),
    (3,3,3,'hola soy #juan #pedro #tito #emilio',1),
    (4,4,4,'hola soy #emilio #pato #tito #matias',1),
    (5,5,5,'hola soy #pato #carlos #marcos #pedro',1),
    (6,6,6,'hola soy #emilio #tito #matias #marcos',1),
    (7,1,7,'hola soy #pedro #marcos #pedro #matias',1),
    (8,2,8,'hola soy #carlos #emilio #pedro #lucas',1),
    (9,3,9,'hola soy #emilio #juan #marcos #emilio',1),
    (10,4,10,'hola soy #matias #pedro #carlos #emilio',1),
    (11,5,11,'hola soy #pato #matias #pedro #emilio',1),
    (12,6,12,'hola soy #lucas #emilio #tito #ruben',1),
    (13,1,13,'hola soy #marcos #juan #juan #pedro',1),
    (14,2,14,'hola soy #pato #juan #matias #pedro',1),
    (15,3,15,'hola soy #lucas #emilio #lucas #matias',1),
    (16,4,16,'hola soy #carlos #lucas #ruben #juan',1),
    (17,5,17,'hola soy #pedro #pedro #matias #marcos',1),
    (18,6,18,'hola soy #carlos #tito #emilio #ruben',1),
    (19,1,19,'hola soy #carlos #pedro #tito #matias',1),
    (20,2,20,'hola soy #ruben #marcos #tito #marcos',1),
    (21,3,21,'hola soy #lucas #pedro #carlos #emilio',1),
    (22,4,22,'hola soy #pedro #tito #marcos #tito',1),
    (23,5,23,'hola soy #marcos #juan #marcos #tito',1),
    (24,6,24,'hola soy #pedro #juan #pedro',1),
    (25,1,25,'hola soy #lucas #marcos #carlos',1),
    (26,2,26,'hola soy #juan #ruben #lucas',1),
    (27,3,27,'hola soy #marcos #matias #tito',1),
    (28,4,28,'hola soy #pato #matias #ruben',1),
    (29,5,29,'hola soy #lucas #emilio #lucas',1),
    (30,6,30,'hola soy #pedro #matias #matias',1),
    (31,1,31,'hola soy #tito #ruben #juan',1),
    (32,2,32,'hola soy #pato #ruben #pato',1),
    (33,3,33,'hola soy #pato #tito #marcos',1),
    (34,4,34,'hola soy #marcos #lucas #marcos',1),
    (35,5,35,'hola soy #carlos #carlos #pedro',1),
    (36,6,36,'hola soy #emilio #pedro #ruben',1),
    (37,1,37,'hola soy #emilio #matias #carlos',1),
    (38,2,38,'hola soy #matias #juan #pedro',1),
    (39,3,39,'hola soy #matias #ruben #matias',1),
    (40,4,40,'hola soy #pato #emilio #pato',1),
    (41,5,41,'hola soy #marcos #emilio #tito',1),
    (42,6,42,'hola soy #carlos #matias',1),
    (43,1,43,'hola soy #lucas #matias',1),
    (44,2,44,'hola soy #matias #emilio',1),
    (45,3,45,'hola soy #carlos #carlos',1),
    (46,4,46,'hola soy #carlos #matias',1),
    (47,5,47,'hola soy #carlos #lucas',1),
    (48,6,48,'hola soy #ruben #juan',1),
    (49,1,49,'hola soy #lucas #tito',1),
    (50,2,50,'hola soy #carlos #marcos',1),
    (51,3,51,'hola soy #juan #carlos',1),
    (52,4,52,'hola soy #pedro #marcos',1),
    (53,5,53,'hola soy #pato #juan',1),
    (54,6,54,'hola soy #ruben #ruben',1),
    (55,1,55,'hola soy #marcos #pato',1),
    (56,2,56,'hola soy #emilio #pato',1),
    (57,3,57,'hola soy #lucas #carlos',1),
    (58,4,58,'hola soy #emilio #ruben',1),
    (59,5,59,'hola soy #ruben',1),
    (60,6,60,'hola soy #tito',1),
    (61,1,61,'hola soy #tito',1),
    (62,2,62,'hola soy #juan',1),
    (63,3,63,'hola soy #emilio',1),
    (64,4,64,'hola soy #lucas',1),
    (65,5,65,'hola soy #pato',1),
    (66,6,66,'hola soy #lucas',1),
    (67,1,67,'hola soy #juan',1),
    (68,2,68,'hola soy #pedro',1),
    (69,3,69,'hola soy #pato',1),
    (70,4,70,'hola soy #tito',1),
    (71,5,71,'hola soy #juan',1),
    (72,6,72,'hola soy #lucas',1)
]

comentarios_rdd = sc.parallelize(comentarios)

In [51]:
usuarios_rdd.take(1)

[(1, 'n1', 'arg', 1)]

In [52]:
comentarios_rdd.take(1)

[(1, 1, 1, 'hola soy #pedro #pato #lucas #pato', 1)]

In [53]:
usuarios_arg = usuarios_rdd.filter(lambda x: x[2]=="arg").map(lambda x: (x[0], x[2]))
usuarios_arg.take(1)

[(1, 'arg')]

In [54]:
comentarios_map = comentarios_rdd.map(lambda x: (x[1],x[3]))
comentarios_map.take(1)

[(1, 'hola soy #pedro #pato #lucas #pato')]

In [58]:
union = comentarios_map.join(usuarios_arg).map(lambda x: (x[1][0]))
union.take(2)

['hola soy #pedro #pato #lucas #pato',
 'hola soy #pedro #marcos #pedro #matias']

In [57]:
separados = union.flatMap(lambda x: x.split(" ")).filter(lambda x: x.startswith("#"))\
                .map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)
separados.takeOrdered(5, key= lambda x: -x[1])

[('#pedro', 16),
 ('#matias', 16),
 ('#marcos', 14),
 ('#tito', 13),
 ('#ruben', 13)]

#### Punto B

In [85]:
comentarios_mapeado = comentarios_rdd.flatMap(lambda x: [(word, x[1]) for word in x[3].split()])\
                        .filter(lambda x: x[0].startswith("#"))
join = comentarios_mapeado.join(comentarios_mapeado).filter(lambda x: x[1][0]!=x[1][1])\
                            .map(lambda x: (x[0], (x[1][0], x[1][1])) if (x[1][0]<x[1][1]) else (x[0], (x[1][1], x[1][0])))\
                            .distinct().cache()

# (#palabra, 1)
# (palabra2, 2)
# (#palabra, 2)

# (#palabra, (1, 2))

# (#palabra, (1, 3))
# (#palabra, (2, 3))



In [87]:
join.take(10)

[('#juan', (3, 5)),
 ('#pato', (3, 4)),
 ('#pedro', (1, 5)),
 ('#tito', (4, 5)),
 ('#tito', (5, 6)),
 ('#marcos', (2, 4)),
 ('#emilio', (2, 4)),
 ('#lucas', (1, 4)),
 ('#carlos', (1, 4)),
 ('#ruben', (1, 4))]

# 2019 1C 2op
1) ACARA posee información histórica sobre la venta de autos 0km
en la Argentina. Posee un RDD con información de cada modelo
(marca, modelo, motor, transmisión, origen) y otro con
la información de ventas (marca, modelo, fecha,
concesionario).
Se desea conocer, para los modelos de origen nacional, cuales son
los modelos que ya se discontinuaron (un modelo discontinuado es
aquel que no tuvo ventas en los últimos 12 meses), obteniendo un
nuevo RDD con (marca, modelo, total_vendido,
fecha_inicio_venta, fecha_discontinuación), donde la
fecha de discontinuación es la fecha cuando se vendió el último auto
de ese modelo, ordenado ascendentemente por esta fecha. (***) (
15pts)
Aclaración: Se puede asumir que el primer RDD tiene un único
registro para cada Marca y Modelo.


In [8]:
autos = [
    ('chevrolet',"m1",1,"m","arg"),
    ('chevrolet',"m2",2,"m","arg"),
    ('chevrolet',"m3",3,"m","arg"),
    ('ford',"m4",4,"m","arg"),
    ('ford',"m5",5,"m","bra"),
    ('ford',"m6",6,"m","arg"),
    ('chevrolet',"m7",7,"a","bra"),
    ('chevrolet',"m8",8,"a","arg"),
    ('chevrolet',"m9",9,"a","bra"),
    ('ford',"m10",10,"a","arg"),
    ('ford',"m11",11,"a","arg"),
    ('ford',"m12",12,"a","arg")
]

ventas = [
    ('chevrolet',"m1",8,2),
    ('chevrolet',"m2",9,2),
    ('chevrolet',"m3",12,1),
    ('ford',"m4",8,3),
    ('ford',"m5",6,3),
    ('ford',"m6",1,3),
    ('chevrolet',"m7",1,1),
    ('chevrolet',"m8",11,3),
    ('chevrolet',"m9",3,3),
    ('ford',"m10",9,3),
    ('ford',"m11",2,2),
    ('ford',"m12",2,2),
    ('chevrolet',"m1",5,2),
    ('chevrolet',"m2",13,3),
    ('chevrolet',"m3",2,1),
    ('ford',"m4",7,3),
    ('ford',"m5",9,2),
    ('ford',"m6",2,3),
    ('chevrolet',"m7",1,1),
    ('chevrolet',"m8",2,1),
    ('chevrolet',"m9",11,3),
    ('ford',"m10",10,2),
    ('ford',"m11",1,3),
    ('ford',"m12",3,2),
    ('chevrolet',"m1",6,3),
    ('chevrolet',"m2",1,3),
    ('chevrolet',"m3",8,2),
    ('ford',"m4",12,2),
    ('ford',"m5",2,2),
    ('ford',"m6",2,1),
    ('chevrolet',"m7",12,1),
    ('chevrolet',"m8",6,1),
    ('chevrolet',"m9",9,2),
    ('ford',"m10",12,3),
    ('ford',"m11",1,3),
    ('ford',"m12",9,1)
]

autos_rdd=sc.parallelize(autos)
ventas_rdd = sc.parallelize(ventas)

In [9]:
autos_rdd.collect()

[('chevrolet', 'm1', 1, 'm', 'arg'),
 ('chevrolet', 'm2', 2, 'm', 'arg'),
 ('chevrolet', 'm3', 3, 'm', 'arg'),
 ('ford', 'm4', 4, 'm', 'arg'),
 ('ford', 'm5', 5, 'm', 'bra'),
 ('ford', 'm6', 6, 'm', 'arg'),
 ('chevrolet', 'm7', 7, 'a', 'bra'),
 ('chevrolet', 'm8', 8, 'a', 'arg'),
 ('chevrolet', 'm9', 9, 'a', 'bra'),
 ('ford', 'm10', 10, 'a', 'arg'),
 ('ford', 'm11', 11, 'a', 'arg'),
 ('ford', 'm12', 12, 'a', 'arg')]

In [11]:
ventas_rdd.take(5)

[('chevrolet', 'm1', 8, 2),
 ('chevrolet', 'm2', 9, 2),
 ('chevrolet', 'm3', 12, 1),
 ('ford', 'm4', 8, 3),
 ('ford', 'm5', 6, 3)]

In [13]:
autos_arg = autos_rdd.filter(lambda x: x[4]=="arg").map(lambda x: ((x[0],x[1]),x[4]))
autos_arg.collect()

[(('chevrolet', 'm1'), 'arg'),
 (('chevrolet', 'm2'), 'arg'),
 (('chevrolet', 'm3'), 'arg'),
 (('ford', 'm4'), 'arg'),
 (('ford', 'm6'), 'arg'),
 (('chevrolet', 'm8'), 'arg'),
 (('ford', 'm10'), 'arg'),
 (('ford', 'm11'), 'arg'),
 (('ford', 'm12'), 'arg')]

In [16]:
ventas_modelos = ventas_rdd.map(lambda x: ((x[0],x[1]), x[2]))
ventas_modelos.take(5)

[(('chevrolet', 'm1'), 8),
 (('chevrolet', 'm2'), 9),
 (('chevrolet', 'm3'), 12),
 (('ford', 'm4'), 8),
 (('ford', 'm5'), 6)]

In [22]:
autos_y_ventas = autos_arg.join(ventas_modelos).map(lambda x: (x[0], (x[1][1], x[1][1])))
autos_y_ventas.take(5)

[(('chevrolet', 'm3'), (12, 12)),
 (('chevrolet', 'm3'), (2, 2)),
 (('chevrolet', 'm3'), (8, 8)),
 (('chevrolet', 'm8'), (11, 11)),
 (('chevrolet', 'm8'), (2, 2))]

In [31]:
autos_reducido = autos_y_ventas.reduceByKey(lambda x,y: (x[0] if x[0]<y[0] else y[0], x[1] if x[1]>y[1] else y[1]))\
            .filter(lambda x: x[1][1]<=2)#.map()
autos_reducido.collect()

[(('ford', 'm11'), (1, 2)), (('ford', 'm6'), (1, 2))]