# Importação Retroativa de Dados
Objetivo: Esse pipeline tem como objetivo realizar a importação retroativa dos dados de Pesquisa do Tally Forms. 

Campanha: lcto-ofan-jan26
Conversion types: 2

# 0. Importando Dependências

In [1]:
# Importações básicas
import math
import pandas as pd
import numpy as np
import sys
from pathlib import Path
import os
sys.path.insert(0, os.path.join(os.path.dirname(os.getcwd()), 'src'))

# Adiciona src ao path
sys.path.append('../src')


# Utilitários de dados
from data_utils import (
    load_raw_data,
    save_processed_data,
    remove_duplicates,
    handle_missing_values,
    detect_outliers,
    normalize_column,
    process_phone_string,
    process_phone_number,
    clean_and_lower_column,
    flatten_list_to_df,
    remove_buyers_from_dataframe
)

CRONOGRAMA_SUBDOMAIN = 'cronogramadosfluentes-xwamel'

# Utilitários SQL
from sql_utils import DatabaseConnection as Dbc, load_query_from_file

# Utilitários de visualização
import matplotlib.pyplot as plt
import seaborn as sns

# Utilitários de API
from api_utils import (
    make_request,
    get_json,
    post_json,
    paginated_request,
    response_to_dataframe
)

# utilitários hotmart
from hotmart_utils import Hotmart

# utilitários tmb
from tmb_utils import TMB   

# utilitários tally
from tally_utils import Tally  

# Configurações pandas
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

# Load Database Driver
db = Dbc()

# Inicializar API Hotmart
hotmart = Hotmart()
# Inicializar API TMB
tmb = TMB()

# Inicializar API Tally Forms
tally = Tally()

# 1. Recuperando Submissões via API Tally Forms

In [2]:
## Constates do Código
CAMPAIGN_ID = 'lcto-ofan-jan26'
CONVERSION_TYPE_ID = "2"
FORM_ID = "Y5P1AN"

# Configurações de Importação
DESTINATION_QUEUE = 'S.NORM.PROD.NEW'
SKIP_ORQUESTRATION = True
SKIP_CONVERSION = False

## Recuperando Submissões via API Export
submissions = load_raw_data("tally_responses.csv")

## 1.1 Formatando Dataframe Submissions

In [3]:
def clean_data(submissions):
    # Drop rows with missing data in columns: 'email', 'phone'
    submissions = submissions.dropna(subset=['email', 'phone'])
    submissions['phone'] = submissions['phone'].astype(int).astype(str) 
    def to_iso_z_apply(value, tz_local: str = "America/Sao_Paulo") -> str | None:
        if pd.isna(value):
            return None
        dt = pd.to_datetime(value, errors="coerce")
        if pd.isna(dt):
            return None
        # se vier tz-naive, assume GMT-3 (America/Sao_Paulo)
        if dt.tzinfo is None:
            dt = dt.tz_localize(tz_local)
        # converte pra UTC e formata
        dt_utc = dt.tz_convert("UTC")
        return dt_utc.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    submissions["conversion_date"] = submissions["Submitted at"].apply(to_iso_z_apply)
    return submissions

submissions_clean = clean_data(submissions.copy())
submissions_clean.head()

Unnamed: 0,Submission ID,Respondent ID,Submitted at,utm_source,utm_campaign,utm_medium,utm_content,utm_term,active_id,campaign_id,score,name,email,phone,country,gender,age_range,current_occupation,biggest_fluency_desire,monthy_income,english_level,took_english_course,conversion_date
1648,ja4j8px,pbO0RjV,2025-12-30 13:58:13,whatsapp,lcto-ofan-jan26,direct,Onboarding,,,lcto-ofan-jan26,58,Fernanda Damasceno do Amaral Mello,fernanda.amaralmello@gmail.com,5541991635720,Brasil,Feminino,De 25 a 30 anos,Empregado(a),Conseguir um aumento/promoção no meu emprego a...,"De R$ 3001,00 a R$ 5.000,00",Intermediário.,Não.,2025-12-30T16:58:13.000Z
1649,ja4j8YE,Gxloqvj,2025-12-30 13:58:32,whatsapp,lcto-ofan-jan26,direct,Onboarding,,,lcto-ofan-jan26,93,Marcos Paulo Mendonça,marcosspt2@gmail.com,5511918831009,Brasil,Masculino,De 31 a 34 anos,Empreendedor(a),Morar no exterior.,"De R$ 10.000,00 a R$ 30.000,00",Básico,Sim.,2025-12-30T16:58:32.000Z
1650,ja4jAla,1AVbNE1,2025-12-30 14:01:23,whatsapp,lcto-ofan-jan26,direct,Onboarding,,,lcto-ofan-jan26,48,Fabíola Soares do Monte,fabiolamonte@gmail.com,5531984619327,Brasil,Feminino,De 45 a 54 anos,Autônomo,Dominar a gramática da lingua inglesa.,"De R$ 3001,00 a R$ 5.000,00",Começando do zero.,Não.,2025-12-30T17:01:23.000Z
1651,Y5B1APv,q4VYJ9g,2025-12-30 14:06:30,whatsapp,lcto-ofan-jan26,direct,Onboarding,,,lcto-ofan-jan26,88,Rodrigo Lima Corrêa,rodriguinhogda@gmail.com,351966675494,Portugal,Masculino,De 25 a 30 anos,Empregado(a),Conquistar um emprego melhor.,"De R$ 5001,00 a R$ 10.000,00",Básico,Sim.,2025-12-30T17:06:30.000Z
1652,ja4jVLx,7RxJpRP,2025-12-30 14:07:55,whatsapp,lcto-ofan-jan26,direct,Onboarding,,,lcto-ofan-jan26,33,Raquel Lopes dos Santos,raquellopezsmile@hotmail.com,5517981485015,Brasil,Feminino,De 25 a 30 anos,Dona de casa,Morar no exterior.,Não tenho nenhuma renda,Básico,Não.,2025-12-30T17:07:55.000Z


