# Open Brewery DB API Extract (API → Datalake)

**Goal:** 
* Read from the breweries listing available on the API endpoint and persisting the data into the data lake.

**Source:**
* *api_endpoint:* the endpoint of the API from where the data will be extracted

**Output:** 
* *output_file_name:* the name of the file where the data will be stored
* *output_path:* the path in the datalake where the output files must be saved

In [0]:
import datetime
import time
import os
import pandas as pd
import requests
import pyspark.sql.functions as F
import json
from typing import Union
from requests import Response
from pyspark.sql.functions import lower

: 

### Widgets

In [0]:
dbutils.widgets.dropdown('ENV', 'QA', ['QA','PROD']) # Ambiente de Execução
dbutils.widgets.text("schema", "","") # Schema
dbutils.widgets.text("view", "","") # View
dbutils.widgets.text("view_unsubscribed", "","") # View desligados

In [0]:
# Extrai os valores vindos dos argumentos
ENV = getArgument("ENV")
schema = getArgument("schema")
view = getArgument("view")
view_unsubscribed = getArgument("view_unsubscribed")

###Integração com Data Warehouse

In [0]:
settings = {
    'QA': {
      'DW' : {
        'servername': "cba-sqldb-edm-qa-eastus2",
        'databasename': "cba-sqldw-edm-qa-eastus2",
        'username': "sasqldbedm",
        'password': dbutils.secrets.get(scope = 'SCT-SCP-CBA-DCB-EDM-EASTUS2', key = "SCT-PW-CBA-SQLDW-EDM-EASTUS2"),
        'port': "1433"
      },
    'Blob': {
        'storage_account_name': 'cbastedmqabrazilsouth',
        'storage_account_key': dbutils.secrets.get(scope='SCT-SCP-CBA-DCB-EDM-EASTUS2', key = "SCT-PW-CBASTEDMBRAZILSOUTH"),
        'storage_container_name': 'cbastedmbrazilsouth'
      }
    },
   'PROD':{
    'DW' : {
        'servername': "cba-sqldb-edm-prod-eastus2",
        'databasename': "cba-sqldw-edm-prod-eastus2",
        'username': "sasqldbedm",
        'password': dbutils.secrets.get(scope = 'SCT-SCP-CBA-DCB-EDM-EASTUS2', key = "SCT-PW-CBA-SQLDW-EDM-EASTUS2"),
        'port': "1433"
      },
    'Blob': {
        'storage_account_name': 'cbastedmprodbrazilsouth',
        'storage_account_key': dbutils.secrets.get(scope='SCT-SCP-CBA-DCB-EDM-EASTUS2', key = "SCT-PW-CBASTEDMBRAZILSOUTH"),
        'storage_container_name': 'cbastedmbrazilsouth'
      }
  }
}

In [0]:
# Blob
temp_dir_url = "wasbs://{}@{}.blob.core.windows.net/{}".format(settings[ENV]['Blob']['storage_container_name'], settings[ENV]['Blob']['storage_account_name'], settings[ENV]['Blob']['storage_container_name'])
spark_config_key = "fs.azure.account.key.{}.blob.core.windows.net".format(settings[ENV]['Blob']['storage_account_name'])
spark_config_value = settings[ENV]['Blob']['storage_account_key']
spark.conf.set(spark_config_key, spark_config_value)


In [0]:
# Conecta no DW
sql_dw_connection_string = f"jdbc:sqlserver://{settings[ENV]['DW']['servername']}.database.windows.net:{settings[ENV]['DW']['port']};database={settings[ENV]['DW']['databasename']};user={settings[ENV]['DW']['username']}@{settings[ENV]['DW']['servername']};password={settings[ENV]['DW']['password']};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;;"

### Obtenção dos dados da View

In [0]:
query = (
  f"SELECT * FROM {schema}.{view}"
)

df_spark = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", sql_dw_connection_string) \
  .option("tempdir", temp_dir_url) \
  .option("forward_spark_azure_storage_credentials", "true") \
  .option("query", query) \
  .load()

# Aplicando cache no DataFrame
df_spark.cache()

# Substituir ";" por ",", caso encontre dentre os textos
df_spark = df_spark.select(
  [F.regexp_replace(F.col(c), ';', ',').alias(c) for c in df_spark.columns]
)

### Testando o conteúdo da View

In [0]:
resultados = 'SUCCESS'

try:
  df_spark.count()
except Exception as e:
  resultados = f"Ocorreu um erro: (ERROR) {e}"
  dbutils.notebook.exit(resultados)

