In [None]:
import pandas as pd
import numpy as np # type: ignore
import time
import os
import concurrent.futures
from tqdm import tqdm # type: ignore
from openai import OpenAI # type: ignore

client = OpenAI(api_key="")

input_csv = "/content/drive/MyDrive/Doukt/Goftino/NLP/colab/embedd/preproces.csv"
output_csv = "/content/drive/MyDrive/Doukt/Goftino/NLP/colab/embedd/embeddings.csv"
output_parquet = "/content/drive/MyDrive/Doukt/Goftino/NLP/colab/embedd/embeddings.parquet"

# Set chunk size for better performance
chunksize = 1000
first_chunk = True
chunk_counter = 0
total_processed = 0

# Number of concurrent requests
max_workers = 5

# Remove output files if they already exist
if os.path.exists(output_csv):
    os.remove(output_csv)

print(f"Starting to process file {input_csv} with chunk size {chunksize} and {max_workers} concurrent requests...")

def get_embedding(text_data, max_retries=3, base_delay=1):
    text, idx, original_idx = text_data
    retries = 0

    while retries < max_retries:
        try:
            if pd.isna(text) or text.strip() == "":
                return None, idx, original_idx

            response = client.embeddings.create(
                model="text-embedding-3-large",
                input=text
            )

            return response.data[0].embedding, idx, original_idx

        except Exception as e:
            retries += 1
            error_msg = str(e).lower()

            if "rate_limit" in error_msg:
                delay = 30
            else:
                delay = base_delay * (2 ** (retries - 1))

            if retries < max_retries:
                print(f"Error at index {original_idx} (attempt {retries} of {max_retries}): {e}")
                print(f"Retrying after {delay} seconds...")
                time.sleep(delay)
            else:
                print(f"Final error at index {original_idx} after {max_retries} attempts: {e}")
                return None, idx, original_idx

    return None, idx, original_idx

# Keep track of the total number of rows processed across all chunks
base_index = 0

for chunk in pd.read_csv(input_csv, chunksize=chunksize):
    chunk_counter += 1
    if "preprocessed" not in chunk.columns:
        print("'preprocessed' column not found.")
        exit(1)

    # Prepare data for parallel processing with original index
    # For each row in the chunk, store: (preprocessed_text, position_in_chunk, original_position_in_csv)
    texts_with_indices = []
    for i, row in chunk.iterrows():
        # Calculate original index in the whole CSV
        original_idx = base_index + (i - chunk.index[0])
        texts_with_indices.append((row["preprocessed"], i, original_idx))

    results = [None] * len(texts_with_indices)
    original_indices_for_results = [None] * len(texts_with_indices)

    print(f"Processing chunk {chunk_counter} with {len(texts_with_indices)} records...")

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_idx = {executor.submit(get_embedding, item): i for i, item in enumerate(texts_with_indices)}

        for future in tqdm(concurrent.futures.as_completed(future_to_idx), total=len(texts_with_indices), desc=f"Processing chunk {chunk_counter}"):
            idx = future_to_idx[future]
            try:
                embedding, _, original_idx = future.result()
                if embedding is not None:
                    results[idx] = embedding
                    original_indices_for_results[idx] = original_idx
            except Exception as e:
                print(f"Error while processing: {e}")

    # Remove None results while keeping original indices
    valid_results = [(emb, idx) for emb, idx in zip(results, original_indices_for_results) if emb is not None]
    if valid_results:
        embeddings, indices = zip(*valid_results)

        # Convert list of embeddings to DataFrame with numeric columns
        arr = np.array(embeddings, dtype=np.float32)
        df_emb = pd.DataFrame(arr, columns=[f"dim_{i}" for i in range(arr.shape[1])])

        # Add original index column
        df_emb.insert(0, 'original_index', indices)

        # Save to CSV file (append mode)
        if first_chunk:
            df_emb.to_csv(output_csv, index=False, mode='w')
            first_chunk = False
        else:
            df_emb.to_csv(output_csv, index=False, mode='a', header=False)

        total_processed += len(embeddings)
        print(f"✅ Chunk {chunk_counter} with {len(embeddings)} records processed and appended to CSV")

        # Free memory
        del arr, df_emb, embeddings, indices, valid_results

    # Update the base_index for the next chunk
    # We add the length of the current chunk to our running total
    base_index += len(chunk)

print(f"\nProcessing of {total_processed} records in {chunk_counter} chunks completed")
print(f"CSV file saved at {output_csv}")

# Convert CSV file to Parquet
print("\nConverting CSV to Parquet...")

# Read CSV in small chunks and convert to Parquet
csv_chunksize = 5000
first_parquet_chunk = True

for csv_chunk in tqdm(pd.read_csv(output_csv, chunksize=csv_chunksize), desc="Converting to Parquet"):
    if first_parquet_chunk:
        csv_chunk.to_parquet(output_parquet, compression="snappy", index=False)
        first_parquet_chunk = False
    else:
        # For simplicity and reliability, we'll use the append method with a temporary file
        temp_parquet = output_parquet + ".temp"

        # Read the existing parquet
        existing_df = pd.read_parquet(output_parquet)

        # Combine with new chunk
        combined_df = pd.concat([existing_df, csv_chunk], ignore_index=True)

        # Save to temp file
        combined_df.to_parquet(temp_parquet, compression="snappy", index=False)

        # Replace original with temp
        os.replace(temp_parquet, output_parquet)

        # Clean up
        del existing_df, combined_df

print(f"✅ Successfully converted to Parquet. Final file saved at {output_parquet}")
print(f"\n✅ All done! {total_processed} records processed. Both CSV and Parquet files saved:")
print(f"- CSV file: {output_csv}")
print(f"- Parquet file: {output_parquet}")
