In [1]:
import os
import glob
import polars as pl

# Defina o caminho para a sua pasta "Dados"
# Ajuste este caminho se a pasta "Dados" não estiver no mesmo diretório do notebook
DATA_DIR = "./Dados/" # Certifique-se que esta pasta contém seus arquivos TSV grandes

# Verificar se o diretório existe
if not os.path.exists(DATA_DIR):
    print(f"Diretório '{DATA_DIR}' não encontrado. Por favor, verifique o caminho.")
else:
    print(f"Diretório de dados configurado: {os.path.abspath(DATA_DIR)}")


Diretório de dados configurado: c:\projects\VDBMS\Dados


In [2]:
print(f"Procurando por arquivos .tsv em: {os.path.abspath(DATA_DIR)}")

# Listar apenas arquivos TSV
tsv_files = glob.glob(os.path.join(DATA_DIR, "*.tsv"))

if not tsv_files:
    print("Nenhum arquivo .tsv encontrado na pasta 'Dados'.")
    print("Por favor, adicione seus arquivos TSV grandes à pasta.")
else:
    print(f"\nArquivos TSV encontrados ({len(tsv_files)}):")
    for i, f_path in enumerate(tsv_files):
        file_name = os.path.basename(f_path)
        try:
            file_size_bytes = os.path.getsize(f_path)
            file_size_gb = file_size_bytes / (1024**3)
            print(f"  {i+1}. {file_name} ({file_size_gb:.2f} GB)")
        except OSError:
            print(f"  {i+1}. {file_name} (tamanho não pôde ser lido)")


Procurando por arquivos .tsv em: c:\projects\VDBMS\Dados

Arquivos TSV encontrados (21):
  1. amazon_reviews_multilingual_US_v1_00.tsv (3.38 GB)
  2. amazon_reviews_us_Apparel_v1_00.tsv (1.84 GB)
  3. amazon_reviews_us_Automotive_v1_00.tsv (1.26 GB)
  4. amazon_reviews_us_Beauty_v1_00.tsv (2.00 GB)
  5. amazon_reviews_us_Books_v1_02.tsv (3.02 GB)
  6. amazon_reviews_us_Camera_v1_00.tsv (1.02 GB)
  7. amazon_reviews_us_Digital_Ebook_Purchase_v1_01.tsv (3.00 GB)
  8. amazon_reviews_us_Digital_Video_Download_v1_00.tsv (1.20 GB)
  9. amazon_reviews_us_Electronics_v1_00.tsv (1.61 GB)
  10. amazon_reviews_us_Health_Personal_Care_v1_00.tsv (2.26 GB)
  11. amazon_reviews_us_Mobile_Apps_v1_00.tsv (1.29 GB)
  12. amazon_reviews_us_Music_v1_00.tsv (3.42 GB)
  13. amazon_reviews_us_Office_Products_v1_00.tsv (1.16 GB)
  14. amazon_reviews_us_PC_v1_00.tsv (3.40 GB)
  15. amazon_reviews_us_Pet_Products_v1_00.tsv (1.14 GB)
  16. amazon_reviews_us_Shoes_v1_00.tsv (1.46 GB)
  17. amazon_reviews_us_Sport

In [5]:
# Escolha um arquivo da lista acima pelo seu número (ex: 1 para o primeiro arquivo)
# ou defina o nome do arquivo diretamente.
# file_to_explore_name = "seu_arquivo_grande.tsv" # Ou use o índice

FILE_INDEX = 0

if not tsv_files:
    print("Nenhum arquivo TSV para explorar. Execute a célula anterior.")
