In [14]:
import pyspark

try: 
    type(sc)
except NameError:
    sc = pyspark.SparkContext('local[*]')

# Broadcast Join (map-side join)

## Variable Broadcast

Una variable Broadcast nos permite mantener una variable solo lectura cacheada en cada una de las maquinas del cluster en vez de enviar esa informacion con cada una de las tareas que se envian al cluster.

Esto es particularmente util cuando cuando tareas a partir de multiples etapas (stages) necesitan la misma informacion o cuando cachear informacion de forma deserealizada es importante.

Tener en cuenta que esto **es posible** cuando uno de los data sets o conjunto de datos **es lo suficientemente pequeño para ser broadcasteado a todos los nodos/workers del cluster**.

In [15]:
# Vamos a suponer que tenemos un RDD de productos por sus IDs identificando ventas de los mismos
prodsList = [1,11,1,4,5,11,2,3,4,5,6,4,5,4,3,2,1,11,2,3,4,5,6,4,3,2,1,1]
prods = sc.parallelize(prodsList,3)

In [16]:
# Un hash con los productos y sus nombres
productNames = {1:'papas',
                2:'cebollas',
                3:'tomates',
                4:'zanahorias',
                5:'batatas',
                6:'peras',
                7:'cilantro',
                8:'apio',
                9:'morrones',
                10:'manzanas',
                11:'naranjas'}

#hacemos un broadcast de la variable
bproductNames = sc.broadcast(productNames)

In [18]:
# Buscamos los productos que se vendieron mas de 4 veces
popularProds = prods.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).filter(lambda x:x[1]>=4)
print popularProds.collect()

[(3, 4), (1, 5), (4, 6), (2, 4), (5, 4)]


## Realizand un map-side join

El join se realiza de forma implicita usando un map y dentro del mismo accediendo a la informacion de la variable a la que se realizo el broadcast via ```.value```

In [12]:
# Buscamos los nombres
# map side-join
# a partir del broadcast la informacion de los nombres de los productos fueron distribuidos
# a los nodos en el cluster y son consultados en el map (haciendo el join implicito)
popularProds = popularProds.map(lambda x:(bproductNames.value[x[0]],x[0],x[1]))
print popularProds.collect()                

[('tomates', 3, 4), ('papas', 1, 5), ('zanahorias', 4, 6), ('cebollas', 2, 4), ('batatas', 5, 4)]


## Ventajas

Cuando un valor es broadcasted al cluster, este es copiado a los nodos/workers **solo una vez** (en vez de multiples veces si la informacion fuera a enviarse en cada task). De esta forma se hara que la aplicacion sea mas rapida. 


### Algunas referencias extras interesantes
- [https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables](https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables)
- [http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/](http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/)
- [https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-broadcast.html](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-broadcast.html)