**Terceiro Trabalho da disciplina Processamento de Dados em Larga Escala**

**Discentes:**

*   **Valéria Cristina A. R. de Figueredo -- vcarf@cesar.school**
*   **Manuela de Lacerda Bezerra Carvalho -- mlbc@cesar.school**


**Docente: Anderson Neves**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install --upgrade pyspark



## Sobre os dados

O arquivo CSV contém eventos 'click' ou 'view' no tempo, de usuários em anúncios de determinadas campanhas.

**Descrição das colunas:**  
timestamp,user_id,action,adId,campaignId

**Amostra:**  
2016-09-21 22:11:00,7c74953c-66cc-48bd-9d02-a02bf039cf3f,click,adId_09,campaignId_01  
2016-06-25 18:29:00,676a083e-2f8e-4ff2-9ec2-270f7f9d6033,view,adId_09,campaignId_02  
2016-02-14 19:03:00,77158997-0dfa-48b7-9149-973dc151ef8d,click,adId_02,campaignId_02  
2016-03-26 06:27:00,78aa2467-b502-413b-94e9-04ec8210bd13,click,adId_07,campaignId_03

**Nome da pasta com os arquivos CSV:**  
data/ad_action_assignment

## Sobre as questões

Usar código de Structured Streaming na resposta.

Favor não alterar o código que gera o inputStream para manter o mesmo padrão na resposta.

Mesmo que não consiga terminar alguma questão, favor enviar, porque parte do código pode valer alguma pontuação.
## ----------------------

In [None]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.streaming import DataStreamReader
from pyspark import StorageLevel


In [None]:
# Criando um cluster local com 2 workers, 1 cores por worker e 3GB de RAM por worker

spark = SparkSession.builder\
    .master('local-cluster[2, 1, 3072]')\
    .getOrCreate()
spark

In [None]:
AD_ACTION_CSV_PATH = "drive/MyDrive/data/ad_action_assignment"

In [None]:
# Descomente e execute para parar o SparkSession
# spark.stop()

## Foi utilizado o notebook `assignment_split_ad_action_data.ipynb` para criar os dados do streaming escrevendo arquivos CSV na pasta `data/ad_action_assignment`

## 1) Quantos eventos as campanhas geraram nos últimos 10 segundos? Ordene crescente pelo window.start, descrescente pela quantidade de eventos e calcule a cada 3 segundos. (4 pontos)

In [None]:
inputStream = spark.readStream.csv(
    AD_ACTION_CSV_PATH,
    schema="timestamp TIMESTAMP, \
        user_id STRING, \
        action STRING, \
        adId STRING, \
        campaignId STRING"
)

# ESCREVA SEU CÓDIGO AQUI
inputStream = inputStream.groupBy(
      F.window("timestamp", "10 seconds", "3 seconds"),
      "campaignId"
    ).count()

def foreach_batch_function(df, epoch_id):
    window_group = Window.partitionBy('window.start')\
        .orderBy(F.desc('count'))
    df = df.orderBy(F.asc('window.start'), F.desc('count'))
    print(epoch_id)
    print(df.toPandas())

query = inputStream\
    .writeStream\
    .outputMode('complete')\
    .foreachBatch(foreach_batch_function)\
    .start()
query.awaitTermination(60)

0
                                        window     campaignId  count
0   (2023-09-01 03:30:51, 2023-09-01 03:31:01)  campaignId_02   1428
1   (2023-09-01 03:30:51, 2023-09-01 03:31:01)  campaignId_03   1418
2   (2023-09-01 03:30:51, 2023-09-01 03:31:01)  campaignId_01   1400
3   (2023-09-01 03:30:54, 2023-09-01 03:31:04)  campaignId_03   5923
4   (2023-09-01 03:30:54, 2023-09-01 03:31:04)  campaignId_02   5786
..                                         ...            ...    ...
67  (2023-09-01 03:31:57, 2023-09-01 03:32:07)  campaignId_03   5884
68  (2023-09-01 03:31:57, 2023-09-01 03:32:07)  campaignId_01   4918
69  (2023-09-01 03:32:00, 2023-09-01 03:32:10)  campaignId_03   1561
70  (2023-09-01 03:32:00, 2023-09-01 03:32:10)  campaignId_02   1525
71  (2023-09-01 03:32:00, 2023-09-01 03:32:10)  campaignId_01   1159

[72 rows x 3 columns]


False

In [None]:
# Stop job
query.stop()

## 2) Quais são os 5 pares de anúncio e campanha que geraram menos clicks no último minuto? Ordene crescente pelo window.start, crescente pela quantidade de clicks e calcule a cada 30 segundos. (3 pontos)

In [None]:
inputStream = spark.readStream.csv(
    AD_ACTION_CSV_PATH,
    schema="timestamp TIMESTAMP, \
        user_id STRING, \
        action STRING, \
        adId STRING, \
        campaignId STRING"
)

