In [0]:
pip install OpenAI

In [0]:
%pip install -U mlflow
dbutils.library.restartPython()

In [0]:
import json, math, time, re
from typing import List, Dict, Any
from openai import OpenAI
import mlflow
mlflow.openai.autolog()

from pyspark.sql.functions import col, lit, coalesce, substring_index, rand

In [0]:
API_KEY=''
#Databricks Secrets (o gerenciador nativo de segredos do workspace) :(
API_URL='https://api.deepseek.com'
client = OpenAI(api_key=API_KEY, base_url=API_URL)

In [0]:
df_estilos = spark.table("gold.spotify.spotify_musicas")
df_estilos_filtrados = df_estilos.filter(col("style") == "unknown").orderBy(rand()).limit(50)
display(df_estilos_filtrados)

In [0]:
json_data = (
    df_estilos_filtrados
    .toPandas()
    .to_dict(orient="records")
)

total = len(json_data)
print(f"Total de registros: {total}")

In [0]:
print(json_data)

In [0]:
system_prompt = """
You are an expert in music genres. Return ONLY valid JSON.

INPUT: a JSON array of objects with fields:
- "master_metadata_track_name"
- "master_metadata_album_artist_name"
- "master_metadata_album_album_name"

REQUIREMENTS:
- Return a SINGLE JSON OBJECT with exactly one key: "data".
- "data" MUST be a JSON array with the same length and order as the input.
- Each object MUST preserve the original fields and add:
  "Style": the most appropriate single genre in English (e.g., "Brazilian Popular Music", "Samba", "Rock", "Hip Hop").
- No extra keys besides "data". No markdown/code fences. No text outside JSON.

EXAMPLE INPUT (array):
[
  {"master_metadata_track_name":"Cabeça Vazia","master_metadata_album_artist_name":"Djavan","master_metadata_album_album_name":"D"}
]

EXAMPLE OUTPUT (object):
{"data":[
  {"master_metadata_track_name":"Cabeça Vazia","master_metadata_album_artist_name":"Djavan","master_metadata_album_album_name":"D","Style":"Brazilian Popular Music"}
]}
""".strip()

In [0]:
TEMPERATURE   = 0.0        # máximo de consistência
MODEL = "deepseek-chat"  # modelo do DeepSeek

In [0]:
def classify_with_deepseek(items: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Chama o DeepSeek em JSON mode e retorna um dict Python com a chave 'data'.
    Usa TEMPERATURE, system_prompt e client já definidos no seu bloco.
    """
    payload = json.dumps(items, ensure_ascii=False)
    response = client.chat.completions.create(
        model=MODEL,
        temperature=TEMPERATURE,
        response_format={"type": "json_object"},  # força saída JSON válida
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": payload}
        ]
    )
    content = response.choices[0].message.content
    return json.loads(content)


In [0]:
def parsed_to_spark_df(parsed: Dict[str, Any]):
    data_out: List[Dict[str, Any]] = []
    for obj in parsed.get("data", []):
        data_out.append({
            "master_metadata_track_name":        obj["master_metadata_track_name"],
            "master_metadata_album_artist_name": obj["master_metadata_album_artist_name"],
            "master_metadata_album_album_name":  obj["master_metadata_album_album_name"],
            "style":                             obj.get("Style", "unknown"),
        })
    if not data_out:
        raise ValueError("DeepSeek retornou vazio ou sem chave 'data'.")
    return spark.createDataFrame(data_out)

In [0]:
def run_classification(records: List[Dict[str, Any]]):
    """
    Orquestra: classifica -> imprime JSON -> mostra DF resultante.
    (Não altera sua tabela — só exibe. Se quiser, depois fazemos o MERGE.)
    """
    parsed = classify_with_deepseek(records)

    print("JSON retornado pelo modelo:")
    print(json.dumps(parsed, ensure_ascii=False, indent=2))

    # 2) DataFrame normalizado 
    df_out = parsed_to_spark_df(parsed)
    print("Prévia do DataFrame normalizado:")
    display(df_out)
    return df_out


In [0]:
if total == 0:
    print("Nada para classificar. Encerrando.")
else:
    df_out = run_classification(json_data)

In [0]:
from delta.tables import DeltaTable

keys = [
    "master_metadata_track_name",
    "master_metadata_album_artist_name",
    "master_metadata_album_album_name",
]

# df_out -> somente colunas necessárias para update
updates = (
    df_out
    .select(*keys, col("style").alias("new_style"))
    .where(col("new_style").isNotNull())
)

# MERGE INTO via API
tgt = DeltaTable.forName(spark, "gold.spotify.spotify_musicas")

(
    tgt.alias("t")
      .merge(
          updates.alias("u"),
          """
          t.master_metadata_track_name        = u.master_metadata_track_name AND
          t.master_metadata_album_artist_name = u.master_metadata_album_artist_name AND
          t.master_metadata_album_album_name  = u.master_metadata_album_album_name
          """
      )
      .whenMatchedUpdate(
          condition="t.style = 'unknown'",
          set={"style": "u.new_style"}
      )
      .execute()
)

display(
    spark.table("gold.spotify.spotify_musicas")
         .where(col("style") != "unknown")
)
