### Importação de Módulos

In [1]:
# Builtins
import logging
import os
import random
import threading
from datetime import date, datetime

# Third party
import pandas as pd
import requests
import json
import time
import psycopg2

In [2]:
version = 'HOMOLOG'
keyversion = 'HOM'

### Configuração de Logs

In [3]:
logger = logging.getLogger(f"{version}")
today = str(date.today()) + '-' + str(int(time.time()))
logging.basicConfig(filename=f'/work/execution_logs/{today}.txt', encoding='utf-8', level=logging.INFO)

## Classes

### Conexão do Astrea_Data

In [4]:
class PostgresqlDB:
    def __init__(self, host, port, database, user, password):
        self.host = host
        self.port = port
        self.database = database
        self.user = user
        self.password = password
        self.connection = self.create_connection()
        logger.info(f"{__class__.__name__}: Conexão com o Astrea_Data iniciada")



    def create_connection(self):
        try:
            connection = psycopg2.connect(
                host=self.host,
                port=self.port,
                database=self.database,
                user=self.user,
                password=self.password
            )
            return connection
        except (Exception, psycopg2.Error) as error:
            print(f"Error while connecting to PostgreSQL: {error}")

    def insert_row(self, table, values):
        try:
            cursor = self.connection.cursor()
            columns = ', '.join(values.keys())
            placeholders = ', '.join(['%s'] * len(values))
            insert_query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
            cursor.execute(insert_query, list(values.values()))
            self.connection.commit()
            logger.info(f"{__class__.__name__}: registro inserido com sucesso na tabela {table}")
        except (Exception, psycopg2.Error) as error:
            print(f"Error while inserting row in PostgreSQL: {error}")
            self.connection.rollback() # Added code to rollback the transaction if an error is raised
        finally:
            if cursor:
                cursor.close()

### API do Astrea

In [5]:
class AstreaPlanApi:

    def __init__(self, auth):
        """
        Inicializa a API de mudança de planos.
        """
        self.headers = {
            'Authorization': auth,
            'Content-Type': 'application/json'
            }
        if version == 'HOMOLOG':
            self.url = 'https://change-plan-dot-astreav2.appspot.com/api/v2/dev/change-plan'
        elif version == 'PRODUCAO':
            self.url = 'https://app.astrea.net.br/api/v2/dev/change-plan'
        else:
            raise Exception(f'Versão de API inválida: {version}')
            
        self.url = 'https://change-plan-dot-astreav2.appspot.com/api/v2/dev/change-plan'
        self.semaphore = threading.Semaphore(3) # Limit the number of concurrent requests
        self.request_queue = [] # Queue for requests that exceed the concurrent limit
        self.successful_requests = 0 # Track number of successful requests
        self.error_count = 0 # Track number of errors
        logger.info(f"{__class__.__name__}: API de mudança de planos inicializada")

    def get_payload(self, customer):
        """
        Cria o payload para a requisição de mudança de plano

        Params:
        - Customer: Dicionário contendo a informação do cliente a ser integrado

        Retorna:
        - Dicionário com o payload formatado para a api do Astrea.
        """
        payload = json.dumps({
            "tenantId": customer['api_key'],
            "editionType": customer['plano_destino'],
            "baseValue": customer['vr_bruto_destino'],
            "discount": customer['desconto_destino']
        })
        logger.info(f'{__class__.__name__}: Payload obtido para o customer: {customer["namespace"]} :' + str(payload))
        return payload

    def make_change_plan_request(self, customer):
        """
        Realiza uma requisição POST para a API de mudança de planos.

        Params:
        - Customer: Dicionário contendo a informação do cliente a ser integrado

        Retorna:
        - O objeto Response da requisição.
        """
        payload = self.get_payload(customer)
        
        response = requests.post(self.url, headers= self.headers, data=payload)
            
        if response.status_code in (200, 201):
            logger.info(f'{__class__.__name__}: Resposta para customer: {customer["namespace"]}: ' + str(response.status_code) + ' - ' + str(response.content) + f' ({response.elapsed.total_seconds()}s)')
            self.successful_requests += 1
        else:
            self.error_count += 1
            logger.info(f'{__class__.__name__}: Resposta para customer: {customer["namespace"]}: ' + str(response.status_code) + ' - ' + str(response.content) + f' ({response.elapsed.total_seconds()}s)')
            
        return response 

