# Extração de Dados do Parquet para Dataframes

Esse notebook é usado para servir de base para um script de extração de eventos do Flurry para criar dataframes que serão usados para analise de dados e extração de features.

A origem do evento é o Flurry, uma plataforma mobile app analytics, que registra os eventos realizados pelas pessoas que estão usando o aplicativo em produção. O primeiro evento registrado foi em Setembro de 2017. Diariamente são extraídos da plataforma em um formato *json*. Os arquivos tem o seguinte formato:

<img src="imagens/json-format-flurry.png">


Atualmente esses arquivos são exportados para um parquet (um formato de armazenamento colunar) bastante eficiente em espaço. No atual parquet são coletados apenas o nome do evento, o id do usuário, o modelo do dispositivo do usuário e o país onde foi registrado o evento.

Os arquivos que são criados tem como finalidade apoiar os processos de analise de dados e feature engineering.

**USERS**

* nome do arquivo - *users_YYYY-MM-DD.csv* - 
Eventos com dados de todos os usuários que já registraram algum evento pelo Flurry. A observação é o *userId* e as variáveis são: *deviceModel* (o principal modelo de aparelho em que foi registrado o evento do app) e *countryISO* (o principal país onde o usuário usou o app).

<img src="imagens/dataframe-users.png">

**EVENTS**
* nome do arquivo - *events_YYYY-MM-DD.csv* - 
Informações sobre todos os eventos registrados pelo flurry. A observação é o nome do evento: *eventName*. As seguintes features foram coletadas dos eventos: *firstOccurrence* (primeira data de ocorrência do evento), *lastOccurrence* (última data de ocorrência do evento), *intervalOccurrence* (diferença em números de dias entre as data anteriores), *aaaa-mm* dezenas de colunas anotando a quantidade de eventos registrados nos meses desde o lançamento do app.

<img src="imagens/dataframe-events.png">

**EVENTS_PER_USER**
* nome do arquivo - *events_per_user_YYYY-MM-DD.csv* - 
Informações resumidas sobre todos os eventos registrados dos usuários. As linhas são a tupla (*userId* e *eventDate*), ou seja, o usuário a data em que ele tem algum evento registrado. As *colunas são todos os eventos*, onde para cada chave (userId, eventData) é totalizado a quantidade de eventos realizados no dia.

<img src="imagens/dataframe-events-per-user.png">

## Inicialização

Definindo biblotecas e variáveis

Um pre-requisito para esse notebook é a instalação do fastparquet ou outra biblioteca similar que possa ler o arquivo parquet. As demais bibliotecas são inicializadas a seguir.

As varíaveis iniciais informam o nome dos arquivos anteriores e o nome do arquivo parquet do qual serão extraídos os novos eventos. 

In [1]:
import numpy as np
import pandas as pd
import os
from datetime import date, datetime
from pathlib import Path
import locale
locale.setlocale(locale.LC_ALL, '')

'LC_CTYPE=pt_BR.UTF-8;LC_NUMERIC=pt_BR.UTF-8;LC_TIME=pt_BR.UTF-8;LC_COLLATE=en_US.UTF-8;LC_MONETARY=pt_BR.UTF-8;LC_MESSAGES=en_US.UTF-8;LC_PAPER=pt_BR.UTF-8;LC_NAME=pt_BR.UTF-8;LC_ADDRESS=pt_BR.UTF-8;LC_TELEPHONE=pt_BR.UTF-8;LC_MEASUREMENT=pt_BR.UTF-8;LC_IDENTIFICATION=pt_BR.UTF-8'

In [2]:
# Definindo os diretórios e arquivos
path_dados   = Path('/home/wesley/data/churn-prediction')
path_parquet = path_dados/'parquet'
path_csv     = path_dados/'csv'

file_parquet             = path_parquet/'flurry.parquet'
file_csv_users           = path_csv/'cp_users.csv'
file_csv_events          = path_csv/'cp_events.csv'
file_csv_events_per_user = path_csv/'cp_events_per_user.csv'

In [3]:
# Carregando o arquivo parquet para um dataframe: df
df = pd.read_parquet(file_parquet)

In [4]:
# transformando a coluna sessionTimestamp em formato de data
df['sessionTimestamp'] = pd.to_datetime(df.sessionTimestamp, unit='ms')

