In [0]:
%pip install openai
%pip install unidecode
%restart_python
%load_ext autoreload
%autoreload 2 

In [0]:
import pandas as pd
import json
import random
import openai
from openai import OpenAI
from pathlib import Path
from unidecode import unidecode

from pyspark.sql.functions import *

In [0]:
from translation import *
from config import *

THIS IS THE EXTENDED VERSION TO INCLUDE THE ADDITIONAL LANGUAGES - WILL BE MERGED LATER FOR ALL LANGUAGES IN SCOPE

In [0]:
%run "./authentication script"

In [0]:
df = spark.table(INPUT_TABLE).toPandas()

In [0]:
output_path = './batch_files/batch_v0_ext.jsonl'

# convert to jsonl format
convert_df_to_jsonl(df, TARGET_LANGUAGES_EXT, output_path)

# Upload your input file
upload = client.files.create(
    file=open(output_path, "rb"),
    purpose="batch"
)

input_file_id = upload.id
print(f"Uploaded input file: {input_file_id}")

In [0]:
# Create the batch job
batch = client.batches.create(
    input_file_id=input_file_id,
    endpoint="/v1/chat/completions",
    completion_window="24h", 
    metadata={
        "description": "aso_translation_ext_may_2025"
    }
)
print(f"Created batch job: {batch.id}")
batch_id = batch.id

In [0]:
# Check status
status = client.batches.retrieve(batch_id)
print(f"Batch status: {status.status}")
print(f"Batch id: {batch_id}")

#### Proceed once status == 'completed'

In [0]:

result_path = './results/batch_v0_ext_results.json'

result_file_id = status.output_file_id
file_response = client.files.content(result_file_id)
result = file_response.content

# Write results to output json file
with open(result_path, "wb") as f:
    f.write(result)

In [0]:
# Parse the json file and read each line 
results = []
with open(result_path, 'r') as file:
    for line in file:
        # Parsing the JSON string into a dict and appending to the list of results
        json_object = json.loads(line.strip())
        results.append(json_object)

In [0]:
#Post process the results to get into decent format 
json_res_parsed = []
for res in results:
    lang_full, _, row_num= res['custom_id'].split('_')
    response_status = res['response']['status_code']
    result = res['response']['body']['choices'][0]['message']['content']
    
    # Parse results from the response into a dict
    res_dict = {'lang': LANG_MAP_EXT[lang_full],'result':result,'row_num':row_num}
    json_res_parsed.append(res_dict)

In [0]:
# This is the results in long format, by [lang,result,row_num]
long_parsed_results = spark.createDataFrame(json_res_parsed)

In [0]:
# Parse long df and create a df for each language; put into a holder which will then be joined by row to create wide format
df_holder = []

for lang_full, lang in LANG_MAP_EXT.items():
    lang_df = long_parsed_results.filter(long_parsed_results.lang == lang).select(col('row_num').cast('int'),col('result').alias(lang)).orderBy('row_num')

    df_holder.append(lang_df)

In [0]:
len(df_holder)

In [0]:
# Join all results together for clear output keyed on language abbrev
# annoying join, fix later
df_merged = df_holder[0].join(df_holder[1],on='row_num').join(df_holder[2],on='row_num').join(df_holder[3],on='row_num')#.join(df_holder[4],on='row_num').join(df_holder[5],on='row_num')


In [0]:
#df_merged.write.saveAsTable('ds.aso_may_2025_results_merged')

In [0]:
# create a new df based on copy of the inputs and reset index
df_in_out_together = df.copy()
df_in_out_together['row_num'] = df_in_out_together.index

# merge with the results
all_together = spark.createDataFrame(df_in_out_together).join(df_merged,on='row_num')

all_together.write.saveAsTable('ds.aso_may_2025_results_in_out_together_extended')

In [0]:
all_together.orderBy('row_num').display()