### Api do Intercom

In [6]:
class IntercomAPI:
    def __init__(self, auth):
        self.headers = {
            "accept": "application/json",
            "content-type": "application/json",
            "authorization": auth
        }
        logger.info(f"{__class__.__name__}: API do Intercom Inicializada")


    def get_company_by_name(self, company_name):
        url = f"https://api.intercom.io/companies?name={company_name}"
        response = requests.get(url, headers=self.headers)
        if response.ok:
            logger.info(f"{__class__.__name__}: Company {company_name} encontrada.")
        else:
            logger.info(f"{__class__.__name__}: Company {company_name} não encontrada.")
        return response.json()
    
    def get_contacts_by_company_id(self, company_id):
        url = f"https://api.intercom.io/companies/{company_id}/contacts"
        response = requests.get(url, headers=self.headers)
        if response.ok:
                logger.info(f"{__class__.__name__}: {len(response.json()['data'])} contatos encontrados para a company {company_id}.")
        else:
            logger.info(f"{__class__.__name__}: A busca de contatos falhou para a company {company_id}.")
        return response.json()["data"]
    
    def send_event(self, company_id, user, event_name, timestamp):
        url = "https://api.intercom.io/events"
        payload = {
            "user_id": user['email'],
            "created_at": timestamp,
            "event_name": f"{event_name}",
            "id": user['id']
        }
        response = requests.post(url, json=payload, headers=self.headers)
        if response.ok:
            logger.info(f"{__class__.__name__}: Evento {event_name} enviado para o usuário {user['email']}.")
        else:
            logger.info(f"{__class__.__name__}: Evento {event_name} não enviado para o usuário {user['email']}. Código de erro: {response.status_code}")
        return response

    def notify_company_users(self, company_name, event_name, timestamp):
        company = self.get_company_by_name(company_name)
        users = self.get_contacts_by_company_id(company['id'])
        errors = 0
        success = 0
        for user in users:
            response = self.send_event(company['id'], user, event_name, timestamp)
            if response.ok:
                success += 1
            else:
                errors += 1
        logger.info(f"{__class__.__name__}: {len(users)} usuários na company {company_name} foram notificados com {errors} erros.")
        return {"errors": errors, "success": success}

## Funções Personalizadas

### Notificações de Execução

In [7]:
# Slack Notify: Envia um webhook para um zap que realiza a integração de mensagens para o slack
def slack_notify(service,status,message):
    url = f'https://hooks.zapier.com/hooks/catch/52679/3d690x6/?service={service}&status={status}&message={message}'
    response = requests.request("GET", url)
    logger.info(f"{__name__}: {service}'s {status} message sent to zapier: {message}")
    return True

## Extrações de Dados

In [8]:
slack_notify(f'Virada de Planos - {version}','Start','Iniciando Extração de dados para Virada de Planos')

True

### Consulta de clientes no astrea_data

