# Information Consumer

In [None]:
# empecemos importando las instancias
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests

In [None]:
# cuenta cuantas veces aparece un hashtag o lo agrega a la lista
def aggregate_tags_count(new_values, total_sum):
    return sum(new_values) + (total_sum or 0)

In [None]:
def get_sql_context_instance(spark_context):
    if 'sqlContextSingletonInstance' not in globals():
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']

In [None]:
def process_rdd(time, rdd):
    print("----------- %s -----------" % str(time))
    #try:
    # obtén el contexto spark sql singleton desde el contexto actual
    sql_context = get_sql_context_instance(rdd.context)
    print("Get spark sql singleton context from the current context ----------- %s -----------" % str(time))

    # convierte el RDD a Row RDD
    row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))

    # crea un DF desde el Row RDD
    hashtags_df = sql_context.createDataFrame(row_rdd)
    print(hashtag_df)

    # Registra el DF como tabla
    hashtags_df.registerTempTable("hashtags")

    # obtenemos la información
    hashtag_counts_df = sql_context.sql(
        '''SELECT hashtag, hashtag_count FROM hashtags ORDER BY hashtag_count DESC LIMIT 10'''
    )# obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos
    hashtag_counts_df.show()
    hashtag_counts_df.coalesce(1)\
        .write.format('com.databricks.spark.csv')\
        .mode('overwrite')\
        .option("header", "true")\
        .csv("hashtag_file.csv") 

    country_counts_df = sql_context.sql(
        '''select word as country_code, 
        word_count as tweet_count from hashtags where word like 'CC%'order by word_count desc limit 10'''
    )# obtén los 10 mejores paises de la tabla utilizando SQL e imprímelos
    country_counts_df.show()
    country_counts_df.coalesce(1)\
        .write.format('com.databricks.spark.csv')\
        .mode('overwrite')\
        .option("header", "true")\
        .csv("country_file.csv")
    #except:
    #    pass

In [None]:
# A continuación creemos la configuración de spark:
conf = SparkConf()
conf.setAppName("Actividad_7")

In [None]:
# creemos el Spark Context
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

In [None]:
# Creamos el Streming Context con intervalo de refresco de 5 segundos
ssc = StreamingContext(sc, 2)

In [None]:
# establecemos un punto de control para la recuperacion de RDD
ssc.checkpoint("checkpoint_TwitterApp")

In [None]:
#por ultimo, leemos los datos:
dataStream = ssc.socketTextStream("localhost",9009)

***

In [None]:
# divide cada Tweet en palabras
words = dataStream.flatMap(lambda line: line.split(" "))

# filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))

# agrega la cuenta de cada hashtag a su última cuenta
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)

# procesa cada RDD generado en cada intervalo
tags_totals.foreachRDD(process_rdd)

# comienza la computación de streaming
ssc.start()

# espera que la transmisión termine
ssc.awaitTermination()