# Translation (FA to EN)

In [None]:
f = open('/content/seqfa.txt', 'r')
fl = f.readlines()
f.close()

In [None]:
!pip install googletrans==3.1.0a0

In [None]:
import googletrans
from googletrans import Translator
translator = Translator()

In [None]:
res = []
for i in range(len(fl)):
  my_text = fl[i]
  my_translation = translator.translate(my_text, src='fa', dest='en')
  res.append(my_translation.text)
  print(i, my_translation.text)

In [None]:
f_w = open('/content/seqen.txt', 'a')
for i in range(len(res)):
  f_w.write(res[i]+'\n')
f_w.close()

# Retrieve top-*k* Examples

In [None]:
import json
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import torch
import numpy as np

In [None]:
# === Setup ===
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = SentenceTransformer("BAAI/bge-large-en", device=device)

# === Load data ===
with open("/content/train-en.txt", "r", encoding="utf-8") as f:
    train_samples = [line.strip() for line in f if line.strip()]

with open("/content/seqen.txt", "r", encoding="utf-8") as f:
    test_samples = [line.strip() for line in f if line.strip()]

# Lowercased for filtering exact matches
train_lc = [s.lower() for s in train_samples]
test_lc = [s.lower() for s in test_samples]

# === Prepare dense ===
def prep_dense(text):
    return f"Represent this sentence for retrieval: {text}"

train_dense_inputs = [prep_dense(t) for t in train_samples]
test_dense_inputs = [prep_dense(t) for t in test_samples]

# === Encode train set once
train_dense = model.encode(train_dense_inputs, normalize_embeddings=True, convert_to_tensor=True, device=device)

# === Fit sparse
vectorizer = TfidfVectorizer()
tfidf_train = vectorizer.fit_transform(train_samples)

# === Parameters ===
top_k = 5
alpha = 0.7  # Dense-TFIDF mix
output_path = "retrieved_alpha0.7_unique.jsonl"

with open(output_path, "w", encoding="utf-8") as fout:
    for idx, (test_raw, test_lc_text, dense_input) in tqdm(
        enumerate(zip(test_samples, test_lc, test_dense_inputs)), total=len(test_samples)
    ):
        # === Dense
        test_dense = model.encode(dense_input, normalize_embeddings=True, convert_to_tensor=True, device=device)
        dense_scores = torch.nn.functional.cosine_similarity(test_dense, train_dense).cpu().numpy()

        # === Sparse
        tfidf_test = vectorizer.transform([test_raw])
        sparse_scores = cosine_similarity(tfidf_test, tfidf_train)[0]

        # === Hybrid score
        hybrid_scores = alpha * dense_scores + (1 - alpha) * sparse_scores

        # === Filter out exact matches
        mask = np.array([train_lc[i] != test_lc_text for i in range(len(train_lc))])
        filtered_scores = np.where(mask, hybrid_scores, -np.inf)

        # === Sort indices by score
        sorted_indices = filtered_scores.argsort()[::-1]

        # === Select top-k with UNIQUE text
        seen_texts = set()
        top_indices = []
        for i in sorted_indices:
            txt = train_samples[i]
            if txt not in seen_texts:
                seen_texts.add(txt)
                top_indices.append(i)
            if len(top_indices) == top_k:
                break

        retrieved_texts = [train_samples[i] for i in top_indices]

        # === Save
        fout.write(json.dumps({
            "test_id": idx,
            "test_en": test_raw,
            "retrieved_indices": [int(i) for i in top_indices],
            "retrieved_examples": retrieved_texts
        }, ensure_ascii=False) + "\n")

print(f"✅ Done. Retrieved top-{top_k} unique examples per test sample → {output_path}")

# Translate Back To Persian (EN to FA)

In [None]:
import googletrans
from googletrans import Translator
translator = Translator()

In [None]:
import json
from tqdm import tqdm
import googletrans
from googletrans import Translator
translator = Translator()

# === Load English Examples
with open("/content/train-en.txt", encoding="utf-8") as f:
    english_examples = [line.strip() for line in f]

# === Input and Output JSONL Files
input_jsonl = "/content/retrieved_alpha0.7_unique.jsonl"
output_jsonl = "translated_examples.jsonl"

resume_from_id = 0  # Replace this with actual test_id where it stopped
resume = False  # Will switch to True once we find the starting point
started = False

