<a href="https://colab.research.google.com/github/niltontac/EspAnalise-EngDados/blob/master/DASK_and_SPARK_AnalysisEvents_Clicks%26Views_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Sobre os dados

O arquivo CSV contém eventos 'click' ou 'view' no tempo, de usuários em anúncios de determinadas campanhas.

**Descrição das colunas:**  
timestamp,user_id,action,adId,campaignId 

**Amostra:**  
2016-09-21 22:11:00,7c74953c-66cc-48bd-9d02-a02bf039cf3f,click,adId_09,campaignId_01  
2016-06-25 18:29:00,676a083e-2f8e-4ff2-9ec2-270f7f9d6033,view,adId_09,campaignId_02  
2016-02-14 19:03:00,77158997-0dfa-48b7-9149-973dc151ef8d,click,adId_02,campaignId_02  
2016-03-26 06:27:00,78aa2467-b502-413b-94e9-04ec8210bd13,click,adId_07,campaignId_03

**Nome do arquivo CSV:**  
ad_action.csv
## ----------------------

In [None]:
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:64047  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.49 GB


In [None]:
data_dask = dd.read_csv('C:/Users/niltontac/Documents/Cesar_School/Análise_e_Engenharia_de_Dados/09._GDBD_-_Gerenciamento_de_Dados_para_BigData/Spark/data/ad_action.csv', header=None, parse_dates=[0])
data_dask.columns = ['timestamp','user_id','action','adId','campaignId']
data_dask

Unnamed: 0_level_0,timestamp,user_id,action,adId,campaignId
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,datetime64[ns],object,object,object,object
,...,...,...,...,...


In [None]:
data_dask.head(5)

Unnamed: 0,timestamp,user_id,action,adId,campaignId
0,2016-09-21 22:11:00,7c74953c-66cc-48bd-9d02-a02bf039cf3f,click,adId_09,campaignId_01
1,2016-06-25 18:29:00,676a083e-2f8e-4ff2-9ec2-270f7f9d6033,view,adId_09,campaignId_02
2,2016-02-14 19:03:00,77158997-0dfa-48b7-9149-973dc151ef8d,click,adId_02,campaignId_02
3,2016-03-26 06:27:00,78aa2467-b502-413b-94e9-04ec8210bd13,click,adId_07,campaignId_03
4,2016-01-02 04:57:00,fef9a98c-d73e-48ef-b2cb-766ba85dc3e3,view,adId_02,campaignId_02


#### Utilizando Dask para analisar os dados e listar as top 3 campanhas que geraram mais eventos e ordenando pela quantidade de eventos (clicks e views)

In [None]:
more_events = data_dask[['campaignId', 'action']]\
    .groupby('campaignId')['action']\
    .count()\
    .compute()\
    .sort_values(ascending=False)

print('As campanhas top 3 são:', more_events)

As campanhas top 3 são: campaignId
campaignId_02    91216
campaignId_03    87036
campaignId_01    76461
Name: action, dtype: int64


#### Listando com Dask qual campanha teve mais clicks

In [None]:
campaign_more_clicks = data_dask[['campaignId', 'action']].groupby('campaignId')['action']\
    .count()\
    .nlargest(1)\
    .compute()\
    .index[0]

campaign_clicks = data_dask[['campaignId', 'timestamp']].groupby('campaignId')['timestamp']\
    .count()\
    .nlargest(1)\
    .compute()\
    .index[0]

number_clicks = data_dask[['campaignId', 'action']]\
    .where(data_dask['campaignId'] == campaign_clicks)['action']\
    .value_counts()\
    .compute()\
    .loc['click']

print('A campanha com mais clicks é a',campaign_more_clicks, 'com', f"{number_clicks} clicks")

A campanha com mais clicks é a campaignId_02 com 63983 clicks


#### Preparando ambiente para Spark

In [None]:
# Para que o Jupyter consiga carregar o Spark corretamente no notebook
import findspark
from distutils.sysconfig import get_python_lib
findspark.init(f"{get_python_lib()}/pyspark")
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [None]:
# Para quem usar Spark com Map-Reduce
conf = SparkConf().setMaster("local[*]")
sc = SparkContext(conf=conf)
#data_sc = sc.textFile('ad_action.csv')
#data_sc.take(5)

In [None]:
# Para quem usar Spark SQL
from pyspark.sql.functions import to_timestamp
spark = SparkSession.builder\
    .master("local[*]")\
    .getOrCreate()
data_spark = spark.read.csv('C:/Users/niltontac/Documents/Cesar_School/Análise_e_Engenharia_de_Dados/09._GDBD_-_Gerenciamento_de_Dados_para_BigData/Spark/data/ad_action.csv', header=False, inferSchema=True)
data_spark = data_spark.withColumnRenamed('_c0', 'timestamp')
data_spark = data_spark.withColumnRenamed('_c1', 'user_id')
data_spark = data_spark.withColumnRenamed('_c2', 'action')
data_spark = data_spark.withColumnRenamed('_c3', 'adId')
data_spark = data_spark.withColumnRenamed('_c4', 'campaignId')
data_spark = data_spark.withColumn('timestamp', to_timestamp('timestamp'))
data_spark.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- action: string (nullable = true)
 |-- adId: string (nullable = true)
 |-- campaignId: string (nullable = true)



In [None]:
data_spark.show(5)

