In [None]:
%run NB_Credentials

StatementMeta(, a0f11fff-7c23-4b99-9ef1-d3ee8375ec5b, 10, Finished, Available, Finished)

In [68]:
import nest_asyncio
import asyncio
import aiohttp
import pandas as pd
import time
import requests

StatementMeta(, a0f11fff-7c23-4b99-9ef1-d3ee8375ec5b, 11, Finished, Available, Finished)

In [69]:
# Iniciar tiempo de ejecución
start_time = time.time()

StatementMeta(, a0f11fff-7c23-4b99-9ef1-d3ee8375ec5b, 12, Finished, Available, Finished)

In [70]:
# Ruta del Data Lake y tabla de destino
TABLA_ORIGEN = "bronze_fact_post"
TABLA_DESTINO = "bronze_dim_users"

StatementMeta(, a0f11fff-7c23-4b99-9ef1-d3ee8375ec5b, 13, Finished, Available, Finished)

In [71]:
# Leer la tabla fact_post para obtener los autores únicos
df_fact_post = spark.read.format("delta").load(f"{BRONZE_PATH}/{TABLA_ORIGEN}").select("author").distinct()
authors = [row["author"] for row in df_fact_post.collect()]

# Total de autores únicos encontrados
# print(f"📊 Total de autores únicos encontrados: {len(authors)}")

StatementMeta(, a0f11fff-7c23-4b99-9ef1-d3ee8375ec5b, 14, Finished, Available, Finished)

In [72]:
# Obtener perfiles de los usuarios en lotes de 25
users_data = []
BATCH_SIZE = 25

for i in range(0, len(authors), BATCH_SIZE):
    batch = authors[i:i + BATCH_SIZE]
    params = {"actors": batch}
    
    response = requests.get("https://public.api.bsky.app/xrpc/app.bsky.actor.getProfiles", params=params)
    
    if response.status_code != 200:
        continue  # Si hay error, saltamos el lote
    
    data = response.json()
    profiles = data.get("profiles", [])

    for profile in profiles:
        users_data.append({
            "author": profile.get("handle", ""),
            "display_name": profile.get("displayName", ""),
            "description": profile.get("description", ""),
            "avatar": profile.get("avatar", ""),
            "created_at": profile.get("createdAt", ""),
            "followers_count": profile.get("followersCount", 0),
            "follows_count": profile.get("followsCount", 0),
            "posts_count": profile.get("postsCount", 0),
            "followers_list": ""  # Inicializamos vacío
        })

    time.sleep(2)  # Pequeña pausa para evitar bloqueos de la API

StatementMeta(, a0f11fff-7c23-4b99-9ef1-d3ee8375ec5b, 15, Finished, Available, Finished)

StatementMeta(, dd6a3301-9d99-48d6-aff2-86eff0ec5928, 13, Finished, Available, Finished)

In [73]:
# Obtener los seguidores de los usuarios de forma asíncrona
async def get_followers_async(session, author, max_pages=10, sleep_time=0.1):
    followers_set = set()
    cursor = None
    page_count = 0

    while page_count < max_pages:
        params = {"actor": author, "limit": 100}
        if cursor:
            params["cursor"] = cursor

        try:
            async with session.get("https://public.api.bsky.app/xrpc/app.bsky.graph.getFollowers", params=params) as response:
                data = await response.json()
                followers = data.get("followers", [])
                cursor = data.get("cursor")

                new_followers = {f.get("handle", "") for f in followers}
                followers_set.update(new_followers)

                if not new_followers or not cursor:
                    break  

                page_count += 1
                await asyncio.sleep(sleep_time)  # Evitar bloqueos por rate-limiting

        except Exception as e:
            print(f"❌ Error obteniendo seguidores de {author}: {e}")
            return ""

    return ", ".join(followers_set)

async def update_followers_async(users_data):
    users_dict = {user["author"]: user for user in users_data}

    async with aiohttp.ClientSession() as session:
        tasks = [get_followers_async(session, author) for author in users_dict.keys()]
        results = await asyncio.gather(*tasks)

    for author, followers in zip(users_dict.keys(), results):
        users_dict[author]["followers_list"] = followers

    # print("✅ Listado de seguidores actualizado.")
    return list(users_dict.values())

# Habilitar `nest_asyncio` para evitar conflictos con el loop de Fabric
nest_asyncio.apply()

StatementMeta(, a0f11fff-7c23-4b99-9ef1-d3ee8375ec5b, 16, Finished, Available, Finished)

In [74]:
# Ejecutar la actualización de seguidores en paralelo usando asyncio
users_data_updated = asyncio.run(update_followers_async(users_data))

StatementMeta(, a0f11fff-7c23-4b99-9ef1-d3ee8375ec5b, 17, Finished, Available, Finished)

In [75]:
# Convertir la lista actualizada a DataFrame de Spark y guardar en Fabric
df_users = spark.createDataFrame(users_data_updated)

df_users.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(f"{BRONZE_PATH}/{TABLA_DESTINO}")

StatementMeta(, a0f11fff-7c23-4b99-9ef1-d3ee8375ec5b, 21, Finished, Available, Finished)

In [76]:
# Calcular y mostrar tiempo de ejecución en minutos y segundos
end_time = time.time()
execution_time = end_time - start_time
minutes = int(execution_time // 60)
seconds = execution_time % 60
print(f"⏳ Tiempo total de ejecución: {minutes} min {seconds:.2f} seg")

StatementMeta(, a0f11fff-7c23-4b99-9ef1-d3ee8375ec5b, 22, Finished, Available, Finished)

⏳ Tiempo total de ejecución: 2 min 56.91 seg