## 1.2 Formatando Dataframe em formato Conversions

In [4]:
conversions = []

def is_nan_or_none(val):
    return val is None or (isinstance(val, float) and math.isnan(val))

def clean_json_value(val):
    """
    Clean a single value to be JSON compliant, replacing nan/None with None and forcing types
    """
    if is_nan_or_none(val):
        return None
    if isinstance(val, float):
        # JSON does not support inf
        if math.isinf(val):
            return None
    return val

def make_json_compliant_dict(item):
    """
    Recursively replace values that are not JSON compliant (e.g., nan/None/inf) with None.
    """
    if isinstance(item, dict):
        return {k: make_json_compliant_dict(v) for k, v in item.items()}
    elif isinstance(item, list):
        return [make_json_compliant_dict(val) for val in item]
    else:
        return clean_json_value(item)

for idx, row in submissions_clean.iterrows():

    # name
    name = clean_json_value(row.get("name"))
    # Defensive get and clean for all fields that might be nan
    email = row.get("email")
    email = "" if is_nan_or_none(email) else str(email).lower().strip()

    phone_val = row.get("phone")
    phone_str = "" if is_nan_or_none(phone_val) else str(phone_val).strip()
    phone = process_phone_number(phone_str)

    phone['raw_phone'] =  phone['raw_phone_input']

    conversion_date = clean_json_value(row.get("conversion_date"))

    utm_source = clean_json_value(row.get("utm_source"))
    utm_campaign = clean_json_value(row.get("utm_campaign"))
    utm_medium = clean_json_value(row.get("utm_medium"))
    utm_content = clean_json_value(row.get("utm_content"))
    submission_id = clean_json_value(row.get("Submission ID"))

    # Criando Conversion Raw Info Dictionary
    conversion_raw_info = {
        "form_id": FORM_ID,
        "submission_id": clean_json_value(row.get("Submission ID"))
    }
    labels = ["country", "gender", "age_range", "current_occupation", "biggest_fluency_desire", "monthy_income", "english_level", "took_english_course"]
    for label in labels:
        conversion_raw_info[label] = clean_json_value(row.get(label))

    conversion_dict = { 
        "lead_data": {
            "email": {"value": email},
            "phone": phone,
            "name": name
        }, 
        "conversion_data": {
            "conversion_date": conversion_date,
            "campaign_id": CAMPAIGN_ID,
            "conversion_type_id": CONVERSION_TYPE_ID,
            "utm_source": utm_source,
            "utm_campaign": utm_campaign,
            "utm_medium": utm_medium,
            "utm_content": utm_content,
            "conversion_raw_info": conversion_raw_info
        },
        "flow_settings": {
            "skip_conversion": SKIP_CONVERSION,
            "skip_orquestration": SKIP_ORQUESTRATION,
            "destination_queue": DESTINATION_QUEUE
        }
    }

    # Recursively clean before append to eliminate any possible nan or non-JSON-compliant values
    conversion_dict = make_json_compliant_dict(conversion_dict)
    conversions.append(conversion_dict)

conversions