+-------------------+--------------------+------+-------+-------------+
|          timestamp|             user_id|action|   adId|   campaignId|
+-------------------+--------------------+------+-------+-------------+
|2016-09-21 22:11:00|7c74953c-66cc-48b...| click|adId_09|campaignId_01|
|2016-06-25 18:29:00|676a083e-2f8e-4ff...|  view|adId_09|campaignId_02|
|2016-02-14 19:03:00|77158997-0dfa-48b...| click|adId_02|campaignId_02|
|2016-03-26 06:27:00|78aa2467-b502-413...| click|adId_07|campaignId_03|
|2016-01-02 04:57:00|fef9a98c-d73e-48e...|  view|adId_02|campaignId_02|
+-------------------+--------------------+------+-------+-------------+
only showing top 5 rows



#### Utilizando Spark para trazer o mês (entre 12 meses do ano) que teve o maior total de eventos acumulado ao longo dos anos

In [None]:
from pyspark.sql.functions import month, year

data_spark = data_spark.withColumn('Year', year('timestamp'))\
    .withColumn('Month', month('timestamp'))\

data_spark.groupBy('Month')\
    .count()\
    .withColumnRenamed('count', 'MounthEvents')\
    .toPandas()\
    .sort_values(by='MounthEvents', ascending=False)\
    .head(1)

Unnamed: 0,Month,MounthEvents
1,1,25800


#### Analisando com Spark qual campanha teve mais views

In [None]:
AD_ACTION_CSV_PATH = 'C:/Users/niltontac/Documents/Cesar_School/Análise_e_Engenharia_de_Dados/09._GDBD_-_Gerenciamento_de_Dados_para_BigData/Spark/data/ad_action.csv'

In [None]:
def parse_campaing(line):
    fields = line.split(",")
    return (fields[4], 1)

def parse_campaing_action(line):
    fields = line.split(",")
    return (fields[4], fields[2])

rdd = sc.textFile(AD_ACTION_CSV_PATH)

campaings = rdd.map(parse_campaing)

campaings_count = campaings.reduceByKey(lambda x, y: x + y)

campaings_sorted = campaings_count.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)

most_popular_campain_list = campaings_sorted.take(1)
most_popular_campain = most_popular_campain_list[0][1]

campaings = rdd.map(parse_campaing_action)

best_campaing_views = campaings.filter(lambda x: most_popular_campain == x[0] and "view" == x[1])

print("A campanha que teve mais views foi a {}".format(most_popular_campain), "com {}".format(best_campaing_views.count()), "views")

A campanha que teve mais views foi a campaignId_02 com 27233 views


#### Spark Streaming

In [None]:
import numpy
import operator

from pyspark.streaming import StreamingContext

#### Configurando Streaming para dividir os dados em 25471 mini-batches e enviar 1 por segundo

In [None]:
ad_action_rdd = sc.textFile("C:/Users/niltontac/Documents/Cesar_School/Análise_e_Engenharia_de_Dados/09._GDBD_-_Gerenciamento_de_Dados_para_BigData/Spark/data/ad_action.csv")
ad_action = ad_action_rdd.collect()
print(f"Total de registros: {len(ad_action)}")
ad_action_stream = numpy.array_split(ad_action, 25471)
rddQueue = [sc.parallelize(mini_batch) for mini_batch in ad_action_stream]
print(f"Total de mini-batches: {len(rddQueue)}")
print(f"Total de registros no primeiro mini-batch: {len(ad_action_stream[0])}")
print(ad_action_stream[:3])

Total de registros: 254713
Total de mini-batches: 25471
Total de registros no primeiro mini-batch: 11
[array(['2016-09-21 22:11:00,7c74953c-66cc-48bd-9d02-a02bf039cf3f,click,adId_09,campaignId_01',
       '2016-06-25 18:29:00,676a083e-2f8e-4ff2-9ec2-270f7f9d6033,view,adId_09,campaignId_02',
       '2016-02-14 19:03:00,77158997-0dfa-48b7-9149-973dc151ef8d,click,adId_02,campaignId_02',
       '2016-03-26 06:27:00,78aa2467-b502-413b-94e9-04ec8210bd13,click,adId_07,campaignId_03',
       '2016-01-02 04:57:00,fef9a98c-d73e-48ef-b2cb-766ba85dc3e3,view,adId_02,campaignId_02',
       '2016-03-04 09:14:00,6ba65af9-4d83-4567-b580-a34f177bb788,view,adId_09,campaignId_01',
       '2016-07-09 21:42:00,be3befb9-ee08-4311-89f4-430d23ee63f1,click,adId_09,campaignId_01',
       '2016-07-18 21:15:00,d37832ae-546f-4b9b-94f1-ab2ddfc0f49d,click,adId_05,campaignId_02',
       '2016-04-20 13:30:00,1e5b575e-19e7-44fa-b5c2-f9aa6953de8a,click,adId_03,campaignId_01',
       '2016-02-23 19:55:00,ed1b4467-8581-459

#### Utilizando SparkStreaming para listar as top 3 campanhas que geraram mais eventos nos últimos 10 segundos, ordenando pela quantidade de eventos e calculando a cada 3 segundos

In [None]:
ssc = StreamingContext(sc, 1)

# Input data
inputStream = ssc.queueStream(rddQueue)

# Process streaming
def parse_action(line):
    fields = line.split(",")
    return (fields[4], fields[2])

def events(action):
    return 1

def campaign_top(rdd):
    print(rdd.top(3, lambda x: x[1]))

inputStream.window(10, 3)\
    .map(parse_action)\
    .mapValues(events)\
    .reduceByKey(operator.add)\
    .foreachRDD(campaign_top)


# Start job
ssc.start()

In [None]:
# Stop job
ssc.stop(stopSparkContext=False, stopGraceFully=False)