# ESCREVA SEU CÓDIGO AQUI
inputStream = inputStream.where(F.col('action') == "click")\
    .groupBy(
      F.window("timestamp", "60 seconds", "30 seconds"),
      "campaignId", "adId"
    ).count()

def foreach_batch_function(df, epoch_id):
    window = Window.partitionBy('window.start')\
        .orderBy(F.asc('count'))
    df = df.withColumn('rank', F.row_number().over(window))\
        .where(F.col('rank') <= 5)\
        .drop('rank')\
        .orderBy(F.asc('window.start'), F.asc('count'))
    print(epoch_id)
    print(df.toPandas())

query = inputStream\
    .writeStream\
    .outputMode('complete')\
    .foreachBatch(foreach_batch_function)\
    .start()
query.awaitTermination(50)


0
                                        window     campaignId     adId  count
0   (2023-09-01 03:30:30, 2023-09-01 03:31:30)  campaignId_03  adId_03   2433
1   (2023-09-01 03:30:30, 2023-09-01 03:31:30)  campaignId_01  adId_10   2460
2   (2023-09-01 03:30:30, 2023-09-01 03:31:30)  campaignId_01  adId_04   2462
3   (2023-09-01 03:30:30, 2023-09-01 03:31:30)  campaignId_01  adId_02   2505
4   (2023-09-01 03:30:30, 2023-09-01 03:31:30)  campaignId_01  adId_07   2518
5   (2023-09-01 03:31:00, 2023-09-01 03:32:00)  campaignId_01  adId_04   4950
6   (2023-09-01 03:31:00, 2023-09-01 03:32:00)  campaignId_01  adId_02   4995
7   (2023-09-01 03:31:00, 2023-09-01 03:32:00)  campaignId_01  adId_10   5010
8   (2023-09-01 03:31:00, 2023-09-01 03:32:00)  campaignId_01  adId_01   5076
9   (2023-09-01 03:31:00, 2023-09-01 03:32:00)  campaignId_03  adId_03   5109
10  (2023-09-01 03:31:30, 2023-09-01 03:32:30)  campaignId_01  adId_04   2563
11  (2023-09-01 03:31:30, 2023-09-01 03:32:30)  campaignId_01 

False

In [None]:
# Stop job
query.stop()

## 3) Qual é o total acumulado de clicks e o total acumulado de views? Calcule a medida que os dados são recebidos no streaming (3 pontos)

In [None]:
inputStream = spark.readStream.csv(
    AD_ACTION_CSV_PATH,
    schema="timestamp TIMESTAMP, \
        user_id STRING, \
        action STRING, \
        adId STRING, \
        campaignId STRING"
)

# ESCREVA SEU CÓDIGO AQUI
inputStream = inputStream.groupBy(
      F.window("timestamp", "10 seconds", "10 seconds"),
      "action"
    ).count()

def foreach_batch_function(df, epoch_id):
    df_pandas = df.toPandas()
    # Para cada tipo de ação (click, view), mostre o total acumulado
    total_clicks = df_pandas[df_pandas['action'] == 'click']['count'].sum()
    total_views = df_pandas[df_pandas['action'] == 'view']['count'].sum()

    print(df_pandas)
    print(f"Total acumulado de clicks: {total_clicks}")
    print(f"Total acumulado de views: {total_views}")

query = inputStream\
    .writeStream\
    .outputMode('complete')\
    .foreachBatch(foreach_batch_function)\
    .start()
query.awaitTermination(35)

                                        window action  count
0   (2023-09-01 03:31:00, 2023-09-01 03:31:10)   view  12807
1   (2023-09-01 03:31:50, 2023-09-01 03:32:00)  click  29712
2   (2023-09-01 03:32:00, 2023-09-01 03:32:10)  click   3001
3   (2023-09-01 03:31:40, 2023-09-01 03:31:50)  click  29597
4   (2023-09-01 03:31:00, 2023-09-01 03:31:10)  click  29653
5   (2023-09-01 03:31:30, 2023-09-01 03:31:40)  click  29897
6   (2023-09-01 03:31:10, 2023-09-01 03:31:20)   view  12715
7   (2023-09-01 03:31:10, 2023-09-01 03:31:20)  click  29738
8   (2023-09-01 03:31:20, 2023-09-01 03:31:30)  click  26707
9   (2023-09-01 03:31:40, 2023-09-01 03:31:50)   view  12853
10  (2023-09-01 03:31:30, 2023-09-01 03:31:40)   view  12553
11  (2023-09-01 03:31:20, 2023-09-01 03:31:30)   view  11498
12  (2023-09-01 03:31:50, 2023-09-01 03:32:00)   view  12738
13  (2023-09-01 03:32:00, 2023-09-01 03:32:10)   view   1244
Total acumulado de clicks: 178305
Total acumulado de views: 76408


