In [4]:
%%writefile translation_evaluator.py
import pandas as pd
from sacrebleu import sentence_bleu, sentence_chrf, corpus_bleu, corpus_chrf
from typing import List, Optional, Union
import warnings
from pathlib import Path

class TranslationEvaluator:
    """
    A complete translation evaluation system that handles:
    - Loading data from files
    - Calculating metrics (BLEU, CHRF)
    - Generating detailed and summary reports
    - Exporting to Excel with professional formatting
    """
    
    def __init__(self):
        warnings.simplefilter('ignore')  # Suppress sacrebleu warnings
        self._data = None
        self._detailed_results = None
        self._model_metrics = None
    
    def load_data(self, file_path: Union[str, Path]) -> None:
        """Load translation data from file (CSV or Excel)"""
        file_path = Path(file_path)
        if file_path.suffix == '.csv':
            self._data = pd.read_csv(file_path)
        elif file_path.suffix in ('.xlsx', '.xls'):
            self._data = pd.read_excel(file_path)
        else:
            raise ValueError("Unsupported file format. Use .csv or .xlsx")
    
    def evaluate(
        self,
        prediction_cols: List[str],
        reference_col: str = "English", 
        metrics: List[str] = ["BLEU", "CHRF"],
        keep_cols: Optional[List[str]] = None
    ) -> None:
        """
        Run complete evaluation pipeline
        
        Args:
            prediction_cols: List of prediction columns to evaluate
            reference_col: Reference translation column
            metrics: List of metrics to compute
            keep_cols: Additional columns to preserve in output
        """
        if self._data is None:
            raise ValueError("No data loaded. Call load_data() first.")
            
        if keep_cols is None:
            keep_cols = []
        
        # Validate columns exist
        self._validate_columns(reference_col, prediction_cols, keep_cols)
        
        # 1. Compute detailed metrics
        self._compute_detailed_metrics(reference_col, prediction_cols, metrics, keep_cols)
        
        # 2. Compute model metrics
        self._compute_model_metrics(reference_col, prediction_cols, metrics)
        
        # 3. Generate report
        self._generate_report()
    
    def _validate_columns(self, reference_col: str, prediction_cols: List[str], keep_cols: List[str]) -> None:
        """Validate all required columns exist"""
        missing = [col for col in [reference_col] + prediction_cols + keep_cols 
                  if col not in self._data.columns]
        if missing:
            raise ValueError(f"Columns not found in data: {missing}")
    
    def _compute_detailed_metrics(self, reference_col: str, prediction_cols: List[str], 
                                metrics: List[str], keep_cols: List[str]) -> None:
        """Calculate per-row metrics for all predictions"""
        self._detailed_results = self._data.copy()
        
        for pred_col in prediction_cols:
            if "BLEU" in metrics:
                self._detailed_results[f"{pred_col} BLEU"] = self._detailed_results.apply(
                    lambda row: sentence_bleu(row[pred_col], [row[reference_col]]).score,
                    axis=1
                )
            if "CHRF" in metrics:
                self._detailed_results[f"{pred_col} CHRF"] = self._detailed_results.apply(
                    lambda row: sentence_chrf(row[pred_col], [row[reference_col]]).score,
                    axis=1
                )
        
        # Reorder columns: keep_cols + (each prediction with its metrics)
        new_columns = keep_cols.copy()
        for pred_col in prediction_cols:
            new_columns.append(pred_col)
            if "BLEU" in metrics:
                new_columns.append(f"{pred_col} BLEU")
            if "CHRF" in metrics:
                new_columns.append(f"{pred_col} CHRF")
        
        self._detailed_results = self._detailed_results[new_columns]
    
    def _compute_model_metrics(self, reference_col: str, prediction_cols: List[str], 
                             metrics: List[str]) -> None:
        """Calculate aggregate metrics for each model"""
        results = []
        
        for pred_col in prediction_cols:
            references = self._data[reference_col].tolist()
            predictions = self._data[pred_col].tolist()
            
            model_results = {"Model": pred_col}
            if "BLEU" in metrics:
                model_results["BLEU"] = corpus_bleu(predictions, [references]).score
            if "CHRF" in metrics:
                model_results["CHRF"] = corpus_chrf(predictions, [references]).score
            
            results.append(model_results)
        
        self._model_metrics = pd.DataFrame(results)
    
    def _generate_report(self) -> None:
        """Generate Excel report with formatted output"""
        with pd.ExcelWriter("translation_results.xlsx", engine='xlsxwriter') as writer:
            # Detailed Results sheet
            self._detailed_results.to_excel(writer, sheet_name="Detailed Results", index=False)
            
            # Model Metrics sheet with formatted table
            self._model_metrics.to_excel(writer, sheet_name="Model Metrics", index=False)
            
            # Formatting
            workbook = writer.book
            worksheet = writer.sheets["Model Metrics"]
            
            # Add formatted table
            (max_row, max_col) = self._model_metrics.shape
            column_settings = [{'header': col} for col in self._model_metrics.columns]
            worksheet.add_table(0, 0, max_row, max_col-1, {
                'columns': column_settings,
                'style': 'Table Style Medium 9',
                'name': 'ModelMetrics'
            })
            
            # Auto-adjust columns
            for i, col in enumerate(self._model_metrics.columns):
                max_len = max(self._model_metrics[col].astype(str).map(len).max(), len(col))
                worksheet.set_column(i, i, max_len + 2)
    
    def get_detailed_results(self) -> pd.DataFrame:
        """Get the detailed results DataFrame"""
        return self._detailed_results.copy()
    
    def get_model_metrics(self) -> pd.DataFrame:
        """Get the model metrics DataFrame"""
        return self._model_metrics.copy()