[{'lead_data': {'email': {'value': 'fernanda.amaralmello@gmail.com'},
   'phone': {'raw_phone_input': '5541991635720',
    'formatted_phone': '+5541991635720',
    'whatsapp_format': '+554191635720',
    'isValid': True,
    'type': 'Nacional',
    'ddd': 41,
    'ddi': 55,
    'region': 'BR',
    'carrier': 'Vivo',
    'location': 'Paraná',
    'timezone': ['America/Sao_Paulo'],
    'number_type': 'Mobile',
    'raw_phone': '5541991635720'},
   'name': 'Fernanda Damasceno do Amaral Mello'},
  'conversion_data': {'conversion_date': '2025-12-30T16:58:13.000Z',
   'campaign_id': 'lcto-ofan-jan26',
   'conversion_type_id': '2',
   'utm_source': 'whatsapp',
   'utm_campaign': 'lcto-ofan-jan26',
   'utm_medium': 'direct',
   'utm_content': 'Onboarding',
   'conversion_raw_info': {'form_id': 'Y5P1AN',
    'submission_id': 'ja4j8px',
    'country': 'Brasil',
    'gender': 'Feminino',
    'age_range': 'De 25 a 30 anos',
    'current_occupation': 'Empregado(a)',
    'biggest_fluency_desire': 'C

In [7]:
import requests
import time
import json
import os

BASE_URL = "https://webhooks.aloud.com.br/data/ce7d3845-3acf-4606-835c-63485eb5fe70"

responses = []

PERSISTENCE_FILE = "imported_leads.json"

# Função auxiliar para obter valor de chave profunda (key_path)
def get_deep_key(d, key_path, default=""):
    """Busca valor em d pelo key_path em formato dot (ex: 'conversion_data.conversion_raw_info.submission_id')"""
    try:
        for key in key_path.split("."):
            d = d[key]
        return d
    except (KeyError, TypeError):
        return default

# Permite selecionar qual chave (via path) será usada para controle de importação
IMPORT_CONTROL_KEY_PATH = "conversion_data.conversion_raw_info.submission_id"

# Carregue os registros persistidos previamente já enviados, se existir
if os.path.exists(PERSISTENCE_FILE):
    with open(PERSISTENCE_FILE, "r", encoding="utf-8") as f:
        imported_keys = set(json.load(f))
else:
    imported_keys = set()

total_to_import = len(conversions)
print(f"Total de registros para importar: {total_to_import}")

for idx, item in enumerate(conversions, start=1):
    import_key = str(get_deep_key(item, IMPORT_CONTROL_KEY_PATH, "")).strip()
    if not import_key:
        print(f"Registro {idx} ignorado: chave de controle '{IMPORT_CONTROL_KEY_PATH}' ausente ou inválida.")
        continue
    if import_key in imported_keys:
        print(f"Registro {idx} já foi importado anteriormente para a chave '{import_key}', ignorando.")
        continue
    
    print(f"Importando registro {idx} de {total_to_import}... (key: {import_key})")
    response = requests.post(BASE_URL, json=item)
    try:
        resp_json = response.json()
    except Exception:
        resp_json = response.text
    responses.append(resp_json)
    
    # Atualize a persistência local imediatamente
    imported_keys.add(import_key)
    with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
        json.dump(sorted(list(imported_keys)), f, ensure_ascii=False, indent=2)

Total de registros para importar: 2962
Registro 1 já foi importado anteriormente para a chave 'ja4j8px', ignorando.
Registro 2 já foi importado anteriormente para a chave 'ja4j8YE', ignorando.
Registro 3 já foi importado anteriormente para a chave 'ja4jAla', ignorando.
Registro 4 já foi importado anteriormente para a chave 'Y5B1APv', ignorando.
Registro 5 já foi importado anteriormente para a chave 'ja4jVLx', ignorando.
Registro 6 já foi importado anteriormente para a chave 'gDPEo1J', ignorando.
Registro 7 já foi importado anteriormente para a chave 'lb2xxrp', ignorando.
Registro 8 já foi importado anteriormente para a chave 'dWGOO4A', ignorando.
Registro 9 já foi importado anteriormente para a chave '440MMZ5', ignorando.
Registro 10 já foi importado anteriormente para a chave 'kdj2Lgo', ignorando.
Registro 11 já foi importado anteriormente para a chave 'vG6lRbD', ignorando.
Registro 12 já foi importado anteriormente para a chave 'EkV29Nq', ignorando.
Registro 13 já foi importado anter

ConnectTimeout: HTTPSConnectionPool(host='webhooks.aloud.com.br', port=443): Max retries exceeded with url: /data/ce7d3845-3acf-4606-835c-63485eb5fe70 (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x12342d160>, 'Connection to webhooks.aloud.com.br timed out. (connect timeout=None)'))