False

In [None]:
# Stop job
query.stop()

## 4) Qual é a porcentagem de usuários que visualizaram e clicaram em um mesmo anúncio e campanha pelo menos uma vez nos últimos 10 segundos? Calcule a cada 10 segundos. (1 ponto extra na nota final)

Exemplo:
20% de 5 usuários clicaram e visualizaram em um mesmo anúncio e campanha pelo menos uma vez. Nesse caso, dos 5 usuários, 1 tem eventos de click e view em pelo menos 1 par de anúncio e campanha. Enquanto que, 4 usuários não apresentam esse padrão. Lembrando que cada intervalo de 10 segundos do janelamento vai ter um valor percentual associado.

```
user_id   adId	  campaignId	  action
U1		A1		C1 	 		click
U1		A1		C1 	 		view
U1		A2		C1 	 		click
U1		A2		C1 	 		view
U2		A1		C1 	 		view
U2		A2		C1 	 		click
U3		A1		C1 	 		view
U4		A2		C1 	 		click
U5		A1		C1 	 		click
```

In [None]:
# Leitura do fluxo de dados
inputStream = spark.readStream.csv(
    AD_ACTION_CSV_PATH,
    schema="""timestamp TIMESTAMP,
              user_id STRING,
              action STRING,
              adId STRING,
              campaignId STRING"""
)

# Agrupando por janela de 10 segundos (para os cálculos de visualização e clique)
inputStream = inputStream.groupBy(
    F.window('timestamp', '10 seconds'),  # Agrupando por uma janela de 10 segundos
    'user_id', 'adId', 'campaignId', 'action'
).count()

# Função que será chamada a cada "batch" de dados
def foreach_batch_function(df, epoch_id):
    # Separando as visualizações e cliques
    views_df = df.filter(F.col('action') == 'view')  # Filtrando apenas visualizações
    clicks_df = df.filter(F.col('action') == 'click')  # Filtrando apenas cliques

    # Identificando usuários que visualizaram e clicaram no mesmo anúncio/campanha
    views_and_clicks_df = views_df.join(clicks_df,
        on=['user_id', 'adId', 'campaignId', 'window'], how='inner')

    # Calculando o número total de usuários que realizaram qualquer ação (visualização ou clique)
    total_users_df = df.groupBy('window', 'adId', 'campaignId').agg(
        F.countDistinct('user_id').alias('total_users')
    )

    # Calculando o número de usuários que fizeram ambas as ações (visualização e clique)
    users_with_both_actions_df = views_and_clicks_df.groupBy('window', 'adId', 'campaignId').agg(
        F.countDistinct('user_id').alias('users_with_both_actions')
    )

    # Juntando os dois DataFrames para calcular a porcentagem
    result_df = total_users_df.join(users_with_both_actions_df,
                                     on=['window', 'adId', 'campaignId'], how='left')

    # Calculando a porcentagem de usuários que fizeram ambas as ações
    result_df = result_df.withColumn(
        'percentage',
        F.coalesce(F.col('users_with_both_actions') / F.col('total_users') * 100, F.lit(0))
    ).select('window', 'adId', 'campaignId', F.round('percentage', 2).alias('percentage'))

    # Exibindo os resultados no console
    print(f"Epoch {epoch_id}:")
    print(result_df.toPandas())

# Consulta de streaming
query = inputStream \
    .writeStream \
    .outputMode('complete') \
    .foreachBatch(foreach_batch_function) \
    .start()

query.awaitTermination(180)


Epoch 0:
                                         window     adId     campaignId  \
0    (2023-09-01 03:32:00, 2023-09-01 03:32:10)  adId_04  campaignId_01   
1    (2023-09-01 03:31:10, 2023-09-01 03:31:20)  adId_04  campaignId_02   
2    (2023-09-01 03:31:10, 2023-09-01 03:31:20)  adId_10  campaignId_01   
3    (2023-09-01 03:31:40, 2023-09-01 03:31:50)  adId_07  campaignId_02   
4    (2023-09-01 03:31:50, 2023-09-01 03:32:00)  adId_08  campaignId_03   
..                                          ...      ...            ...   
198  (2023-09-01 03:31:50, 2023-09-01 03:32:00)  adId_04  campaignId_01   
199  (2023-09-01 03:32:00, 2023-09-01 03:32:10)  adId_08  campaignId_03   
200  (2023-09-01 03:31:10, 2023-09-01 03:31:20)  adId_09  campaignId_02   
201  (2023-09-01 03:32:00, 2023-09-01 03:32:10)  adId_10  campaignId_01   
202  (2023-09-01 03:31:40, 2023-09-01 03:31:50)  adId_04  campaignId_03   

     percentage  
0         20.29  
1         67.39  
2         60.95  
3         68.30  


False

In [None]:
# Stop job
query.stop()