# Translation

This notebook serves as a testing playground for dataset translation.

In [None]:
import time
import random
import functools
import itertools
import requests
from typing import Dict, Any, Callable, Iterable

import pandas as pd

from transformers import pipeline

## Config

Notebook-level config:
* `OUTPUT_DATA_SIZE`: number of rows to be translated per language (can be set to `None`)
* `BATCH_SIZE`: number of records to be translated at once
* `BACKUP_SIZE`: number of records backuped at once

In [None]:
OUTPUT_DATA_SIZE_LIMIT = None  # there are 76879 rows in total
BATCH_SIZE = 50
BACKUP_SIZE = 500

BACKUP_PATH = "./backups/"

In [None]:
def _generate_hex_id(id_length: int = 8):
    hex_string = '0123456789abcdef'
    return ''.join([random.choice(hex_string) for x in range(id_length)])

In [None]:
RUN_ID = _generate_hex_id()
RUN_ID

## HuggingFace API

### Settings

In [None]:
HEADERS_DFLT = {"Authorization": "Bearer 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"}

In [None]:
API_URLS = {
    "CS": "https://api-inference.huggingface.co/models/Helsinki-NLP/opus-mt-en-cs",
    "DE": "https://api-inference.huggingface.co/models/Helsinki-NLP/opus-mt-en-de",
    "SL": "https://api-inference.huggingface.co/models/Helsinki-NLP/opus-mt-en-sla",
    "EN": "https://api-inference.huggingface.co/models/Helsinki-NLP/opus-mt-de-en""
}

### Definitions

In [None]:
TranslationFunction = Callable[[Dict[str, str], Dict[str, Any]], Dict[str, Any]]


def _call_hf_api(
    payload: Dict[str, str], 
    api_url, 
    headers: Dict[str, Any] = None
) -> Dict[str, Any]:
    """Performs one HugigngFace API call"""
    if headers is None:
        headers = HEADERS_DFLT
        
    response = requests.post(api_url, headers=headers, json=payload)
    return response.json()


def translate_text_api(
    text: str, 
    api_translation_function: TranslationFunction,
    n_attempts: int = 10,
    sleep_time: float = 1.0,
    verbose: bool = True,
) -> str: 
    """Uses the API translation funciton to translate plain text"""
    for i in range(n_attempts):
        if verbose:
            print(f"(attempt={i + 1}): Attempting to translate `{text}`...")
        response = api_translation_function(text)
        try:
            result = response[0]["translation_text"]
            if verbose:
                print(f"successfully translated: `{text}` -> `{result}`")
            return result
        except KeyError: 
            continue
            
    raise requests.Timeout(f"Text could not be translated: `{text}`")

In [None]:
api_translate_en_to_cs = functools.partial(_call_hf_api, api_url=API_URLS["CS"])
api_translate_en_to_sl = functools.partial(_call_hf_api, api_url=API_URLS["SL"])
api_translate_en_to_de = functools.partial(_call_hf_api, api_url=API_URLS["DE"])

## HuggingFace Pipelines

### Settings

In [None]:
MODELS_DICT = {
    "CS": "Helsinki-NLP/opus-mt-en-cs",
    "DE": "Helsinki-NLP/opus-mt-en-de",
    "SL": "Helsinki-NLP/opus-mt-en-sla",
    "EN": "Helsinki-NLP/opus-mt-de-en",
}

### Definitions

In [None]:
def create_pipeline_translator(model_name: str):
    """Returns a translation function that leverages the HF pretrained pipelines"""
    pipe = pipeline("translation", model_name)
    
    def _translate_text(text): 
        try:
            return pipe.predict(text)[0]["translation_text"]
        except (IndexError, KeyError):
            return None
    
    return _translate_text

In [None]:
pipeline_transate_en_to_cs = create_pipeline_translator(model_name=MODELS_DICT["CS"])
pipeline_transate_en_to_de = create_pipeline_translator(model_name=MODELS_DICT["DE"])
pipeline_transate_en_to_sl = create_pipeline_translator(model_name=MODELS_DICT["SL"])
pipeline_transate_de_to_en = create_pipeline_translator(model_name=MODELS_DICT["EN"])

## Paraphrase data

Translation of bi-texts so that we obtain paraphrasing datasets.

### Settings

In [None]:
DATA_DIR_PATH = "./data/data_filtered/"

DATA_PATHS = {
    "CS": DATA_DIR_PATH + "cz_en_filtered.csv",
    "DE": DATA_DIR_PATH + "de_en_filtered.csv",
    "SL": DATA_DIR_PATH + "sl_en_filtered.csv",
}

### Definitions

In [None]:
def load_data(path: str, limit: int = None):
    """Wrapper around data loading (all datasets are probably gonna be in the same format)"""
    df_full = pd.read_csv(path)
    if limit is None or limit >= df_full.shape[0]:
        return df_full
    drop_idx = pd.RangeIndex(start=limit, stop=df_full.shape[0])
    return df_full.drop(drop_idx)

In [None]:
def _batched(iterable: Iterable, n: int):
    "Batches data into tuples of length n. The last batch may be shorter."
    if n < 1:
        raise ValueError('n must be at least one')
    it = iter(iterable)
    while batch := tuple(itertools.islice(it, n)):
        yield batch
        
        