Writing translation_evaluator.py


In [6]:
%%writefile translation_evaluator.py
import time
import pandas as pd
from sacrebleu import sentence_bleu, sentence_chrf, corpus_bleu, corpus_chrf
from typing import List, Optional, Union
import warnings
from pathlib import Path

class TranslationEvaluator:
    """
    A translation evaluation system that handles:
    - Loading data
    - Computing BLEU/CHRF per row
    - Tracking response time per row
    - Aggregating model metrics and average response times
    - Exporting to Excel with three sheets:
      'Detailed Results', 'Model Metrics', 'Response Times'
    """
    def __init__(self):
        warnings.simplefilter('ignore')
        self._data = None
        self._detailed_results = None
        self._model_metrics = None
        # Accumulate across runs
        self._all_response_times = pd.DataFrame([], columns=["Model", "Run", "Average Response Time"])

    def load_data(self, file_path: Union[str, Path]) -> None:
        file_path = Path(file_path)
        if file_path.suffix == '.csv':
            self._data = pd.read_csv(file_path)
        elif file_path.suffix in ('.xlsx', '.xls'):
            self._data = pd.read_excel(file_path)
        else:
            raise ValueError("Unsupported file format. Use .csv or .xlsx")

    def evaluate(
        self,
        prediction_cols: List[str],
        reference_col: str = "English",
        metrics: List[str] = ["BLEU", "CHRF"],
        keep_cols: Optional[List[str]] = None,
        run_id: int = 1
    ) -> None:
        """
        Run a full evaluation pass.
        - prediction_cols: names of model output columns
        - reference_col: name of the reference column
        - metrics: which metrics to compute
        - keep_cols: extra columns to carry through to detailed results
        - run_id: an integer label for this run (e.g. 1 or 2)
        """
        if self._data is None:
            raise ValueError("No data loaded. Call load_data() first.")
        keep_cols = keep_cols or []
        self._validate_columns(reference_col, prediction_cols, keep_cols)

        # 1. Detailed per-row metrics + response times
        self._compute_detailed_metrics(reference_col, prediction_cols, metrics, keep_cols)
        # 2. Aggregate BLEU/CHRF
        self._compute_model_metrics(reference_col, prediction_cols, metrics)
        # 3. Record avg response times for this run
        self._record_response_times(prediction_cols, run_id)
        # 4. Write out all three sheets
        self._generate_report()

    def _validate_columns(self, reference_col, prediction_cols, keep_cols):
        missing = [col for col in [reference_col] + prediction_cols + keep_cols
                   if col not in self._data.columns]
        if missing:
            raise ValueError(f"Columns not found in data: {missing}")

    def _compute_detailed_metrics(
        self, reference_col: str, prediction_cols: List[str],
        metrics: List[str], keep_cols: List[str]
    ) -> None:
        df = self._data.copy()
        # Prepare storage for response times per model
        response_time_cols = {m: [] for m in prediction_cols}

        # Compute metrics + timing
        for m in prediction_cols:
            # initialize empty columns
            df[f"{m} BLEU"] = None
            df[f"{m} CHRF"] = None
            for idx, row in df.iterrows():
                start = time.time()
                if "BLEU" in metrics:
                    df.at[idx, f"{m} BLEU"] = sentence_bleu(row[m], [row[reference_col]]).score
                if "CHRF" in metrics:
                    df.at[idx, f"{m} CHRF"] = sentence_chrf(row[m], [row[reference_col]]).score
                end = time.time()
                response_time_cols[m].append(end - start)

            # attach per-row times
            df[f"{m} Response Time"] = response_time_cols[m]

        # Reorder: keep_cols + for each model: [model, model BLEU, model CHRF, model Response Time]
        columns = []
        for c in keep_cols:
            columns.append(c)
        for m in prediction_cols:
            columns += [m]
            if "BLEU" in metrics:
                columns.append(f"{m} BLEU")
            if "CHRF" in metrics:
                columns.append(f"{m} CHRF")
            columns.append(f"{m} Response Time")

        self._detailed_results = df[columns]

    def _compute_model_metrics(
        self, reference_col: str, prediction_cols: List[str],
        metrics: List[str]
    ) -> None:
        rows = []
        for m in prediction_cols:
            refs = self._data[reference_col].tolist()
            hyps = self._data[m].tolist()
            res = {"Model": m}
            if "BLEU" in metrics:
                res["BLEU"] = corpus_bleu(hyps, [refs]).score
            if "CHRF" in metrics:
                res["CHRF"] = corpus_chrf(hyps, [refs]).score
            rows.append(res)
        self._model_metrics = pd.DataFrame(rows)

    def _record_response_times(self, prediction_cols: List[str], run_id: int) -> None:
        # Compute average per model and append to the global table
        rows = []
        for m in prediction_cols:
            times = self._detailed_results[f"{m} Response Time"].tolist()
            avg = sum(times) / len(times)
            rows.append({"Model": m, "Run": run_id, "Average Response Time": avg})
        df_rt = pd.DataFrame(rows)
        self._all_response_times = pd.concat([self._all_response_times, df_rt], ignore_index=True)

    def _generate_report(self) -> None:
        with pd.ExcelWriter("translation_results.xlsx", engine='xlsxwriter') as writer:
            # Detailed Results
            self._detailed_results.to_excel(writer, sheet_name="Detailed Results", index=False)
            # Model Metrics
            self._model_metrics.to_excel(writer, sheet_name="Model Metrics", index=False)
            # Response Times
            self._all_response_times.to_excel(writer, sheet_name="Response Times", index=False)

            # Formatting for Model Metrics
            workbook  = writer.book
            ws_metrics = writer.sheets["Model Metrics"]
            (r, c) = self._model_metrics.shape
            cols = [{'header': h} for h in self._model_metrics.columns]
            ws_metrics.add_table(0, 0, r, c-1, {
                'columns': cols,
                'style': 'Table Style Medium 9'
            })
            for i, h in enumerate(self._model_metrics.columns):
                width = max(self._model_metrics[h].astype(str).map(len).max(), len(h)) + 2
                ws_metrics.set_column(i, i, width)

            # Formatting for Response Times
            ws_rt = writer.sheets["Response Times"]
            (r2, c2) = self._all_response_times.shape
            cols_rt = [{'header': h} for h in self._all_response_times.columns]
            ws_rt.add_table(0, 0, r2, c2-1, {
                'columns': cols_rt,
                'style': 'Table Style Medium 9'
            })
            for i, h in enumerate(self._all_response_times.columns):
                width = max(self._all_response_times[h].astype(str).map(len).max(), len(h)) + 2
                ws_rt.set_column(i, i, width)

    def get_detailed_results(self) -> pd.DataFrame:
        return self._detailed_results.copy()

    def get_model_metrics(self) -> pd.DataFrame:
        return self._model_metrics.copy()

    def get_response_times(self) -> pd.DataFrame:
        """Get the average response-time table for all runs"""
        return self._all_response_times.copy()


