<a href="https://colab.research.google.com/github/joquifer2/Carga-y-sincronizci-n-de-los-contactos-entre-Systeme.io-y-BigQuery/blob/main/Carga_y_sincronizci%C3%B3n_de_los_contactos_entre_Systeme_io_y_BigQuery.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Carga y sincronizción de los contactos entre Systeme.io y BigQuery

Este código garantiza que los contactos en BigQuery estén sincronizados con los de Systeme.io, eliminando los contactos que ya no existen en Systeme.io y agregando los nuevos.

**Resumen del Flujo de Trabajo**

1. Inicio: Inicializa el cliente de BigQuery y configura los parámetros de la API.
2. Obtención de Contactos: Itera a través de las páginas de contactos de Systeme.io, almacenando los correos electrónicos y determinando los nuevos contactos.
3. Carga de Contactos Nuevos: Transforma y carga los contactos nuevos en BigQuery.
4. Sincronización: Identifica y elimina los contactos que ya no están presentes en Systeme.io.
5.Finalización: Indica la finalización del proceso de sincronización.

https://developer.systeme.io/reference/api

**Detalles del Código**

1. Inicialización y Configuración:

 * Se definen la URL base y los encabezados necesarios para realizar solicitudes a la API de Systeme.io.
 * Se inicializa el cliente de BigQuery.
2. Obtención de Correos Electrónicos Existentes en BigQuery:

 * Se consulta la tabla de BigQuery para obtener los correos electrónicos de los contactos existentes.
 * Estos correos se almacenan en un conjunto existing_contact_emails para facilitar las comparaciones posteriores.
3. Iteración a través de las Páginas de la API de Systeme.io:

 * Se utiliza un bucle while para iterar a través de las páginas de contactos de la API de Systeme.io.
 * En cada iteración, se solicita una página de contactos (con un límite de 100 contactos por página).
4. Procesamiento de Contactos de la API:

 * Para cada página de contactos recibida:
Se almacenan los correos electrónicos en all_systeme_contacts.
 * Se filtran los contactos para obtener solo aquellos que no están ya presentes en existing_contact_emails (contactos nuevos).
 * Se agrega el correo de cada contacto nuevo a existing_contact_emails para futuras comparaciones.
5. Carga de Nuevos Contactos en BigQuery:

 * Si hay contactos nuevos, se transforman en un DataFrame.
Se verifica que el esquema del DataFrame coincida con el esquema de la tabla en BigQuery.
 * Los contactos nuevos se cargan en la tabla de BigQuery.
6. Verificación de Páginas Adicionales:

 * Se verifica si hay más páginas disponibles en la API de Systeme.io.
 * Si no hay más páginas, se sale del bucle.
7. Identificación y Eliminación de Contactos Eliminados:

 * Después de procesar todas las páginas, se identifican los contactos que están en BigQuery pero no en all_systeme_contacts (contactos eliminados en Systeme.io).
 * Se crea una consulta SQL para eliminar estos contactos de BigQuery.
 * La consulta se ejecuta utilizando client.query.
8. Finalización:

 * Se imprime un mensaje indicando que el proceso de obtención, carga y sincronización de contactos se ha completado.

In [None]:
import requests
import pandas as pd
from google.cloud import bigquery
import time

# Define la URL base y los encabezados para la solicitud API que obtiene los datos de contacto
base_url = "https://api.systeme.io/api/contacts"
headers = {
    "accept": "application/json",
    "X-API-Key": "Introducir manualmente"
}

# Inicializa el cliente de BigQuery
client = bigquery.Client()
new_table_id = 'table_id'

# Obtiene los emails de los contactos ya existentes en BigQuery
query = f"SELECT email FROM `{new_table_id}`"
existing_contact_emails = set(row["email"] for row in client.query(query).result())
print(f"Número de contactos existentes en BigQuery: {len(existing_contact_emails)}")

page = 1
all_systeme_contacts = set()
has_more = True