else:
    # Vamos pegar o primeiro arquivo como exemplo, você pode mudar isso
    selected_file_index = FILE_INDEX # Mude para o índice do arquivo que quer explorar
    
    if 0 <= selected_file_index < len(tsv_files):
        file_path_to_explore = tsv_files[selected_file_index]
        file_name_to_explore = os.path.basename(file_path_to_explore)
        print(f"\n--- Explorando o arquivo: {file_name_to_explore} ---")

        # Quantas linhas ler para a amostragem inicial e inferência de esquema.
        # Para arquivos de 1GB+, 1000-10000 linhas devem ser suficientes para uma boa inferência inicial.
        # Ajuste conforme necessário.
        N_ROWS_SAMPLE = 1000
        
        print(f"Tentando ler as primeiras {N_ROWS_SAMPLE} linhas para inferir o esquema...")

        try:
            # Use read_csv com n_rows para ler apenas uma parte do arquivo.
            # separator='\t' é crucial para arquivos TSV.
            # infer_schema_length: Polars usa este número de linhas para inferir os tipos.
            # Se os tipos não parecerem corretos, você pode aumentar este valor ou
            # especificar os tipos manualmente com o parâmetro `dtypes`.
            df_sample = pl.read_csv(
                file_path_to_explore,
                separator='\t',
                quote_char=None,
                n_rows=N_ROWS_SAMPLE,
                infer_schema_length=N_ROWS_SAMPLE, # Usar as mesmas N linhas para inferir
                ignore_errors=True # Tenta pular linhas com erros de parsing na amostra
            )

            print("\n1. Nomes das Colunas:")
            print(df_sample.columns)

            print("\n2. Tipos de Dados Inferidos (Esquema):")
            # O esquema mostra o nome da coluna e o tipo de dado inferido pelo Polars
            for col_name, dtype in df_sample.schema.items():
                print(f"   - Coluna: '{col_name}', Tipo Inferido: {dtype}")

            print(f"\n3. Primeiras 5 linhas da amostra de {N_ROWS_SAMPLE} linhas:")
            print(df_sample.head(5))
            
            print(f"\n4. Formato da amostra lida: {df_sample.shape} (linhas, colunas)")

            print("\nObservações:")
            print("- Se os tipos de dados (Dtypes) não parecerem corretos, você pode precisar:")
            print(f"  a) Aumentar `N_ROWS_SAMPLE` e `infer_schema_length`.")
            print(f"  b) Especificar os tipos manualmente no `pl.read_csv` usando o parâmetro `dtypes`.")
            print(f"     Ex: dtypes = {{'coluna_A': pl.Utf8, 'coluna_B': pl.Int64, 'coluna_data': pl.Date}}")
            print("- `ignore_errors=True` foi usado para a amostra. Para processamento completo, decida como tratar erros.")

        except Exception as e:
            print(f"Erro ao tentar ler uma amostra do arquivo {file_name_to_explore}: {e}")
            print("Verifique se o arquivo é um TSV válido e se o separador está correto.")
    else:
        print(f"Índice de arquivo inválido. Por favor, escolha um número entre 0 e {len(tsv_files)-1}.")



--- Explorando o arquivo: amazon_reviews_multilingual_US_v1_00.tsv ---
Tentando ler as primeiras 1000 linhas para inferir o esquema...