Overwriting translation_evaluator.py


In [7]:
from translation_evaluator import TranslationEvaluator

# Now you can use it
evaluator = TranslationEvaluator()
print("Successfully imported TranslationEvaluator!")

AttributeError: type object 'object' has no attribute 'dtype'

In [2]:
#pip install translation_evaluator

Collecting translation_evaluator
Note: you may need to restart the kernel to use updated packages.


Error processing line 7 of C:\Users\Hi\Anaconda3\lib\site-packages\pywin32.pth:

  Traceback (most recent call last):
    File "C:\Users\Hi\Anaconda3\lib\site.py", line 168, in addpackage
      exec(line)
    File "<string>", line 1, in <module>
  ModuleNotFoundError: No module named 'pywin32_bootstrap'

Remainder of file ignored
  ERROR: Could not find a version that satisfies the requirement translation_evaluator (from versions: none)
ERROR: No matching distribution found for translation_evaluator


In [7]:
try:
    from translation_evaluator import TranslationEvaluator
    evaluator = TranslationEvaluator()
    print("✅ Library imported successfully!")
except Exception as e:
    print(f"❌ Error: {str(e)}")

✅ Library imported successfully!


In [11]:
from translation_evaluator import TranslationEvaluator

# Initialize evaluator
evaluator = TranslationEvaluator()

# Load data (replace with your file path)
evaluator.load_data("C:/Users/Hi/inputdata.xlsx")  # or .xlsx

# Run evaluation with your configuration
evaluator.evaluate(
    prediction_cols=["base_mad1ad400_translation", "finetuned_mad1ad400_translation", "finetuned_helsinki_translation"],
    reference_col="en",
    metrics=["BLEU", "CHRF"],
    keep_cols=["es"]
)

# Optional: Access results programmatically
print("Model Metrics:")
print(evaluator.get_model_metrics())

Model Metrics:
                             Model      BLEU       CHRF
