In [1]:
#@title Após executar esse parágrafo, irá aparecer um link para permitir o acesso ao seu drive. Basta copiar e colocar no local indicado e apertar 'ENTER'. { display-mode: "both" }
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col, lit, udf, split, desc, asc

from unicodedata import normalize
import re

In [4]:
spark = SparkSession.builder.config("spark.executor.memory", "1gb").getOrCreate()

In [5]:
#@title Fiz uma visão agrupada das colunas para entender melhor os dados do dataframe. Exclui as colunas que não considerei necessárias para atingir o objetivo proposto e também as que não fui capaz de entender o que significavam. Essas são as colunas que decidi utilizar. Vale ressaltar que assim como no primeiro teste, em um cenário real, eu entraria em contato com o time parceiro para validar os campos e obter insights. { display-mode: "both" }
listColumnsToSelect = ['Page Category',
                       'Page Category 1',
                       'Page Category 2',
                       'Page Category 3',
                       'at',
                       'browser',
                       'carrier',
                       'custom_4',
                       'first-accessed-page',
                       'marketing_campaign',
                       'marketing_medium',
                       'marketing_source',
                       'model',
                       'platform',
                       'studentId_clientType',
                       'uuid']

In [6]:
#@title Leitura dos arquivos json. Também separo da coluna studentId_clientType as informações de StudentId para usar como chave nos dataframes da Base A e o ClientType para análise. Vale destacar que não utilizei nenhum dataframe da Base A pois não achei nenhuma informação relevante para o objetivo proposto. O dataframe sessions.json era o mais promissor mas não tinha nenhum dado relevante. { display-mode: "both" }
inputPath = '/content/drive/MyDrive/passeiDireto/BASE B/'
dfPageViewEvents = spark.read.json(inputPath).select(listColumnsToSelect)\
                                             .withColumn('StudentId',
                                                          split(col("studentId_clientType"), "@").getItem(0))\
                                             .withColumn('ClientType', 
                                                          split(col("studentId_clientType"), "@").getItem(1))\
                                              .drop('studentId_clientType')

In [7]:
#@title Função similar a cleanString da primeira parte, com a diferença que aqui incluo ela no UDF. { display-mode: "both" }
def cleanString(value):
    value = re.sub(' +', ' ', str(value))
    return normalize('NFKD', value).encode('ASCII', 'ignore').decode('ASCII').upper().strip()
cleanStringUDF = udf(cleanString)

In [8]:
#@title Função similar a normalizeColumnsValues da primeira parte. { display-mode: "both" }
def normalizeColumnsValues(dataframe):
    for column in dataframe.columns:
        dataframe = dataframe.withColumn(column, cleanStringUDF(column))
    return dataframe 

In [9]:
dfPageViewEvents = normalizeColumnsValues(dfPageViewEvents)

In [10]:
#@title Dicionário para padronizar os nomes das colunas. { display-mode: "both" }
DictColumnToRename  = {'Page Category':'PageCategory',
                      'Page Category 1':'PageCategory_1',
                      'Page Category 2':'PageCategory_2',
                      'Page Category 3':'PageCategory_3',
                      'at':'AccessDateTime',
                      'browser':'Browser',
                      'carrier':'InternetProvider',
                      'custom_4':'UserAccessCategory',
                      'first-accessed-page':'FirstAccessedPage',
                      'marketing_campaign':'MarketingCampaign',
                      'marketing_medium':'MarketingMedium',
                      'marketing_source':'MarketingSource',
                      'model':'Model',
                      'platform':'OS',
                      'uuid':'UuidPageViewEvents'}

In [11]:
for columnAsIs,columnToBe in DictColumnToRename.items():
  dfPageViewEvents = dfPageViewEvents.withColumnRenamed(columnAsIs, columnToBe) 

In [12]:
#@title Foquei a análise em entender os valores 'NONE' e 'UNDEFINED' que existem na colunas que eu selecionei. Após realizar algumas simulações de filtros e resultados, decidi apenas por remover os StudentId que possuíam 'NONE'. As PageCategory geraram bastante dúvida, mas decidi deixa-las por entender que alguns comportamentos podem ser identificados utilizando as 4 categorias. 
dfPageViewEvents = dfPageViewEvents.where(col('StudentId') != 'NONE')

In [13]:
#@title Transformei essa operação em função para não impactar no tempo/custo da execução do ETL, e também para manter no notebook de onde tirei os percentuais de 'NONE' para passar para a área cliente. Achei que o conteúdo das colunas Marketing pode ser relevante, mas é bom destacar elas só possuem informação em 4% da base.
# def percentageOfNoneInEachColumn():
listColumns = dfPageViewEvents.columns
sizeDataframe = dfPageViewEvents.count()
for column in listColumns: 
  print(column, round((dfPageViewEvents.where(col(column) == 'NONE').count() / sizeDataframe),2))

PageCategory 0.0
PageCategory_1 0.13
PageCategory_2 0.13
PageCategory_3 0.14
AccessDateTime 0.0
Browser 0.0
InternetProvider 0.0
UserAccessCategory 0.07
FirstAccessedPage 0.28
MarketingCampaign 0.96
MarketingMedium 0.96
MarketingSource 0.96
Model 0.0
OS 0.0
UuidPageViewEvents 0.0
StudentId 0.0
ClientType 0.0


In [14]:
#@title Salva o arquivo no seu drive em formato json.
outputPath = '/content/drive/MyDrive/passeiDireto/page_view_events.json'
dfPageViewEvents.write.format('json').mode('overwrite').json(outputPath)