if df_spark.isEmpty() == True:
  resultados = "O DataFrame está vazio (WARNING)"
  dbutils.notebook.exit(resultados)

In [0]:
# Registtrando a quantidade de registros
qtde_registros = df_spark.count()

In [0]:
print(qtde_registros)

###Convertendo o DataFrame Spark em arquivo CSV

In [0]:
# Informações para acesso ao Data Lake
clientId = dbutils.secrets.get(scope = "SCT-SCP-CBA-DCB-EDM-EASTUS2", key = "SCT-ID-CBA-CLIENT-DATALAKE")
clientSecret = dbutils.secrets.get(scope = "SCT-SCP-CBA-DCB-EDM-EASTUS2", key = "SCT-PW-CLIENT-DATALAKE")
storageAccount = dbutils.secrets.get(scope= "SCT-SCP-CBA-DCB-EDM-EASTUS2", key = "SCT-ID-CBA-STORAGEACCOUNT-DATALAKE")

In [0]:
# Nome para acesso à Storage Account
storageName = "cbadlsedmbrazilsouth"
TemplateName = "MailingDinamize"
outputRawPath = "RAW/DHO/DINAMIZE"

In [0]:
# Verifica se há um diretório temporário já montado - se sim, desmonta
if any(mount.mountPoint == f'/mnt/{TemplateName}' for mount in dbutils.fs.mounts()):
  dbutils.fs.unmount(f"/mnt/{TemplateName}")

In [0]:
# Monta o Data Lake em um sistema de arquivos do Databricks para manipulação temporária
dbutils.fs.mount(
  source = f"abfss://{storageName}@{storageAccount}.dfs.core.windows.net/",
  mount_point = f"/mnt/{TemplateName}",
  extra_configs = {"fs.azure.account.auth.type": "OAuth",
                   "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                   "fs.azure.account.oauth2.client.id": clientId,
                   "fs.azure.account.oauth2.client.secret": clientSecret,
                   "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/votorantimindustrial.onmicrosoft.com/oauth2/token"})

In [0]:
# Cria arquivo CSV no Data Lake

# Path de destino do arquivo
outputDir = f'dbfs:/mnt/{TemplateName}/{outputRawPath}/{TemplateName}'
dbutils.fs.mkdirs(outputDir)
outputDir = outputDir.replace('dbfs:', '/dbfs')

data_atual = datetime.datetime.now().strftime("%d-%m-%Y")
nome_base = "contatos"

# Concatenando a data com o nome do arquivo
fileName = f"{nome_base}_{data_atual}.CSV"

# Criando um DataFrame Pandas a partir do DataFrame Spark
df_pandas = df_spark.toPandas()

# Salva o arquivo CSV
df_pandas.to_csv(f'{outputDir}/{fileName}', sep=';', index=False, header=True)

# Caminho do arquivo csv
csv_file = f'{outputDir}/{fileName}'

# API

### Get Token

In [0]:
# Obtém as credenciais do Key Vault
api_url = dbutils.secrets.get(scope='SCT-SCP-CBA-DCB-EDM-EASTUS2', key = "SCT-CS-CBA-DINAMIZE-URL")
api_usr = dbutils.secrets.get(scope='SCT-SCP-CBA-DCB-EDM-EASTUS2', key = "SCT-CS-CBA-DINAMIZE-USR")
api_pwd = dbutils.secrets.get(scope='SCT-SCP-CBA-DCB-EDM-EASTUS2', key = "SCT-CS-CBA-DINAMIZE-PWD")
api_cc = dbutils.secrets.get(scope='SCT-SCP-CBA-DCB-EDM-EASTUS2', key = "SCT-CS-CBA-DINAMIZE-CC")

In [0]:
action  = 'auth'
url_token = api_url + action

url = f"{url_token}?user={api_usr}&password={api_pwd}&client_code={api_cc}"
token = None

def get_token(url, params=None):
    try:
        response = requests.post(url, params=None)
        code = response.json()['code']
        if code == '480001':
            return response.json()['body']['auth-token']
        else:
          return f"(Falha ao tentar obter o token (FAIL):, {response.status_code}, {response.text})"
    except requests.exceptions.RequestException as e:
        return f"(Erro ao tentar obter o token (ERROR): {e})"

## API Action

### Lista de Valores - Função