0       base_mad1ad400_translation  6.376716  17.207060
1  finetuned_mad1ad400_translation  0.000000  16.187283
2   finetuned_helsinki_translation  0.000000  12.292971


In [2]:
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.responses import FileResponse
from pydantic import BaseModel
import evaluate
import pandas as pd
import os
import tempfile
import traceback

app = FastAPI(
    title="Roberta large BERTScore API",
    description="API for calculating BERTScore between reference and candidate texts",
    version="1.0.0"
)

# we still point TRANSFORMERS_CACHE / HF_HOME at your local MODEL_PATH if needed,
# but for the metric itself we pass only the ID string.
MODEL_PATH = os.getenv("BERT", "/appdata/cortex/dev4/shared/libs/huggingface/roberta-large")
os.environ["BERT"] = MODEL_PATH
DEVICE = "cuda" if os.environ.get("CUDA_VISIBLE_DEVICES") else "cpu"

# load HF's evaluate metric
try:
    bscore = evaluate.load("bertscore")
    print("✅ Loaded HuggingFace BERTScore metric")
except Exception as e:
    print("❌ Failed to load BERTScore metric:", e)
    bscore = None

class ScoreRequest(BaseModel):
    reference: list[str]
    candidate: list[str]

class ScoreResponse(BaseModel):
    precision: list[float]
    recall:    list[float]
    f1:        list[float]
    model_type: str = "roberta-large"
    version:    str = "0.3.12"

@app.get("/")
async def health_check():
    return {
        "status":   "healthy" if bscore else "failed",
        "model":    "HuggingFace BERTScore",
        "location": MODEL_PATH,
        "device":   DEVICE
    }

@app.post("/bertscore/", response_model=ScoreResponse)
async def calculate_score(request: ScoreRequest):
    if bscore is None:
        raise HTTPException(500, "Metric failed to load")
    if len(request.reference) != len(request.candidate):
        raise HTTPException(400, "`reference` and `candidate` lists must be same length")

    try:
        results = bscore.compute(
            predictions=request.candidate,
            references=request.reference,
            model_type="roberta-large",  # just the ID
            lang="en",
            device=DEVICE,
        )
        P, R, F1 = results["precision"], results["recall"], results["f1"]
    except Exception as e:
        print(traceback.format_exc())
        raise HTTPException(500, str(e))

    return {
        "precision": [round(x, 6) for x in P],
        "recall":    [round(x, 6) for x in R],
        "f1":        [round(x, 6) for x in F1],
        "model_type": "roberta-large",
        "version":    "0.3.12"
    }

@app.get("/batch-local/")
async def batch_local():
    if bscore is None:
        raise HTTPException(500, "Metric failed to load")

    inp = "/appdata/cortex/dev4/shobha/input_data.xlsx"
    out = "/appdata/cortex/dev4/shobha/output_data.xlsx"

    if not os.path.exists(inp):
        raise HTTPException(404, f"Input not found: {inp}")
    try:
        df = pd.read_excel(inp)
    except Exception as e:
        raise HTTPException(500, f"Error reading input: {e}")
    if "reference" not in df.columns or "candidate" not in df.columns:
        raise HTTPException(400, "Excel must have 'reference' and 'candidate' columns")

    try:
        results = bscore.compute(
            predictions=df["candidate"].astype(str).tolist(),
            references=df["reference"].astype(str).tolist(),
            model_type="roberta-large",
            lang="en",
            device=DEVICE,
        )
        P, R, F1 = results["precision"], results["recall"], results["f1"]
    except Exception as e:
        raise HTTPException(500, f"Scoring error: {e}")

    df["precision"] = [round(x, 6) for x in P]
    df["recall"]    = [round(x, 6) for x in R]
    df["f1"]        = [round(x, 6) for x in F1]

    try:
        df.to_excel(out, index=False)
    except Exception as e:
        raise HTTPException(500, f"Cannot write Excel: {e}")

    return FileResponse(
        out,
        filename=os.path.basename(out),
        media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
    )

@app.post("/batch-bert/")
async def batch_bert(file: UploadFile = File()):
    if bscore is None:
        raise HTTPException(500, "Metric failed to load")
    if not file.filename.lower().endswith((".xls", ".xlsx")):
        raise HTTPException(400, "Upload an .xls or .xlsx file")

    try:
        df = pd.read_excel(file.file)
    except Exception as e:
        raise HTTPException(400, f"Cannot read Excel: {e}")
    if "reference" not in df.columns or "candidate" not in df.columns:
        raise HTTPException(400, "Excel must have 'reference' and 'candidate' columns")

    try:
        results = bscore.compute(
            predictions=df["candidate"].astype(str).tolist(),
            references=df["reference"].astype(str).tolist(),
            model_type="roberta-large",
            lang="en",
            device=DEVICE,
        )
        P, R, F1 = results["precision"], results["recall"], results["f1"]
    except Exception as e:
        raise HTTPException(500, f"Scoring error: {e}")

    df["precision"] = [round(x, 6) for x in P]
    df["recall"]    = [round(x, 6) for x in R]
    df["f1"]        = [round(x, 6) for x in F1]

    tmp = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False)
    df.to_excel(tmp.name, index=False)
    tmp.close()
    return FileResponse(
        tmp.name,
        filename="bert_results.xlsx",
        media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)


