Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [None]:
NAME = "XAVIER MALTAS TARRIDAS"
COLLABORATORS = "OSCAR BUISAN VINIEGRA"

---

# PAC 6 - Introducció a Spark Streaming i Structured Streaming

## Introducció
A l'activitat anterior, vas aprendre sobre dues eines populars per adquirir dades en temps real: Flume i Kafka. En aquesta activitat, treballarem amb un conjunt d'eines de streaming de Spark i aprendràs com processar dades en temps real utilitzant Spark Streaming. Exploraràs el poder de Spark Streaming i Structured Streaming, dues eines potents disponibles a la biblioteca Spark.

### [Spark Streaming](https://spark.apache.org/docs/2.4.0/streaming-programming-guide.html)
Spark Streaming és un motor de processament d'streams escalable i tolerant a fallades construït sobre Apache Spark. Permet el processament de dades en temps real amb un alt rendiment i baixa latència. Amb Spark Streaming, pots realitzar analítiques en temps real, aprenentatge automàtic i processament de gràfics sobre dades en streaming.

### [Spark Structured Streaming](https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html)
Structured Streaming és una API de nivell alt per al processament d'streams a Spark. Proporciona una interfície declarativa i similar a SQL per al processament de dades estructurades en streaming. Amb Structured Streaming, pots escriure consultes en temps real que s'integren fàcilment amb el processament per lots, permetent-te construir pipelines de processament de dades end-to-end.

## Objectius
En els diferents exercicis pràctics, aprendràs a:
- Configurar una aplicació de Spark Streaming
- Processar i analitzar dades en temps real
- Construir pipelines de processament de dades de punta a punta

