# PROGRAMA DE CIENCIAS DE LOS DATOS 
# **Curso: Big Data**
## **PROYECTO FINAL:**

## <font color='red'>Streaming de tendencias con #hashtags en Twitter</font>. 
## <font color='blue'>Parte 2: Procesamiento de DataStreaming en APACHE SPARK</font>. 

#### **Profesor: MSc. Felipe Meza Obando**


#### Alumnos: 
  
####  **Lester Salazar Viales.**
####  **Randal Salazar Viales.**

El objetivo de esta parte, es poder efectuar el procesamiento de los datos Streaming que son adquiridos del servidor tipo socket. Mediante una conexión tipo cliente, Spark logra obtener los datos de TWITTER del servidor tipo socket y efectuar el procesado correspondiente de los hashtags en tiempo real de ese momento.

Como criterio de diseño, se escogió poder visualizar el top 10 de hashtags populares en el momento de la corrida (hashtags que más se repitieran en ese momento).

## <font color='blue'>Configuración de Apache Spark Streaming</font>.

### **Importación de Librerías** a ser utilizadas para el Streaming de datos.

In [1]:
# Importación de librería PySpark
import pyspark
from pyspark import SparkFiles

# Otras Librerias PySpark
import pyspark.sql.functions as F
from pyspark.sql.types import *

from pyspark.sql.functions import col, date_format, udf 
from pyspark.sql.functions import explode, split, size, from_json
from pyspark.sql.types import DateType

# Librerías del Sistema Operativo
import os

# Librerías Numéricas
import pandas as pd
import numpy as np
from datetime import datetime

# Librerías de Preprocesamiento de datos Twitter
import preprocessor as p
import json

### Importación de FindSpark

In [2]:
import findspark

# Ruta de Apache Spark para sistema Mac/OS
#findspark.init('/opt/spark')

# Ruta de Apache Spark para sistema Windows/OS
findspark.init('C:\Spark')

### Creación de SparkSession

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, size

spark = SparkSession.builder.appName("TwitterSpark").getOrCreate()

### Definición del Schema de datos TWITTER a utilizar

La definición de datos del esquema a emplear, debe ser coincidente con los datos que se envían por streaming desde el servidor socket, de lo contrario, los datos serán almacenados erróneamente y el procesamiento no será lo requerido.

In [4]:
# Definir schema para el input data
schema = StructType([ \
                     StructField("created_at", StringType()),\
                     StructField("text", StringType()),\
                     StructField("userid", StringType()),\
                     StructField("username", StringType()),\
                     StructField("userlocation", StringType()),\
                     StructField("retweet_count", IntegerType()),\
                     StructField("entities", ArrayType(StringType())),\
])

### Creación del StreamDataframe de datos TWITTER

Se debe crear el streamdataframe  que almacena los datos de TWITTER, que son enviados desde el servidor tipo socket configurado anteriormente. Los datos de TWITTER recibidos, son acorde a lo requerido por el schema de datos escogido previamente.

In [5]:
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 5555).load()

* **Visualización de si el dataframe definido anteriormente es un streamdataframe**

In [6]:
# Identificación de si el DataFrame tiene datos Streaming o NO
lines.isStreaming

True

* **Visualización del Schema  del streamdataframe**

In [7]:
lines.printSchema()

root
 |-- value: string (nullable = true)



* **Otra forma de visualización las columnas del streamdataframe**

In [8]:
print(lines)

DataFrame[value: string]


### Operaciones

* **Creación de streamdatafrme que almacenará los datos de TWITTER**

La creación de este dataframe es en función del schema definido, y valiéndose de que los datos TWITTER son creados en archivos JSON.

In [9]:
df_twitter = lines.select(
    from_json('value', schema).created_at.alias('created_at'),
    from_json('value', schema).text.alias('text'),
    from_json('value', schema).userid.alias('userid'),
    from_json('value', schema).username.alias('username'),
    from_json('value', schema).userlocation.alias('userlocation'),
    from_json('value', schema).retweet_count.alias('retweet_count'),
    from_json('value', schema).entities.alias('entities')
)

* **Visualización del Schema  del streamdataframe de datos TWITTER**

In [10]:
df_twitter.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- text: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- username: string (nullable = true)
 |-- userlocation: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- entities: array (nullable = true)
 |    |-- element: string (containsNull = true)



