## Processing Parallel Corpora

The entire data processing in this project is built on the **datatrove** library from **huggingface**.
The main storage for one data sample in datatrove is the Document class:

```python
class Document:
    text: str
    id: str
    media: list[Media] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)
```

When you start working, you specify which column of your dataset you want to work with, after which this column is renamed to "text", and all other data goes into Document.metadata.

**Data Transformation Example:**

Original Dataset:
```python
{
    "ru": "Привет, мир!",
    "chv": "Сывлăх, тĕнче!",
    "random_shit": 42
}
```

When selecting "ru" as the key column:

```python
doc.text = "Привет, мир!"
doc.metadata = {
    'chv': "Сывлăх, тĕнче!",
    'random_shit': 42
}
```

Visual representation:
```
┌───────────────────────┐                 ┌───────────────────────────────┐
│      Original Data    │                 │          Document             │
│                       │                 │                               │
│  "ru": "Привет, мир!" ┼─────┐           │  text: "Привет, мир!"         │
│                       │     │           │                               │
│  "chv": "Сывлăх, ..." │     │           │  metadata: {                  │
│                       │     └──────────▶│    "chv": "Сывлăх, тĕнче!",   │
│  "random_shit": 42    │                 │    "random_shit": 42          │
│                       │                 │  }                            │
└───────────────────────┘                 └───────────────────────────────┘
```

If you want to work with multiple columns simultaneously, datatrove in most cases cannot provide you with such an opportunity - in some basic classes, for example, it is hardcoded that work is done exclusively on Document.text.

Eeve implements, based on some blocks of datatrove, such blocks that allow you to directly specify which column from your dataset you would like to transform.

In [None]:
import re
import random
import pandas as pd

from datatrove.pipeline.readers.csv import CsvReader
from datatrove.pipeline.writers.jsonl import JsonlWriter
from datatrove.executor.local import LocalPipelineExecutor

from eeve.utils.datatrove import (
    _reader_adapter_with_column_info,
    _writer_adapter_with_column_restore
)

from eeve.data.formatters.callable import CallableFormatter
from eeve.data.formatters.quote import QuoteReplacer

In [None]:
def print_random_rows(df_list, k):
    m = min(len(df) for df in df_list)
    idx = random.sample(range(m), k)
    n = min(df.shape[1] for df in df_list)
    a = df_list[0]
    b = df_list[1] if len(df_list) == 2 else None
    for c in range(n):
        print(list(a.iloc[idx, c]))
        if b is not None:
            print(list(b.iloc[idx, c]))
        print()

In [None]:
inputs = pd.read_csv("./files/parallel_corpus_process.csv")
inputs.head(7)

In [None]:
print_random_rows([inputs], k=2)

To get a dataset after transformations not in the format that datatrove provides, but in the original format (preserving all columns and their names), you can use adapters. The _reader_adapter_with_column_info adapter is used when reading data and saves information about the original columns and their names in metadata, while the _writer_adapter_with_column_restore adapter takes this information into account when writing data.

NB: to get data in the same format at the output as you submitted at the input, you need to use **both reader and writer adapters** in the pipeline. An example will be shown below.

In [None]:
example_reader = CsvReader("./files/parallel_corpus_process.csv", text_key="text_1")
gen = example_reader.run()

doc = next(iter(gen))
print(doc) # as we can see, all values from the dataset, except text_key='text_1', have been transferred to doc.metadata

In the demonstration dataset above, we can see several text formatting issues:
- chevron quotes need to be properly placed;
- multiple spaces need to be replaced;
- there are indentations before the first or after the last character in a sentence.

Let's implement functions that remove these deficiencies, and use them as examples to transform the columns in our corpus.

In [None]:
def process_string(s: str) -> str:
    s = re.sub(r'\s+', ' ', s)
    return s.strip()

In [None]:
executor = LocalPipelineExecutor(
    pipeline=[
        CsvReader(
            "./files/parallel_corpus_process.csv",
            text_key="text_1",
            adapter=_reader_adapter_with_column_info
        ),
        CallableFormatter(func=process_string, list_path=["text", "metadata['text_2']"]),
        QuoteReplacer(list_path=["text", "metadata['text_2']"]),
        JsonlWriter(
            output_folder="./files",
            output_filename='result_${rank}',
            adapter=_writer_adapter_with_column_restore
        )
    ],
    tasks=1,
    workers=1,
    skip_completed=False
)

In [None]:
executor.run()

In [None]:
result = pd.read_json(path_or_buf='./files/result_00000.gz', lines=True)

In [None]:
print_random_rows([inputs, result], k=2)