In [0]:
%sql

CREATE CATALOG IF NOT EXISTS analytics;
CREATE SCHEMA IF NOT EXISTS analytics.animes;
CREATE VOLUME IF NOT EXISTS analytics.animes.animes_data;



In [0]:
import pyspark.pandas as ps
import requests
import time
import json
import pandas as pd



def normalize_text(text):
    return text.lower().strip()

def match_title(user_input, anime):
    # Verifica se existe lista de títulos e itera sobre ela
    titles = anime.get("titles", [])
    return any(
        normalize_text(user_input) in normalize_text(t["title"])
        for t in titles
    )

def fetch_anime_id_by_name(anime_name: str) -> int:
    # Pequeno sleep para respeitar a API entre chamadas de ID
    time.sleep(1) 
    resp = requests.get("https://api.jikan.moe/v4/anime", params={"q": anime_name, "limit": 10})
    
    if resp.status_code == 429:
        raise Exception("Erro 429: Rate Limit atingido. Diminua a concorrência.")
        
    resp.raise_for_status()
    data = resp.json().get("data", [])
    
    if not data:
        print(f"Anime {anime_name} não encontrado, pulando...")
        return None

    for anime in data:
        if match_title(anime_name, anime):
            return anime["mal_id"]
    return data[0]["mal_id"]
def fetch_jikan_characters_logic(
    anime_name: str,
    catalog: str,
    schema: str,
    layer: str = "silver",
    max_characters: int = None
) -> str:

    anime_id = fetch_anime_id_by_name(anime_name)
    if not anime_id:
        return None

    print(f"Buscando personagens para ID: {anime_id}...")
    time.sleep(1)

    resp = requests.get(f"https://api.jikan.moe/v4/anime/{anime_id}/characters")
    resp.raise_for_status()
    items = resp.json().get("data", [])

    if max_characters:
        items = items[:max_characters]

    df_pd = pd.json_normalize(items)

    if 'voice_actors.name' in df_pd.columns:
        df_pd['voice_actors.name'] = df_pd['voice_actors.name'].apply(str)

    df_spark = spark.createDataFrame(df_pd)

    table_name = f"{catalog}.{schema}.{anime_name.replace(' ', '_').lower()}_characters"

    (
        df_spark.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable(table_name)
    )

    return table_name



In [0]:
import numpy as np
animes_list_names = [
    "naruto",
    "bleach",
    "one piece",
    "dragon ball z",
    "dragon ball gt",
    "dragon ball super",
    "boruto",
    "death note",
    "full metal alchemist brotherhood",
    "attack on titan",
    "my hero academia",
    "one punch man",
  
]

animes_list = np.random.choice(animes_list_names, 10, replace=False)
animes_list


dados_animes = [
    fetch_anime_id_by_name(anime_name)
    for anime_name in animes_list
]
for anime_name in animes_list:
    table_name = fetch_jikan_characters_logic(
    anime_name,
    catalog="analytics",
    schema="animes",
    max_characters=1000
)

 

In [0]:
%sql
SHOW TABLES IN analytics.animes;

SHOW COLUMNS IN analytics.animes.bleach_characters


In [0]:
tables = [row.tableName for row in spark.sql("SHOW TABLES IN analytics.animes").collect()]

for table in tables:
    df = spark.table(f"analytics.animes.{table}")
    df_clean = df.dropDuplicates()
    (
        df_clean.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable(f"analytics.animes.{table}")
    )