# Code Improvement + Test Runner Workflow

Este notebook implementa un flujo completo en **Python** para:
1) Respaldar archivos de código, leer su contenido y enviarlo a una API de mejora.
2) Ejecutar pruebas `unittest` asociadas y agregar los resultados **en la misma fila** del DataFrame.

## Entrada esperada
Una lista de objetos con la forma:
```python
items = [
    {"file": "path/al/archivo.py", "test": "ruta/al/test_module_sin_extension", "iterations": 3},
    {"file": "otro/archivo.py", "test": "otro/test_module", "iterations": 1},
]
```
- **file**: ruta al archivo de código a mejorar (incluye `.py`).
- **test**: ruta al *módulo de pruebas* **sin** la extensión `.py` (por ejemplo `src/excercise1-fibonacci/fibonacci_test`).
- **iterations** *(opcional)*: número de veces que se envía el mismo archivo a la API. Cada envío genera **una fila** en el DataFrame.

## Salida
Un `DataFrame` con una fila por (archivo, test, iteración) incluyendo:
- Metadatos del archivo, respaldo y contenido original.
- Código mejorado y análisis devueltos por la API.
- Métricas antes/después si la API las provee.
- Resultados de pruebas (tests pasados/total, % éxito, tiempo, etc.).

---
⚙️ **Nota**: Configura `API_URL` con la URL de tu servicio (por defecto `http://127.0.0.1:8000/improve`).


In [30]:
# Imports
import os
import shutil
import json
from pathlib import Path
from datetime import datetime
import requests
import pandas as pd
import unittest
import sys
import time
import importlib
from io import StringIO
import contextlib
import inspect

API_URL = 'http://127.0.0.1:8000/improve'  # Cambia si tu API vive en otra URL
BACKUP_DIR = Path('backups')
BACKUP_DIR.mkdir(parents=True, exist_ok=True)


## Utilidades: respaldo de archivos y helpers


In [31]:
def backup_file(src_path: str | Path, backup_root: Path = BACKUP_DIR) -> Path:
    """Crea un respaldo timestamped del archivo y devuelve la ruta del respaldo."""
    src_path = Path(src_path)
    if not src_path.exists():
        raise FileNotFoundError(f"No existe el archivo: {src_path}")
    ts = datetime.now().strftime('%Y%m%d-%H%M%S-%f')
    dst_name = f"{src_path.stem}__{ts}{src_path.suffix}"
    dst_path = backup_root / dst_name
    shutil.copy2(src_path, dst_path)
    return dst_path

def read_text_file(path: str | Path) -> str:
    with open(path, 'r', encoding='utf-8') as f:
        return f.read()


In [32]:
def restore_from_backup(backup_path: str, original_path: str) -> None:
    """Restaura un archivo original desde un backup específico."""
    from pathlib import Path
    import shutil
    bkp = Path(backup_path)
    orig = Path(original_path)
    if not bkp.exists():
        print(f"[WARN] Backup no encontrado: {bkp}")
        return
    try:
        shutil.copy2(bkp, orig)
    except Exception as e:
        print(f"[WARN] No se pudo restaurar {orig} desde {bkp}: {e}")


## Paso 1: Envío a API de mejora y construcción de DataFrame (por iteración)
Esta celda sigue tu contrato de API tal como fue provisto.

In [33]:
def improve_one_file(file_path: str, test_module: str, iteration: int, api_url: str = API_URL) -> dict:
    """
    Respaldar, leer y enviar un archivo a la API de mejora.
    ⚠️ Ahora REEMPLAZA el archivo original con el `improved_code` retornado por la API.
    Devuelve un diccionario listo para ser convertido a fila de DataFrame.
    """
    # 1) Respaldo y lectura
    backup_path = backup_file(file_path)
    code_content = read_text_file(file_path)
    filename = os.path.basename(file_path)

    # 2) Envío a la API
    headers = {'Content-Type': 'application/json'}
    payload = {'Code': code_content}
    resp = requests.post(api_url, headers=headers, json=payload)

    # 3) Procesamiento de la respuesta según tu contrato
    if resp.status_code == 200:
        data = resp.json()
        improved_code = data.get('Code')

        # === NUEVO: escribir el código mejorado al archivo original ===
        if isinstance(improved_code, str) and improved_code.strip():
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(improved_code)

        row = {
            'code_file': filename,
            'code_file_path': file_path,
            'test_file': test_module,
            'iteration': iteration,
            'backup_path': str(backup_path),
            'original_code': code_content,
            'improved_code': improved_code,
            'analysis': data.get('Analisis'),
        }
        # Métricas opcionales
        metrics = data.get('metrics')
        if metrics and isinstance(metrics, dict):
            before = metrics.get('before') or {}
            after = metrics.get('after') or {}
            row.update({
                'before_method_number': before.get('method_number'),
                'before_ifs': before.get('number_of_ifs'),
                'before_loops': before.get('number_of_loops'),
                'before_cyclomatic_complexity': before.get('cyclomatic_complexity'),
                'before_avg_method_size': before.get('average_method_size'),
                'after_method_number': after.get('method_number'),
                'after_ifs': after.get('number_of_ifs'),
                'after_loops': after.get('number_of_loops'),
                'after_cyclomatic_complexity': after.get('cyclomatic_complexity'),
                'after_avg_method_size': after.get('average_method_size'),
            })
        # Contexto opcional
        if 'RetrievedContext' in data:
            row['retrieved_context'] = json.dumps(data['RetrievedContext'])
    else:
        row = {
            'code_file': filename,
            'code_file_path': file_path,
            'test_file': test_module,
            'iteration': iteration,
            'backup_path': str(backup_path),
            'error': f"API Error: {resp.status_code}",
            'error_details': resp.text,
        }

    return row