## Començar
L'activitat es basarà en la xarxa social Mastodon per processar les seves dades en streaming. És una bona idea conèixer l'estructura JSON d'un 'toot' - consulta el [web de l'API de Mastodon](https://docs.joinmastodon.org/entities/Status/) per facilitar la comprensió dels diferents exercicis que proposem.

A més, has de conèixer [Spark Streaming](https://spark.apache.org/docs/2.4.0/streaming-programming-guide.html) i [Spark Structured Streaming](https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html). A més de treballar amb els materials que compartim al campus, és interasant llegir la documentació oficial de la versió 2.4.0, que és la que està instal·lada al servidor.

## Notes Importants
- L'activitat s'ha de realitzar en **grups de 2 membres**. Assegura't de conèixer qui és el teu company abans de començar l'activitat.
- Tot i que és possible completar les activitats en aquest quadern, **desaconsellem fer-ho** degut a possibles problemes de rendiment del servidor. Veuràs que cada activitat està autocontinguda dins de la seva cel·la, permetent-te copiar-la fàcilment en un fitxer de Python. Aquest fitxer es pot executar al servidor mitjançant SSH o VSCode. Després d'haver executat i provat l'script amb èxit, simplement **copieu-lo de nou a la cel·la corresponent del quadern**. Aquest enfocament garanteix una execució més suau i una millor gestió dels recursos del servidor.
- **Només has d'utilitzar les llibreries proporcionades, llevat que s'indiqui explícitament el contrari.**
- Si us plau, no canvieu el nom del quadern ni els tipus de cel·la.

## Activitat 1: Consumidor de Tòpics de Kafka (0.5 punt)

L'objectiu d'aquest primer exercici és **crear un script que es connecti a un tòpic de Kafka i imprimeixi el seu contingut** a la terminal. Per completar l'exercici, us proporcionem un tòpic Kafka (`mastodon_toots`) que distribueix el contingut de l'stream principal de Mastodon.

**Passos per Completar l'Activitat:**
1. Configura un consumidor de Kafka per connectar-te al tòpic `mastodon_toots`.
2. Consumix els missatges del tema.
3. Imprimeix el contingut del camp (si existeix) de cada toot a la terminal.

Tingues en compte que els toots **s'emmagatzemen com a `JSON`**, així que has de garantir una conversió adequada de les dades per treballar amb elles. A més, tingues en compte que proporcionem una funció, `extract_text_from_html`. Pots utilitzar aquesta funció auxiliar per *renderitzar* el contingut del toot, que està codificat en HTML.

Comencem!

In [None]:
# a1_kafkaConsumer.py
# python3 a1_kafkaConsumer.py

# Create a consumer that subscribes to the Kafka topic and digest the toots
from tokenize import group
from kafka import KafkaConsumer
import json
from bs4 import BeautifulSoup

# Extract the text from the HTML content
def extract_text_from_html(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    text = soup.get_text(separator=' ', strip=True)
    return text

# Kafka Configuration
# kafka_server = 'Cloudera02:9092'  # Kafka server address
kafka_server = ['Cloudera02:9092', 'Cloudera03:9092']  # Kafka server address
kafka_topic = 'mastodon_toots'   # Kafka topic
kafka_group = 'xmaltast'   # Kafka consumer group, first surname of each member of the group separated by an underscore.

# Create a Kafka consumer
consumer = KafkaConsumer(
    kafka_topic,
    bootstrap_servers=kafka_server,
    group_id=kafka_group,
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Print the toots to the console
try:
    print("Streaming started.")
    for message in consumer:
        # Convert the message to a JSON object
        toot = message.value

        # Check if 'content' field exists in the toot
        if 'content' in toot:
            # Extract text from HTML content
            toot_text = extract_text_from_html(toot['content'])
            print("Toot Content:", toot_text)
except KeyboardInterrupt:
    # Close the consumer
    print("Streaming stopped.")
    consumer.close()

consumer.close()
#END <FILL IN>

## Activitat 2: Spark Streaming
En aquest exercici, analitzaràs l'activitat a Mastodon comptant els toots en una finestra de temps. Aquests tipus d'estadístics son una de les operacions fonamentals a Spark, i en aquesta activitat, farem servir la biblioteca Spark Streaming per analitzar les dades en aquest aspecte.

### Activitat 2.1: Comptar en Finestres (0.5 punt)

Com potser sabràs, la biblioteca Spark Streaming **processa dades utilitzant el concepte de finestres temporals**, agrupant elements de dades basant-se en el moment en què van ser rebuts. Aquest enfocament permet el processament per lots de dades en streaming, possibilitant anàlisis **en intervals de temps diferents**. Trobaràs que la sintaxi per realitzar operacions en RDDs dins d'aquestes finestres temporals és **pràcticament equivalent a les operacions estàndard de RDD** que ja coneixes.

Completa el codi següent per obtenir el **nombre de toots originals publicats cada cinc segons**. Exclou els retuits del teu recompte. Potser necessitaràs consultar la [API de Mastodon](https://docs.joinmastodon.org/entities/Status/) per entendre com estan estructurats els toots.

In [None]:
# a21_comptarFinestres.py
# python3 a21_comptarFinestres.py

import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

# Initialize SparkContext and StreamingContext with a 1-second batch interval
app_name = "TootCount"  # Name of your application

# Create the SparkContext
try:
    sc = SparkContext("local[2]", appName = app_name)
except ValueError:
    sc.stop()
    sc = SparkContext("local[2]", appName = app_name)
sc.setLogLevel("ERROR")

# Create the StreamingContext
batch_interval = 5  # Batch interval in seconds
ssc = StreamingContext(sc, batch_interval)
ssc.checkpoint("checkpoint")  # Necessary for updateStateByKey operation

# Define Kafka parameters
kafka_server = 'Cloudera02:9092,Cloudera03:9092'  # Kafka server address
kafka_topic = "mastodon_toots"   # Kafka topic
kafka_group = "xmaltast"   # Kafka consumer group, first surname of each member of the group separated by an underscore.

kafkaParams = {
    "metadata.broker.list": kafka_server,
    "group.id": kafka_group
} 

# Create a DStream that connects to Kafka
kafkaStream = KafkaUtils.createDirectStream(ssc, [kafka_topic], kafkaParams)

# Count each toot as 1 and update the total count excluding retweets
tootCounts = kafkaStream\
    .map(lambda x: json.loads(x[1]))\
    .filter(lambda toot: 'retweeted_status' not in toot) \
    .map(lambda x: ("toot", 1))\
    .updateStateByKey(lambda values, total: sum(values) + (total or 0))

# Print the cumulative count every 5 seconds
tootCounts.pprint()

# Start the computation
try:
    ssc.start()
    ssc.awaitTermination()
except KeyboardInterrupt:
    ssc.stop()
    sc.stop()

### Activitat 2.2: Comptar Toots i agrupar per idioma (1 punt)

Com has observat a l'Exercici 2.1, el procés és bastant similar a treballar amb RDDs. Ara, aprofundirem en un anàlisi més complex **comptant el nombre de toots originals per idioma es creen cada 5 segons.** Per a una millor llegibilitat, us demanem que ordeneu els idiomes en ordre descendent segons el nombre de toots i limiteu la sortida als 10 primers idiomes.

In [None]:
# a32_agregacioDadesFlux.py
# python3 a32_agregacioDadesFlux.py

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import from_json, col, window, count

conf = SparkConf()
conf.setMaster("local[2]")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

# Initialize Spark Session for Structured Streaming
app_name = "activity3_2_xmaltast"  # Replace with your Spark app name, must include the username of the members of the group

spark = SparkSession \
    .builder \
    .appName(app_name) \
    .getOrCreate()

# Define Kafka parameters
kafka_topic = 'mastodon_toots'
kafka_bootstrap_servers = 'Cloudera02:9092,Cloudera03:9092'  # Replace with your Kafka bootstrap servers

# Read a small batch of data from Kafka for schema inference!
batch_df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", '{"' + kafka_topic + '":{"0": -1}}') \
    .load()

# Infer schema
schema = spark.read.json(batch_df.selectExpr("CAST(value AS STRING)").rdd.map(lambda x: x[0])).schema

# Create streaming DataFrame by reading data from Kafka
toots = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Parse the value column as JSON and apply the inferred schema. Then select the columns we need.
toots_df = toots \
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \
    .filter("parsed_value is not null and parsed_value.language is not null") \
    .select("parsed_value.language") \
    .groupBy("language") \
    .agg(count("*").alias("count")) \
    .orderBy(col("count").desc())  # Order by the count in descending order

try:
    # Open stream to console (you need to execute it in a terminal to see the output)
    query = toots_df \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .option("truncate", "false") \
        .trigger(processingTime="10 seconds") \
        .start()

    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    spark.stop()
    sc.stop()

### Activitat 2.3: Acumular els comptatges (1 punt)

Fins ara, hem obtingut resultats específics de l'stream dels micro-lots generats per finestra, que són generalment útils. Però què passa si volem mantenint la informació a través de finestres per, per exemple, acumular tendències amb el temps? En aquest exercici treballarem aquest concepte.

Et convidem a ajustar l'script anterior per **mantenir un recompte de tots els toots originals, categoritzats per idioma**. En lloc de simplement comptar nous toots cada cinc segons, **els anirem afegint de manera contínua**. Pensa-hi com en una puntuació que s'actualitza constantment amb el nombre total de toots originals en cada idioma des del moment en què comencem la transmissió.

Per aconseguir això, jugarem amb les [**transformacions stateful d'Spark Streaming**](https://spark.apache.org/docs/2.4.0/streaming-programming-guide.html#caching--persistence). Aquesta és una una manera sofisticada de dir que recordarem dades anteriors i les utilitzarem en els nostres càlculs actuals. És semblant a mantenir un total en curs en una variable global en lloc de començar des de zero cada vegada.

***Et convidem a completar l'script següent:***

In [None]:
# a32_agregacioDadesFlux.py
# python3 a32_agregacioDadesFlux.py

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import from_json, col, window, count

conf = SparkConf()
conf.setMaster("local[2]")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

# Initialize Spark Session for Structured Streaming
app_name = "activity3_2_xmaltast"  # Replace with your Spark app name, must include the username of the members of the group

spark = SparkSession \
    .builder \
    .appName(app_name) \
    .getOrCreate()

# Define Kafka parameters
kafka_topic = 'mastodon_toots'
kafka_bootstrap_servers = 'Cloudera02:9092,Cloudera03:9092'  # Replace with your Kafka bootstrap servers

# Read a small batch of data from Kafka for schema inference!
batch_df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", '{"' + kafka_topic + '":{"0": -1}}') \
    .load()

# Infer schema
schema = spark.read.json(batch_df.selectExpr("CAST(value AS STRING)").rdd.map(lambda x: x[0])).schema

# Create streaming DataFrame by reading data from Kafka
toots = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Parse the value column as JSON and apply the inferred schema. Then select the columns we need.
toots_df = toots \
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \
    .filter("parsed_value is not null and parsed_value.language is not null") \
    .select("parsed_value.language") \
    .groupBy("language") \
    .agg(count("*").alias("count")) \
    .orderBy(col("count").desc())  # Order by the count in descending order

try:
    # Open stream to console (you need to execute it in a terminal to see the output)
    query = toots_df \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .option("truncate", "false") \
        .trigger(processingTime="10 seconds") \
        .start()

    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    spark.stop()
    sc.stop()

### Activitat 2.4: Comptatge amb Finestres Temporals (1 punt)

Com has observat, Spark Streaming és increïblement flexible i fàcil d'utilitzar, i aquí tens un truc interessant que pot fer: **et permet trobar un punt òptim entre comptar toots per [finestra de temps](https://spark.apache.org/docs/2.4.0/streaming-programming-guide.html#window-operations) i mantenir un recompte en curs.** Imaginem que volem crear un tauler de control, com un panell de control, que mostri el nombre de toots fets en cada idioma. La particularitat és que **volem aquesta actualització cada 5 segons, però estem seguiment els comptatges al llarg d'un minut complet.**

Així doncs, cada 5 segons, el nostre tauler de control es refresca, proporcionant-nos el darrer recompte d'un minut. És com tenir una puntuació en directe que s'actualitza amb freqüència i també fa un seguiment del que ha passat en els últims 60 segons, no només en els últims 5. D'aquesta manera, obtens tant actualitzacions immediates com una visió més amplia del que està passant, tot al mateix temps. Mostra només els 10 primers idiomes.

***Modifica l'script següent per assolir aquest objectiu:***

In [None]:
# a24_comptarFinestresTemporals.py
# python3 a24_comptarFinestresTemporals.py
# This code provides counts within time-based windows, offering more insights into the distribution of toots over specified intervals. 

import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

# Initialize SparkContext and StreamingContext with a 1-second batch interval
app_name = "TopLanguagesWindowedCounts"  # Name of your application

# Create the SparkContext
try:
    sc = SparkContext("local[2]", appName="app_name")
except ValueError:
    sc.stop()
    sc = SparkContext("local[2]", appName="app_name")

sc.setLogLevel("ERROR")

batch_interval = 5  # Batch interval in seconds
window_duration = 60  # Window duration in seconds
slide_duration = 5  # Slide duration in seconds

ssc = StreamingContext(sc, batch_interval)
ssc.checkpoint("checkpoint")  # Necessary for updateStateByKey operation

# Define Kafka parameters
kafka_server = 'Cloudera02:9092,Cloudera03:9092'  # Kafka server address
kafka_topic = 'mastodon_toots'   # Kafka topic
kafka_group = 'xmaltast'   # Kafka consumer group, first surname of each member of the group separated by an underscore.

kafkaParams = {
    "metadata.broker.list": kafka_server,
    "group.id": kafka_group
} 

# Create a DStream that connects to Kafka
kafkaStream = KafkaUtils.createDirectStream(ssc, [kafka_topic], kafkaParams)

# Count each toot as 1 and update the total count. Use a 60-second window with a 5-second slide
tootCounts = kafkaStream\
    .map(lambda x: json.loads(x[1]))\
    .filter(lambda toot: "language" in toot and toot["language"] is not None)\
    .map(lambda toot: (toot["language"], 1))\
    .updateStateByKey(lambda new_values, running_count: sum(new_values) + (running_count or 0))\
    .window(windowDuration=window_duration, slideDuration=slide_duration)\
    .reduceByKey(lambda x, y: x + y)\
    .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))

# Print the cumulative count
tootCounts.pprint()

# Start the computation
try:
    ssc.start()
    ssc.awaitTermination()
except KeyboardInterrupt:
    ssc.stop()
    sc.stop()

### Activitat 2.5: Potenciant (1 punt)

D'acord, ja sabem que els RDDs a Spark són increïblement versàtils: pots fer pràcticament qualsevol operació amb ells. No obstant això, **a mesura que les coses es fan més complexes, el repte augmenta**.

Ara bé, donem-li més força al nostre tauler de control. En lloc de simplement mostrar el nombre de toots per minut, **afegim algunes característiques noves i interessants**. No seria interessant **fer un seguiment de la longitud mitjana d'aquests toots?** I encara millor: **descobrim qui és l'usuari més seguit entre els toots publicats en aquest minut**.

Però espera, **encara hi ha més!** Per fer tota aquesta informació molt fàcil de llegir i entendre, la **presentarem en** un format de **taula neta**. No només es tracta de les dades, sinó de fer-ho fàcil d'usar i visualment digestible.

La taula resultant s'ha d'actualitzar en intervals de 5 segons, i les finestres mitjanes han de ser de 60 segons. Les columnes d'aquesta taula han de ser:
- **lang:** Idioma
- **num_toots:** Nombre de toots originals en aquest idioma
- **avg_len_content:** Longitud mitjana (en caràcters) del contingut del toot
- **user:** Usuari més seguit
- **followers:** Nombre de seguidors d'aquest usuari

Per fer la sortida més llegible, limita el nombre de files a 10.

**SUGGERIMENT:** Hi ha un exemple molt útil a la [Guia de Programació en Streaming de Spark](https://spark.apache.org/docs/2.4.0/streaming-programming-guide.html#dataframe-and-sql-operations). Busca'l!

In [None]:
<FILL IN>

## Activitat 3: Structured Streaming

Com has vist a l'últim exercici, depenent de les operacions, l'API de Spark Streaming pot no ser tan convenient, especialment perquè has de treballar amb APIs de baix nivell. Afortunadament, **Spark proporciona una API de nivell alt anomenada [Spark Structured Streaming](https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html)** que et permet **realitzar càlculs en streaming de la mateixa manera que expressaries un càlcul en lot sobre dades estructurades estàtiques**; com les que pots utilitzar en processament per lots.

En aquest conjunt d'exercicis, et submergiràs en el fascinant món de Spark Structured Streaming.

### Activitat 3.1: Obtenir l'Esquema (1 punt)

Una de les coses més interessants sobre Spark Structured Streaming és **com gestiona les dades estructurades**. Per exemple, el flux de dades al nostre tòpic de Kafka, on **cada toot arriba en un format JSON ordenat**.

Similar a treballar amb els DataFrames de Spark, **Structured Streaming utilitza esquemes de dades per analitzar i formatejar aquestes dades estructurades**. En el processament per lots, Spark sovint pot deduir aquesta estructura directament de les dades. No obstant això, amb les dades en streaming, és una mica diferent: **necessitem definir aquesta estructura prèviament**.

En els exercicis següents, farem servir un truc convenient: **enlloc de definir manualment** l'estructura complexa d'un toot, inicialment **extreurem alguns toots de Kafka i els analitzarem en lot per aprendre el seu esquema**. És com fer una ullada per entendre com estan organitzades les coses. Un cop tinguem l'esquema, l'aplicarem al nostre pipeline de streaming.

La teva tasca en aquest exercici és fer que aquesta transformació es realitzi. Després, fent servir les operacions de DataFrame amb les quals ja estàs familiaritzat, es demana crear una taula amb les següents columnes que ens permetran veure els toots individualment a mesura que són digerits:
- **id:** Identificador únic per a cada toot.
- **created_at:** Marca de temps quan es va crear el toot.
- **content:** Què diu realment el toot.
- **language:** Idioma del toot.
- **username:** Maneta de l'autor del toot.
- **followers_count:** Nombre de seguidors que té l'autor.

Recorda que **ens interessen els toots originals!** Filtra aquells que corresponguin a retuits.

Un altre aspecte fonamental que has de gestionar aquí és **seleccionar el mode de sortida adequat**! Mira la [documentació](https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html#output-modes) i tria el que millor s'adapti a aquest exercici.

In [None]:
# a31_obtenirEsquema.py
# python3 a31_obtenirEsquema.py

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import json_tuple, from_json, col
from pyspark.sql import SparkSession

conf = SparkConf()
conf.setMaster("local[2]")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

# Initialize Spark Session for Structured Streaming
app_name = "activity3_1_xmaltast"  # Replace with your Spark app name must include the username of the members of the group

spark = SparkSession \
    .builder \
    .appName(app_name) \
    .getOrCreate()

# Define Kafka parameters
kafka_topic = 'mastodon_toots'
kafka_bootstrap_servers = 'Cloudera02:9092,Cloudera03:9092'  # Replace with your Kafka bootstrap servers

# Read a small batch of data from Kafka for schema inference!
batch_df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Determine the ending offset dynamically based on the available data
ending_offset = batch_df.selectExpr("max(CAST(offset AS LONG)) as max_offset").collect()[0]["max_offset"]

# Use the dynamically determined ending offset for the batch read
batch_df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "{\"" + kafka_topic + "\":{\"0\":" + str(ending_offset) + "}}") \
    .load()

# Infer schema
schema = spark.read.json(batch_df.selectExpr("CAST(value AS STRING)").rdd.map(lambda x: x[0])).schema

# Create streaming DataFrame by reading data from Kafka
toots = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "latest") \
    .load()

# Parse the value column as JSON and apply the inferred schema. Then select the columns we need.
toots_df = toots\
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \
    .select(
        col("parsed_value.id").alias("id"),
        col("parsed_value.created_at").alias("created_at"),
        col("parsed_value.content").alias("content"),
        col("parsed_value.language").alias("language"),
        col("parsed_value.account.username").alias("USERNAME"),
        col("parsed_value.account.followers_count").alias("followers_count")
    )

try:
    # Open stream to console (you need to execute it in a terminal to see the output)
    query = toots_df \
            .writeStream \
            .outputMode("append") \
            .format("console")\
            .start()

    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    spark.stop()
    sc.stop()

### Activitat 3.2: Agregació de Dades d'un Flux (1 punt)

Spark Structured Streaming és realment potent, especialment quan realitzes operacions sobre un flux continu de dades. En aquest exercici, ens aprofundirem en Structured Spark Streaming, **centrant-nos específicament en l'agregació de dades d'un flux de Kafka**. És semblant al que hem fet a l'Exercici 2. La teva missió és **comptar el nombre de toots originals en cada idioma**.

Així és com ha de ser la **sortida:**

- **language:** Aquesta columna mostra l'idioma dels toots.
- **count:** Aquí és on mostraràs el nombre de toots per a cada idioma.

La teva taula ha **d'acumular continuament aquestes comptatges cada 10 segons**. A més, per facilitar al visualizació us demanem **ordenar els idiomes pel nombre de toots, amb els idiomes més parlants a la part superior**.

Ara bé, aquí tens una **part clau** d'aquest exercici: **has de triar el mode de sortida adequat** per a la teva consulta en streaming. Recorda, **el mode de sortida determina com es escriu cada lot resultant de dades en l'origen de sortida**. Les teves opcions són 'Complete', 'Append' i 'Update'. Pensa en quin encaixa millor amb el nostre escenari de comptatge acumulatiu i ordenat. I **no oblidis escriure la teva raonament en forma de comentari al codi**.

In [None]:
# a32_agregacioDadesFlux.py
# python3 a32_agregacioDadesFlux.py

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import from_json, col, window, count

conf = SparkConf()
conf.setMaster("local[2]")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

# Initialize Spark Session for Structured Streaming
app_name = "activity3_2_xmaltast"  # Replace with your Spark app name, must include the username of the members of the group

spark = SparkSession \
    .builder \
    .appName(app_name) \
    .getOrCreate()

# Define Kafka parameters
kafka_topic = 'mastodon_toots'
kafka_bootstrap_servers = 'Cloudera02:9092,Cloudera03:9092'  # Replace with your Kafka bootstrap servers

# Read a small batch of data from Kafka for schema inference!
batch_df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", '{"' + kafka_topic + '":{"0": -1}}') \
    .load()

# Infer schema
schema = spark.read.json(batch_df.selectExpr("CAST(value AS STRING)").rdd.map(lambda x: x[0])).schema

# Create streaming DataFrame by reading data from Kafka
toots = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Parse the value column as JSON and apply the inferred schema. Then select the columns we need.
toots_df = toots \
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \
    .filter("parsed_value is not null and parsed_value.language is not null") \
    .select("parsed_value.language") \
    .groupBy("language") \
    .agg(count("*").alias("count")) \
    .orderBy(col("count").desc())  # Order by the count in descending order

try:
    # Open stream to console (you need to execute it in a terminal to see the output)
    query = toots_df \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .option("truncate", "false") \
        .trigger(processingTime="10 seconds") \
        .start()

    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    spark.stop()
    sc.stop()

### Activitat 3.3: Comptatge per Finestres (1 punt)

Bona feina! Has après com realitzar agregacions i fer un seguiment dels comptatges al llarg del temps. Com has vist a l'Exercici 2.4, a vegades és més efectiu mantenir aquests **comptatges dins de finestres de temps específiques**. Ara volem que apliquis aquesta tècnica utilitzant les [funcions disponibles a Spark Structured Streaming](https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html#window-operations-on-event-time). Tingues en compte que Spark Structured Streaming **gestiona el temps de manera diferent que Spark Streaming**, tingueu en compte a l'hora d'analitzar i interpretar els resultats.

La teva tasca és **crear una taula que mostri el comptatge del nombre de toots originals (recorda filtrar els retuits) fets en cada idioma, segmentats dins d'un marc de temps específic**. Per a aquest exercici, has de fer servir una finestra lliscant d'un minut, amb les dades actualitzant-se cada 5 segons. Aquest enfocament et permetrà monitorar de prop la freqüència dels toots en diferents idiomes durant intervals breus i superposats.

Et demanem que proporcionis una **taula amb la següent estructura**:
- **window:** Mostra l'interval de temps.
- **language:** Aquesta columna mostra l'idioma dels toots.
- **count:** Aquí és on mostraràs el nombre de toots per a cada idioma.

Els resultats han d'estar **ordenats per finestra de temps i count en ordre descendent**.

In [1]:
# a33_comptatgeFinestres.py
# python3 a33_comptatgeFinestres.py

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import json_tuple, from_json, col, window, count

conf = SparkConf()
conf.setMaster("local[2]")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

# Initialize Spark Session for Structured Streaming
app_name = "activity3_3_xmaltast"  # Replace with your Spark app name, must include the username of the members of the group

spark = SparkSession \
    .builder \
    .appName(app_name) \
    .getOrCreate()

# Define Kafka parameters
kafka_topic = 'mastodon_toots'
kafka_bootstrap_servers = 'Cloudera02:9092,Cloudera03:9092'  # Replace with your Kafka bootstrap servers

# Read a small batch of data from Kafka for schema inference!
batch_df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

# Infer schema
schema = spark.read.json(batch_df.selectExpr("CAST(value AS STRING)").rdd.map(lambda x: x[0])).schema
# Create streaming DataFrame by reading data from Kafka
toots = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "latest") \
    .load()

# Parse the value column as JSON and apply the inferred schema. Then select the columns we need.
toots_df = toots\
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

# Print the schema to understand the structure
toots_df.printSchema()

# Filter based on the correct nested field
filtered_toots_df = toots_df \
    .filter(col("parsed_value.quote.account.display_name") == "some_value") \
    .select(
        "parsed_value.text",            # Replace with the actual text field
        "parsed_value.created_at",      # Replace with the actual created_at field
        "parsed_value.group_by_column"  # Replace with the actual group by column
    ) \
    .groupBy(
        window("parsed_value.created_at", "5 minutes"),
        "parsed_value.group_by_column"
    ) \
    .count() \
    .orderBy("window.start", "window.end", ascending=False)

try:
    # Open stream to console (you need to execute it in a terminal to see the output)
    query = filtered_toots_df \
            .writeStream \
            .outputMode("complete")\
            .format("console")\
            .option("truncate", "false")\
            .trigger(processingTime='5 seconds')\
            .start()

    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    spark.stop()
    sc.stop()

AnalysisException: 'No such struct field is_reblog in account, activity_pub_type, akkoma, all_emojis, application, bookmarked, card, communities, content, content_type, conversation_id, created_at, edited_at, emoji_reactions, emoji_reactions_count, emojis, favourited, favourites_count, filtered, friendica, id, in_reply_to_account_id, in_reply_to_id, in_reply_to_status, is_meta_preview, is_only_for_followers, is_rss_content, iscn_id, language, limited, limited_scope, local_only, markdown, media_attachments, mentions, meta_title, muted, nyaize_content, pinned, plain_content, pleroma, poll, processing, profile_emojis, quote, quote_id, reactions, reactions_count, reblog, reblogged, reblogs_count, replies_count, rss_host_url, rss_link, searchability, sensitive, spoiler_text, status_reference_ids, status_references_count, status_referred_by_count, support_likers, tags, text, text_count, translated_text, updated_at, uri, url, visibility, visibility_ex; line 1 pos 71'

### Activitat 3.4: Unió de Fluxos (1 punt)

En aquest darrer exercici, explorarem una característica molt interessant de Spark Streaming que et permet ***[unir dos fluxos](https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html#stream-stream-joins) i analitzar-los!***

Per simplificar les coses, **ja et proporcionem dos fluxos de dades preagregades**. El primer, al tòpic Kafka **`mastodon_toots_original_domain`**, mostra el recompte de toots originals per diverses instàncies de Mastodon (recorda que Mastodon és una federació d'instàncies) en una **finestra fixa d'un minut**. El segon flux, al tòpic **`mastodon_toots_retoot_domain`**, presenta dades similars per als toots que són retoots d'altres toots. Les dades emmagatzemades als tòpics Kafka tenen la mateixa **estructura en format JSON**:
- Una estructura `window` amb dos elements de tipus `string`: `start` i `end`
- Un component `string` anomenat `mastodon_instance` amb el domini
- Un element de tipus `integer` anomenat `count` amb el nombre de toots realitzats en aquell domini en la finestra de temps específica.

Ja que l'estructura de les dades és bastant senzilla, **et demanem que la defineixis l'esquema manualment aquesta vegada**. Un cop hagis configurat les estructures, **hauràs d'obrir un flux per a cada origen Kafka**. El teu següent pas és unir aquests fluxos. Volem que facis un **left-join del flux de toots originals amb el flux de toots retuits**. Després de completar el join, la teva sortida **ha de incloure**:
- **window:** l'interval de temps
- **mastodon_instance:** el domini de la instància de Mastodon
- **original_count:** nombre de toots originals publicats en aquell domini durant aquell interval de temps
- **retweet_count:** nombre de toots retuits publicats en aquell domini durant aquell interval de temps

**SUGGERIMENT:** Realitzar una unió en línia de dos fluxos no és una tasca fàcil, i hi ha moltes restriccions que has de respectar. **Llegeix [la documentació](https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html#stream-stream-joins) detingudament** i recorda que estem utilitzant la versió 2.4.0. A més, recorda que estem fent un **join en el temps, i això és una component clau**. Conceptes com els que has après sobre les finestres són fonamentals aquí, **juntament amb conceptes com [la marca d'aigua (watermarking)](https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html#stream-stream-joins)**. I recorda, **els modes de sortida**, has de triar-ne un que sigui adequat per a la tasca que vols fer.

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import from_json, col, window, to_timestamp, struct
from pyspark.sql import SparkSession

# Initialize Spark Context
conf = SparkConf()
conf.setMaster("local[2]")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

# Initialize Spark Session for Structured Streaming
app_name = "activity3_4" + <FILLIN> # Replace with your Spark app name must include the username of the members of the group

spark = SparkSession \
    .builder \
    .appName(app_name) \
    .getOrCreate()

# Define schema for the incoming data
schema = StructType(<FILLIN>)

# Define Kafka parameters
toots_original_topic = <FILLIN>
toots_retoot_topic = <FILLIN>
kafka_bootstrap_servers = <FILLIN>  # Replace with your Kafka bootstrap servers

# Create streaming DataFrame by reading original toots data from Kafka
toots_original = spark \
    .readStream \
    .<FILLIN>
    ...
    .<FILLIN>
    .load()

# Parse the value column as JSON and apply the infered schema. Then select the columns we need.
toots_original_df = toots_original\
    .<FILLIN>
    ...
    .<FILLIN>

# Create streaming DataFrame by reading retoots data from Kafka
toots_retoot = spark \
    .readStream \
    .<FILLIN>
    ...
    .<FILLIN>
    .load()

# Parse the value column as JSON and apply the infered schema. Then select the columns we need.
toots_retoot_df = toots_retoot\
    .<FILLIN>
    ...
    .<FILLIN>

# Join the two streams
toots_join_df = toots_original_df.join(<FILLIN>...<FILLIN>)

try:
    # Start running the query that prints the running counts to the console
    query = toots_join_df\
            .writeStream \
            <FILLIN>
            ...
            <FILLIN>
            .option("numRows", 100)\
            .start()\

    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    spark.stop()
    sc.stop()

## Video de Resposta (1 punt)

En aquesta secció, hauràs de respondre a les següents preguntes en un vídeo de resposta **d'entre 3 i 6 minuts**. Has de fer una gravació en què el teu rostre sigui visible mentre respones a les preguntes amb les teves pròpies paraules. Has de seguir els punts següents per fer el vídeo (4 punts):

1. Al principi del vídeo, **has de dir el teu nom i DNI mentre mostres el document** (que ha de ser clarament visible i llegible).
2. Has de respondre les preguntes **en l'ordre que es donen**, comentant sempre a l'inici de cada resposta a quina pregunta et refereixes.
3. En cas que no responguis a una pregunta, **has d'indicar el número de la pregunta que no estàs contestant i per què**.
4. **Cada membre del grup ha de tenir una contribució comparable al vídeo**.

### Preguntes:

1. Durant els exercicis fets amb Spark Streaming, has utilitzat el paràmetre `group_id` per llegir les dades de Kafka. Explica què volen dir els grups a Kafka i per què necessites utilitzar aquest paràmetre per completar els exercicis.
2. Has après com Spark Structured Streaming és eficaç per treballar amb dades estructurades. No obstant això, hi ha escenaris on una API de baix nivell com Spark Streaming pot ser més avantatjosa. **Proporciona un exemple utilitzant el mateix conjunt de dades d'aquesta activitat on Spark Streaming demostra ser més convenient que Structured Streaming**. Això ha de il·lustrar els avantatges de Spark Streaming en determinades situacions.
3. Has practicat l'ús de **finestres per a agregacions basades en el temps**, però és important destacar que el mètode de definir aquestes finestres difereix entre les dues llibreries que hem estudiat. **Detalla aquestes diferències**, centrant-te en com cada llibreria descriu i implementa finestres temporals per a les agregacions.
4. En aquest exercici, has estat mostrant els resultats a la consola, però Spark Structured Streaming ofereix una varietat d'altres formats de sortida. **Si us plau, descriu aquests formats**, destacant la gamma d'opcions disponibles per presentar i utilitzar els resultats generats per Spark Structured Streaming.