## 🧠 What is Ray?

> **Ray** is a Python framework for **distributed computing**. It makes it easy to scale Python code across **multiple cores**, **GPUs**, **machines**, or **clusters**, **without rewriting your codebase**.

---

## 🍽 Real-World Analogy: Ray is Like a Restaurant Kitchen

Imagine you’re a chef managing a busy restaurant kitchen:

### Without Ray:

You’re the **only cook**, making every dish yourself — from slicing onions to baking pizzas. Even if you have 8 burners, you’re only using one.

### With Ray:

You become the **head chef** who assigns tasks to multiple **line cooks**:

* One handles pasta
* Another bakes pizzas
* One prepares sauces
* All **in parallel** ⚡

You manage the workflow — **they cook the food**.

✅ Result: Everything gets done **faster**, **cleaner**, and **at scale**

---

## 🧑‍💻 Programming Analogy: Regular Python vs. Ray

### 🔁 Regular Python

```python
def slow_task(x):
    time.sleep(1)
    return x * 2

results = [slow_task(x) for x in range(5)]
```

⏱️ Takes \~5 seconds (runs sequentially)

---

### ⚡ Ray Parallel Version

```python
import ray
import time

ray.init()

@ray.remote
def slow_task(x):
    time.sleep(1)
    return x * 2

futures = [slow_task.remote(x) for x in range(5)]
results = ray.get(futures)
```

✅ Takes \~1 second (runs in parallel across CPU cores)

---

## ✅ Why Ray Is Used in Enterprise Systems

| Feature                  | Why It Matters                          |
| ------------------------ | --------------------------------------- |
| 🔄 Distributed Execution | Scale across CPUs, GPUs, clusters       |
| 🧠 ML-native             | Supports HuggingFace, PyTorch, Sklearn  |
| 🔌 Pluggable             | Use Ray with Airflow, Spark, Dask, etc. |
| 📊 Job Monitoring        | Has dashboards for profiling & tracing  |
| 🚀 Easy to Start         | Works locally with 1 line: `ray.init()` |

---

## 🔧 Real-World Use in Your Project

You're already using it like this:

```python
@ray.remote
def preprocess_and_save():
    ...
```

Here, you're:

* Turning `preprocess_and_save()` into a **Ray task**
* Running it as a **background distributed process**
* Even though it's on your laptop now, this code can run on **10 nodes with zero change** later

---

## 🔨 When to Use Ray

| Task Type                       | Ray Use? | Why                             |
| ------------------------------- | -------- | ------------------------------- |
| Preprocessing large datasets    | ✅ Yes    | Parallel tokenization           |
| Training models                 | ✅ Yes    | Use `ray.train` or `RayTrainer` |
| Running experiments/grid search | ✅ Yes    | Ray Tune (for AutoML)           |
| Small script with 100 rows      | ❌ No     | Too much overhead               |
| RAG chatbot backend             | ⚠️ Maybe | Use if scaling is needed        |

---

## 📚 Example: Training on Ray with HuggingFace

```python
from ray.train.huggingface import TransformersTrainer
from ray.train import ScalingConfig

trainer = TransformersTrainer(
    model=model,
    args=TrainingArguments(...),
    scaling_config=ScalingConfig(num_workers=4),
    train_dataset=train_ds,
    eval_dataset=eval_ds,
    tokenizer=tokenizer
)
trainer.fit()
```

✅ Ray will:

* Split your data
* Distribute training across 4 workers
* Aggregate results
* Save the best model

---

## ✅ TL;DR Summary

| Concept          | Ray Explanation                                |
| ---------------- | ---------------------------------------------- |
| Remote function  | Run a function in parallel on a worker         |
| Object store     | Ray tracks where data is, avoids copies        |
| Cluster manager  | Run on 1 machine, a cloud cluster, or hybrid   |
| Seamless scaling | Local dev → Enterprise deployment (no rewrite) |




In [2]:
import json
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from datasets import Dataset, DatasetDict
from transformers import AutoTokenizer
import ray  
from ray import train as ray_train


In [3]:
LABEL_PATH = "/Users/tanmay/Downloads/US_Sol_LLM/email-policy-ai/data/raw/policy_labels.jsonl"
MODEL_NAME = "distilroberta-base"
TEST_SIZE = 0.2
MAX_LENGTH = 512
SEED = 42
OUTPUT_DIR = "/Users/tanmay/Downloads/US_Sol_LLM/email-policy-ai/data/processed"



In [4]:
tokenizer=AutoTokenizer.from_pretrained(MODEL_NAME)

tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/480 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

In [10]:
import unicodedata

def normalize_text(text):
    # Normalize unicode characters (e.g., curly quotes to ASCII)
    text = unicodedata.normalize("NFKD", text)
    return text.encode("ascii", "ignore").decode("utf-8").strip()