## Paso 2: Ejecutar pruebas `unittest` y agregar resultados a las filas (mismo (file, test, iteration))
La lógica está basada en tu bloque original, adaptada para conservar la clave `(code_file_path, test_file, iteration)`.

In [34]:
def _run_tests_for_file(test_module_path: str, iteration: int = 1) -> pd.DataFrame:
    """Ejecuta unittest para un *módulo* de pruebas (sin .py)."""
    file_dir = os.path.dirname(test_module_path)
    module_name = os.path.basename(test_module_path)

    if file_dir and file_dir not in sys.path:
        sys.path.append(file_dir)

    try:
        test_module = importlib.import_module(module_name)

        test_classes = []
        for name, obj in inspect.getmembers(test_module):
            if inspect.isclass(obj) and issubclass(obj, unittest.TestCase) and obj != unittest.TestCase:
                test_classes.append(obj)

        if not test_classes:
            return pd.DataFrame({
                'test_file': [test_module_path],
                'iteration': [iteration],
                'tests': ['0/0'],
                'percentage_of_success': [0.0],
                'execution_time': [0.0]
            })

        all_results = []
        total_passed = 0
        total_tests = 0
        total_time = 0

        for test_class in test_classes:
            class TimedTestCase(test_class):
                def run(self, result=None):
                    start_time = time.time()
                    super().run(result)
                    self.execution_time = time.time() - start_time

            class TestResultCollector(unittest.TextTestResult):
                def __init__(self, *args, **kwargs):
                    super().__init__(*args, **kwargs)
                    self.test_results = []

                def addSuccess(self, test):
                    super().addSuccess(test)
                    self.test_results.append({
                        'test_name': test.id().split('.')[-1],
                        'status': 'PASS',
                        'execution_time': getattr(test, 'execution_time', 0),
                        'error_message': None
                    })

                def addFailure(self, test, err):
                    super().addFailure(test, err)
                    self.test_results.append({
                        'test_name': test.id().split('.')[-1],
                        'status': 'FAIL',
                        'execution_time': getattr(test, 'execution_time', 0),
                        'error_message': str(err[1])
                    })

                def addError(self, test, err):
                    super().addError(test, err)
                    self.test_results.append({
                        'test_name': test.id().split('.')[-1],
                        'status': 'ERROR',
                        'execution_time': getattr(test, 'execution_time', 0),
                        'error_message': str(err[1])
                    })

            suite = unittest.TestLoader().loadTestsFromTestCase(TimedTestCase)

            output = StringIO()
            with contextlib.redirect_stdout(output):
                runner = unittest.TextTestRunner(resultclass=TestResultCollector)
                result = runner.run(suite)

            class_results = result.test_results
            all_results.extend(class_results)

            class_passed = len([r for r in class_results if r['status'] == 'PASS'])
            class_total = len(class_results)
            class_time = sum(r['execution_time'] for r in class_results)

            total_passed += class_passed
            total_tests += class_total
            total_time += class_time

        success_percentage = (total_passed / total_tests * 100) if total_tests > 0 else 0.0

        summary_df = pd.DataFrame({
            'test_file': [test_module_path],
            'iteration': [iteration],
            'tests': [f"{total_passed}/{total_tests}"],
            'percentage_of_success': [success_percentage],
            'execution_time': [total_time]
        })
        return summary_df

    except Exception as e:
        return pd.DataFrame({
            'test_file': [test_module_path],
            'iteration': [iteration],
            'tests': ['ERROR'],
            'percentage_of_success': [0.0],
            'execution_time': [0.0],
            'error': [str(e)]
        })

def run_tests_for_items(items: list[dict]) -> pd.DataFrame:
    rows = []
    for it in items:
        test_module = it['test']
        iterations = int(it.get('iterations', 1))
        for k in range(1, iterations + 1):
            rows.append(_run_tests_for_file(test_module, k))
    return pd.concat(rows, ignore_index=True) if rows else pd.DataFrame()


## Orquestador: une Paso 1 + Paso 2 en un único DataFrame
Se hace un `merge` por `(test_file, iteration)` (y se conserva `code_file_path`) para garantizar que cada envío/iteración tenga los resultados de pruebas correspondientes.