Overwriting translation_evaluator.py


In [None]:
# translation_evaluator.py

import time
import pandas as pd
from sacrebleu import sentence_bleu, sentence_chrf, corpus_bleu, corpus_chrf
from typing import List, Optional, Union
import warnings
from pathlib import Path

class TranslationEvaluator:
    """
    A translation evaluation system that handles:
    - Loading data
    - Computing BLEU/CHRF per row
    - Tracking response time per row per run
    - Aggregating model metrics and average response times
    - Exporting to Excel with three sheets:
      'Detailed Results', 'Model Metrics', 'Response Times'
    """
    def __init__(self):
        warnings.simplefilter('ignore')
        self._data = None
        self._detailed_results = None
        self._model_metrics = None
        # Accumulate across runs
        self._all_response_times = pd.DataFrame([], columns=["Model", "Run", "Average Response Time"])

    def load_data(self, file_path: Union[str, Path]) -> None:
        file_path = Path(file_path)
        if file_path.suffix == '.csv':
            self._data = pd.read_csv(file_path)
        elif file_path.suffix in ('.xlsx', '.xls'):
            self._data = pd.read_excel(file_path)
        else:
            raise ValueError("Unsupported file format. Use .csv or .xlsx")

    def evaluate(
        self,
        prediction_cols: List[str],
        reference_col: str = "English",
        metrics: List[str] = ["BLEU", "CHRF"],
        keep_cols: Optional[List[str]] = None,
        run_id: int = 1
    ) -> None:
        """
        Run a full evaluation pass.
        - prediction_cols: names of model output columns
        - reference_col: name of the reference column
        - metrics: which metrics to compute
        - keep_cols: extra columns to carry through to detailed results
        - run_id: an integer label for this run (e.g. 1 or 2)
        """
        if self._data is None:
            raise ValueError("No data loaded. Call load_data() first.")
        keep_cols = keep_cols or []
        self._validate_columns(reference_col, prediction_cols, keep_cols)

        # 1. Detailed per-row metrics + response times for this run
        self._compute_detailed_metrics(reference_col, prediction_cols, metrics, keep_cols, run_id)
        # 2. Aggregate BLEU/CHRF
        self._compute_model_metrics(reference_col, prediction_cols, metrics)
        # 3. Record avg response times for this run
        self._record_response_times(prediction_cols, run_id)
        # 4. Only after run 2, write the full report
        if run_id == 2:
            self._generate_report()

    def _validate_columns(self, reference_col, prediction_cols, keep_cols):
        missing = [col for col in [reference_col] + prediction_cols + keep_cols
                   if col not in self._data.columns]
        if missing:
            raise ValueError(f"Columns not found in data: {missing}")

    def _compute_detailed_metrics(
        self, reference_col: str, prediction_cols: List[str],
        metrics: List[str], keep_cols: List[str], run_id: int
    ) -> None:
        df = self._data.copy()
        run_suffix = f" Run {run_id}"
        response_time_cols = {m: [] for m in prediction_cols}

        for m in prediction_cols:
            # initialize metric columns on first run
            if run_id == 1:
                if "BLEU" in metrics:
                    df[f"{m} BLEU"] = None
                if "CHRF" in metrics:
                    df[f"{m} CHRF"] = None

            for idx, row in df.iterrows():
                start = time.time()
                if "BLEU" in metrics:
                    bleu_score = sentence_bleu(row[m], [row[reference_col]]).score
                    if run_id == 1:
                        df.at[idx, f"{m} BLEU"] = bleu_score
                if "CHRF" in metrics:
                    chrf_score = sentence_chrf(row[m], [row[reference_col]]).score
                    if run_id == 1:
                        df.at[idx, f"{m} CHRF"] = chrf_score
                end = time.time()
                response_time_cols[m].append(end - start)

            df[f"{m} Response Time{run_suffix}"] = response_time_cols[m]

        # build column order
        if run_id == 1:
            columns = []
            for c in keep_cols:
                columns.append(c)
            for m in prediction_cols:
                columns += [m]
                if "BLEU" in metrics:
                    columns.append(f"{m} BLEU")
                if "CHRF" in metrics:
                    columns.append(f"{m} CHRF")
                columns.append(f"{m} Response Time Run 1")
        else:
            columns = self._detailed_results.columns.tolist() + \
                      [f"{m} Response Time Run 2" for m in prediction_cols]

        self._detailed_results = df[columns]

    def _compute_model_metrics(
        self, reference_col: str, prediction_cols: List[str],
        metrics: List[str]
    ) -> None:
        rows = []
        for m in prediction_cols:
            refs = self._data[reference_col].tolist()
            hyps = self._data[m].tolist()
            res = {"Model": m}
            if "BLEU" in metrics:
                res["BLEU"] = corpus_bleu(hyps, [refs]).score
            if "CHRF" in metrics:
                res["CHRF"] = corpus_chrf(hyps, [refs]).score
            rows.append(res)
        self._model_metrics = pd.DataFrame(rows)

    def _record_response_times(self, prediction_cols: List[str], run_id: int) -> None:
        rows = []
        for m in prediction_cols:
            col = f"{m} Response Time Run {run_id}"
            times = self._detailed_results[col].tolist()
            avg = sum(times) / len(times)
            rows.append({"Model": m, "Run": run_id, "Average Response Time": avg})
        df_rt = pd.DataFrame(rows)
        self._all_response_times = pd.concat([self._all_response_times, df_rt], ignore_index=True)

    def _generate_report(self) -> None:
        with pd.ExcelWriter("translation_results.xlsx", engine='xlsxwriter') as writer:
            # Detailed Results
            self._detailed_results.to_excel(writer, sheet_name="Detailed Results", index=False)
            # Model Metrics
            self._model_metrics.to_excel(writer, sheet_name="Model Metrics", index=False)
            # Response Times
            self._all_response_times.to_excel(writer, sheet_name="Response Times", index=False)

            # Formatting for Model Metrics
            workbook  = writer.book
            ws_metrics = writer.sheets["Model Metrics"]
            (r, c) = self._model_metrics.shape
            cols = [{'header': h} for h in self._model_metrics.columns]
            ws_metrics.add_table(0, 0, r, c-1, {
                'columns': cols,
                'style': 'Table Style Medium 9'
            })
            for i, h in enumerate(self._model_metrics.columns):
                width = max(self._model_metrics[h].astype(str).map(len).max(), len(h)) + 2
                ws_metrics.set_column(i, i, width)

            # Formatting for Response Times
            ws_rt = writer.sheets["Response Times"]
            (r2, c2) = self._all_response_times.shape
            cols_rt = [{'header': h} for h in self._all_response_times.columns]
            ws_rt.add_table(0, 0, r2, c2-1, {
                'columns': cols_rt,
                'style': 'Table Style Medium 9'
            })
            for i, h in enumerate(self._all_response_times.columns):
                width = max(self._all_response_times[h].astype(str).map(len).max(), len(h)) + 2
                ws_rt.set_column(i, i, width)

    def get_detailed_results(self) -> pd.DataFrame:
        return self._detailed_results.copy()

    def get_model_metrics(self) -> pd.DataFrame:
        return self._model_metrics.copy()

    def get_response_times(self) -> pd.DataFrame:
        """Get the average response-time table for all runs"""
        return self._all_response_times.copy()


