# 1. Cadastrar usuário para obter token de acesso da API
# {"user": "contato@gmaxti.com.br", "API Token": "BGX34YFN3FDY"}

# Instalar Bibliotecas

In [None]:
pip install --upgrade pandas-gbq

In [None]:
pip install --upgrade reverse_geocoder

In [None]:
pip install fernet

# Importar Bibliotecas

In [1]:
from datetime import datetime

import pandas as pd
import requests
import reverse_geocoder as rg
from cryptography.fernet import Fernet
from google.cloud import bigquery
from google.oauth2 import service_account

# 2. Consumir dados de usuário da API

In [2]:
dados = requests.get("https://begrowth.deta.dev/token=BGX34YFN3FDY")
dados = dados.json()
df = pd.DataFrame(dados)

# Para o campo utm (Urchin Traffic Monitor) temos os seguintes dominios validos, que sempre terá a estrutura [país]-[source]-[vertical]:

In [3]:
utm = df['utm'].str.split('-', expand=True)
utm.columns = ['utm_country', 'utm_source', 'utm_vertical']
utm.loc[(utm.utm_country == 'br'), 'utm_country'] = 'Brasil'
utm.loc[(utm.utm_source == 'gg'), 'utm_source'] = 'Google'
utm.loc[(utm.utm_source == 'fb'), 'utm_source'] = 'Facebook'
utm.loc[(utm.utm_source == 'wpp'), 'utm_source'] = 'WhatsApp'
utm.loc[(utm.utm_vertical == 'emp'), 'utm_vertical'] = 'Empréstimo'
utm.loc[(utm.utm_vertical == 'cc'), 'utm_vertical'] = 'Cartão de Crédito'
utm.loc[(utm.utm_vertical == 'fin'), 'utm_vertical'] = 'Financiamento'
df.insert(7, "utm_country", utm['utm_country'], True)
df.insert(8, "utm_source", utm['utm_source'], True)
df.insert(9, "utm_vertical", utm['utm_vertical'], True)

# 2.1 - Precisamos saber o estado que a pessoa acessou nossos serviços, com as coordenadas  address.geo_latitude e address.geo_longitude dos usuários devemos criar uma coluna de estado chamada address_state, processo denominado reverse geocode

In [4]:
list_value = df['address'].tolist()
address = pd.DataFrame(list_value)


def reverseGeocode(coordinates):
    result = rg.search(coordinates, mode=1)
    return result
    # result é uma lista contendo um dicionário ordenado.


if __name__ == "__main__":
    coordinates = (
        list(zip(address['geo_latitude'], address['geo_longitude'])))
    states = pd.DataFrame(reverseGeocode(coordinates))

states = pd.DataFrame((states['admin1'].tolist()), columns=["address_state"])
df.insert(6, "address_state", states['address_state'], True)

Loading formatted geocoded file...


# 2.2 - a API nos devolve o CPF do usuário, mas temos um problema: o CPF está criptografado! 🔒 Utilizando a chave de criptografia Fernet passada por email, abra esse CPF para que possamos analisar o próximo requisito;

In [5]:
key = Fernet(b'ekkxXo0uHWRkIbHqHrLS4gaMj2hWTYMJyPTAbi9INGI=')
cpf = ''
for i in df['cpf'].index:
    cpf += ((key.decrypt(df['cpf'].__getitem__(i)).decode('utf-8'))+', ')

cpf_list = cpf.split(",")
df['cpf'] = pd.DataFrame(cpf_list, columns=["cpf"])

# 2.3 - Para ajudar a identificar registros mais atualizados e para nosso controle de auditoria, precisamos que o dataframe tenha as colunas dt_insert que contenha data/hora de inclusão do registro e candidate_name que contenha seu nome

In [6]:
data_hora = datetime.now()
data_hora_string = data_hora.strftime('%d/%m/%Y %H:%M')
df['dt_insert'] = data_hora_string
df['candidate_name'] = 'Aisllan Max Caldeira'
df.to_csv('aisllan_data.csv', index=False)

# 3. Inserir esse dataframe dentro de uma tabela no BigQuery

In [7]:
def load_data_from_file(dataset_id, table_id, source_file_name):
    key_path = "svc-data-engineer-test.json"
    credentials = service_account.Credentials.from_service_account_file(key_path,)
    credentials = credentials.with_scopes(
        [
            'https://www.googleapis.com/auth/drive',
            'https://www.googleapis.com/auth/cloud-platform',
        ],
    )
    bigquery_client = bigquery.Client(credentials=credentials, project = credentials.project_id,)
    dataset_ref = bigquery_client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)

    job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, autodetect=True,)
    
    with open(source_file_name, 'rb') as source_file:
        job = bigquery_client.load_table_from_file(source_file, table_ref, job_config=job_config)        

    job.result()  # Waits for job to complete

    print('Loaded {} rows into {}:{}.'.format(
        job.output_rows, credentials.project_id, table_ref))

In [8]:
load_data_from_file('bg_users', 'bg_data_enginner_test_aisllan_max_caldeira', 'aisllan_data.csv')

Loaded 1020 rows into begrowth-user-api-demo:begrowth-user-api-demo.bg_users.bg_data_enginner_test_aisllan_max_caldeira.