In [9]:
df_clientes = _deepnote_execute_sql('select vsn.api_key,\n       vcc.namespace,\n       vsn.periodicidade_atual as periodicidade_origem,\n       vcc.plano as plano_origem,\n\n       case\n        when vsn.minimo_plano_aumento < 0 then \'VIP\'\n        when vsn.minimo_plano_aumento >= 0 then \'COMPANY\'\n        else Null\n       end as plano_destino,\n\n       vcc.vr_contrato_total_com_cupons as vr_liquido_origem,\n\n       vcc.vr_contrato_total_sem_cupons as vr_bruto_origem,\n\n       case\n        when vsn.minimo_plano_aumento < 0 then vsn.prox_plano_contrato_sem_cupons\n        when vsn.minimo_plano_aumento >= 0 then np.vr_contrato_total_sem_cupons\n        else Null\n       end as vr_bruto_destino,\n\n       vcc.vr_cupons_ativos_total as desconto_origem,\n\n       case\n        when vsn.minimo_plano_aumento < 0 then vsn.prox_plano_contrato_sem_cupons - vcc.vr_contrato_total_com_cupons\n        when vsn.minimo_plano_aumento >= 0 then 0\n        else Null\n       end as desconto_destino,\n\n       vcc.parcela_dt_proxima,\n\n       vcc.parcela_dt_proxima::date - current_date as dias_ate_renovacao\n\nfrom cx_data.v_simulacao_novos_planos vsn\njoin cx_data.novos_planos np on np.id = vsn.minimo_plano_id\njoin oroboro2.v_contratos_consolidado vcc on vcc.api_key = vsn.api_key\nleft join oroboro2.contratos_custom cc on vcc.id = cc.contrato_id\nwhere vsn.minimo_plano_destino = \'Company\'', 'SQL_EB56FC30_B625_4AA0_83D2_B8F22F562A50', audit_sql_comment='', sql_cache_mode='cache_disabled')
df_clientes

Unnamed: 0,api_key,namespace,periodicidade_origem,plano_origem,plano_destino,vr_liquido_origem,vr_bruto_origem,vr_bruto_destino,desconto_origem,desconto_destino,parcela_dt_proxima,dias_ate_renovacao
0,5633421678215168,Soares_e_Souza,Mensal,Astrea Pro 217 / 50,COMPANY,443.40,443.40,557.0,0.00,0.0,2023-09-10 00:00:00+00:00,18
1,5721310499962880,Claro_e_DC_Fonseca,Mensal,Astrea Pro 237 / 60,COMPANY,391.81,391.81,557.0,0.00,0.0,2023-09-26 00:00:00+00:00,34
2,5665395394478080,Laurindo_Freitas_e_Selva,Mensal,Astrea Starter Padrão 50 / 25,VIP,561.60,561.60,997.0,0.00,435.4,2023-09-10 00:00:00+00:00,18
3,5180660218068992,Aguiar_Toledo_Advogados_Associados,Mensal,Astrea Starter Padrão 50 / 25,COMPANY,185.44,185.44,557.0,0.00,0.0,2023-09-10 00:00:00+00:00,18
4,5390824133099520,giovannidilion,Mensal,Astrea Starter Padrão 60 / 25,COMPANY,385.33,469.25,557.0,83.92,0.0,2023-09-20 00:00:00+00:00,28
...,...,...,...,...,...,...,...,...,...,...,...,...
2502,5834965021753344,contato118874,Mensal,Astrea Pro 277 / 70,COMPANY,420.00,577.00,557.0,157.00,0.0,2023-08-23 00:00:00+00:00,0
2503,5115392389677056,ollier.adv,Anual,Astrea Pro 277 / 70,COMPANY,4500.00,4661.40,5844.0,161.40,0.0,2024-08-24 00:00:00+00:00,367
2504,5172001582219264,otavioalday,Mensal,Astrea Pro 277 / 70,COMPANY,497.00,497.00,557.0,0.00,0.0,2023-08-23 00:00:00+00:00,0
2505,6098603280302080,joaovitor24,Anual parcelado,Astrea Pro 277 / 70,COMPANY,4000.00,4406.40,5844.0,406.40,0.0,2023-08-23 00:00:00+00:00,0


In [10]:
df_notified_tenants = _deepnote_execute_sql('select api_key from cx_data.virada_planos_notified_customers', 'SQL_EB56FC30_B625_4AA0_83D2_B8F22F562A50', audit_sql_comment='', sql_cache_mode='cache_disabled')
df_notified_tenants

Unnamed: 0,api_key
0,6748020348682240


## Fluxo de Controle 1: Atualização de Planos

In [11]:
control_flow_begin = int(time.time())
slack_notify(f'Virada de Planos - {version}','Info','Base de Clientes extraída, filtrando clientes para atualização')

True