In [None]:
# run_evaluation.py

from translation_evaluator import TranslationEvaluator

def main():
    # 1) Instantiate & load your data
    evaluator = TranslationEvaluator()
    evaluator.load_data("your_translations.xlsx")  # or .csv

    # 2) Define which columns to evaluate
    models    = ["model_A", "model_B"]
    keep_cols = ["Sentence ID"]                   # any extra columns to carry through
    ref_col   = "English"

    # 3) Run twice (run_id=1 and run_id=2)
    evaluator.evaluate(
        prediction_cols=models,
        reference_col=ref_col,
        keep_cols=keep_cols,
        run_id=1
    )
    evaluator.evaluate(
        prediction_cols=models,
        reference_col=ref_col,
        keep_cols=keep_cols,
        run_id=2
    )

    # 4) (Optional) Inspect results in the console
    print("\n=== Detailed Results ===")
    print(evaluator.get_detailed_results().head(), "\n")

    print("=== Run Averages ===")
    print(evaluator.get_response_times(), "\n")

if __name__ == "__main__":
    main()


In [None]:
#Score

# batch_bertscore.py

import os
import tempfile

from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.responses import FileResponse
from pydantic import BaseModel

import pandas as pd
import evaluate

# ──────────────────────────────────────────────────────────────────────────────
# CONFIGURATION
# Path to your locally-saved roberta-large checkpoint
MODEL_PATH = os.getenv(
    "BERT",
    "/appdata/cortex/dev4/shared/libs/huggingface/roberta-large"
)

# Use GPU if CUDA_VISIBLE_DEVICES is set, else CPU
DEVICE = "cuda" if os.getenv("CUDA_VISIBLE_DEVICES") else "cpu"

# Load the 🤗 Evaluate BERTScore metric (no `from bert_score import …`)
bertscore = evaluate.load("bertscore")

# ──────────────────────────────────────────────────────────────────────────────
# FASTAPI SETUP
app = FastAPI(
    title="RoBERTa-Large BERTScore API",
    description="Compute BERTScore with your local RoBERTa-large checkpoint",
    version="1.0.0"
)

