<a href="https://colab.research.google.com/github/rgprado/processamento_larga_escala/blob/main/spark_assignment_RodrigoPrado.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Sobre os dados

O arquivo CSV contém eventos 'click' ou 'view' no tempo, de usuários em anúncios de determinadas campanhas.

**Descrição das colunas:**  
timestamp,user_id,action,adId,campaignId 

**Amostra:**  
2016-09-21 22:11:00,7c74953c-66cc-48bd-9d02-a02bf039cf3f,click,adId_09,campaignId_01  
2016-06-25 18:29:00,676a083e-2f8e-4ff2-9ec2-270f7f9d6033,view,adId_09,campaignId_02  
2016-02-14 19:03:00,77158997-0dfa-48b7-9149-973dc151ef8d,click,adId_02,campaignId_02  
2016-03-26 06:27:00,78aa2467-b502-413b-94e9-04ec8210bd13,click,adId_07,campaignId_03

**Nome do arquivo CSV:**  
data/ad_action.csv

## Sobre as questões

As questões devem ser respondidas usando alguma API da tecnologia Spark, exceto a API "Pandas API on Spark".

Quando utilizar uma action do Spark tenha cuidado para evitar estouro de memória, sempre imaginado que vai executar o código com uma grande massa de dados.

Mesmo que não consiga terminar alguma questão, favor enviar, porque parte do código pode valer alguma pontuação.

In [1]:
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark

tar: spark-3.0.1-bin-hadoop2.7.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '\
    --driver-memory 2G \
    --executor-memory 2G \
    pyspark-shell'

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [3]:
# Para quem usar Spark com Map-Reduce

# conf = SparkConf().setMaster("local[*]")
# sc = SparkContext.getOrCreate(conf=conf)
# data_sc = sc.textFile('ad_action.csv')
# data_sc.take(5)

In [4]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col, desc, asc, lag
from pyspark.sql.window import Window

In [5]:
# Para quem usar Spark SQL

spark = SparkSession.builder\
    .master("local[*]")\
    .getOrCreate()
data_spark = spark.read.csv('ad_action.csv', header=False, inferSchema=True)
data_spark = data_spark.withColumnRenamed('_c0', 'timestamp')
data_spark = data_spark.withColumnRenamed('_c1', 'user_id')
data_spark = data_spark.withColumnRenamed('_c2', 'action')
data_spark = data_spark.withColumnRenamed('_c3', 'adId')
data_spark = data_spark.withColumnRenamed('_c4', 'campaignId')
data_spark = data_spark.withColumn('timestamp', to_timestamp('timestamp'))
data_spark.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- action: string (nullable = true)
 |-- adId: string (nullable = true)
 |-- campaignId: string (nullable = true)



In [6]:
SparkSession.getActiveSession()

In [7]:
data_spark.show(5)

+-------------------+--------------------+------+-------+-------------+
|          timestamp|             user_id|action|   adId|   campaignId|
+-------------------+--------------------+------+-------+-------------+
|2016-09-21 22:11:00|7c74953c-66cc-48b...| click|adId_09|campaignId_01|
|2016-06-25 18:29:00|676a083e-2f8e-4ff...|  view|adId_09|campaignId_02|
|2016-02-14 19:03:00|77158997-0dfa-48b...| click|adId_02|campaignId_02|
|2016-03-26 06:27:00|78aa2467-b502-413...| click|adId_07|campaignId_03|
|2016-01-02 04:57:00|fef9a98c-d73e-48e...|  view|adId_02|campaignId_02|
+-------------------+--------------------+------+-------+-------------+
only showing top 5 rows



In [8]:
# Descomente para desligar clusters

#sc.stop()
# spark.stop()

## 1) Quais são as top 3 campanhas que geraram mais eventos? Ordene pela quantidade de eventos (2,5 pontos)

In [9]:
%%time
most_popular_cp = data_spark.groupby('campaignId')\
    .count()\
    .orderBy(desc('count'))\
    .take(3)

print('Top 3 campanhas com mais enventos:')
for i, b in enumerate(most_popular_cp):
  print(most_popular_cp[i]['campaignId'])

Top 3 campanhas com mais enventos:
campaignId_02
campaignId_03
campaignId_01
CPU times: user 26.5 ms, sys: 7.95 ms, total: 34.4 ms
Wall time: 3.22 s


## 2) Qual campanha teve mais clicks? (2,5 pontos)

In [10]:
data_spark.groupBy('campaignId', 'action')\
    .count()\
    .orderBy(desc('count'))\
    .where(col("action") == 'click')\
    .show()

