# Importación y creación del contexto

In [1]:
# Importación de dependencias y funciones
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add
from operator import sub

In [2]:
# Load external packages programatically
import os
packages = "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1"
# os.environ["PYSPARK_PYTHON"] = ""
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)

In [3]:
sc = SparkContext(appName="streaming")

In [4]:
# Crear el contexto de Spark Streaming
ssc = StreamingContext(sc, 5)

# Parseo de datos de entrada

In [5]:
from datetime import datetime
import re

def parseOrder(line):
    
    s = re.split(''',(?=(?:[^'"]|'[^']*'|"[^"]*")*$)''', line)
    
    try:
       
      return [{"ID": s[0],
               "PARENT-SYS-ID": s[1],
               "Source": s[2],
               "Mentions":s[3],
               "Target":s[4],
               "Name_source":s[5],
               "Body": s[6],
               "Pub_date":s[7],
               "URLs":s[8],
               "Tipe_action":s[9],
               "Link":s[10],
               "Has_link":s[11],
               "Has_picture":s[12],
               "Website":s[13],
               "Country":s[14],
               "Activity":s[15],
               "Followers":s[16],
               "Following":s[17],
                "Location":s[18]
               }]
    except Exception as err:
        print("Wrong line format (%s): " % line)
        return []
    

  

# Envio de datos a Kafka

In [6]:
# Configura el endpoint para localizar el broker de Kafka
# kafkaBrokerIPPort = "172.20.1.21:9092"
kafkaBrokerIPPort = "127.0.0.1:9092"

# Productor simple (Singleton!)
# from kafka import KafkaProducer
import kafka
class KafkaProducerWrapper(object):
  producer = None
  @staticmethod
  def getProducer(brokerList):
    if KafkaProducerWrapper.producer != None:
      return KafkaProducerWrapper.producer
    else:
      KafkaProducerWrapper.producer = kafka.KafkaProducer(bootstrap_servers=brokerList,
                                                          key_serializer=str.encode,
                                                          value_serializer=str.encode)
      return KafkaProducerWrapper.producer

# Envía métricas a Kafka! (salida)  
def sendMetrics(itr):
  prod = KafkaProducerWrapper.getProducer([kafkaBrokerIPPort])
  for m in itr:
    prod.send("metrics", key=m[0], value=m[0]+","+str(m[1]))
  prod.flush()

# Lectura de datos 

In [7]:
# Fichero de texto: Lectura de fuente de datos de fichero (no se usa en este ejemplo, en su lugar 
# enviamos los datos a Kafka para crear una simulación más realista)
#stream = ssc.textFileStream("data/input")

# Kafka: Lectura de datos
kafkaParams = {"metadata.broker.list": kafkaBrokerIPPort}
stream = KafkaUtils.createDirectStream(ssc, ["Quatar_GP_2014"], kafkaParams)
stream = stream.map(lambda o: str(o[1]))

# Ejercicio 1

Calcular el número total de menciones recibidas por cada cuenta de usuario durante el intervalo de 5 segundos.

In [None]:
orders = stream.flatMap(parseOrder)
#Filtramos aquellos mensajes que sean menciones
orders_filter = orders.filter((lambda x: True if 'M' in x['Tipe_action'] else False ))
#Un vez tenemos la información filtrada eliminamos aquellos que tengan el campo vacio.
orders_filter_not = orders_filter.filter((lambda x: True if x['Tipe_action'] != '' else False ))
#Split y explode para separar cada uno por la coma.

orders_alone = (orders_filter_not.filter((lambda x: x['Mentions']))
                                 .flatMap(lambda x: x['Mentions'].split(","))
                                 .map(lambda word: (word, 1)) 
                                 .reduceByKey(lambda a, b: a + b))


orders_alone.foreachRDD(lambda rdd: rdd.foreachPartition(sendMetrics))


# Ejercicio 2

Calcular la frecuencia total acumulada de apariciones de cada hastag en el campo body, actulizando un ranking con los 5 hastags con mayor frecuencia de aparicion.

In [None]:
orders = stream.flatMap(parseOrder)

#Cálculo de frecuencias.
frecuencias = (orders.filter((lambda x: x['Body']))
                                 .flatMap(lambda o: o['Body'].split(" "))
                                 .filter(lambda word: True if len(word) != 0 else False)
                                 .filter(lambda word: True if word[0] == '#' else False)
                                 .map(lambda word: (word, 1)) 
                                 .reduceByKey(lambda a, b: a + b))

#Top 5 hastags
FrecuencyState = frecuencias.updateStateByKey(lambda vals, totalOpt: sum(vals) + totalOpt if totalOpt != None else sum(vals))

top5 = (FrecuencyState.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False)
                   .map(lambda x: x[0]).zipWithIndex().filter(lambda x: x[1] < 5)))

top5values = (FrecuencyState.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False)
                   .map(lambda x: x[1]).zipWithIndex().filter(lambda x: x[1] < 5)))

top5List = top5values.repartition(1).map(lambda x: str(x[0])).glom().map(lambda arr: ("Top5values", arr))

top5clList = top5.repartition(1).map(lambda x: str(x[0])).glom().map(lambda arr: ("Top5", arr))


finalStream = FrecuencyState.union(top5clList)

finalStream_2 = FrecuencyState.union(top5List)


finalStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendMetrics))

finalStream_2.foreachRDD(lambda rdd: rdd.foreachPartition(sendMetrics))


sc.setCheckpointDir("data_entrega/checkpoint/")



# Ejercicio 3

Calcular en una ventana temporal 20 segundos con offset de 10 segundos la frecuencia de aparición de cada uno de los 3 posibles tipos de tweets (TW-RT-MT)

In [8]:
def clase(m):
    
    if 'TW' in m:
        return('TW')
    
    elif 'Rt' in m:
        
        return('RT')
    
    else:
        return('MT')
        

In [9]:
orders = stream.flatMap(parseOrder)

orders_filter = orders.filter(lambda x: True if x['Tipe_action'] != '' else False ).filter((lambda x: x['Tipe_action']))

change = (orders_filter.map((lambda x: clase(x['Tipe_action'])))
                              .map(lambda word: (word, 1)) 
                              .reduceByKeyAndWindow(add, sub, 20, 10))
                              
    


change.foreachRDD(lambda rdd: rdd.foreachPartition(sendMetrics))

sc.setCheckpointDir("data_entrega/checkpoint/")


# Start Streaming context

In [10]:
ssc.start()
ssc.awaitTerminationOrTimeout(10)# Espera 10 segs. antes de acabar

False

# Stop Streaming context

In [None]:
ssc.stop(False)