def load_jsonl(path):
    records = []
    labels_set = set()

    with open(path, "r", encoding="utf-8") as f:
        for i, line in enumerate(f, start=1):
            try:
                obj = json.loads(line)
                text = normalize_text(obj.get("text", ""))
                label = obj.get("label", "").strip()
                if text and label:
                    labels_set.add(label)
                    records.append({"text": text, "label": label})
            except json.JSONDecodeError as e:
                print(f"❌ JSON parse error on line {i}: {e}")

    df = pd.DataFrame(records)

    # Create label2id mappings
    label2id = {label: idx for idx, label in enumerate(sorted(labels_set))}
    id2label = {v: k for k, v in label2id.items()}
    df["label"] = df["label"].map(label2id)

    return df, label2id, id2label



## 🧠 What Does This Line Do?

```python
tokenizer(batch["text"], padding="max_length", truncation=True, max_length=512, return_tensors="np")
```

This takes raw email text and returns a **machine-readable representation** for the transformer model. Let’s dive deeper:

---

## 🔧 Step-by-Step Breakdown

### 1️⃣ **Tokenization**

* The tokenizer breaks the raw string into **subword tokens**.

  * For example:

    ```
    "Please email me your credit card number."
    ⟶ ['Please', 'Ġemail', 'Ġme', 'Ġyour', 'Ġcredit', 'Ġcard', 'Ġnumber', '.']
    ```

### 2️⃣ **Input IDs**

* Each token is mapped to an integer from the model’s vocabulary.

  ```python
  [1423, 3412, 115, 2468, 5123, 872, 9321, 4]
  ```

### 3️⃣ **Attention Mask**

* A binary mask:

  * `1` = this token should be attended to (actual content)
  * `0` = this token is just padding
  * Used during model’s attention calculations

### 4️⃣ **Truncation**

* If the email text becomes **more than 512 tokens**, only the first 512 tokens are kept.
* This prevents model failure due to exceeding its limit.

### 5️⃣ **Padding**

* If the email is **shorter than 512 tokens**, it's padded with special `[PAD]` tokens (usually ID `0`).
* Ensures every input is **the same length** — required for efficient batching on GPUs.

### 6️⃣ **return\_tensors="np"**

* Returns the data as a NumPy array — needed for Ray/HuggingFace batch processing during preprocessing.
* You later convert to torch tensors for training.

---

## 🏭 Why This Matters in Enterprise NLP Systems

| Feature           | Why It’s Enterprise-Standard                                  |
| ----------------- | ------------------------------------------------------------- |
| ⚖️ Uniform length | Makes batched training possible (no dynamic shape headaches)  |
| 🧠 Attention mask | Allows the model to **ignore padding** tokens during training |
| ⚡ Max efficiency  | Enables GPU-friendly input formats (512 tokens = \~128 words) |
| 🧹 Clean design   | Avoids runtime errors from overly long emails                 |
| 📥 Email-safe     | Works even on long enterprise emails, disclaimers, threads    |

---

## 🧠 Final Output (Example)

For one email:

```json
{
  "input_ids": [101, 1423, 3412, 115, 2468, ..., 0, 0, 0],
  "attention_mask": [1, 1, 1, 1, 1, ..., 0, 0, 0],
  "label": 1
}
```

* `input_ids` → tokenized and padded text
* `attention_mask` → 1s for real text, 0s for padding
* `label` → 1 (violation) or 0 (compliant)

This is now ready for `torch.utils.data.DataLoader` or direct `Trainer` API usage.

---

## ✅ TL;DR

| Part                 | Purpose                              |
| -------------------- | ------------------------------------ |
| `input_ids`          | What the model sees                  |
| `attention_mask`     | What the model should **ignore**     |
| `padding/truncation` | Ensures every email is 512 tokens    |
| NumPy/Torch format   | Enables efficient training, batching |



In [6]:
def tokenize_function(batch):
    return tokenizer(batch["text"],padding="max_length", truncation=True, max_length=MAX_LENGTH, return_tensors="np")

In [8]:
def prepare_dataset():
    df=load_jsonl(LABEL_PATH)
    train_df, test_df = train_test_split(df, test_size=TEST_SIZE, random_state=SEED, stratify=df['label'])
    dataset=DatasetDict({
        "train": Dataset.from_pandas(train_df.reset_index(drop=True)),
        "test": Dataset.from_pandas(test_df.reset_index(drop=True))
    })
    tokenized_dataset= dataset.map(tokenize_function, batched=True)
    tokenized_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    return tokenized_dataset

In [9]:
@ray.remote
def preprocess_and_save():
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    dataset=prepare_dataset()
    dataset["train"].to_json(os.path.join(OUTPUT_DIR, "train_tokenized.json"))
    dataset["test"].to_json(os.path.join(OUTPUT_DIR, "test_tokenized.json"))
    print("Preprocessing and saving completed.")

In [None]:
"""if __name__ == "__main__":
    ray.init(ignore_reinit_error=True)
    ray.get(preprocess_and_save.remote())
    ray.shutdown()
"""