+-------------+------+-----+
|   campaignId|action|count|
+-------------+------+-----+
|campaignId_02| click|63983|
|campaignId_03| click|60947|
|campaignId_01| click|53375|
+-------------+------+-----+



In [11]:
most_click_campain = data_spark.groupBy('campaignId', 'action')\
    .count()\
    .orderBy(desc('count'))\
    .where(col("action") == 'click')\
    .take(1)

print('A campanha "{}" foi a mais clicada com {} clicks'.format(most_click_campain[0]['campaignId'], most_click_campain[0]['count']))

A campanha "campaignId_02" foi a mais clicada com 63983 clicks


## 3) Dos 12 meses do ano, qual teve o maior total de eventos acumulado ao longo dos anos? (2,5 pontos)

In [12]:
from pyspark.sql.functions import *
data_spark.select(date_format(col("timestamp"),'MMMM').alias('month'), 'action')\
  .groupby('month')\
  .count()\
  .orderBy(desc('count'))\
  .show()

+---------+-----+
|    month|count|
+---------+-----+
|  January|25800|
|    March|21377|
|  October|21363|
|   August|21362|
|      May|21224|
|     July|21183|
|     June|20657|
| November|20646|
|September|20627|
|    April|20558|
| December|20297|
| February|19619|
+---------+-----+



In [13]:
most_event_month = data_spark.select(date_format(col("timestamp"),'MMMM').alias('month'), 'action')\
  .groupby('month')\
  .count()\
  .orderBy(desc('count'))\
  .take(1)
print('Mês que teve o maior total de eventos acumulado foi "{}" com o total de {} eventos.'.format(most_event_month[0]['month'], most_event_month[0]['count']))

Mês que teve o maior total de eventos acumulado foi "January" com o total de 25800 eventos.


In [14]:
most_event_month

[Row(month='January', count=25800)]

## 4) Nas situações onde existe um evento de view seguido de um evento de click criados pelo mesmo usuário no mesmo anúncio e campanha, quais são os 5 pares de anúncio e campanha com menores médias de tempo entre os dois eventos (2,5 pontos)

In [56]:
# Fazendo shift do timestamp, user_id, action, adID e campaignId
w = Window().partitionBy().orderBy(col("timestamp"))
df_spark = data_spark.select("*", lag("timestamp",-1).over(w).alias("timestamp_next"), \
                             lag("user_id",-1).over(w).alias("user_id_next"),\
                             lag("action",-1).over(w).alias("action_next"),\
                             lag("adId",-1).over(w).alias("adId_next"),\
                             lag("campaignId",-1).over(w).alias("campaignId_next")\
                             )

df_spark.show()

+-------------------+--------------------+------+-------+-------------+-------------------+--------------------+-----------+---------+---------------+
|          timestamp|             user_id|action|   adId|   campaignId|     timestamp_next|        user_id_next|action_next|adId_next|campaignId_next|
+-------------------+--------------------+------+-------+-------------+-------------------+--------------------+-----------+---------+---------------+
|2016-01-01 00:00:00|c577e717-1be3-4e6...| click|adId_06|campaignId_02|2016-01-01 00:07:00|0fb092f2-6721-4dd...|      click|  adId_10|  campaignId_03|
|2016-01-01 00:07:00|0fb092f2-6721-4dd...| click|adId_10|campaignId_03|2016-01-01 00:07:00|4fde5dfa-ccde-4c2...|       view|  adId_07|  campaignId_02|
|2016-01-01 00:07:00|4fde5dfa-ccde-4c2...|  view|adId_07|campaignId_02|2016-01-01 00:11:00|2dbd9392-768e-4ba...|      click|  adId_03|  campaignId_03|
|2016-01-01 00:11:00|2dbd9392-768e-4ba...| click|adId_03|campaignId_03|2016-01-01 00:12:00|cd3

In [67]:
# Fazendo filtro das linhas que correspondem aos critérios solicitados
# um evento de view seguido de um evento de click criados pelo mesmo usuário no mesmo anúncio e campanha
df_pares = df_spark.filter(
    (col('user_id') == col('user_id_next')) &
    (col('action') == 'view') & (col('action_next') == 'click') &
    (col('adId') == col('adId_next')) &
    (col('campaignId') == col('campaignId_next'))
)

df_pares.printSchema()
df_pares.show()

