In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Crear un SparkContext con un nombre de aplicación
sc = SparkContext(appName="MessageStreamingApp")

# Crear un StreamingContext con un intervalo de batch de 1 segundo
ssc = StreamingContext(sc, 1)

# Crear un DStream para recibir los datos del servidor
lines = ssc.socketTextStream("localhost", 12345)  # reemplazar localhost y 9999 con la dirección IP y el puerto del servidor

# Procesar los datos
"""
En este ejemplo, se aplica un procesamiento básico a los datos. 
Primero, se divide cada línea en palabras utilizando flatMap. 
Luego, se asigna un valor de 1 a cada palabra mediante map. 
A continuación, se agrupan las palabras y se cuentan las ocurrencias mediante reduceByKey. 
Finalmente, se imprime el resultado utilizando pprint (pretty print) para mostrar los recuentos de palabras.
"""
word_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
word_counts.pprint()

# Comenzar el procesamiento de streaming
ssc.start()

# Esperar a que se complete el procesamiento
ssc.awaitTermination()

In [None]:
###VADER Y KEYWORDS

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType, BooleanType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Crear el SparkSession y StreamingContext
spark = SparkSession.builder.appName("NetworkWordCount").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 1)

# Ajustar el nivel de registro a ERROR para evitar advertencias
sc.setLogLevel("ERROR")

# Crear un DStream que se conectará al hostname:port, por ejemplo localhost:9999
lines = ssc.socketTextStream("localhost", 12345)

# Crear el analizador de sentimientos Vader
analyser = SentimentIntensityAnalyzer()

# Lista de palabras clave que podrían indicar que un niño está pidiendo ayuda
keywords = ["help", "need", "trouble", "danger", "unsafe", "scared", "fear", "worried", "hurt", "abuse"]

# Función que busca coincidencias con las keywords.
def check_keywords(message):
    # Convertir el mensaje a minúsculas
    message = message.lower()
    
    # Comprobar si alguna palabra clave está en el mensaje
    for keyword in keywords:
        if keyword in message:
            return True
    
    return False

# Definir la UDF a partir de la función anterior para poder aplicarla a un DataFrame.
check_keywords_udf = udf(check_keywords, BooleanType())

# Definir una función para calcular la puntuación de sentimiento de un mensaje
def get_sentiment_score(message):
    score = analyser.polarity_scores(message)
    return score['compound']

# Definir otra UDF
get_sentiment_score_udf = udf(get_sentiment_score, FloatType())

# Función para procesar los datos recibidos (procesar cada RDD)
def process(time, rdd):
    print("========= %s =========" % str(time))
    print("RDD count: ", rdd.count())  # Imprimir el conteo del RDD
    
    try:
        # Crear un DataFrame a partir del RDD
        df = spark.createDataFrame(rdd, "string").toDF("message")
        
        # Verificar si hay al menos una fila con ese mensaje para parar el contexto de streaming si lo hay
        if df.filter(df.message == "SCRAPING_DONE").count() > 0:
            ssc.stop()
        
        # Crear una nueva columna "is_help" que será True si el mensaje está pidiendo ayuda y False en caso contrario
        df = df.withColumn("is_help", check_keywords_udf(df["message"]))
        
        # Crear una nueva columna "sentiment_score" que será la puntuación de sentimiento del mensaje
        df = df.withColumn("sentiment_score", get_sentiment_score_udf(df["message"]))
        
        df.show()
        
    
    except Exception as e:
        print("Error:", e)
        

# Procesar cada RDD en el DStream
lines.foreachRDD(process)

# Iniciar el streaming
ssc.start()

# Esperar a que se complete el procesamiento de los datos
ssc.awaitTermination()

In [None]:
###NLP NO SUPERVISADO

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.clustering import LDA
from pyspark.ml import Pipeline

# Crear el SparkSession y StreamingContext
spark = SparkSession.builder.appName("NetworkWordCount").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 1)

# Ajustar el nivel de registro a ERROR para evitar advertencias
sc.setLogLevel("ERROR")

# Crear un DStream que se conectará al hostname:port para recibir los datos
lines = ssc.socketTextStream("localhost", 12345)

# Crear el analizador de sentimientos
analyser = SentimentIntensityAnalyzer()

# Definir una función para calcular la puntuación de sentimiento de un mensaje
def get_sentiment_score(message):
    score = analyser.polarity_scores(message)
    return score['compound']

# Crear una función UDF para usar en el DataFrame
get_sentiment_score_udf = udf(get_sentiment_score, FloatType())

# Definir una función para procesar los datos recibidos (cada RDD)
def process(time, rdd):
    print("========= %s =========" % str(time))

    try:
        # Crear un DataFrame a partir del RDD
        df = spark.createDataFrame(rdd, "string").toDF("message")

        # Verificar si hay al menos una fila con ese mensaje para parar el contexto de streaming si lo hay
        if df.filter(df.message == "SCRAPING_DONE").count() > 0:
            ssc.stop()

        # Crear una nueva columna "sentiment_score" que será la puntuación de sentimiento del mensaje
        df = df.withColumn("sentiment_score", get_sentiment_score_udf(df["message"]))

        # Aplicar NLP no supervisado creando instancias con Tokenizer, StopWordsRemover, CountVectorizer y LDA
        tokenizer = Tokenizer(inputCol="message", outputCol="words")
        remover = StopWordsRemover(inputCol="words", outputCol="filtered")
        cv = CountVectorizer(inputCol="filtered", outputCol="features")
        lda = LDA(k=2, maxIter=10, featuresCol="features")  # 2 tópicos

        # Crear un pipeline que encapsula estas etapas del procesamiento.
        pipeline = Pipeline(stages=[tokenizer, remover, cv, lda])
        model = pipeline.fit(df)
        result = model.transform(df)

        result.show()

        # Guardar los resultados en un archivo CSV
        #result.select("sentiment_score").coalesce(1).write.csv("apartadoC.csv", header=True)

    except Exception as e:
        print("Error:", e)

# Procesar cada RDD en el DStream
lines.foreachRDD(process)

# Iniciar el streaming
ssc.start()

# Esperar a que se complete el procesamiento de los datos
ssc.awaitTermination()