with open(input_jsonl, encoding="utf-8") as fin, open(output_jsonl, "w", encoding="utf-8") as fout:
    for line in tqdm(fin): #, total=500 # or use sum(1 for _ in open(input_jsonl)) for exact count
        data = json.loads(line)
        ###
        test_id = data["test_id"]
        if not resume:
            if test_id == resume_from_id:
                resume = True  # Found where to resume
            else:
                continue  # Skip this line

        if not started:
            print(f"Resuming from test_id: {test_id}")
            started = True
        ###
        indices = data["retrieved_indices"]

        # Get corresponding English examples
        examples_to_translate = [english_examples[i] for i in indices]

        # # Translate all at once (batch) for speed
        # translations = translator(examples_to_translate, max_length=128)
        # translated_texts = [t["translation_text"] for t in translations]

        res = []
        for i in range(len(examples_to_translate)):
          my_text = examples_to_translate[i]
          my_translation = translator.translate(my_text, src='en', dest='fa')
          res.append(my_translation.text)
          print(i, my_translation.text)

        # Save new JSONL line
        fout.write(json.dumps({
            "test_id": data["test_id"],
            "translations": res
        }, ensure_ascii=False) + "\n")


# Tokenization (Hazm)

In [None]:
!pip install hazm

In [None]:
from hazm import word_tokenize
from hazm import Normalizer

In [None]:
import json
from tqdm import tqdm

In [None]:
normalizer = Normalizer()

In [None]:
# === Input and Output JSONL Files
input_jsonl = "/content/translated_examples.jsonl"
output_jsonl = "/content/tokenized_examples.jsonl"

# === Function to tokenize Persian text using Hazm
def tokenize_persian(text):
    text = normalizer.normalize(text)
    tokens = word_tokenize(text)
    return tokens

# === Read the JSONL file, process it, and save tokenized output
with open(input_jsonl, encoding="utf-8") as fin, open(output_jsonl, "w", encoding="utf-8") as fout:
    for line in tqdm(fin, total=500):  # Adjust based on the number of lines in your input file
        data = json.loads(line)

        # Get the Persian text you need to tokenize (assuming it's in the 'translations' field)
        persian_text = data["translations"]
        print(persian_text)
        # Tokenize the Persian text
        tokenized_text = [tokenize_persian(text) for text in persian_text]
        print(tokenized_text)

        # Save the tokenized data back into JSONL format
        fout.write(json.dumps({
            "test_id": data["test_id"],
            "tokenized_translations": tokenized_text
        }, ensure_ascii=False) + "\n")


# Alignment (SimAlign)

In [None]:
!pip install simalign

In [None]:
import json
from simalign import SentenceAligner

In [None]:
# === Load English labels (one per line) ===
import ast
with open("/content/english-atis-tokens-train.txt", "r", encoding="utf-8") as f:
    english_tokens = [ast.literal_eval(line.strip()) for line in f]

In [None]:
# === Load Persian translations ===
persian_translations = {}
with open("/content/tokenized_examples.jsonl", "r", encoding="utf-8") as f:
    for line in f:
        data = json.loads(line)
        persian_translations[data["test_id"]] = data["tokenized_translations"]

In [None]:
# === Initialize SimAlign ===
from simalign import SentenceAligner
aligner = SentenceAligner(model="bert", token_type="bpe", matching_methods="mai")

In [None]:
# === Prepare output file ===
output_file = open("aligned_output.jsonl", "w", encoding="utf-8")

In [None]:
# === Process main file ===
with open("/content/retrieved_alpha0.3_unique.jsonl", "r", encoding="utf-8") as f:
    for line in f:
        sample = json.loads(line)
        test_id = sample["test_id"]
        retrieved_indices = sample["retrieved_indices"]

        sample_alignments = []  # list of alignments per retrieved index

        for i, idx in enumerate(retrieved_indices):
            try:
                en_sentence = english_tokens[idx]
                fa_tokens = persian_translations[test_id][i]
            except (KeyError, IndexError):
                print(f"Skipping index {idx} for test_id {test_id} due to missing data.")
                sample_alignments.append([])  # empty list for alignment
                continue

            alignment_result = aligner.get_word_aligns(en_sentence, fa_tokens)
            # myaligner.get_word_aligns(src_sentence, trg_sentence)
            aligned_pairs = alignment_result["itermax"]
            sample_alignments.append(aligned_pairs)

        # Write the sample output
        output_data = {
            "test_id": test_id,
            "retrieved_indices": retrieved_indices,
            "alignments": sample_alignments
        }
        print(output_data)
        output_file.write(json.dumps(output_data, ensure_ascii=False) + "\n")

output_file.close()

# Slot Labels

In [None]:
import json
import ast
from tqdm import tqdm

