<a href="https://colab.research.google.com/github/mateus-miguel/case-dadosfera/blob/main/Batch%20LLM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install boto3

In [None]:
import os
import boto3
import json
from tqdm import tqdm

# Chaves de acesso AWS


# Locais do Bucket e File dentro da AWS S3
dados = []
bucket = 'dadosfera-datalake'
file_json = 'silver/products_clean_10000rows.json'
schema_json = 'metadata/schema.json'

s3 = boto3.client('s3')

# Load do schema
obj_schema = s3.get_object(
    Bucket=bucket,
    Key=schema_json
)

schema = json.loads(obj_schema["Body"].read())
print(schema['attributes'][0])

# Load dos produtos limpos
obj_product = s3.get_object(
    Bucket=bucket,
    Key=file_json
)

products = json.loads(obj_product["Body"].read())

total_items = len(products)
print(products[0])
print(f'Total de produtos: {total_items}')

# Criando subdivisões de <batch_size> produtos em JSON separados
batch_size = 10
total_files = int(total_items / batch_size)
print(f'Quantidade de batches: {total_files}')

batch_list = []

for i in range(total_files):
  start = batch_size * i
  end = start + batch_size
  batch_json = products[start:end]
  batch_list.append(batch_json)

  # with open(f'./batch/products_batch_{i:03d}.json', 'w', encoding='utf-8') as f:
  #   json.dump(batch_json, f, ensure_ascii=False, separators=(",", ":"))

# Usar separators=(",", ":") ao invés de indent = 2 economiza espaço no JSON, reduzindo ele em ~30-40% (bom para LLM)

{'name': 'docid', 'type': 'int', 'description': 'document ID for product'}
{'docid': 1, 'title': 'FYY Leather Case with Mirror for Samsung Galaxy S8 Plus, Leather Wallet Flip Folio Case with Mirror and Wrist Strap for Samsung Galaxy S8 Plus Black', 'text': '  Premium PU Leather Top quality. Made with Premium PU Leather. Receiver design. Accurate cut-out for receiver. Convenient to Answer the phone without open the case. Hand strap makes it easy to carry around. RFID Technique RFID Technique: Radio Frequency Identification technology, through radio signals to identify specific targets and to read and copy electronic data. Most Credit Cards, Debit Cards, ID Cards are set-in the RFID chip, the RFID reader can easily read the cards information within 10 feet(about 3m) without touching them. This case is designed to protect your cards information from stealing with blocking material of RFID shielding technology. 100% Handmade 100% Handmade. Perfect craftmanship and reinforced stitching make

In [None]:
# --- ACESSO AO MODELO LLM ---
from openai import OpenAI

# Chave de acesso API OpenAI


# Criar client OpenAI
client = OpenAI(api_key=OPENAI_API_KEY)

In [None]:
def enrich_product(products, schema):
  # Função que pega um batch (lista) de <batch_size> produtos e com o schema definido extrai atributos para cada produto

  prompt=f"""
  SCHEMA:
  {json.dumps(schema, ensure_ascii=False, separators=(",", ":"))}

  LIST OF PRODUCTS:
  {json.dumps(products, ensure_ascii=False, separators=(",", ":"))}

  For each product in the list:
  - Return ONLY the attributes defined in the schema
  - Always preserve the same docid
  - Output a JSON array with EXACTLY {len(products)} objects
  """

  response = client.chat.completions.create(
      model='gpt-4o-mini',
      temperature=0,
      messages=[
          {
              'role': 'system',
              'content': (
                  "You are an information extraction system.\n"
                  "Use only the schema attributes.\n"
                  "Do not hallucinate.\n"
                  "Infer the main category and country from the product title and description"
                  "If an attribute cannot be inferred, return null (empty string, empty list). Don't use 'unknown', 'not specified' or ['none'] for a list"
                  "Return valid JSON only. No 'json' before, starting at '{' or '['"
                  "All string values must be single-line."
                  "NEVER include \n, \r, \t characters."
              )
          },
          {'role': 'user', 'content': prompt}
      ]
  )

  content = response.choices[0].message.content.strip()

  try:
    result = json.loads(content)
  except json.JSONDecodeError as e:
    raise ValueError(f"Invalid JSON returned by model: {e}\nContent:\n{content}")

  if not isinstance(result, list) or len(result) != len(products):
    raise ValueError(
        f'Invalid batch size returned: expected {len(products)}, got {len(result)}'
    )

  return result

In [None]:
# --- TRECHO DE TESTE PARA POUCOS BATCHES ---

# arquivo = './batch/products_batch_000.json'

# with open(arquivo, 'r', encoding='utf-8') as f:
#   batch = json.load(f)

resultado = enrich_product(batch_list[0], schema)

with open('./output/products_output_000.json', 'w', encoding='utf-8') as f:
  json.dump(resultado, f, ensure_ascii=False, indent=2)

In [None]:
# --- Processando todos os batches ---
import os

# Faço um loop indexado sobre os batches separados do arquivo original, usando try/except para pular erros e continuar aos próximos
for i, batch in enumerate(tqdm(batch_list)):
  try:
    enriched_products = enrich_product(batch, schema)

    # Colocando o resultado do batch na S3, de forma a persistir
    path_s3 = f'gold/products_batch_{batch_size}rows_{i:03d}.json'

    s3.put_object(
        Bucket=bucket,
        Key=path_s3,
        Body=json.dumps(enriched_products, ensure_ascii=False).encode("utf-8")
    )

    # path = f'./output/products_output_{i:03d}.json'
    # with open(path, 'w', encoding='utf-8') as f:
    #   json.dump(enriched_products, f, ensure_ascii=False, indent=2)
    #   f.flush()                 # flush do Python
    #   os.fsync(f.fileno())      # flush do SO

  except Exception as e:
    print(f'Erro no batch {i:03d}: {e}')

In [None]:
!rm -rf batch/*