In [0]:
def lista_valores(column, field_code):
    lista = df_spark.select(column).distinct().collect()
    token = get_token(url, None)
    num_reqs = 0
    time.sleep(60)

    # Dados da requisição
    headers = {
        "Content-Type": "application/json; charset=utf-8",
        "auth-token": token
    }

    for row in lista:
        data = {
            "contact-list_code": "4",
            "field_code": field_code,
            "value": getattr(row, column)
        }
        response = requests.post(url=url_lista, headers=headers, json=data)
        num_reqs = num_reqs + 1
        if num_reqs == 55:
            # Pausar a execução por 60 segundos
            time.sleep(60)
            num_reqs = 0
            token = get_token(url, None)
        
        # Verificando a resposta
        if response.status_code != 200:
            print(f"Erro na requisição de Lista de Valores | Valor: {getattr(row, column)}:", response.status_code, response.text)
            resultados = f"Erro na requisição de Lista de Valores (ERROR) | Valor: {getattr(row, column)} | 'status_code': {response.status_code} | 'text': {response.text}"

### Lista de Valores (Dados Mestres) - Envio

In [0]:
# Parâmetros padrão - Requisição para Lista de Valores
action_lista = 'emkt/field-lov/add'
url_lista = api_url + action_lista        

lista_valores("Unidade", "10")
lista_valores("Cargo", "11")
lista_valores("Geracao", "31")
lista_valores("Genero", "15")
lista_valores("Status", "16")
lista_valores("GS", "19")
lista_valores("Nivel", "20")
lista_valores("Diretoria", "21")
lista_valores("Area", "22")
lista_valores("Gerencia", "23")
lista_valores("Coordenacao", "32")
lista_valores("Classificacao", "29")
lista_valores("Publico", "30")

### Envio dos Contatos
CONTATOS - IMPORTAÇÃO - ADICIONAR <br>
Conforme documentação: https://panel.dinamize.com/apidoc/#api-5ContactImportGroup-ContactImportAdd

In [0]:
action_send = 'emkt/contact'
url_action = api_url + action_send
token = get_token(url, None)

# Dados da requisição
headers = {"auth-token": token}

params = {
    "command":"import",
    "parameters": {
        "contact-list_code": "4", 
        "separator": ";", 
        "header": True, 
        "file_columns": [
            {"Position": "0", "Field": "2", "Rule": "3"}, 
            {"Position": "1", "Field": "1", "Rule": "3"}, 
            {"Position": "2", "Field": "9", "Rule": "3"}, 
            {"Position": "3", "Field": "10", "Rule": "3"}, 
            {"Position": "4", "Field": "11", "Rule": "3"}, 
            {"Position": "5", "Field": "12", "Rule": "3"}, 
            {"Position": "6", "Field": "13", "Rule": "3"}, 
            {"Position": "7", "Field": "31", "Rule": "3"}, 
            {"Position": "8", "Field": "15", "Rule": "3"}, 
            {"Position": "9", "Field": "16", "Rule": "3"}, 
            {"Position": "10", "Field": "17", "Rule": "3"}, 
            {"Position": "11", "Field": "18", "Rule": "3"}, 
            {"Position": "12", "Field": "19", "Rule": "3"}, 
            {"Position": "13", "Field": "20", "Rule": "3"}, 
            {"Position": "14", "Field": "30", "Rule": "3"}, 
            {"Position": "15", "Field": "21", "Rule": "3"}, 
            {"Position": "16", "Field": "22", "Rule": "3"}, 
            {"Position": "17", "Field": "23", "Rule": "3"}, 
            {"Position": "18", "Field": "32", "Rule": "3"}, 
            {"Position": "19", "Field": "25", "Rule": "3"}, 
            {"Position": "20", "Field": "29", "Rule": "3"}
        ]
    }
}

files = {"file": open(csv_file, "rb")}
data = {"command": "import", "parameters": json.dumps(params["parameters"])}

# Realizando a requisição POST
time.sleep(60) # Para garantir que não exceda o limite de requisições por minuto
response = requests.post(url=url_action, headers=headers, data=data, files=files)
 
response_code = response.json()['code']
if response_code == '480001':
    resultados = "Contatos enviados com sucesso (SUCCESS)!"
else:
    resultados =  "Falha ao tentar enviar contatos (FAIL)!"

print(resultados)
print(f"Arquivo: {csv_file}\nTotal de registros enviados: {qtde_registros}")

print("\n$response:")
print(response.text)
 
print("\nResponse Info:")
print("status_code:", response.status_code)
print(response.headers)

### Invalidar Contatos -> Alterar status para 'Inválido'