while has_more:
    params = {
        "page": page,
        "limit": 100  # Solicita un máximo de 100 contactos por página
    }

    print(f"Solicitando la página {params['page']} con límite de {params['limit']} contactos...")
    response = requests.get(base_url, headers=headers, params=params)

    print(f"Respuesta recibida con código de estado: {response.status_code}")
    if response.status_code == 200:
        data_values = response.json()
        num_contacts = len(data_values.get('items', []))
        print(f"Número de contactos recibidos en esta página: {num_contacts}")

        if num_contacts == 0:
            print("No se encontraron más datos de contacto válidos en la respuesta.")
            break

        contacts = data_values['items']
        all_systeme_contacts.update(contact["email"] for contact in contacts)

        # Filtra los contactos para obtener solo los nuevos
        new_contacts = [contact for contact in contacts if contact["email"] not in existing_contact_emails]
        print(f"Número de nuevos contactos a cargar: {len(new_contacts)}")

        if new_contacts:
            # Agrega los nuevos emails al conjunto existente
            existing_contact_emails.update(contact["email"] for contact in new_contacts)

            # Función para transformar los datos y cargarlos en BigQuery
            def transform_and_load_to_bigquery(contacts, table_id, bq_client):
                print("Iniciando la transformación y carga de datos en BigQuery...")
                contact_list = []

                # Extrae los datos del campo 'fields' y agrégalos al diccionario de contactos
                for contact in contacts:
                    contact_dict = {
                        "email": contact.get("email"),
                        "registeredAt": contact.get("registeredAt"),
                        "locale": contact.get("locale"),
                        "sourceURL": contact.get("sourceURL"),
                        "unsubscribed": contact.get("unsubscribed"),
                        "bounced": contact.get("bounced"),
                        "needsConfirmation": contact.get("needsConfirmation")
                    }
                    for field in contact['fields']:
                        contact_dict[field['slug']] = field['value']
                    contact_list.append(contact_dict)

                # Crea un DataFrame a partir de la lista de contactos
                df_update = pd.DataFrame(contact_list)

                # Asegúrate de que el esquema coincide
                query = f"SELECT * FROM `{table_id}` LIMIT 1"
                schema_df = bq_client.query(query).to_dataframe()

                for column in schema_df.columns:
                    if column not in df_update.columns:
                        df_update[column] = None

                # Filtra el DataFrame para que solo contenga las columnas que están en el esquema de la tabla
                df_update = df_update[schema_df.columns]

                # Verifica que el DataFrame contiene el campo 'email' y que no es nulo
                if 'email' not in df_update.columns or df_update['email'].isnull().any():
                    raise ValueError("El campo 'email' debe estar presente y no puede ser nulo.")

                # Print the DataFrame to be updated to check the result
                print(df_update)

                # Carga el DataFrame con los nuevos datos en la tabla existente
                job_update = bq_client.load_table_from_dataframe(df_update, table_id)

                # Espera a que el trabajo se complete
                job_update.result()

                print(f'Actualizados {job_update.output_rows} filas en {table_id}.')

            # Llama a la función para cargar los datos en BigQuery
            transform_and_load_to_bigquery(new_contacts, new_table_id, client)
        else:
            print("No hay nuevos contactos para cargar.")

        # Verificar si hay más páginas para procesar
        has_more = data_values.get('hasMore', False)
        if not has_more:
            print("No hay más páginas disponibles.")
            break

    else:
        print(f"Error: {response.status_code}")
        break

    page += 1
    # Pausa para evitar superar el límite de la API
    time.sleep(1)

# Identificar contactos eliminados
contacts_to_delete = existing_contact_emails - all_systeme_contacts
print(f"Número de contactos a eliminar: {len(contacts_to_delete)}")

if contacts_to_delete:
    emails_to_delete = list(contacts_to_delete)
    # Crear una consulta SQL para eliminar los contactos en BigQuery
    delete_query = f"""
    DELETE FROM `{new_table_id}`
    WHERE email IN UNNEST(@emails_to_delete)
    """
    # Ejecutar la consulta SQL
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ArrayQueryParameter("emails_to_delete", "STRING", emails_to_delete)
        ]
    )
    query_job = client.query(delete_query, job_config=job_config)
    query_job.result()  # Espera a que la consulta se complete
    print(f"Eliminados {len(emails_to_delete)} contactos de {new_table_id}.")

print("Proceso de obtención, carga y sincronización de contactos completado.")