* **Ampliar el Array Entities para poder extraer el elemento "Hashtag" y almacenarlo en el streamdataframe de datos TWITTER**

In [11]:
exp_entities = df_twitter.withColumn('hashtag', explode(df_twitter.entities))

* **Visualización de si el dataframe definido anteriormente es un streamdataframe**

In [12]:
# Identificación de si el DataFrame tiene datos Streaming o NO
exp_entities.isStreaming

True

* **Otra forma de visualización las columnas del streamdataframe**

In [13]:
print(exp_entities)

DataFrame[created_at: string, text: string, userid: string, username: string, userlocation: string, retweet_count: int, entities: array<string>, hashtag: string]


### **Operación de agrupamiento de hashtags iguales**

Con esta operación, se agruparán todos los hashtags que son iguales, así mismo se agregarán dos columnas adicionales:

- count(hashtag): almacenará la cantidad de hashtag que se repiten

- sum(retweeet_count): almacena la suma de retweets de los hashtag que se repiten

Luego de esto, se ordenarán los datos en orden descendente de acuerdo a la cantidad de hashtag repetidos (de mayor a menor)

In [14]:
total_tweets = exp_entities.select('hashtag', 'retweet_count', 'userlocation', 'userid') \
    .groupBy('hashtag') \
    .agg({'hashtag':'count', 'retweet_count':'sum'}) \
    .sort('count(hashtag)', ascending = False)

* **Creación del query para poder accesar a los datos del streamdataframe**

In [15]:
query=total_tweets.writeStream.queryName('query_hashtag').outputMode('complete').format('memory').start()

* **Descripción del streamdataframe del query final**

In [16]:
spark.sql("Describe query_hashtag").show()

+------------------+---------+-------+
|          col_name|data_type|comment|
+------------------+---------+-------+
|           hashtag|   string|   null|
|sum(retweet_count)|   bigint|   null|
|    count(hashtag)|   bigint|   null|
+------------------+---------+-------+



* **Visualización de cantidad de datos procesados en el streamdataframe del query**

In [17]:
spark.sql("select count(*) from query_hashtag ").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



* **Actualización de cantidad de datos procesados en el streamdataframe del query**

In [18]:
spark.sql("select count(*) from query_hashtag ").show()

+--------+
|count(1)|
+--------+
|      68|
+--------+



* **Visualización de los 5 primeros datos procesados de la columna hashtag del streamdataframe del query**

In [19]:
spark.sql("Select hashtag from query_hashtag ").show(5)

+------------+
|     hashtag|
+------------+
|SeraKutlubey|
|CemreKaraçay|
|      CenCem|
| OzanDolunay|
| CenkKaraçay|
+------------+
only showing top 5 rows



## Selecionar los 10 Hashtags más populares

### Seleccionando el top 10 por Consulta SQL al StreamdataFrame:

- **Visualización del resultado como DataFrame de Pandas para el comando basico Spark SQL.**

In [20]:
spark.sql("Select * from query_hashtag ").toPandas().head(10)

Unnamed: 0,hashtag,sum(retweet_count),count(hashtag)
0,SeraKutlubey,0,25
1,CemreKaraçay,0,25
2,CenCem,0,25
3,OzanDolunay,0,25
4,CenkKaraçay,0,25
5,بدايتِي,0,21
6,لي,0,21
7,KUWTacha,0,8
8,ReleaseSharjeelImmediately,0,8
9,이달의소녀,0,6


- **Otra manera de obtener el resultado como DataFrame de Pandas para el comando basico Spark SQL.**

In [21]:
spark.sql("Select * from query_hashtag desc limit 10").toPandas().head(10)

Unnamed: 0,hashtag,sum(retweet_count),count(hashtag)
0,SeraKutlubey,0,25
1,CemreKaraçay,0,25
2,CenCem,0,25
3,OzanDolunay,0,25
4,CenkKaraçay,0,25
5,بدايتِي,0,21
6,لي,0,21
7,KUWTacha,0,8
8,ReleaseSharjeelImmediately,0,8
9,이달의소녀,0,6


* **Visualización de cantidad de datos procesados en el streamdataframe del query (para ver si se está actualizando con más datos)**

In [22]:
spark.sql("select count(*) from query_hashtag ").show()

+--------+
|count(1)|
+--------+
|     370|
+--------+



- **Comando de espera por si se termina la comunicación  entre el servidor tipo socket y Apache Spark**

In [None]:
query.awaitTermination()