root
 |-- timestamp: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- action: string (nullable = true)
 |-- adId: string (nullable = true)
 |-- campaignId: string (nullable = true)
 |-- timestamp_next: timestamp (nullable = true)
 |-- user_id_next: string (nullable = true)
 |-- action_next: string (nullable = true)
 |-- adId_next: string (nullable = true)
 |-- campaignId_next: string (nullable = true)

+-------------------+--------------------+------+-------+-------------+-------------------+--------------------+-----------+---------+---------------+
|          timestamp|             user_id|action|   adId|   campaignId|     timestamp_next|        user_id_next|action_next|adId_next|campaignId_next|
+-------------------+--------------------+------+-------+-------------+-------------------+--------------------+-----------+---------+---------------+
|2016-01-12 07:21:00|796ea1b7-7b9c-4e0...|  view|adId_02|campaignId_01|2016-01-12 07:32:00|796ea1b7-7b9c-4e0...|      

In [17]:
df_results = df_pares.withColumn('tempo_eventos', col("timestamp_next").cast("long") - col('timestamp').cast("long"))
df_results.show()

+-------------------+--------------------+------+-------+-------------+-------------------+--------------------+-----------+---------+---------------+-------------+
|          timestamp|             user_id|action|   adId|   campaignId|     timestamp_next|        user_id_next|action_next|adId_next|campaignId_next|tempo_eventos|
+-------------------+--------------------+------+-------+-------------+-------------------+--------------------+-----------+---------+---------------+-------------+
|2016-01-12 07:21:00|796ea1b7-7b9c-4e0...|  view|adId_02|campaignId_01|2016-01-12 07:32:00|796ea1b7-7b9c-4e0...|      click|  adId_02|  campaignId_01|          660|
|2016-01-17 10:06:00|1d625585-afe1-44e...|  view|adId_04|campaignId_02|2016-01-17 10:06:00|1d625585-afe1-44e...|      click|  adId_04|  campaignId_02|            0|
|2016-01-20 20:58:00|5b5c2595-2af7-44c...|  view|adId_06|campaignId_01|2016-01-20 20:58:00|5b5c2595-2af7-44c...|      click|  adId_06|  campaignId_01|            0|
|2016-02-0

In [18]:
df_results.select('adId', 'campaignId', 'tempo_eventos')\
  .groupby('adId', 'campaignId')\
  .avg()\
  .orderBy(asc('avg(tempo_eventos)'))\
  .take(10)

[Row(adId='adId_07', campaignId='campaignId_03', avg(tempo_eventos)=0.0),
 Row(adId='adId_04', campaignId='campaignId_02', avg(tempo_eventos)=0.0),
 Row(adId='adId_10', campaignId='campaignId_03', avg(tempo_eventos)=0.0),
 Row(adId='adId_05', campaignId='campaignId_01', avg(tempo_eventos)=0.0),
 Row(adId='adId_01', campaignId='campaignId_02', avg(tempo_eventos)=0.0),
 Row(adId='adId_02', campaignId='campaignId_02', avg(tempo_eventos)=30.0),
 Row(adId='adId_09', campaignId='campaignId_02', avg(tempo_eventos)=30.0),
 Row(adId='adId_09', campaignId='campaignId_01', avg(tempo_eventos)=45.0),
 Row(adId='adId_06', campaignId='campaignId_01', avg(tempo_eventos)=60.0),
 Row(adId='adId_01', campaignId='campaignId_03', avg(tempo_eventos)=60.0)]

In [36]:
df_results.select('adId', 'campaignId', 'tempo_eventos')\
  .orderBy(asc('adId'), asc('campaignId'))\
  .show()

+-------+-------------+-------------+
|   adId|   campaignId|tempo_eventos|
+-------+-------------+-------------+
|adId_01|campaignId_01|          900|
|adId_01|campaignId_01|           60|
|adId_01|campaignId_01|            0|
|adId_01|campaignId_01|          120|
|adId_01|campaignId_02|            0|
|adId_01|campaignId_03|           60|
|adId_01|campaignId_03|           60|
|adId_02|campaignId_01|          660|
|adId_02|campaignId_02|           60|
|adId_02|campaignId_02|            0|
|adId_02|campaignId_03|           60|
|adId_02|campaignId_03|            0|
|adId_02|campaignId_03|          660|
|adId_03|campaignId_02|           60|
|adId_03|campaignId_02|          120|
|adId_04|campaignId_01|           60|
|adId_04|campaignId_01|          120|
|adId_04|campaignId_02|            0|
|adId_04|campaignId_02|            0|
|adId_05|campaignId_01|            0|
+-------+-------------+-------------+
only showing top 20 rows