# Load English slot labels (one list per line)
with open("/content/labels-english-atis-train.txt", "r", encoding="utf-8") as f:
    english_slot_labels = [ast.literal_eval(line.strip()) for line in f]

# Load Persian translations (tokenized)
with open("/content/tokenized_examples.jsonl", "r", encoding="utf-8") as f:
    persian_data = [json.loads(line) for line in f]

# Load test file with retrieved indices
with open("/content/retrieved_alpha0.3_unique.jsonl", "r", encoding="utf-8") as f:
    test_data = [json.loads(line) for line in f]

# Load precomputed alignments
with open("/content/aligned_output.jsonl", "r", encoding="utf-8") as f:
    alignment_data = [json.loads(line) for line in f]

In [None]:
# Collect final output
all_mapped_labels = []

for test_id, test_sample in tqdm(enumerate(test_data), total=len(test_data)):
    retrieved_indices = test_sample["retrieved_indices"]
    persian_sentences = persian_data[test_id]["tokenized_translations"]
    test_alignments = alignment_data[test_id]["alignments"]  # list of alignments per retrieved sample

    test_labels = []

    for i, idx in enumerate(retrieved_indices):
        try:
            fa_tokens = persian_sentences[i]
            en_slot_labels = english_slot_labels[idx]
            alignment_pairs = test_alignments[i]
        except (IndexError, KeyError):
            print(f"Skipping test_id {test_id}, idx {idx} due to missing data")
            test_labels.append(["O"] * len(fa_tokens))
            continue

        # Initialize all Persian labels as "O"
        fa_labels = ["O"] * len(fa_tokens)

        for en_idx, fa_idx in alignment_pairs:
            if fa_idx < len(fa_labels) and en_idx < len(en_slot_labels):
                fa_labels[fa_idx] = en_slot_labels[en_idx]

        test_labels.append(fa_labels)

    all_mapped_labels.append(test_labels)

# Save output
with open("mapped_persian_slot_labels.jsonl", "w", encoding="utf-8") as f:
    for label_list in all_mapped_labels:
        json.dump(label_list, f, ensure_ascii=False)
        f.write("\n")

print("✅ Done. Output saved to mapped_persian_slot_labels.jsonl")


# Set API Key

In [None]:
#!pip install openai

In [None]:
import time
import openai
openai.api_key = 'your_API_Key'

#SF

## Prompt

In [None]:
import json

def build_prompt_for_sample(test_id, input_tokens, tokenized_dict, mapped_labels_dict, slot_names):
    k = 1
    examples = tokenized_dict[test_id][:k]
    labels = mapped_labels_dict[test_id][:k]

    assert len(examples) == len(labels), f"Mismatch in examples and labels for test_id {test_id}"

    lines = []
    # Header
    lines.append("You are a language model trained to perform slot filling.\n")
    lines.append(f"Here are the possible slots: {json.dumps(slot_names, ensure_ascii=False)}\n")
    lines.append("I will provide you with an utterance. Your task is to extract and return the slot values in a structured JSON format using BIO tags for slot filling.\n")
    lines.append("The output structure should be:")
    lines.append("   {{\n       \"slots\": [\n           {{\"token\": \"word1\", \"label\": \"B-slot_name\"}},\n           {{\"token\": \"word2\", \"label\": \"I-slot_name\"}},\n           ...\n       ]\n   }}\n")

    # Few-shot examples
    for ex_idx, (tok_list, label_list) in enumerate(zip(examples, labels), 1):
        lines.append(f"Here is one Example:\n")
        lines.append(f"    Utterance: {json.dumps(tok_list, ensure_ascii=False)}")
        lines.append("    Response:")
        lines.append("    {{\n        \"slots\": [")
        for tok, lbl in zip(tok_list, label_list):
            line = f"            {{{{\"token\": \"{tok}\", \"label\": \"{lbl}\"}}}},"
            lines.append(line)
        if len(tok_list) > 0:
            lines[-1] = lines[-1].rstrip(',')  # remove comma from last line
        lines.append("        ]\n    }}")

    # Final utterance
    lines.append("\nNow, process this utterance:\n")
    lines.append(f"Utterance: {json.dumps(input_tokens, ensure_ascii=False)}")

    return "\n".join(lines)


In [None]:
with open("/content/sampled_utterances.txt", "r", encoding="utf-8") as f:
    sampled_utterances = [ast.literal_eval(line.strip()) for line in f]


with open("/content/tokenized_examples.jsonl", "r", encoding="utf-8") as f:
    tokenized_dict = {item["test_id"]: item["tokenized_translations"] for item in map(json.loads, f)}


