# Sistemas distribuidos 3: SPARK STREAMING con KAFKA

M.C. Gálvez Ortiz



## Proceso inicial y setup:

1. Se crea el entorno de Anaconda Python para Python v3.5. 
2. En el entorno, se ejecuta zookeeeper y después Kafka.
3. Se crea un directorio `checkpoint`, en este caso dentro del subdirectorio `data` donde también se copia el fichero con los tweets a analizar, **DATASET-Twitter-23-26-Mar-2014-MotoGP-Qatar.csv**.
4. Se ejecuta el programa que hace de productor leyendo el fichero de tweets. Se ha usado el 5-kafka_producer.py o el 3-kafka_producer.py proporcionado en los ejemplos de clase.
`python 5-kafka_producer.py 0.1 0.3 Quatar_GP_2014 data/DATASET-Twitter-23-26-Mar-2014-MotoGP-Qatar.csv`.
5. Se lanza el jupyter-notebook. 

## Practica

In [1]:
# Importación de dependencias y funciones.
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add
from operator import sub

In [2]:
# Carga de paquetes externos.
import os
packages = "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1"

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)

In [3]:
# Activación del contexto spark.
sc = SparkContext(appName="PracticaSD3_Quatar_GP_2014")

In [4]:
# Creación del contexto de Spark Streaming.
ssc = StreamingContext(sc, 5)

In [5]:
#Parseo de datos.
import csv

def parseOrder(line):
    #print(line)
    s=next(csv.reader([line]))
    
       
    try:
        return [{"Id": s[0], "Parent_sys_id": s[1], "Source": s[2],
               "Mentions": s[3],"Target": s[4], "Name_source": s[5],
               "Body": s[6],"Pub_date": s[7],
               "URLs": s[8],"Tipe_action": s[9],
              "Link": s[10], "Has_link": s[11], "Has_picture": s[12],
              "Website": s[13],
              "Country": s[14],"Activity": s[15], "Followers": s[16],
               "Following": s[17],"Location": s[18]}]
    
    
    except Exception as err:
        print("Wrong line format (%s): " % line)
        return []
    

In [6]:
#Envio de datos a kafka con la misma dirección que usa el producer para mandar los datos.

kafkaBrokerIPPort = "127.0.0.1:9092"


In [7]:
# Kafka: lectura de datos con el mismo topic lanzado en el producer.
kafkaParams = {"metadata.broker.list": kafkaBrokerIPPort}
stream = KafkaUtils.createDirectStream(ssc, ["Quatar_GP_2014"], kafkaParams)
stream = stream.map(lambda o: str(o[1]))


## a) Calcular el número total de menciones recibidas por cada cuenta de usuario durante el intervalo de 5 segundos.

In [8]:
#Lectura de la lista que va llegando en el streaming. 

lista = stream.flatMap(parseOrder)

#Como dentro del campo "Menciones" las diferentes menciones estan separadas por comas,
# se hace el mapeo dividiendo el campo, se asigna un valor 1 a cada mención encontrada
# creando la tupla clave-valor, y se hace la suma por clave en el "reduceByKey".

menciones = lista.flatMap(lambda o: (o['Mentions'].split(','))) \
                  .map(lambda u: (u, 1)).reduceByKey(lambda a, b: a+b)
     
menciones.pprint()


## b) Calcular la frecuencia total acumulada de apariciones de cada hashtag en el campo body, actualizando un ranking con los 5 hashtags con mayor frecuencia de aparición.

In [9]:
#Se define una función suma.
def update_func(new_val, last_sum):
    return sum(new_val) + (last_sum or 0)

In [10]:
#Lectura de la lista que va llegando en el streaming. 
lista = stream.flatMap(parseOrder)

#Igual que en el ejercicio anterior, se mapea el campo "Body" donde se separan las componentes
# por espacio en blanco y se seleccionan las que empiecen por "#" que señalan hastags.

cuerpo_hasht = lista.flatMap(lambda o: (o['Body'].split())).filter(lambda x: x.startswith('#'))      

#Se mapea asignando un valor 1 a cada hastags y se hace la suma acumulada usando el 
#"updateStateByKey" con la función suma definida antes.

hasht_ac=cuerpo_hasht.map(lambda u: (u,1)).updateStateByKey(update_func)

#Para sacar el top 5, se ordenan el resultado de la suma acumulada de mayor a menor, 
#tupla (hashtag,valor suma acumulada), se mapea seleccionando la componente primera de 
#la tupla, esto es el hastags, se indexa y se filtra solo a los primeros 5 índices.

topS5 = hasht_ac.transform(lambda a: a.sortBy(lambda x: x[1], False).map(lambda x: x[0])\
                                     .zipWithIndex().filter(lambda x: x[1] < 5))
                                     
topS5.pprint()

ssc.checkpoint("data/checkpoint/") 

## c) Calcular en una ventana temporal 20 segundos con offset de 10 segundos la frecuencia de aparición de cada uno de los 3 posibles tipos de tweets (TW-RT-MT).

In [11]:
#Lectura de la lista que va llegando en el streaming. 
lista = stream.flatMap(parseOrder)

#Se mapea la lista en el campo "Tipe_action", asignando valor 1 y se hace la suma por clave 
#y por ventana, donde en el offset del 10 segundos se suman los tweets de cada tipo que van
# entrando en la ventana y se restan los que van saliendo.

tweetsPerWindow = lista.map(lambda x: (x['Tipe_action'],1)).reduceByKeyAndWindow(add, sub, windowDuration=20,slideDuration=10)

tweetsPerWindow.repartition(1).pprint()

sc.setCheckpointDir("data/checkpoint/")

# Comienzo de spark-streaming


In [12]:
ssc.start()


-------------------------------------------
Time: 2018-07-15 12:14:55
-------------------------------------------
('motogp', 66)
('andreaiannone29', 4)
('laureussport', 8)
('marcmarquez93', 47)
('lorenzo99', 4)
('valeyellow46', 33)
('aleixespargaro', 4)
('19bautista', 5)

-------------------------------------------
Time: 2018-07-15 12:14:55
-------------------------------------------
('#motogp', 0)
('#qatar', 1)
('#lwsa14', 2)
('#fb', 3)

-------------------------------------------
Time: 2018-07-15 12:15:00
-------------------------------------------
('motogp', 26)
('marcmarquez93', 26)
('valeyellow46', 26)

-------------------------------------------
Time: 2018-07-15 12:15:00
-------------------------------------------
('#motogp', 0)
('#qatar', 1)
('#lwsa14', 2)
('#fb', 3)

-------------------------------------------
Time: 2018-07-15 12:15:00
-------------------------------------------
('MT', 92)

-------------------------------------------
Time: 2018-07-15 12:15:05
------------------

# Parada de spark-streaming

In [13]:
ssc.stop(False)

-------------------------------------------
Time: 2018-07-15 12:15:25
-------------------------------------------
('motogp', 21)
('26_danipedrosa', 21)
('marcmarquez93', 21)
('valeyellow46', 21)

-------------------------------------------
Time: 2018-07-15 12:15:25
-------------------------------------------
('#motogp', 0)
('#qatar', 1)
("#motogp'", 2)
('#lwsa14', 3)
('#fb', 4)