class ScoreRequest(BaseModel):
    reference: list[str]
    candidate: list[str]

class ScoreResponse(BaseModel):
    precision: list[float]
    recall:    list[float]
    f1:        list[float]
    model_type: str
    version:    str

@app.get("/")
async def health_check():
    return {
        "status": "healthy",
        "model":  f"roberta-large @ {MODEL_PATH}"
    }

@app.post("/bertscore/", response_model=ScoreResponse)
async def calculate_score(request: ScoreRequest):
    if len(request.reference) != len(request.candidate):
        raise HTTPException(
            status_code=400,
            detail="`reference` and `candidate` must be the same length"
        )

    try:
        results = bertscore.compute(
            predictions=request.candidate,
            references = request.reference,
            model_type = MODEL_PATH,
            device     = DEVICE
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"scoring error: {e}")

    return ScoreResponse(
        precision=results["precision"],
        recall=   results["recall"],
        f1=       results["f1"],
        model_type="roberta-large",
        version="1.0"
    )

@app.get("/batch-local/")
async def batch_local():
    inp = "/appdata/cortex/dev4/shobha/input_data.xlsx"
    if not os.path.exists(inp):
        raise HTTPException(status_code=404, detail=f"input not found: {inp}")
    return FileResponse(
        path=inp,
        filename=os.path.basename(inp),
        media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
    )

@app.post("/batch-bertscore/")
async def batch_bertscore(file: UploadFile = File(...)):
    # 1) Read the uploaded Excel file
    try:
        df = pd.read_excel(file.file)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"cannot read Excel: {e}")

    # 2) Validate required columns
    if "reference" not in df.columns or "candidate" not in df.columns:
        raise HTTPException(
            status_code=400,
            detail="Excel must have 'reference' and 'candidate' columns"
        )

    # 3) Compute BERTScore in batch
    try:
        results = bertscore.compute(
            predictions=df["candidate"].astype(str).tolist(),
            references = df["reference"].astype(str).tolist(),
            model_type = MODEL_PATH,
            device     = DEVICE
        )
        df["precision"] = results["precision"]
        df["recall"]    = results["recall"]
        df["f1"]        = results["f1"]
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"scoring error: {e}")

    # 4) Write results to a temporary Excel and return
    tmp = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False)
    out = tmp.name
    tmp.close()

    try:
        df.to_excel(out, index=False)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"cannot write Excel: {e}")

    return FileResponse(
        path=out,
        filename="roberta_results.xlsx",
        media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)


In [None]:
# translation_evaluator.py

import time
import pandas as pd
from sacrebleu import sentence_bleu, sentence_chrf, corpus_bleu, corpus_chrf
from typing import List, Optional, Union
import warnings
from pathlib import Path

