In [None]:
# Programa unha aplicación que vixile un directorio onde entrarán os ficheiros JSON 
# cos diferentes tweets.
# Interésanos saber os usuarios máis activos na rede en intervalos dunha hora
# executada cada 15 minutos.

In [None]:
# No caso de de lanzar a aplicación desde consola debemos configurar a session
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("StreamingTwitterFilesWindowed") \
    .getOrCreate() 

In [None]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
from pyspark.sql.types import StructType
from pyspark.sql.functions import *

filepath = "file:///home/hduser/input/csv"

# A pesar de que se trata dun JSON, ao ser un streaming temos que pasarlle un Schema
userSchema = StructType().add("created_at", "string").add("screen_name", "string").add("text", "string")

orixe = spark \
    .readStream \
    .schema(userSchema) \
    .json(filepath)

# Convirto o string da data nun timestamp
orixe_datado = orixe.withColumn('created_at',to_timestamp(orixe.created_at, 'EEE MMM d HH:mm:ss z yyyy'))

procesado = orixe_datado \
    .groupBy(window(orixe_datado.created_at, "60 minutes", "30 minutes"),'screen_name') \
    .count() \
    .orderBy('window','count',ascending=False)


#procesado = orixe_datado.groupBy(window(orixe_datado.created_at, "3600 minutes", "1800 minutes"),'screen_name').count().orderBy('window','count',ascending=False)

consulta = procesado \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate",False) \
    .start()

consulta.awaitTermination()

In [None]:
## Como exercicio extra podes repetir o exercicio mais en lugar dunha lista dos usuarios máis activos
#  trataríase de calcular o TT ou Trending Topic, tendo en conta o número de aparicións dos HASHTAGS, 
#  palabras precedidas por un símbolo #

In [None]:
#Nunha primeira aproximación estudamos como contar os #hashtags

from pyspark.sql.functions import *
orixe = spark.read.json("file:///home/hduser/input/tweets/tweets.json")

#orixe.printSchema()

# Usamos a mesma estratexia que no wordcount, pero filtranso despois os #hashtags
palabras = orixe.select(explode(split(orixe.text, " ")).alias("palabra"))
hashtags = palabras.filter(palabras.palabra.startswith('#'))
hashtags.groupBy('palabra').count().orderBy('count',ascending=False).show()

In [None]:
# Agora que xa sabemos que funciona, trátase de aplicar un código similar pero con streaming

In [None]:
from pyspark.sql.types import StructType
from pyspark.sql.functions import *

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

filepath = "file:///home/hduser/input/csv"

# A pesar de que se trata dun JSON, ao ser un streaming temos que pasarlle un Schema
userSchema = StructType().add("created_at", "string").add("screen_name", "string").add("text", "string")

orixe = spark \
    .readStream \
    .schema(userSchema) \
    .json(filepath)

# Convirto o string da data nun timestamp
orixe_datado = orixe.withColumn('created_at',to_timestamp(orixe.created_at, 'EEE MMM d HH:mm:ss z yyyy'))

palabras = orixe_datado.select('created_at',explode(split(orixe.text, " ")).alias("palabra"))
hashtags = palabras.filter(palabras.palabra.startswith('#'))

procesado = hashtags \
    .groupBy(window(orixe_datado.created_at, "60 minutes", "30 minutes"),'palabra') \
    .count() \
    .orderBy('window','count',ascending=False)


consulta = procesado \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate",False) \
    .start()

consulta.awaitTermination()