1. Nomes das Colunas:
['marketplace', 'customer_id', 'review_id', 'product_id', 'product_parent', 'product_title', 'product_category', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase', 'review_headline', 'review_body', 'review_date']

2. Tipos de Dados Inferidos (Esquema):
   - Coluna: 'marketplace', Tipo Inferido: String
   - Coluna: 'customer_id', Tipo Inferido: Int64
   - Coluna: 'review_id', Tipo Inferido: String
   - Coluna: 'product_id', Tipo Inferido: String
   - Coluna: 'product_parent', Tipo Inferido: Int64
   - Coluna: 'product_title', Tipo Inferido: String
   - Coluna: 'product_category', Tipo Inferido: String
   - Coluna: 'star_rating', Tipo Inferido: Int64
   - Coluna: 'helpful_votes', Tipo Inferido: Int64
   - Coluna: 'total_votes', Tipo Inferido: Int64
   - Coluna: 'vine', Tipo Inferido: String
   - Coluna: 've

In [7]:
# Esta célula é opcional e demonstra como usar scan_csv para operações em arquivos grandes.
# Requer que a Célula 3 tenha sido executada e `file_path_to_explore` esteja definido.

if 'file_path_to_explore' in locals() and file_path_to_explore:
    file_name_to_scan = os.path.basename(file_path_to_explore)
    print(f"\n--- Usando scan_csv para análise lazy de: {file_name_to_scan} ---")
    
    try:
        # scan_csv não carrega os dados imediatamente, ele cria um "plano" (LazyFrame).
        # Os dados são processados apenas quando você chama .collect() ou .fetch().
        lazy_df = pl.scan_csv(
            file_path_to_explore,
            separator='\t',
            quote_char=None,
            infer_schema_length=10000, # Pode usar um valor maior para inferência em scans
            ignore_errors=False # Para scans completos, é melhor estar ciente dos erros
        )

        print("\n1. Esquema inferido pelo scan_csv (sem carregar dados):")
        # O esquema é inferido com base nas primeiras `infer_schema_length` linhas.
        for col_name, dtype in lazy_df.schema.items():
            print(f"   - Coluna: '{col_name}', Tipo Inferido: {dtype}")

        # Exemplo: Contar valores nulos por coluna no arquivo inteiro
        # Esta operação será executada de forma eficiente em termos de memória.
        print("\n2. Contando valores nulos por coluna (executando no arquivo inteiro):")
        # null_counts = lazy_df.select(
        #     [pl.col(c).is_null().sum().alias(f"{c}_null_count") for c in lazy_df.columns]
        # ).collect()
        # print(null_counts)
        
        # Uma forma mais simples de obter contagens de nulos:
        print(lazy_df.select(pl.all().null_count()).collect())


        # Exemplo: Obter contagens de valores para uma coluna específica (se for categórica)
        # Substitua 'nome_da_sua_coluna_categorica' pelo nome real de uma coluna
        # target_column_for_value_counts = 'nome_da_sua_coluna_categorica'
        # if target_column_for_value_counts in lazy_df.columns:
        #     print(f"\n3. Contagem de valores para a coluna '{target_column_for_value_counts}' (top 10):")
        #     value_c = lazy_df.group_by(target_column_for_value_counts).agg(
        #         pl.count().alias("counts")
        #     ).sort("counts", descending=True).limit(10).collect()
        #     print(value_c)
        # else:
        #     print(f"\nColuna '{target_column_for_value_counts}' não encontrada para contagem de valores.")
            
        print("\nLembre-se: operações com `lazy_df` só são executadas ao chamar `.collect()` ou `.fetch(N)`.")

    except Exception as e:
        print(f"Erro ao tentar escanear o arquivo {file_name_to_scan}: {e}")
else:
    print("Execute a Célula 3 primeiro para selecionar um arquivo para exploração.")




--- Usando scan_csv para análise lazy de: amazon_reviews_multilingual_US_v1_00.tsv ---

1. Esquema inferido pelo scan_csv (sem carregar dados):
   - Coluna: 'marketplace', Tipo Inferido: String
   - Coluna: 'customer_id', Tipo Inferido: Int64
   - Coluna: 'review_id', Tipo Inferido: String
   - Coluna: 'product_id', Tipo Inferido: String
   - Coluna: 'product_parent', Tipo Inferido: Int64
   - Coluna: 'product_title', Tipo Inferido: String
   - Coluna: 'product_category', Tipo Inferido: String
   - Coluna: 'star_rating', Tipo Inferido: Int64
   - Coluna: 'helpful_votes', Tipo Inferido: Int64
   - Coluna: 'total_votes', Tipo Inferido: Int64
   - Coluna: 'vine', Tipo Inferido: String
   - Coluna: 'verified_purchase', Tipo Inferido: String
   - Coluna: 'review_headline', Tipo Inferido: String
   - Coluna: 'review_body', Tipo Inferido: String
   - Coluna: 'review_date', Tipo Inferido: String

2. Contando valores nulos por coluna (executando no arquivo inteiro):


  for col_name, dtype in lazy_df.schema.items():


shape: (1, 15)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ marketpla ┆ customer_ ┆ review_id ┆ product_i ┆ … ┆ verified_ ┆ review_he ┆ review_bo ┆ review_d │
│ ce        ┆ id        ┆ ---       ┆ d         ┆   ┆ purchase  ┆ adline    ┆ dy        ┆ ate      │
│ ---       ┆ ---       ┆ u32       ┆ ---       ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---      │
│ u32       ┆ u32       ┆           ┆ u32       ┆   ┆ u32       ┆ u32       ┆ u32       ┆ u32      │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 0         ┆ 0         ┆ 0         ┆ 0         ┆ … ┆ 0         ┆ 11        ┆ 71        ┆ 0        │
└───────────┴───────────┴───────────┴───────────┴───┴───────────┴───────────┴───────────┴──────────┘

Lembre-se: operações com `lazy_df` só são executadas ao chamar `.collect()` ou `.fetch(N)`.


In [2]:
import polars as pl
from pymilvus import connections, utility, Collection, DataType, FieldSchema, CollectionSchema
from sentence_transformers import SentenceTransformer
import os

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
DATA_DIR = "./Dados/"

# --- Configurações ---
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
COLLECTION_NAME = "reviews_data_collection"
ID_FIELD_NAME = "pk_id"
REVIEW_HEADLINE_FIELD = "review_headline"
REVIEW_BODY_FIELD = "review_body"
EMBEDDING_MODEL_NAME = 'all-MiniLM-L6-v2'
EMBEDDING_DIMENSION = 384
BATCH_SIZE = 128
MAX_LEN_REVIEW_TEXT = 65530 # Um pouco menos que 65535 para segurança
MAX_LEN_OTHER_VARCHAR = 1000 # Ajuste conforme necessário para suas outras colunas
TSV_FILEPATH = DATA_DIR + "amazon_reviews_multilingual_US_v1_00.tsv" # !!! SUBSTITUA PELO CAMINHO DO SEU ARQUIVO TSV !!!

def connect_to_milvus():
    """Conecta-se à instância Milvus."""
    print(f"Conectando ao Milvus em {MILVUS_HOST}:{MILVUS_PORT}...")
    try:
        connections.connect(alias="default", host=MILVUS_HOST, port=MILVUS_PORT)
        print("Conectado ao Milvus com sucesso!")
    except Exception as e:
        print(f"Falha ao conectar ao Milvus: {e}")
        raise

def create_collection_if_not_exists(all_tsv_columns):
    """
    Cria a coleção no Milvus se ela não existir.
    O esquema incluirá um campo ID, os campos de texto originais,
    seus campos de embedding correspondentes, e todos os outros campos do DataFrame.
    """
    if utility.has_collection(COLLECTION_NAME):
        print(f"Coleção '{COLLECTION_NAME}' já existe.")
        return Collection(COLLECTION_NAME)

    print(f"Criando coleção '{COLLECTION_NAME}'...")
    
    fields = [
        FieldSchema(name=ID_FIELD_NAME, dtype=DataType.INT64, is_primary=True, auto_id=True, description="Primary key"),
        FieldSchema(name=REVIEW_HEADLINE_FIELD, dtype=DataType.VARCHAR, max_length=MAX_LEN_REVIEW_TEXT, description="Original review headline text"),
        FieldSchema(name=f"{REVIEW_HEADLINE_FIELD}_embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIMENSION, description="Embedding of review headline"),
        FieldSchema(name=REVIEW_BODY_FIELD, dtype=DataType.VARCHAR, max_length=MAX_LEN_REVIEW_TEXT, description="Original review body text"),
        FieldSchema(name=f"{REVIEW_BODY_FIELD}_embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIMENSION, description="Embedding of review body")
    ]

    defined_field_names = {field.name for field in fields}
    for col_name in all_tsv_columns:
        if col_name not in defined_field_names:
            fields.append(FieldSchema(name=col_name, dtype=DataType.VARCHAR, max_length=MAX_LEN_OTHER_VARCHAR, description=f"Other data column: {col_name}"))

    schema = CollectionSchema(fields=fields, description="Collection for reviews data with embeddings for headline and body")
    collection = Collection(COLLECTION_NAME, schema=schema)
    print(f"Coleção '{COLLECTION_NAME}' criada com sucesso com o seguinte esquema:")
    for field in collection.schema.fields:
        print(f"  - {field.name}: {field.dtype}, Primary: {field.is_primary}, MaxLen: {getattr(field, 'params', {}).get('max_length', 'N/A')}")
    
    print("Criando índices para os campos de embedding...")
    index_params = {
        "metric_type": "L2", # Ou "IP" para produto interno
        "index_type": "IVF_FLAT",
        "params": {"nlist": 128},
    }
    collection.create_index(field_name=f"{REVIEW_HEADLINE_FIELD}_embedding", index_params=index_params)
    collection.create_index(field_name=f"{REVIEW_BODY_FIELD}_embedding", index_params=index_params)
    print("Índices criados.")
    
    return collection

def process_and_insert_data(collection, filepath, embedding_model):
    """Lê dados do arquivo, gera embeddings e insere no Milvus."""
    try:
        df = pl.read_csv(filepath, separator='\t', quote_char=None)
    except FileNotFoundError:
        print(f"Erro: Arquivo '{filepath}' não encontrado.")
        return False
    except Exception as e:
        print(f"Erro ao ler o arquivo TSV '{filepath}': {e}")
        return False

    print(f"Arquivo '{filepath}' carregado. Total de {df.height} linhas.")
    
    if REVIEW_HEADLINE_FIELD not in df.columns or REVIEW_BODY_FIELD not in df.columns:
        print(f"Erro: As colunas '{REVIEW_HEADLINE_FIELD}' ou '{REVIEW_BODY_FIELD}' (ou ambas) não foram encontradas no TSV.")
        print(f"Colunas disponíveis: {df.columns}")
        return False

    # Lidar com NaNs nas colunas de texto antes do embedding e garantir que são strings
    df = df.with_columns([
        pl.col(REVIEW_HEADLINE_FIELD).fill_null("").cast(pl.String),
        pl.col(REVIEW_BODY_FIELD).fill_null("").cast(pl.String)
    ])
    
    print(f"Processando e inserindo dados em lotes de {BATCH_SIZE}...")
    schema_field_names_for_insert = [field.name for field in collection.schema.fields if not field.is_primary]
    num_batches = (df.height - 1) // BATCH_SIZE + 1

    for i in range(num_batches):
        batch_df = df.slice(i * BATCH_SIZE, BATCH_SIZE)
        if batch_df.is_empty():
            continue

        print(f"Processando lote {i+1}/{num_batches} ({batch_df.height} linhas)...")

        headline_texts = batch_df[REVIEW_HEADLINE_FIELD].to_list()
        body_texts = batch_df[REVIEW_BODY_FIELD].to_list()

        headline_embeddings = embedding_model.encode(headline_texts, show_progress_bar=False)
        body_embeddings = embedding_model.encode(body_texts, show_progress_bar=False)

        data_to_insert = []
        for row_dict in batch_df.to_dicts(): # Iterar sobre as linhas do batch_df
            current_row_values = []
            
            for field_name in schema_field_names_for_insert:
                if field_name == f"{REVIEW_HEADLINE_FIELD}_embedding":
                    current_row_values.append(headline_embeddings[data_to_insert.__len__() % BATCH_SIZE]) # Usar índice relativo ao lote
                elif field_name == f"{REVIEW_BODY_FIELD}_embedding":
                    current_row_values.append(body_embeddings[data_to_insert.__len__() % BATCH_SIZE]) # Usar índice relativo ao lote
                elif field_name == REVIEW_HEADLINE_FIELD:
                    text_val = str(row_dict.get(field_name, ""))
                    current_row_values.append(text_val[:MAX_LEN_REVIEW_TEXT])
                elif field_name == REVIEW_BODY_FIELD:
                    text_val = str(row_dict.get(field_name, ""))
                    current_row_values.append(text_val[:MAX_LEN_REVIEW_TEXT])
                elif field_name in row_dict: # Outras colunas do TSV
                    value = row_dict[field_name]
                    # Polars usa None para missing, converter para string
                    str_value = str(value) if value is not None else ""
                    current_row_values.append(str_value[:MAX_LEN_OTHER_VARCHAR])
                else:
                    print(f"Aviso: Campo '{field_name}' no esquema mas não nos dados da linha. Usando string vazia.")
                    current_row_values.append("")
            
            data_to_insert.append(current_row_values)

        try:
            if data_to_insert: # Certifique-se de que há dados para inserir
                # Transpor os dados do formato de lista de linhas para lista de colunas
                # Milvus espera uma lista de colunas, onde cada coluna é uma lista de valores para esse campo.
                # data_to_insert é List[List[Any]] (BATCH_SIZE linhas, 17 colunas)
                # columnar_data será List[List[Any]] (17 colunas, BATCH_SIZE linhas)
                columnar_data = [list(col) for col in zip(*data_to_insert)]
                collection.insert(columnar_data)
            else:
                print(f"Lote {i+1} não continha dados para inserir após o processamento.")
        except Exception as e:
            print(f"Erro ao inserir o lote {i+1}: {e}")
             # Para depuração, você pode querer inspecionar as dimensões dos dados:
            if 'columnar_data' in locals() and columnar_data:
                print(f"Debug: columnar_data tem {len(columnar_data)} colunas. A primeira coluna tem {len(columnar_data[0]) if columnar_data else 0} linhas.")
            elif data_to_insert:
                print(f"Debug: data_to_insert (antes da transposição) tem {len(data_to_insert)} linhas. A primeira linha tem {len(data_to_insert[0]) if data_to_insert else 0} campos.")
            return False # Retornar False em caso de falha na inserção

    collection.flush()
    print(f"Todos os dados processados. Total de entidades na coleção '{COLLECTION_NAME}': {collection.num_entities}")
    return True

def main():
    print(f"Carregando modelo de embedding '{EMBEDDING_MODEL_NAME}'...")
    embedding_model = SentenceTransformer(EMBEDDING_MODEL_NAME)
    print("Modelo de embedding carregado.")

    try:
        connect_to_milvus()

        try:
            temp_df = pl.read_csv(TSV_FILEPATH, separator='\t', n_rows=1, quote_char=None) 
            all_tsv_columns = temp_df.columns
        except FileNotFoundError:
            print(f"Erro: Arquivo TSV '{TSV_FILEPATH}' não encontrado. Verifique o caminho.")
            return
        except Exception as e:
            print(f"Não foi possível ler as colunas do arquivo TSV '{TSV_FILEPATH}': {e}")
            return

        collection = create_collection_if_not_exists(all_tsv_columns)
        
        print(f"Carregando coleção '{COLLECTION_NAME}' na memória...")
        collection.load()
        print("Coleção carregada.")
        
        success = process_and_insert_data(collection, TSV_FILEPATH, embedding_model)

        if success:
            print(f"Dados inseridos com sucesso. Deletando arquivo original: {TSV_FILEPATH}")
            try:
                os.remove(TSV_FILEPATH)
                print(f"Arquivo '{TSV_FILEPATH}' deletado com sucesso.")
            except OSError as e:
                print(f"Erro ao deletar o arquivo '{TSV_FILEPATH}': {e}")
        else:
            print(f"Processo de inserção de dados do arquivo '{TSV_FILEPATH}' falhou. O arquivo não será deletado.")

    except Exception as e:
        print(f"Ocorreu um erro geral no processo: {e}")
    finally:
        # connections.disconnect("default") # Descomente se desejar desconectar explicitamente
        print("Processo finalizado.")

if __name__ == "__main__":
    main()


Carregando modelo de embedding 'all-MiniLM-L6-v2'...
Modelo de embedding carregado.
Conectando ao Milvus em localhost:19530...
Conectado ao Milvus com sucesso!
Coleção 'reviews_data_collection' já existe.
Carregando coleção 'reviews_data_collection' na memória...
Coleção carregada.
Arquivo './Dados/amazon_reviews_multilingual_US_v1_00.tsv' carregado. Total de 6931166 linhas.
Processando e inserindo dados em lotes de 128...
Processando lote 1/54150 (128 linhas)...
Processando lote 2/54150 (128 linhas)...
Processando lote 3/54150 (128 linhas)...
Processando lote 4/54150 (128 linhas)...
Processando lote 5/54150 (128 linhas)...
Processando lote 6/54150 (128 linhas)...
Processando lote 7/54150 (128 linhas)...
Processando lote 8/54150 (128 linhas)...
Processando lote 9/54150 (128 linhas)...
Processando lote 10/54150 (128 linhas)...
Processando lote 11/54150 (128 linhas)...
Processando lote 12/54150 (128 linhas)...
Processando lote 13/54150 (128 linhas)...
Processando lote 14/54150 (128 linha

KeyboardInterrupt: 

In [None]:
import polars as pl
from pymilvus import connections, utility, Collection, DataType, FieldSchema, CollectionSchema
from sentence_transformers import SentenceTransformer
import os

DATA_DIR = "./Dados/"

# --- Configurações ---
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
COLLECTION_NAME = "reviews_data_collection"
ID_FIELD_NAME = "pk_id" # Este será nosso ID primário gerenciado manualmente
REVIEW_HEADLINE_FIELD = "review_headline"
REVIEW_BODY_FIELD = "review_body"
EMBEDDING_MODEL_NAME = 'all-MiniLM-L6-v2'
EMBEDDING_DIMENSION = 384
BATCH_SIZE = 128
MAX_LEN_REVIEW_TEXT = 65530
MAX_LEN_OTHER_VARCHAR = 1000
TSV_FILEPATH = DATA_DIR + "amazon_reviews_multilingual_US_v1_00.tsv"
PROGRESS_FILE = "milvus_ingestion_progress.txt"

def connect_to_milvus():
    """Conecta-se à instância Milvus."""
    print(f"Conectando ao Milvus em {MILVUS_HOST}:{MILVUS_PORT}...")
    try:
        connections.connect(alias="default", host=MILVUS_HOST, port=MILVUS_PORT)
        print("Conectado ao Milvus com sucesso!")
    except Exception as e:
        print(f"Falha ao conectar ao Milvus: {e}")
        raise

def create_collection_if_not_exists(all_tsv_columns):
    """
    Cria a coleção no Milvus se ela não existir.
    O ID primário será gerenciado manualmente.
    """
    if utility.has_collection(COLLECTION_NAME):
        print(f"Coleção '{COLLECTION_NAME}' já existe.")
        collection = Collection(COLLECTION_NAME)
        # Verificar se o esquema existente corresponde ao esperado (especialmente auto_id para o PK)
        pk_field_schema = next((f for f in collection.schema.fields if f.name == ID_FIELD_NAME), None)
        if pk_field_schema and pk_field_schema.auto_id:
            print(f"AVISO: A coleção '{COLLECTION_NAME}' existente tem auto_id=True para o campo '{ID_FIELD_NAME}'.")
            print("Para usar IDs gerenciados manualmente, a coleção pode precisar ser recriada com auto_id=False.")
            # Poderia adicionar lógica para dropar e recriar, mas é mais seguro alertar.
        return collection

    print(f"Criando coleção '{COLLECTION_NAME}'...")
    
    fields = [
        FieldSchema(name=ID_FIELD_NAME, dtype=DataType.INT64, is_primary=True, auto_id=False, description="Primary key (managed manually)"),
        FieldSchema(name=REVIEW_HEADLINE_FIELD, dtype=DataType.VARCHAR, max_length=MAX_LEN_REVIEW_TEXT, description="Original review headline text"),
        FieldSchema(name=f"{REVIEW_HEADLINE_FIELD}_embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIMENSION, description="Embedding of review headline"),
        FieldSchema(name=REVIEW_BODY_FIELD, dtype=DataType.VARCHAR, max_length=MAX_LEN_REVIEW_TEXT, description="Original review body text"),
        FieldSchema(name=f"{REVIEW_BODY_FIELD}_embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIMENSION, description="Embedding of review body")
    ]

    defined_field_names = {field.name for field in fields}
    for col_name in all_tsv_columns:
        if col_name not in defined_field_names:
            fields.append(FieldSchema(name=col_name, dtype=DataType.VARCHAR, max_length=MAX_LEN_OTHER_VARCHAR, description=f"Other data column: {col_name}"))

    schema = CollectionSchema(fields=fields, description="Collection for reviews data with manually managed PK and embeddings")
    collection = Collection(COLLECTION_NAME, schema=schema)
    print(f"Coleção '{COLLECTION_NAME}' criada com sucesso com o seguinte esquema:")
    for field in collection.schema.fields:
        print(f"  - {field.name}: {field.dtype}, Primary: {field.is_primary}, AutoID: {field.auto_id if hasattr(field, 'auto_id') else 'N/A'}, MaxLen: {getattr(field, 'params', {}).get('max_length', 'N/A')}")
    
    print("Criando índices para os campos de embedding...")
    index_params = {
        "metric_type": "L2",
        "index_type": "IVF_FLAT",
        "params": {"nlist": 128},
    }
    collection.create_index(field_name=f"{REVIEW_HEADLINE_FIELD}_embedding", index_params=index_params)
    collection.create_index(field_name=f"{REVIEW_BODY_FIELD}_embedding", index_params=index_params)
    print("Índices criados.")
    
    return collection

def process_and_insert_data(collection, filepath, embedding_model):
    """Lê dados, gera IDs e embeddings, e insere no Milvus."""
    try:
        df_original = pl.read_csv(filepath, separator='\t', quote_char=None)
    except FileNotFoundError:
        print(f"Erro: Arquivo '{filepath}' não encontrado.")
        return False
    except Exception as e:
        print(f"Erro ao ler o arquivo TSV '{filepath}': {e}")
        return False
    
    print(f"Arquivo '{filepath}' carregado. Total de {df_original.height} linhas no arquivo original.")

    start_row_offset = 0
    try:
        with open(PROGRESS_FILE, "r") as f:
            content = f.read().strip()
            if content:
                start_row_offset = int(content)
                print(f"Arquivo de progresso encontrado. Retomando a partir da linha offset: {start_row_offset}")
    except FileNotFoundError:
        print("Nenhum arquivo de progresso encontrado. Iniciando do começo.")
    except ValueError:
        print(f"Arquivo de progresso '{PROGRESS_FILE}' corrompido. Iniciando do começo.")
        start_row_offset = 0

    if start_row_offset >= df_original.height:
        print("Todos os dados já foram processados anteriormente conforme arquivo de progresso.")
        if os.path.exists(PROGRESS_FILE): os.remove(PROGRESS_FILE)
        return True
        
    df_to_process = df_original.slice(start_row_offset, None)
    
    if REVIEW_HEADLINE_FIELD not in df_to_process.columns or REVIEW_BODY_FIELD not in df_to_process.columns:
        print(f"Erro: As colunas '{REVIEW_HEADLINE_FIELD}' ou '{REVIEW_BODY_FIELD}' não foram encontradas.")
        print(f"Colunas disponíveis: {df_to_process.columns}")
        return False

    df_to_process = df_to_process.with_columns([
        pl.col(REVIEW_HEADLINE_FIELD).fill_null("").cast(pl.String),
        pl.col(REVIEW_BODY_FIELD).fill_null("").cast(pl.String)
    ])
    
    if df_to_process.is_empty():
        print("Nenhum dado restante para processar (após considerar o progresso).")
        if start_row_offset >= df_original.height and os.path.exists(PROGRESS_FILE):
             os.remove(PROGRESS_FILE)
        return True
        
    print(f"Processando {df_to_process.height} linhas restantes em lotes de {BATCH_SIZE}...")
    # Campos para inserir, EXCLUINDO o primário (que será adicionado primeiro)
    schema_field_names_for_insert_non_pk = [field.name for field in collection.schema.fields if not field.is_primary]
    num_batches = (df_to_process.height - 1) // BATCH_SIZE + 1

    for i in range(num_batches):
        batch_df = df_to_process.slice(i * BATCH_SIZE, BATCH_SIZE)
        if batch_df.is_empty():
            continue

        current_batch_global_start_line = start_row_offset + (i * BATCH_SIZE)
        print(f"Processando lote {i+1}/{num_batches} (globalmente a partir da linha {current_batch_global_start_line}, {batch_df.height} linhas neste lote)...")
        
        headline_texts = batch_df[REVIEW_HEADLINE_FIELD].to_list()
        body_texts = batch_df[REVIEW_BODY_FIELD].to_list()

        headline_embeddings = embedding_model.encode(headline_texts, show_progress_bar=False)
        body_embeddings = embedding_model.encode(body_texts, show_progress_bar=False)
        
        ids_for_batch = []
        data_for_non_pk_fields_rows = [] # Lista de linhas, cada linha é uma lista de valores para campos não-PK
        
        for idx_in_batch, row_dict in enumerate(batch_df.to_dicts()):
            # Gerar ID primário: offset global da linha no arquivo original
            pk_value = current_batch_global_start_line + idx_in_batch
            ids_for_batch.append(pk_value)

            current_row_non_pk_values = []
            for field_name in schema_field_names_for_insert_non_pk:
                if field_name == f"{REVIEW_HEADLINE_FIELD}_embedding":
                    current_row_non_pk_values.append(headline_embeddings[idx_in_batch])
                elif field_name == f"{REVIEW_BODY_FIELD}_embedding":
                    current_row_non_pk_values.append(body_embeddings[idx_in_batch])
                elif field_name == REVIEW_HEADLINE_FIELD:
                    text_val = str(row_dict.get(field_name, ""))
                    current_row_non_pk_values.append(text_val[:MAX_LEN_REVIEW_TEXT])
                elif field_name == REVIEW_BODY_FIELD:
                    text_val = str(row_dict.get(field_name, ""))
                    current_row_non_pk_values.append(text_val[:MAX_LEN_REVIEW_TEXT])
                elif field_name in row_dict:
                    value = row_dict[field_name]
                    str_value = str(value) if value is not None else ""
                    current_row_non_pk_values.append(str_value[:MAX_LEN_OTHER_VARCHAR])
                else:
                    # print(f"Aviso: Campo '{field_name}' no esquema mas não nos dados da linha. Usando string vazia.")
                    current_row_non_pk_values.append("")
            data_for_non_pk_fields_rows.append(current_row_non_pk_values)

        try:
            if ids_for_batch: # Se há IDs, deve haver dados
                # Transpor os dados dos campos não primários para formato colunar
                columnar_data_non_pk = [list(col) for col in zip(*data_for_non_pk_fields_rows)]
                
                # Montar os dados finais para inserção: [pk_column_data, non_pk_col1_data, ...]
                # A ordem deve corresponder ao esquema da coleção.
                final_columnar_data_to_insert = [ids_for_batch] + columnar_data_non_pk
                
                collection.insert(final_columnar_data_to_insert)
                collection.flush()

                processed_rows_in_this_batch_run = (i * BATCH_SIZE) + batch_df.height
                next_global_start_row = start_row_offset + processed_rows_in_this_batch_run
                try:
                    with open(PROGRESS_FILE, "w") as f:
                        f.write(str(next_global_start_row))
                except IOError as e_progress:
                    print(f"Aviso: Não foi possível salvar o progresso após o lote {i+1}: {e_progress}")
            else:
                print(f"Lote {i+1} não continha dados para inserir após o processamento.")
        except Exception as e:
            print(f"Erro ao inserir o lote {i+1} (índice de lote na execução atual): {e}")
            # Se a exceção for de ID duplicado (ex: pymilvus.exceptions.MilvusException e "primary key ... already exist" na msg)
            # o script irá parar aqui, e o PROGRESS_FILE não será atualizado.
            # Isso significa que na próxima execução, ele tentará este mesmo lote novamente.
            return False

    print(f"Todos os dados processados. Total de entidades na coleção '{COLLECTION_NAME}': {collection.num_entities}")
    if os.path.exists(PROGRESS_FILE):
        try:
            os.remove(PROGRESS_FILE)
            print(f"Processamento concluído. Arquivo de progresso '{PROGRESS_FILE}' removido.")
        except OSError as e_rm_progress:
            print(f"Aviso: Não foi possível remover o arquivo de progresso '{PROGRESS_FILE}': {e_rm_progress}")
    return True

def main():
    print(f"Carregando modelo de embedding '{EMBEDDING_MODEL_NAME}'...")
    embedding_model = SentenceTransformer(EMBEDDING_MODEL_NAME)
    print("Modelo de embedding carregado.")

    try:
        connect_to_milvus()

        try:
            temp_df = pl.read_csv(TSV_FILEPATH, separator='\t', n_rows=1, quote_char=None) 
            all_tsv_columns = temp_df.columns
        except FileNotFoundError:
            print(f"Erro: Arquivo TSV '{TSV_FILEPATH}' não encontrado. Verifique o caminho.")
            return
        except Exception as e:
            print(f"Não foi possível ler as colunas do arquivo TSV '{TSV_FILEPATH}': {e}")
            return

        collection = create_collection_if_not_exists(all_tsv_columns)
        
        print(f"Carregando coleção '{COLLECTION_NAME}' na memória...")
        collection.load()
        print("Coleção carregada.")
        
        success = process_and_insert_data(collection, TSV_FILEPATH, embedding_model)

        if success:
            print(f"Dados inseridos com sucesso. Deletando arquivo original: {TSV_FILEPATH}")
            try:
                os.remove(TSV_FILEPATH)
                print(f"Arquivo '{TSV_FILEPATH}' deletado com sucesso.")
            except OSError as e:
                print(f"Erro ao deletar o arquivo '{TSV_FILEPATH}': {e}")
        else:
            print(f"Processo de inserção de dados do arquivo '{TSV_FILEPATH}' falhou. O arquivo não será deletado.")

    except Exception as e:
        print(f"Ocorreu um erro geral no processo: {e}")
    finally:
        # connections.disconnect("default")
        print("Processo finalizado.")

if __name__ == "__main__":
    main()