class TranslationEvaluator:
    """
    A translation evaluation system that handles:
    - Loading data
    - Computing BLEU/CHRF per row
    - Tracking response time per row per run
    - Aggregating model metrics and average response times
    - Exporting to Excel with four sheets:
      'Detailed Results', 'Model Metrics', 'Response Times', 'Run Averages'
    """
    def __init__(self):
        warnings.simplefilter('ignore')
        self._data = None
        self._detailed_results = None
        self._model_metrics = None
        # Holds per‑run averages across runs
        self._all_response_times = pd.DataFrame(
            [], columns=["Model", "Run", "Average Response Time"]
        )

    def load_data(self, file_path: Union[str, Path]) -> None:
        file_path = Path(file_path)
        if file_path.suffix == '.csv':
            self._data = pd.read_csv(file_path)
        elif file_path.suffix in ('.xlsx', '.xls'):
            self._data = pd.read_excel(file_path)
        else:
            raise ValueError("Unsupported file format. Use .csv or .xlsx")

    def evaluate(
        self,
        prediction_cols: List[str],
        reference_col: str = "English",
        metrics: List[str] = ["BLEU", "CHRF"],
        keep_cols: Optional[List[str]] = None,
        run_id: int = 1
    ) -> None:
        if self._data is None:
            raise ValueError("No data loaded. Call load_data() first.")
        keep_cols = keep_cols or []
        self._validate_columns(reference_col, prediction_cols, keep_cols)

        # 1. Detailed per-row metrics + response times
        self._compute_detailed_metrics(
            reference_col, prediction_cols, metrics, keep_cols, run_id
        )
        # 2. Aggregate corpus‑level metrics
        self._compute_model_metrics(reference_col, prediction_cols, metrics)
        # 3. Record per‑run averages
        self._record_response_times(prediction_cols, run_id)
        # 4. After run 2, write out the full Excel report
        if run_id == 2:
            self._generate_report()

    def _validate_columns(self, reference_col, prediction_cols, keep_cols):
        missing = [
            col for col in [reference_col] + prediction_cols + keep_cols
            if col not in self._data.columns
        ]
        if missing:
            raise ValueError(f"Columns not found in data: {missing}")

    def _compute_detailed_metrics(
        self, reference_col: str, prediction_cols: List[str],
        metrics: List[str], keep_cols: List[str], run_id: int
    ) -> None:
        df = self._data.copy()
        run_suffix = f" Run {run_id}"
        response_time_cols = {m: [] for m in prediction_cols}

        for m in prediction_cols:
            # initialize score columns on first run
            if run_id == 1:
                if "BLEU" in metrics:
                    df[f"{m} BLEU"] = None
                if "CHRF" in metrics:
                    df[f"{m} CHRF"] = None

            for idx, row in df.iterrows():
                start = time.time()
                if "BLEU" in metrics:
                    bleu_score = sentence_bleu(
                        row[m], [row[reference_col]]
                    ).score
                    if run_id == 1:
                        df.at[idx, f"{m} BLEU"] = bleu_score
                if "CHRF" in metrics:
                    chrf_score = sentence_chrf(
                        row[m], [row[reference_col]]
                    ).score
                    if run_id == 1:
                        df.at[idx, f"{m} CHRF"] = chrf_score
                end = time.time()
                response_time_cols[m].append(end - start)

            # append this run's timing column
            df[f"{m} Response Time{run_suffix}"] = response_time_cols[m]

        # build or extend column order
        if run_id == 1:
            cols = []
            for c in keep_cols:
                cols.append(c)
            for m in prediction_cols:
                cols.append(m)
                if "BLEU" in metrics:
                    cols.append(f"{m} BLEU")
                if "CHRF" in metrics:
                    cols.append(f"{m} CHRF")
                cols.append(f"{m} Response Time Run 1")
        else:
            cols = self._detailed_results.columns.tolist() + [
                f"{m} Response Time Run 2" for m in prediction_cols
            ]

        self._detailed_results = df[cols]

    def _compute_model_metrics(
        self, reference_col: str, prediction_cols: List[str],
        metrics: List[str]
    ) -> None:
        rows = []
        for m in prediction_cols:
            refs = self._data[reference_col].tolist()
            hyps = self._data[m].tolist()
            entry = {"Model": m}
            if "BLEU" in metrics:
                entry["BLEU"] = corpus_bleu(hyps, [refs]).score
            if "CHRF" in metrics:
                entry["CHRF"] = corpus_chrf(hyps, [refs]).score
            rows.append(entry)
        self._model_metrics = pd.DataFrame(rows)

    def _record_response_times(
        self, prediction_cols: List[str], run_id: int
    ) -> None:
        rows = []
        for m in prediction_cols:
            col = f"{m} Response Time Run {run_id}"
            times = self._detailed_results[col].tolist()
            avg = sum(times) / len(times)
            rows.append({
                "Model": m,
                "Run": run_id,
                "Average Response Time": avg
            })
        df_rt = pd.DataFrame(rows)
        self._all_response_times = pd.concat(
            [self._all_response_times, df_rt], ignore_index=True
        )

    def _generate_report(self) -> None:
        # also build the Run Averages sheet
        run_avgs = (
            self._all_response_times
                .pivot(index="Model", columns="Run", values="Average Response Time")
                .reset_index()
        )
        run_avgs["Overall Avg (s)"] = run_avgs[[1, 2]].mean(axis=1)

        with pd.ExcelWriter("translation_results.xlsx", engine='xlsxwriter') as writer:
            # 1) Detailed Results
            self._detailed_results.to_excel(
                writer, sheet_name="Detailed Results", index=False
            )
            # 2) Corpus‑level Metrics
            self._model_metrics.to_excel(
                writer, sheet_name="Model Metrics", index=False
            )
            # 3) Per‑run averages
            self._all_response_times.to_excel(
                writer, sheet_name="Response Times", index=False
            )
            # 4) Overall Run Averages
            run_avgs.to_excel(writer, sheet_name="Run Averages", index=False)

            # (Optional) add table formatting for each sheet...


In [None]:
# run_evaluation.py

from translation_evaluator import TranslationEvaluator

def main():
    evaluator = TranslationEvaluator()
    evaluator.load_data("your_translations.xlsx")  # or .csv

    models    = ["model_A", "model_B"]   # your prediction columns
    keep_cols = ["Sentence ID"]          # any extra columns to carry through
    ref_col   = "English"

    # Run 1
    evaluator.evaluate(
        prediction_cols=models,
        reference_col=ref_col,
        keep_cols=keep_cols,
        run_id=1
    )
    # Run 2
    evaluator.evaluate(
        prediction_cols=models,
        reference_col=ref_col,
        keep_cols=keep_cols,
        run_id=2
    )

    # (Optional) print to console
    print("=== Detailed Results ===")
    print(evaluator.get_detailed_results().head(), "\n")
    print("=== Run Averages ===")
    print(evaluator.get_response_times(), "\n")

if __name__ == "__main__":
    main()

    #pip install pandas sacrebleu xlsxwriter