### Filtro de Clientes passíveis de atualização

In [12]:
df_clientes_execucao = _deepnote_execute_sql('select * from df_clientes\nwhere  (periodicidade_origem like \'%Anual%\' \n    or (periodicidade_origem like \'%Mensal%\'\n        and parcela_dt_proxima::date >= \'2023-10-05\'::date))\nand api_key in (select api_key from df_notified_tenants)\nand dias_ate_renovacao = 30', 'SQL_DEEPNOTE_DATAFRAME_SQL', audit_sql_comment='', sql_cache_mode='cache_disabled')
df_clientes_execucao

Unnamed: 0,api_key,namespace,periodicidade_origem,plano_origem,plano_destino,vr_liquido_origem,vr_bruto_origem,vr_bruto_destino,desconto_origem,desconto_destino,parcela_dt_proxima,dias_ate_renovacao


In [13]:
extracted_on = int(time.time())

### Instanciamento de Recursos

In [14]:
# Instanciando API do Astrea
apiastrea = AstreaPlanApi(auth = os.environ[f"ASTREAV2_{keyversion}_AUTH"])

# Instanciando a API do Intercom
apiintercom = IntercomAPI(auth = os.environ["INTERCOM_AUTH"])

# Instanciando PostgreSQL
con = PostgresqlDB(
    host = os.environ["ASTREA_DATA_HOST"],
    port = 5432,
    database = 'astrea_data',
    user = 'cxops',
    password = os.environ["CXOPS_PASS"], 
)

### Execução da Atualização

In [15]:
clientes_exec = len(df_clientes_execucao)

if clientes_exec == 0:
    slack_notify(f'Virada de Planos - {version}','Info','Nenhum cliente encontrado para atualização')

elif clientes_exec > 0:
    slack_notify(f'Virada de Planos - {version}','Info',f'{clientes_exec} clientes encontrados para atualização')
    logger.info('Fluxo de Controle: Iniciando o Loop de integração')
    for index, row in df_clientes_execucao.iterrows():
        customer = row.to_dict()
        request_at = int(time.time())
        logger.info(f'{__name__}: Integrando customer: {customer["namespace"]} em {request_at}')
        response = apiastrea.make_change_plan_request(customer=customer)
        response_at = int(time.time())

        if response.status_code in range(200, 299):
            apiintercom.notify_company_users(
                company_name= customer["namespace"],
                event_name="virada-planos-automacao-concluida",
                timestamp=response_at
            )
        else:
            apiintercom.notify_company_users(
                company_name= customer["namespace"],
                event_name="virada-planos-automacao-falhou",
                timestamp=response_at
            )
        customer_summary = {
            "script_ver": version,
            "api_key": customer['api_key'],
            "namespace": customer['namespace'],
            "periodicidade": customer['periodicidade'],
            "plano_origem": customer['plano_origem'],
            "plano_destino": customer['plano_destino'],
            "vr_bruto_origem": customer['vr_bruto_origem'], # 10 - VR Bruto Origem - Decimal
            "vr_bruto_destino": customer['vr_bruto_destino'], # 11 - VR Bruto Destino - Decimal
            "vr_liquido_origem": customer['vr_liquido_origem'], # 12 - VR Liquido Origem - Decimal
            "desconto_origem": customer['desconto_origem'], # 13 - Desconto Origem - Decimal
            "desconto_destino": customer['desconto_destino'], # 14 - Desconto Destino - Decimal
            "parcela_dt_proxima": customer['parcela_dt_proxima'], # 15 - Parcela Dt Proxima - Date
            "dias_ate_renovacao": customer['dias_ate_renovacao'], # 16 - Dias Ate Renovacao - Integer
            "extracted_on": extracted_on, # 17 - Extracted On - Bigint
            "request_at": request_at, # 18 - Request At - Bigint
            "response_at": response_at, # 19 - Response At - Bigint
            "response_elapsed": response.elapsed.total_seconds(), # 20 - Response Elapsed - Decimal
            "response_status_code": response.status_code, # 21 - Response Status Code - Integer
            "response_content": str(response.content), # 22 - Response Content - Text
            "error_count_so_far": apiastrea.error_count # 23 - Error Count - Integer
        }
        con.insert_row(
            table='cx_data.virada_planos_customer_summary',
            values=customer_summary
        )

