Tenemos información sobre recetas en 3 RDD de Spark.

Recetas: (id_receta, nombre, tiempo_preparación, dificultad)
Ingredientes: (id_ingrediente, nombre)
Ingredientes por Receta: (id_receta, id_ingrediente, cantidad)

Se pide:
a) Obtener el nombre de todas las recetas que tengan Cordero. (7 puntos)
b) Calcular la cantidad total de cada ingrediente si queremos hacer todas las recetas con Cordero que sean fáciles.


In [1]:
import pyspark

try: 
    type(sc)
except NameError:
    sc = pyspark.SparkContext('local[*]')

In [2]:
recetas = [
    (1, "milanesas con ensalada", 60, "facil"),
    (4, "sushi", 100, "difícil"),
    (5, "asado", 110, "facil" ),
    (7, "cordero a la cerveza", 60, "facil"),
    (8, "cordero al disco", 130, "dificil")
]

In [3]:
ingredientes = [
    (1, "pescado"),
    (2, "cordero"),
    (3, "cerveza"),
    (4, "morron"),
    (5, "sal"),
    (6, "vino tinto"),
    (7, "lechuga")
]

In [4]:
ingred_x_receta = [
    (5, 2, 1000),
    (5, 5, 10),
    (4, 1, 500),
    (1, 7, 100),
    (7, 2, 500),
    (7, 3, 1000),
    (8, 2, 800),
    (8, 4, 100)
]

### Resolveremos primero el punto (a)
Obtener el nombre de todas las recetas que tengan Cordero

In [5]:
recetas_rdd = sc.parallelize(recetas)
ingred_recetas_rdd = sc.parallelize(ingred_x_receta)
ingredientes_rdd = sc.parallelize(ingredientes)

In [6]:
cordero = ingredientes_rdd.filter(lambda x: x[1] == "cordero")

In [7]:
rec_con_cordero = ingred_recetas_rdd.map(lambda x: (x[1], x[0]))

In [8]:
rec_con_cordero.collect()  #en cada tupla tenemos (id_ingrediente, id_receta)

[(2, 5), (5, 5), (1, 4), (7, 1), (2, 7), (3, 7), (2, 8), (4, 8)]

In [9]:
cordero.collect()

[(2, 'cordero')]

In [10]:
join_a = cordero.join(rec_con_cordero)

In [11]:
join_a.collect() #ahora tengo tuplas de la forma (id_ingrediente, ('cordero', id_receta))

[(2, ('cordero', 5)), (2, ('cordero', 7)), (2, ('cordero', 8))]

##### Ahora hacemos todo el join en una sola linea

In [12]:
ids_recetas_con_cordero = cordero.join(rec_con_cordero).map(lambda x: (x[1][1], None))

In [13]:
ids_recetas_con_cordero.collect() #lo que hicimos fue mapear un None a cada id para luego poder joinear

[(5, None), (7, None), (8, None)]

In [14]:
resul = recetas_rdd.map(lambda x: (x[0], x[1])).join(ids_recetas_con_cordero)
#lo que hicimos fue mapear una tupla clave valor en el rdd de recetas (para poder hacer el join) de la forma (id_receta, nombre_receta)
#luego joineamos con los ids de las recetas

In [15]:
recetas_con_cordero = resul.map(lambda x: x[1][0])

In [16]:
recetas_con_cordero.collect()

['asado', 'cordero a la cerveza', 'cordero al disco']

### Resolveremos ahora el punto (b)
Calcular la cantidad total de cada ingrediente si queremos hacer todas las recetas con Cordero que sean fáciles.

In [17]:
ids_recetas_faciles_con_cordero = recetas_rdd.filter(lambda x: x[3] == "facil")\
.map(lambda x: (x[1],x[0]))\
.join(recetas_con_cordero.map(lambda x: (x, None)))\
.map(lambda x: (x[1][0], None))

In [18]:
ids_recetas_faciles_con_cordero.collect()

[(5, None), (7, None)]

In [19]:
ingredientes_utilizados_faciles_cordero = ids_recetas_faciles_con_cordero.join(\
ingred_recetas_rdd.map(lambda x: ( x[0], (x[1], x[2]))))\
.map(lambda x: (x[1][1][0], x[1][1][1] ))

In [20]:
# hasta antes del ultimo map tengo (id_receta, (None, (id_ingrediente,cantidad))) entonces mapeo para que me quede todo ordenado
# para realizar el reduceByKey

In [21]:
ingredientes_utilizados_faciles_cordero.collect()

[(2, 1000), (5, 10), (2, 500), (3, 1000)]

In [22]:
#Hasta antes del map tengo (id_ingrediente , (cant_utilizada, nombre_ingrediente))

In [23]:
ingredientes_cantidad = ingredientes_utilizados_faciles_cordero.reduceByKey(lambda x,y: x + y).join(ingredientes_rdd)\
.map(lambda x: (x[1][1], x[1][0]))

ingredientes_cantidad.collect()

[('cordero', 1500), ('cerveza', 1000), ('sal', 10)]

In [24]:
#Dato: El takeOrdered ordena de menor a mayor! 