In [35]:
def run_full_workflow(items: list[dict], api_url: str = API_URL) -> pd.DataFrame:
    """
    Ejecuta el flujo por **iteración**: mejora + escribe, ejecuta tests, MERGE de resultados,
    y **restaura el archivo original al final de cada iteración** usando el backup de esa iteración.
    """
    merged_rows = []
    for it in items:
        file_path = it['file']
        test_module = it['test']
        iterations = int(it.get('iterations', 1))
        for k in range(1, iterations + 1):
            # Paso 1 (iteración k): mejora y escritura del código
            improve_row = improve_one_file(file_path, test_module, k, api_url)

            # Paso 2 (iteración k): correr tests para este test_module/k
            tests_df = _run_tests_for_file(test_module, k)

            # Fusionar en una sola fila
            import pandas as _pd
            improve_df_k = _pd.DataFrame([improve_row])
            merged_k = improve_df_k.merge(
                tests_df,
                on=['test_file', 'iteration'],
                how='left',
                suffixes=('', '_test')
            )
            merged_rows.append(merged_k)

            # Restaurar inmediatamente al finalizar la iteración
            try:
                if 'backup_path' in improve_row and improve_row.get('backup_path'):
                    restore_from_backup(improve_row['backup_path'], improve_row['code_file_path'])
            except Exception as e:
                print('[WARN] Falló la restauración por iteración:', e)

    # Concatenar todas las filas y ordenar columnas
    if not merged_rows:
        return pd.DataFrame()
    out = pd.concat(merged_rows, ignore_index=True)
    preferred_cols = [
        'code_file', 'code_file_path', 'backup_path', 'test_file', 'iteration',
        'tests', 'percentage_of_success', 'execution_time',
        'original_code', 'improved_code', 'analysis', 'retrieved_context',
        'before_method_number', 'before_ifs', 'before_loops', 'before_cyclomatic_complexity', 'before_avg_method_size',
        'after_method_number', 'after_ifs', 'after_loops', 'after_cyclomatic_complexity', 'after_avg_method_size',
        'error', 'error_details'
    ]
    for col in preferred_cols:
        if col not in out.columns:
            out[col] = None
    return out[preferred_cols]


## Ejemplo de uso
Ajusta las rutas de `items` a tus archivos locales. Recuerda que `test` **no** lleva `.py`.

In [36]:
# Ejemplo (ajusta a tu proyecto):
items = [
    {
        'file': 'src/excercise1-fibonacci/fibonacci.py',
        'test': 'src/excercise1-fibonacci/fibonacci_test',
        'iterations': 3
    }
]

# Ejecuta el workflow completo
try:
    df = run_full_workflow(items, api_url=API_URL)
    display(df)
except Exception as e:
    print('[WARN] No se pudo ejecutar el flujo completo aquí (por ejemplo, si la API no está disponible).')
    print('Error:', e)


.........
----------------------------------------------------------------------
Ran 9 tests in 0.003s

OK
.........
----------------------------------------------------------------------
Ran 9 tests in 0.012s

OK
.........
----------------------------------------------------------------------
Ran 9 tests in 0.013s

OK


Unnamed: 0,code_file,code_file_path,backup_path,test_file,iteration,tests,percentage_of_success,execution_time,original_code,improved_code,...,before_loops,before_cyclomatic_complexity,before_avg_method_size,after_method_number,after_ifs,after_loops,after_cyclomatic_complexity,after_avg_method_size,error,error_details
0,fibonacci.py,src/excercise1-fibonacci/fibonacci.py,backups/fibonacci__20251018-185321-867960.py,src/excercise1-fibonacci/fibonacci_test,1,9/9,100.0,0,"def f(a=0, b=1, n=10, x=None, c=True, *args, *...",import functools\nimport logging\n\nlogging.ba...,...,3,12,38.0,1,5,1,7,26.0,,
1,fibonacci.py,src/excercise1-fibonacci/fibonacci.py,backups/fibonacci__20251018-185336-502094.py,src/excercise1-fibonacci/fibonacci_test,2,9/9,100.0,0,"def f(a=0, b=1, n=10, x=None, c=True, *args, *...",import logging\n\nlogging.basicConfig(level=lo...,...,3,12,38.0,1,4,1,7,25.0,,
2,fibonacci.py,src/excercise1-fibonacci/fibonacci.py,backups/fibonacci__20251018-185349-293261.py,src/excercise1-fibonacci/fibonacci_test,3,9/9,100.0,0,"def f(a=0, b=1, n=10, x=None, c=True, *args, *...",import logging\nfrom dataclasses import datacl...,...,3,12,38.0,2,4,0,6,14.0,,


## Guardar resultados (opcional)


In [37]:
# Descomenta para guardar a CSV
# if 'df' in globals() and isinstance(df, pd.DataFrame):
#     df.to_csv('improvement_and_tests_results.csv', index=False)
#     print('Resultados guardados en improvement_and_tests_results.csv')