control_flow_end = int(time.time())
total_elapsed_time = control_flow_end - control_flow_begin
logger.info(f'Atualização finalizada ({total_elapsed_time}s), {apiastrea.error_count} erros, {apiastrea.successful_requests} sucessos.')
slack_notify(f'Virada de Planos - {version}','Finish',f'Atualização finalizada ({total_elapsed_time}s), {apiastrea.error_count} erros, {apiastrea.successful_requests} sucessos.')
logger.info(f"Fluxo de Controle Finalizado")

## Fluxo de Controle 2: Notificação de clientes 45d

In [16]:
df_clientes_notificacao = _deepnote_execute_sql('select * from df_clientes\nwhere  (periodicidade_origem like \'%Anual%\' and dias_ate_renovacao = 45\n    or (periodicidade_origem like \'%Mensal%\'\n        and (parcela_dt_proxima::date + interval\'1 month\') - current_date::date = 45))\nand not api_key in (select api_key from df_notified_tenants)\nlimit 1', 'SQL_DEEPNOTE_DATAFRAME_SQL', audit_sql_comment='', sql_cache_mode='cache_disabled')
df_clientes_notificacao

Unnamed: 0,api_key,namespace,periodicidade_origem,plano_origem,plano_destino,vr_liquido_origem,vr_bruto_origem,vr_bruto_destino,desconto_origem,desconto_destino,parcela_dt_proxima,dias_ate_renovacao
0,4556297799335936,mila.rodriigues,Anual parcelado,Astrea Starter Padrão 129 / 50,COMPANY,2468.4,2468.4,5844.0,0.0,0.0,2023-10-07,45


In [18]:
clientes_notify = len(df_clientes_notificacao)

if clientes_notify == 0:
    slack_notify(f'Virada de Planos - {version}','Info','Nenhum cliente encontrado para notificação de 45d')

elif clientes_notify > 0:
    slack_notify(f'Virada de Planos - {version}','Info',f'{clientes_notify} clientes encontrados para notificação de 45d')
    logger.info('Fluxo de Controle: Iniciando o Loop de notificação')
    for index, row in df_clientes_notificacao.iterrows():
        customer = row.to_dict()
        notification = apiintercom.notify_company_users(
            company_name=customer["namespace"],
            event_name="virada-planos-notificacao-45d",
            timestamp = int(time.time())
        )
        
        customer_summary = {
            "script_ver": version,
            "api_key": customer['api_key'],
            "namespace": customer['namespace'],
            "periodicidade": customer['periodicidade_origem'],
            "plano_origem": customer['plano_origem'],
            "plano_destino": customer['plano_destino'],
            "vr_bruto_origem": customer['vr_bruto_origem'], # 10 - VR Bruto Origem - Decimal
            "vr_bruto_destino": customer['vr_bruto_destino'], # 11 - VR Bruto Destino - Decimal
            "vr_liquido_origem": customer['vr_liquido_origem'], # 12 - VR Liquido Origem - Decimal
            "desconto_origem": customer['desconto_origem'], # 13 - Desconto Origem - Decimal
            "desconto_destino": customer['desconto_destino'], # 14 - Desconto Destino - Decimal
            "parcela_dt_proxima": customer['parcela_dt_proxima'], # 15 - Parcela Dt Proxima - Date
            "dias_ate_renovacao": customer['dias_ate_renovacao'], # 16 - Dias Ate Renovacao - Integer
            "users_notified": notification['success']
            }
        con.insert_row(
            table='cx_data.virada_planos_notified_customers',
            values=customer_summary
        )
logger.info('Fluxo de Controle: Loop de notificação finalizado.')

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=a6aa03ff-9a9e-4b1a-a1ec-ff555e98a018' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>