In [24]:
import os
import requests
import pandas as pd
from io import StringIO
from dotenv import load_dotenv
from datetime import datetime

load_dotenv()

def export_ALL_Partial_payments(year: int, month: int, 
                        out_dir: str = r"..\data\All_Partial_payments") -> tuple[pd.DataFrame, str]:
    """
    Descarga, filtra y exporta a CSV los contratos premium con Abo Unique = 1.

    Parámetros
    ----------
    year : int
        Año de la consulta (ejemplo: 2025).
    month : int
        Mes de la consulta en formato numérico (ejemplo: 8 o 12).
    out_dir : str, opcional
        Carpeta donde se guardará el archivo. Por defecto ../data/Debt_premium

    Retorna
    -------
    tuple[pd.DataFrame, str]
        - DataFrame filtrado.
        - Ruta completa del archivo generado.
    """
    # Formatear año y mes
    year_str = str(year)
    month_str = f"{month:02d}"  # asegura siempre 2 dígitos (ej: 08, 12)

    base_url = "https://billing.izoswap.fr/admin/export/payments_subs_attempts/ALL"
    token = os.getenv("PAYMENTS_TOKEN")
    url = f"{base_url}/{year_str}-{month_str}/{token}/csv"

    # Descargar CSV en memoria
    response = requests.get(url)
    response.raise_for_status()

    df = pd.read_csv(StringIO(response.text), sep=";")

    # Filtro
    mask = (df["Role name"] == "Error Subscription") & (df['Partner Code']=='118')
    columns = ["UID", "Date Transaction", "Campaign Type"]
    df_filtered = df.loc[mask, columns]

    # --- Mantener la fila más reciente por UID ---
    # 1) Parsear fechas en formato día-mes-año (para evitar warnings)
    df_filtered["__dt"] = pd.to_datetime(
        df_filtered["Date Transaction"], errors="coerce", dayfirst=True
    )

    # 2) Ordenar por UID y fecha descendente
    df_filtered = df_filtered.sort_values(["UID", "__dt"], ascending=[True, False], na_position="last")

    # 3) Quitar duplicados por UID, manteniendo la fila más reciente
    df_filtered = df_filtered.drop_duplicates(subset=["UID"], keep="first")

    # 4) Normalizar formato de fecha a ISO estándar
    df_filtered["Date Transaction"] = df_filtered["__dt"].dt.strftime("%Y-%m-%d %H:%M:%S")

    # 5) Limpiar columna auxiliar
    df_filtered = df_filtered.drop(columns="__dt")

    # Crear carpeta si no existe
    os.makedirs(out_dir, exist_ok=True)

    # Definir nombre del archivo
    file_name = f"All_Partial_payments{year_str}_{month_str}.csv"
    file_path = os.path.join(out_dir, file_name)

    # Exportar
    df_filtered.to_csv(file_path, index=False)

    return df_filtered, file_path

In [25]:
df_premium, ruta = export_ALL_Partial_payments(2025, 8)
print(df_premium.shape)
print(f"Archivo generado en: {ruta}")

(652, 3)
Archivo generado en: ..\data\All_Partial_payments\All_Partial_payments2025_08.csv


In [26]:
from pathlib import Path
def build_potential_All_Partial_payments_user(
    dir_path: str = r"..\data\All_Partial_payments",
    pattern: str = "All_Partial_payments",
    clients_filename: str = "Debt_clients.csv",
    out_filename: str = "Potential_All_Partial_payments_user.csv",
    sep: str = ","
) -> str:
    """
    Concatena CSVs cuyo nombre contenga `pattern`, agrega flag de pertenencia
    a Debt_clients.csv y guarda el resultado en `out_filename`.
    """
    base = Path(dir_path)
    if not base.exists():
        raise FileNotFoundError(f"No existe la carpeta: {base}")

    # CSVs candidatos (evitar el archivo de salida y el Debt_clients)
    csvs = sorted([
        f for f in base.glob("*.csv")
        if pattern.lower() in f.name.lower()
        and f.name.lower() != clients_filename.lower()
        and out_filename.lower() not in f.name.lower()
    ])
    if not csvs:
        raise FileNotFoundError(f"No hay CSVs con '{pattern}' en {base}")

    # Leer y concatenar SIN quitar duplicados
    dfs = [pd.read_csv(f, sep=sep, dtype={'UID': "string"}, low_memory=False) for f in csvs]
    df_all = pd.concat(dfs, ignore_index=True, sort=False)

    # Leer Debt_clients.csv
    clients_path = base / clients_filename
    if not clients_path.exists():
        raise FileNotFoundError(f"No se encontró {clients_filename} en {base}")
    df_clients = pd.read_csv(clients_path, sep=sep, dtype={'UID': "string"}, low_memory=False)

    # Validaciones mínimas
    if "UID" not in df_all.columns:
        raise KeyError("La columna 'UID' no existe en los CSV concatenados.")
    if "UID" not in df_clients.columns:
        raise KeyError("La columna 'UID' no existe en Debt_clients.csv.")

    # Flag 1/0 si el UID está en Debt_clients
    df_all["Potential_All_Partial_payments_user"] = df_all["UID"].isin(df_clients["UID"]).astype(int)

    # Guardar
    out_path = base / out_filename
    df_all.to_csv(out_path, index=False, sep=sep)
    return df_all


In [28]:
# Ejemplo de uso:
df_all=build_potential_All_Partial_payments_user()
print(df_all.shape)

(652, 4)