# vendo características importantes do dataframe
print(df.shape, df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 59877344 entries, 0 to 59877343
Data columns (total 7 columns):
 #   Column            Dtype         
---  ------            -----         
 0   sessionTimestamp  datetime64[ns]
 1   deviceModel       object        
 2   countryISO        object        
 3   eventName         object        
 4   eventOffset       object        
 5   eventParameters   object        
 6   userId            object        
dtypes: datetime64[ns](1), object(6)
memory usage: 3.1+ GB
(59877344, 7) None


In [5]:
# Analise das primeiras linhas do dataframe
df.head()

Unnamed: 0,sessionTimestamp,deviceModel,countryISO,eventName,eventOffset,eventParameters,userId
0,2017-08-30 21:22:02.235,Moto G4 Plus,BR,Opened,48.0,{},aff3df3b88b1e955
1,2017-08-30 21:14:19.712,Moto G4 Plus,BR,,,{},
2,2017-08-28 20:02:01.905,Moto G4 Plus,BR,ativacao,63.0,{},1111
3,2017-08-30 19:10:48.562,Moto G4 Plus,BR,Opened,50.0,{},aff3df3b88b1e955
4,2017-08-29 13:49:46.685,Moto G4 Plus,BR,,,{},


### Limpeza de Dados

In [6]:
qtd_registros_inicial = df.shape[0]
print(f"Número de registros no dataframe antes da limpeza: {qtd_registros_inicial:n}")

Número de registros no dataframe antes da limpeza: 59.877.344


**Removendo linhas com eventos ou userId nulos**

In [7]:
# analisando q quantidade de registros com eventos nulos ou userId nulo
print(f'Numero de eventos Nulos:  {df.eventName.isnull().sum()}')
print(f'Numero de userId Nulos:   {df.userId.isnull().sum()}')
print(f'Numero de registros com eventos ou Ids nulos: {df[df.eventName.isnull() | df.userId.isnull()].shape[0]:n}')

Numero de eventos Nulos:  536167
Numero de userId Nulos:   350389
Numero de registros com eventos ou Ids nulos: 593.040


In [8]:
# removendo os registros com informações nulas no evento e userId
df = df[df.eventName.notnull() & df.userId.notnull()]
print(f"Número de registros no dataframe após a remoção de eventos e userId nulos: {df.shape[0]:n}")

Número de registros no dataframe após a remoção de eventos e userId nulos: 59.284.304


**Removendo linhas userId igual ao valor 0**

In [9]:
# Analisando o número de usuários com userId igual a 0
print(f"Nro registros com userID = 0 : {df.loc[df['userId'] == '0'].shape[0]:n}")

Nro registros com userID = 0 : 966.551


In [10]:
# Removendo os registros cujo userId é igual a zero.
# A premissa é que isso deve ser um erro
df = df.loc[df['userId'] != '0']
print(f"Número de registros no dataframe após a remoção de userId com valor 0: {df.shape[0]:n}")

Número de registros no dataframe após a remoção de userId com valor 0: 58.317.753


**Removendo linhas com userId com valores não numéricos**

In [11]:
# Verificando o número de registros cujo o userId não é exclusivamente numérico
print("Número de linhas com userId com valores não numéricos: {:n}".format(df.userId.str.contains('\D+').sum()))

Número de linhas com userId com valores não numéricos: 70.712


In [12]:
# Removendo registros cujo o userId não é valor numérico
df = df[df.userId.str.contains('^\d+$')]
print(f"Número de registros no dataframe após a remoção de userId com valores não numéricos: {df.shape[0]:n}")

Número de registros no dataframe após a remoção de userId com valores não numéricos: 58.247.041


In [13]:
print(f"Número de registros removidos após a limpeza: {qtd_registros_inicial-df.shape[0]:n}")

Número de registros removidos após a limpeza: 1.630.303


In [14]:
#calculando o horário do primeiro e último evento 
first_event = df.sort_values('sessionTimestamp').iloc[0]['sessionTimestamp']
last_event  = df.sort_values('sessionTimestamp').iloc[-1]['sessionTimestamp']

In [15]:
# Número total de usuários distintos na plataforma durante todo o período
print(f"Número de userIds únicos:   {df.userId.nunique():n}")
print(f"Número de eventos únicos:   {df.eventName.nunique():n}")
print("Horário do primeiro evento: {}".format(first_event.strftime("%d-%b-%Y %H:%M:%S")))
print("Horário do último evento:   {}".format(last_event.strftime("%d-%b-%Y %H:%M:%S")))

Número de userIds únicos:   210.251
Número de eventos únicos:   201
Horário do primeiro evento: 28-ago-2017 20:02:01
Horário do último evento:   16-abr-2020 23:59:58


In [16]:
# Analisando o dataframe após a limpeza
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 58247041 entries, 2 to 59877343
Data columns (total 7 columns):
 #   Column            Dtype         
---  ------            -----         
 0   sessionTimestamp  datetime64[ns]
 1   deviceModel       object        
 2   countryISO        object        
 3   eventName         object        
 4   eventOffset       object        
 5   eventParameters   object        
 6   userId            object        
dtypes: datetime64[ns](1), object(6)
memory usage: 3.5+ GB


### Criação do dataframe de eventos por usuário por data

In [17]:
%%time
# criando a feature eventDate usando apenas o dia e descartando a hora do sessionTimestamp
df['eventDate'] = df.sessionTimestamp.dt.strftime("%Y-%m-%d")

CPU times: user 3min 37s, sys: 2.16 s, total: 3min 39s
Wall time: 3min 39s


In [18]:
df.head()

Unnamed: 0,sessionTimestamp,deviceModel,countryISO,eventName,eventOffset,eventParameters,userId,eventDate
2,2017-08-28 20:02:01.905,Moto G4 Plus,BR,ativacao,63,{},1111,2017-08-28
10,2017-08-28 21:00:18.964,Moto G4 Plus,BR,ativacao,107,{},1111,2017-08-28
11,2017-08-28 21:00:18.964,Moto G4 Plus,BR,uncaught,66356,{},1111,2017-08-28
12,2017-08-28 21:00:18.964,Moto G4 Plus,BR,ativacao,8106,{},1111,2017-08-28
13,2017-08-28 21:00:18.964,Moto G4 Plus,BR,ativacao,14168,{},1111,2017-08-28


In [19]:
events_per_user = df[['eventDate','userId','eventName', 'eventOffset']].\
                  groupby(['eventDate', 'userId', 'eventName'], as_index=False).agg({'eventOffset': 'count'})
events_per_user.columns = ['eventDate', 'userId', 'eventName', 'eventOccurrences']

In [20]:
events_per_user.head()

Unnamed: 0,eventDate,userId,eventName,eventOccurrences
0,2017-08-28,1111,ativacao,6
1,2017-08-28,1111,uncaught,2
2,2017-10-02,123,Opened,9
3,2017-10-02,123,donate,2
4,2017-10-02,123,onBoardingStage,3


In [21]:
events_per_user = events_per_user.\
                  pivot_table(index=['eventDate', 'userId'], columns='eventName', values='eventOccurrences').\
                  reset_index().fillna(0)

In [29]:
events_per_user.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3805376 entries, 0 to 3805375
Columns: 203 entries, eventDate to uncaught
dtypes: float64(201), object(2)
memory usage: 5.8+ GB


In [32]:
events_per_user.head()

eventName,eventDate,userId,ActivedNotificationIOS,ApparedMessageWrongLogin,AssociatedWithFacebook,ClickedAdAfterDonate,ClickedAdBeforeDonate,ClickedAdMoreInfo,ClickedBadgesList,ClickedBadgesUnupdated,...,rcdloi-ok,referrer,reportError,requestDeleteAccount,selectImpact,skipTutorialFromOnBoarding,test-referral-64,test-referral-70,test-referral-87,uncaught
0,2017-08-28,1111,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0
1,2017-10-02,123,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0
2,2017-10-02,6,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
3,2017-10-02,7,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.0
4,2017-10-03,6,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [31]:
events_per_user.userId.nunique()

210251

In [33]:
# Analisando as primeiras linhas, mas de um evento específico
events_per_user[['userId', 'eventDate', 'donate']].head(10)

eventName,userId,eventDate,donate
0,1111,2017-08-28,0.0
1,123,2017-10-02,2.0
2,6,2017-10-02,0.0
3,7,2017-10-02,0.0
4,6,2017-10-03,0.0
5,7,2017-10-03,0.0
6,4,2017-10-04,1.0
7,6,2017-10-04,0.0
8,7,2017-10-04,0.0
9,9,2017-10-04,0.0


In [34]:
# salvando o arquivo de eventos por usuário
events_per_user.to_csv(file_csv_events_per_user, index=False)

### Criando o dataframe de usuários

In [35]:
%%time
# Serão usados as colunas deviceModel e countryISO.
# Será capturada para cada usuário a moda de cada coluna, ou seja, o valor mais recorrente
users = df[['userId','deviceModel', 'countryISO']].groupby(['userId']).agg(pd.Series.mode)
print(users.shape)
users.head()

(210251, 2)
CPU times: user 1min 12s, sys: 5.92 s, total: 1min 18s
Wall time: 1min 23s


Unnamed: 0_level_0,deviceModel,countryISO
userId,Unnamed: 1_level_1,Unnamed: 2_level_1
1,Galaxy J5 Prime,BR
10,iOS Emulator,BR
100,Moto Z (2) Play,BR
1000,Nexus 5,BR
10000,Q6,BR


In [36]:
# Salavando em um arquivo de usuários
users.to_csv(file_csv_users)

### Criando o dataframe de eventos

In [37]:
# definindo duas matrizes uma com as colunas que são de eventos e a outra que não são
NO_EVENTS = ['userId', 'eventDate']
EVENTS = [c for c in events_per_user.columns if c not in NO_EVENTS]
len(EVENTS)

201

In [39]:
del df

In [44]:
# transformando a coluna sessionTimestamp em formato de data
events_per_user['eventDate'] = pd.to_datetime(events_per_user.eventDate)

In [46]:
# Vamos criar 3 features para cada evento: firstOccurrence', 'lastOccurrence', 'intervalOccurrence'

# iniciando o DataFrame de eventos, com as colunas selecionadas para a analise
events = pd.DataFrame(columns=['eventName', 'firstOccurrence', 'lastOccurrence', 'intervalOccurrence'])

# para cada evento é criado uma linha com o calculo das datas e do intervalo
for e in EVENTS:
    
    firstOccurrence = events_per_user.eventDate[events_per_user[e] > 0].sort_values().iloc[0]
    lastOccurrence = events_per_user.eventDate[events_per_user[e] > 0].sort_values().iloc[-1]
    
    events = events.append({'eventName': e,
                              'firstOccurrence': firstOccurrence,
                              'lastOccurrence': lastOccurrence,
                              'intervalOccurrence': lastOccurrence - firstOccurrence},
                              ignore_index=True)
    
# Converte o intervalo de Timedelta para float
events.intervalOccurrence = events.intervalOccurrence.apply(lambda x: float(x.days))

events.shape, events.head()

((201, 4),
                   eventName firstOccurrence lastOccurrence  intervalOccurrence
 0    ActivedNotificationIOS      2019-09-27     2019-10-17                20.0
 1  ApparedMessageWrongLogin      2019-10-14     2019-12-03                50.0
 2    AssociatedWithFacebook      2018-09-13     2020-03-14               548.0
 3      ClickedAdAfterDonate      2018-09-18     2020-04-16               576.0
 4     ClickedAdBeforeDonate      2018-09-17     2020-04-16               577.0)

In [47]:
# definindo uma matriz para definir as colunas que serão usadas (filtradas) antes do resample
# basicamente adicionando o eventDate com a matriz EVENTS
colunas = EVENTS
colunas.append('eventDate')

# cria um novo dataframe com a somatória mensal de eventos.
# de forma que os eventos ficam nas linhas e os meses na coluna
eventos_mes = events_per_user[colunas].resample('M', on='eventDate').sum().transpose()

# renomeia as colunas do dataframa só com os dados do ano e mês
eventos_mes.columns = [str(m.date())[:7] for m in eventos_mes.columns]

eventos_mes.shape, eventos_mes.columns, eventos_mes.head()

((201, 33),
 Index(['2017-08', '2017-09', '2017-10', '2017-11', '2017-12', '2018-01',
        '2018-02', '2018-03', '2018-04', '2018-05', '2018-06', '2018-07',
        '2018-08', '2018-09', '2018-10', '2018-11', '2018-12', '2019-01',
        '2019-02', '2019-03', '2019-04', '2019-05', '2019-06', '2019-07',
        '2019-08', '2019-09', '2019-10', '2019-11', '2019-12', '2020-01',
        '2020-02', '2020-03', '2020-04'],
       dtype='object'),
                           2017-08  2017-09  2017-10  2017-11  2017-12  \
 eventName                                                               
 ActivedNotificationIOS        0.0      0.0      0.0      0.0      0.0   
 ApparedMessageWrongLogin      0.0      0.0      0.0      0.0      0.0   
 AssociatedWithFacebook        0.0      0.0      0.0      0.0      0.0   
 ClickedAdAfterDonate          0.0      0.0      0.0      0.0      0.0   
 ClickedAdBeforeDonate         0.0      0.0      0.0      0.0      0.0   
 
                           2018-

In [48]:
# juntando os dataframes eventos e o eventos_mes
events = events.sort_values(by=['firstOccurrence'], ascending=True).join(eventos_mes, on= 'eventName')
events.shape, events.head()

((201, 37),
            eventName firstOccurrence lastOccurrence  intervalOccurrence  \
 200         uncaught      2017-08-28     2020-04-16               962.0   
 145         ativacao      2017-08-28     2017-08-28                 0.0   
 78            Opened      2017-10-02     2020-04-13               924.0   
 164  onBoardingStage      2017-10-02     2019-05-27               602.0   
 169      openCurtain      2017-10-02     2020-01-16               836.0   
 
      2017-08  2017-09  2017-10  2017-11  2017-12  2018-01  ...  2019-07  \
 200      2.0      0.0     89.0    220.0    164.0     39.0  ...  17201.0   
 145      6.0      0.0      0.0      0.0      0.0      0.0  ...      0.0   
 78       0.0      0.0   3811.0   5496.0   7167.0   8718.0  ...     48.0   
 164      0.0      0.0    969.0    653.0   2547.0   3124.0  ...      0.0   
 169      0.0      0.0   2262.0   2611.0   3513.0   3444.0  ...     14.0   
 
      2019-08  2019-09  2019-10   2019-11  2019-12  2020-01  2020-02  20

In [52]:
# salvando o arquivo de eventos 
events.to_csv(file_csv_events, index=False)