def _create_backup(
    data: pd.DataFrame,
    file_name: str,
    backup_dir_path: str = BACKUP_PATH,
    verbose: bool = False,
):
    """Backups the dataframe"""
    path_save = f"{backup_dir_path}{RUN_ID}_{file_name}.csv"
    data.to_csv(path_save)
    if verbose:
        print(f"[BACKUP] saving {data.shape[0]} rows @ `{path_save}`")
        
        
def create_paraphrase_dataset(
    df_bitexts: pd.DataFrame,
    api_translation_function: Callable[[str], str],
    source_col: str, 
    original_col: str,
    batch_size: int = BATCH_SIZE,
    backup_size: int = BACKUP_SIZE,
    inplace: bool = False,
    verbose: bool = False,
) -> pd.DataFrame:
    """Translates a source column of a bitext dataset, outputting a paraphrasing dataset."""
    
    if verbose:
        time_total = 0
        display(df_bitexts.head())
        print(f"--------\nParaphrase dataset creation started: {df_bitexts.shape[0]} rows\n--------")
        
    df_final = pd.DataFrame(columns=["Original", "Paraphrase"])
    for chunk_id, chunk_rows in enumerate(_batched(df_bitexts.iterrows(), n=backup_size)):
        try:
            if verbose:
                start = time.perf_counter()
            
            chunk_texts = [row[source_col] for _, row in chunk_rows]
            chunk_indices = [index for index, _ in chunk_rows]
            # translate current chunk
            col_translated = pd.Series(dtype="str")
            for batch in _batched(chunk_texts, n=batch_size):
                try:
                    batch_translated = pd.Series(batch, dtype="str").apply(api_translation_function)
                    col_translated = pd.concat([col_translated, batch_translated])
                except Exception as e: 
                    if verbose:
                        print(f"[Skipping current batch] Caught: {e}")                        
            col_translated.index = chunk_indices

            if verbose:
                time_diff = (time.perf_counter() - start)
                time_total += time_diff
                print(f"[create_paraphrase_dataset] chunk {chunk_id}: {col_translated.size:,} / {len(chunk_indices)} records translated in {time_diff:.2f} s.")

            # make the chunk into a DF
            df_src = df_bitexts.loc[chunk_indices, original_col]
            df_src = df_src if inplace else df_src.copy()
            df_out = pd.concat([df_src, col_translated], axis=1)
            df_out.columns = df_final.columns

            _create_backup(
                df_out, 
                file_name=f"translate_from_{source_col}_to_{original_col}_{chunk_id}",
                verbose=verbose,
            )
            
        except Exception as e:
            if vebrose:
                print(f"[SKIPPING] Caught: {e}")
            
        df_final = pd.concat([df_final, df_out], axis=0)

    if verbose:
        display(df_final.head())
        print(f"[create_paraphrase_dataset] TOTAL TIME: {time_total:.2f} s.")

    return df_final

In [None]:
load_data_cs = functools.partial(load_data, path=DATA_PATHS["CS"], limit=OUTPUT_DATA_SIZE_LIMIT)
load_data_de = functools.partial(load_data, path=DATA_PATHS["DE"], limit=OUTPUT_DATA_SIZE_LIMIT)
load_data_sl = functools.partial(load_data, path=DATA_PATHS["SL"], limit=OUTPUT_DATA_SIZE_LIMIT)

### Load data

In [None]:
data_cs = load_data_cs()
data_de = load_data_de()
data_sl = load_data_sl()

### Translate

In [None]:
translate_en_to_cs_api = functools.partial(
    translate_text_api, 
    api_translation_function=api_translate_en_to_cs
)
translate_en_to_de_api = functools.partial(
    translate_text_api, 
    api_translation_function=api_translate_en_to_de
)
translate_en_to_sl_api = functools.partial(
    translate_text_api, 
    api_translation_function=api_translate_en_to_sl
)
translate_de_to_en_api = functools.partial(
    translate_text_api, 
    api_translation_function=api_translate_de_to_en
)

In [None]:
df_paraphrase_de = create_paraphrase_dataset(
    df_bitexts=data_de,
    api_translation_function=pipeline_transate_en_to_de,
    source_col="English",
    original_col="German",
    verbose=True,
)

In [None]:
df_paraphrase_sl = create_paraphrase_dataset(
    df_bitexts=data_sl,
    api_translation_function=pipeline_transate_en_to_sl,
    source_col="English",
    original_col="Slovenian",
    verbose=True,
)

In [None]:
df_paraphrase_cs = create_paraphrase_dataset(
    df_bitexts=data_cs,
    api_translation_function=pipeline_transate_en_to_cs,
    source_col="English",
    original_col="Czech",
    verbose=True,
)

In [None]:
df_paraphrase_en = create_paraphrase_dataset(
    df_bitexts=data_de,
    api_translation_function=pipeline_transate_de_to_en,
    source_col="German",
    original_col="English",
    verbose=True,
)


In [None]:
df_paraphrase_de.to_csv("./paraphrases_de.csv")
df_paraphrase_sl.to_csv("./paraphrases_sl.csv")
df_paraphrase_cs.to_csv("./paraphrases_cs.csv")
df_paraphrase_en.to_csv("./paraphrases_en.csv")