with open("/content/mapped_persian_slot_labels.jsonl", "r", encoding="utf-8") as f:
    mapped_labels_dict = {i: labels for i, labels in enumerate(map(json.loads, f))}

In [None]:
# Persian-ATIS
slot_names = ['cost_relative',
 'class_type',
 'toloc.state_code',
 'fromloc.city_name',
 'toloc.state_name',
 'return_date.today_relative',
 'fare_amount',
 'meal',
 'return_date.day_name',
 'restriction_code',
 'flight_time',
 'arrive_time.time',
 'depart_time.start_time',
 'fromloc.airport_name',
 'depart_date.day_number',
 'fare_basis_code',
 'depart_date.today_relative',
 'return_date.date_relative',
 'toloc.country_name',
 'depart_time.period_mod',
 'toloc.airport_name',
 'day_name',
 'arrive_time.period_of_day',
 'toloc.airport_code',
 'depart_time.period_of_day',
 'days_code',
 'return_time.period_of_day',
 'arrive_time.end_time',
 'time',
 'toloc.city_name',
 'arrive_time.time_relative',
 'state_name',
 'depart_date.day_name',
 'fromloc.state_code',
 'flight_number',
 'fromloc.state_name',
 'flight',
 'arrive_date.day_number',
 'depart_time.time',
 'airport_name',
 'stoploc.state_code',
 'depart_time.time_relative',
 'meal_description',
 'round_trip',
 'return_time.period_mod',
 'arrive_time.start_time',
 'state_code',
 'fromloc.airport_code',
 'return_date.month_name',
 'stoploc.city_name',
 'flight_stop',
 'airline_name',
 'O',
 'depart_time.end_time',
 'compartment',
 'period_of_day',
 'transport_type',
 'stoploc.airport_name',
 'today_relative',
 'mod',
 'booking_class',
 'arrive_date.date_relative',
 'airport_code',
 'economy',
 'meal_code',
 'depart_date.date_relative',
 'arrive_time.period_mod',
 'time_relative',
 'arrive_date.today_relative',
 'depart_date.year',
 'day_number',
 'flight_mod',
 'depart_date.month_name',
 'city_name',
 'return_date.day_number',
 'arrive_date.day_name',
 'airline_code',
 'connect',
 'or',
 'flight_days',
 'aircraft_code',
 'month_name',
 'arrive_date.month_name',
 'stoploc.airport_code']

In [None]:
output_file_path = "Output_SF.txt"

In [None]:
results = []
with open(output_file_path, 'w') as output_file:
  for test_id, input_tokens in enumerate(sampled_utterances):
      messages = [{"role": "system", "content": "You are a language model trained to perform slot filling."}]
      prompt = build_prompt_for_sample(test_id, input_tokens, tokenized_dict, mapped_labels_dict, slot_names)
      # print(f"Prompt for test_id {test_id}:\n")
      # print(prompt)
      # print("\n" + "="*80 + "\n")
      messages.append({"role": "user", "content": prompt})
      response = openai.chat.completions.create(
          model="gpt-4o",
          messages=messages
      )
      reply = response.choices[0].message.content
      # results.append({
      #     "test_id": test_id,
      #     "utterance": input_tokens,
      #     "response": reply
      # })
      print(f"ChatGPT: {reply}")
      print('-' * 60)

      # Write the response to the output file
      output_file.write(f"{reply}\n")
      time.sleep(2)

#ID

In [None]:
# Persian-ATIS
intent_labels = [
'flight_no',
'airfare+flight',
'capacity',
'airfare+flight_time',
'quantity',
'airfare',
'ground_service+ground_fare',
'city',
'flight_no+airline',
'flight',
'flight+airfare',
'airport',
'abbreviation',
'cheapest',
'aircraft+flight+flight_no',
'distance',
'restriction',
'meal',
'aircraft',
'flight_time',
'flight+airline',
'ground_fare',
'airline+flight_no',
'airline',
'ground_service',
'day_name'
]

In [None]:
with open("/content/sampled_utterances.txt", "r", encoding="utf-8") as f:
    sampled_utterances = [ast.literal_eval(line.strip()) for line in f]

with open("/content/tokenized_examples.jsonl", "r", encoding="utf-8") as f:
    tokenized_dict = {item["test_id"]: item["tokenized_translations"] for item in map(json.loads, f)}

In [None]:
retrieved_indices_list = []

with open("/content/retrieved_alpha0.7_unique.jsonl", "r", encoding="utf-8") as f:
    for line in f:
        data = json.loads(line)
        retrieved_indices_list.append(data["retrieved_indices"])