Documentação - Consulta do "contact_code": https://panel.dinamize.com/apidoc/#api-4ContactGroup-ContactSearch

Documentação : https://panel.dinamize.com/apidoc/#api-4ContactGroup-ContactUpdate


In [0]:
query_unsubscribed = (
  f"SELECT * FROM {schema}.{view_unsubscribed}"
)

df_unsubscribed = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", sql_dw_connection_string) \
  .option("tempdir", temp_dir_url) \
  .option("forward_spark_azure_storage_credentials", "true") \
  .option("query", query_unsubscribed) \
  .load()

# Aplicando cache no DataFrame
df_unsubscribed.cache()

In [0]:
# Verificando a view
resultados = 'SUCCESS'

try:
  df_unsubscribed.count()
except Exception as e:
  resultados = f"Ocorreu um erro: (ERROR) {e}"
  dbutils.notebook.exit(resultados)

if df_unsubscribed.isEmpty() == True:
  resultados = "O DataFrame está vazio (WARNING)"
  dbutils.notebook.exit(resultados)

# Registtrando a quantidade de registros
qtde_registros = df_unsubscribed.count()

# Registrar os emails em uma lista
emails_desligados = [row.E_mail.lower() for row in df_unsubscribed.select("E_mail").collect()]

In [0]:
print(qtde_registros)

In [0]:
print(len(emails_desligados))

In [0]:
contatos_fail = []
contatos_success = []

for email in emails_desligados:
    action_search = 'emkt/contact/search'
    url_action = api_url + action_search
    token = get_token(url, None)

    # Dados da requisição
    headers = {
    "Content-Type": "application/json; charset=utf-8",
    "auth-token": token
    }

    data = {
        "contact-list_code": "4",
        "page_number": "1",
        "page_size": "10",
        "search": [
            {
                "field": "email",
                "operator": "=",
                "value": [f"{email}"]
            }
        ]
    }

    # Realizando a requisição POST
    time.sleep(1) # Para garantir que não exceda o limite de 60 requisições por minuto
    response = requests.post(url=url_action, headers=headers, data=json.dumps(data))
    
    response_code = response.json()['code']
    if response_code == '480001':
        resultados = "Contatos enviados com sucesso (SUCCESS)!"
        if len(response.json()['body']['items']) > 0: 
            codigo_registro = response.json()['body']['items'][0]['code']
        else:
            resultados =  "Falha ao tentar enviar contatos (FAIL)!"
            contatos_fail.append(email)
            continue
    else:
        resultados =  "Falha ao tentar enviar contatos (FAIL)!"
        contatos_fail.append(email)
        continue

    action_update = 'emkt/contact/update'
    url_action = api_url + action_update
    token = get_token(url, None)

    # Dados da requisição
    headers = {
    "Content-Type": "application/json; charset=utf-8",
    "auth-token": token
    }

    data = {
        "contact-list_code": "4",
        "contact_code": codigo_registro,
        "custom_fields": {
            "cmp16": "15" # Muda o status para "Desligado"
        },
        "date_rest": "9999-12-31", # Muda a "Data de descanso" para 9999-12-31
        "status_email": "I" # Muda a "Situação de e-mail" para "Inválido"
    }

    # Realizando a requisição POST
    time.sleep(1) # Para garantir que não exceda o limite de 60 requisições por minuto
    response = requests.post(url=url_action, headers=headers, data=json.dumps(data))
    
    response_code = response.json()['code']
    if response_code == '480001':
        resultados = "Contatos enviados com sucesso (SUCCESS)!"
        contatos_success.append(email)
    else:
        resultados =  "Falha ao tentar enviar contatos (FAIL)!"
        contatos_fail.append(email)

if len(contatos_fail) > 0:
    print("Os seguinte contatos falharam:")
    print(contatos_fail)
if len(contatos_success) > 0:
    print(f"Os seguintes contatos foram alterados com sucesso:")
    print(contatos_success)


In [0]:
print(len(contatos_success))

In [0]:
print(len(contatos_fail))

### Resultados

In [0]:
# resultados
if 'ERROR' in resultados:
  EXIT = 'ERROR_QUERIES'
elif 'WARNING' in resultados:
  EXIT = 'WARNING'
elif 'FAIL' in resultados:
  EXIT = 'FAIL'  
elif 'SUCCESS' in resultados:
  EXIT = 'SUCCESS'
else: 
  EXIT = 'ERROR_QUERIES'

dbutils.notebook.exit(EXIT)