In [None]:
f = open('/content/intent-persian-atis-train.txt', 'r')
intent_list = f.readlines()

## Prompt

In [None]:
def build_intent_prompt(test_id, input_tokens, tokenized_dict, retrieved_indices_list, intent_list, intent_labels):
    k = 1
    examples = tokenized_dict[test_id][:k]
    labels = retrieved_indices_list[test_id][:k]
    assert len(examples) == len(labels), f"Mismatch in examples and labels for test_id {test_id}"

    lines = []
    lines.append("You are a language model trained to perform intent detection.\n")
    lines.append(f"Here are the possible intents: {json.dumps(intent_labels, ensure_ascii=False)}\n")
    lines.append("I will provide you with an utterance. Your task is to identify the intent of the input text by selecting the most relevant intent from the list.\n")

    # Few-shot examples
    for ex_idx, (tok_list, label_list) in enumerate(zip(examples, labels)):
        example_utterance = examples[ex_idx]
        example_intent = labels[ex_idx]
        lines.append(f"Example {ex_idx+1}:\n")
        lines.append(f"    Utterance: {json.dumps(tok_list, ensure_ascii=False)}")
        # lines.append(f"    Utterance: {example_utterance}")
        lines.append(f"    Intent: \"{intent_list[example_intent].strip()}\"\n")

    lines.append("Now, process this utterance (only write the intent):\n")
    lines.append(f"Utterance: {input_tokens}")

    return "\n".join(lines)

In [None]:
output_file_path = "Output_ID.txt"

In [None]:
results = []
with open(output_file_path, 'w') as output_file:
  for test_id, input_tokens in enumerate(sampled_utterances):
      messages = [{"role": "system", "content": "You are a language model trained to perform slot filling."}]
      prompt = build_intent_prompt(test_id, input_tokens, tokenized_dict, retrieved_indices_list, intent_list, intent_labels)
      # print(f"Prompt for test_id {test_id}:\n")
      # print(prompt)
      # print("\n" + "="*80 + "\n")
      messages.append({"role": "user", "content": prompt})
      response = openai.chat.completions.create(
          model="gpt-4o",
          messages=messages
      )
      reply = response.choices[0].message.content
      # results.append({
      #     "test_id": test_id,
      #     "utterance": input_tokens,
      #     "response": reply
      # })
      print(f"ChatGPT: {reply}")
      print('-' * 60)

      # Write the response to the output file
      output_file.write(f"{reply}\n")
      time.sleep(2)

# Metrics

ID

In [None]:
from sklearn.metrics import precision_recall_fscore_support, accuracy_score, classification_report

acc = accuracy_score(gold_intent, test_intent)

averages = ["micro", "macro", "weighted"]
metrics = {}

for avg in averages:
    p, r, f1, _ = precision_recall_fscore_support(gold_intent, test_intent, average=avg, zero_division=0)
    metrics[avg] = {"precision": p, "recall": r, "f1": f1}

print(f"Accuracy: {acc:.3f}\n")

for avg in averages:
    print(f"{avg.capitalize()} Precision: {metrics[avg]['precision']:.4f}")
    print(f"{avg.capitalize()} Recall:    {metrics[avg]['recall']:.4f}")
    print(f"{avg.capitalize()} F1-score:  {metrics[avg]['f1']:.4f}")
    print()

SF

In [None]:
import json

# Function to extract labels from JSON strings
def extract_labels_from_json(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()

    # Split content based on the appearance of "json"
    parts = content.split('json')

    # Prepare a list to hold the labels
    labels_list = []
    c = 0
    for part in parts:
        # Find the first { and last } to capture the JSON object
        json_start = part.find('{')
        json_end = part.rfind('}')

        if json_start != -1 and json_end != -1:
            json_str = part[json_start:json_end+1]  # Extract the JSON block

            try:
                # Load JSON string as a dictionary
                data = json.loads(json_str)

                # Extract labels
                if 'slots' in data:
                    labels = [slot['label'] for slot in data['slots']]
                    labels_list.append(labels)
                    c = c+1
                    print(c)
                else:
                    print(f"Skipping JSON without 'slots': {json_str}")
            except json.JSONDecodeError:
                print(f"Error decoding JSON: {json_str}")

    return labels_list

# Function to write the labels to a file
def write_labels_to_file(labels, output_file):
    with open(output_file, 'w', encoding='utf-8') as file:
        for label_list in labels:
            # Convert list to string format and write to file
            file.write(f"{label_list}\n")

    file.close()

In [None]:
!pip install seqeval

In [None]:
print(classification_report(gold_slot, test_slot))