In [None]:
import pandas as pd
import subprocess
import tempfile
import os
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

# Read the parquet file
df = pd.read_parquet('r_code_output_smaller_model.parquet')

successful_tasks = []
failed_tasks = []

print(f"Starting tests for {len(df)} tasks...")
print(f"{'='*80}\n")

def run_r_test(idx, task_id, r_code, r_test):
    full_code = f"{r_code}\n\n{r_test}"
    with tempfile.NamedTemporaryFile(mode='w', suffix='.R', delete=False) as f:
        f.write(full_code)
        temp_file = f.name

    try:
        result = subprocess.run(
            ['Rscript', temp_file],    # ✅ Run using Rscript
            capture_output=True,
            text=True,
            timeout=30
        )

        if result.returncode == 0:
            return ("success", task_id, idx, result.stdout, result.stderr)
        else:
            return ("failed", task_id, idx, result.stdout, result.stderr, result.returncode)

    except subprocess.TimeoutExpired:
        return ("timeout", task_id, idx, '', 'Test exceeded 30 second timeout', 'TIMEOUT')

    except Exception as e:
        return ("exception", task_id, idx, '', str(e), 'EXCEPTION')

    finally:
        if os.path.exists(temp_file):
            os.remove(temp_file)


# Number of threads — adjust as needed
MAX_WORKERS = 8

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    future_to_task = {
        executor.submit(run_r_test, idx, row['task_id'], row['r_code'], row['r_test']): idx
        for idx, row in df.iterrows()
    }

    for future in as_completed(future_to_task):
        idx = future_to_task[future]
        try:
            status, task_id, row_index, stdout, stderr, *extra = future.result()
            if status == "success":
                print(f"✅ SUCCESS: {task_id} (Row {row_index + 1}/{len(df)})")
                successful_tasks.append(task_id)
            elif status == "failed":
                return_code = extra[0]
                print(f"❌ FAILED: {task_id} (Row {row_index + 1})")
                print(f"   Error output: {stderr[:200]}...")
                failed_tasks.append({
                    'task_id': task_id,
                    'row_index': row_index,
                    'return_code': return_code,
                    'stderr': stderr,
                    'stdout': stdout
                })
            elif status == "timeout":
                print(f"⏱️ TIMEOUT: {task_id}")
                failed_tasks.append({
                    'task_id': task_id,
                    'row_index': row_index,
                    'return_code': 'TIMEOUT',
                    'stderr': stderr,
                    'stdout': stdout
                })
            else:
                print(f"💥 ERROR: {task_id} - {stderr}")
                failed_tasks.append({
                    'task_id': task_id,
                    'row_index': row_index,
                    'return_code': 'EXCEPTION',
                    'stderr': stderr,
                    'stdout': stdout
                })

        except Exception as e:
            print(f"💥 ERROR in future: {e}")

# Print summary
print(f"\n{'='*80}")
print("SUMMARY")
print(f"{'='*80}")
print(f"Total tasks: {len(df)}")
print(f"✅ Successful: {len(successful_tasks)}")
print(f"❌ Failed: {len(failed_tasks)}")
print(f"Success rate: {len(successful_tasks)/len(df)*100:.2f}%")

if failed_tasks:
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    failed_df = pd.DataFrame(failed_tasks)
    failed_df.to_csv(f'failed_r_tasks_{timestamp}.csv', index=False)
    print(f"\n❌ Failed tasks saved to: failed_r_tasks_{timestamp}.csv")

    print("\nFailed task IDs:")
    for task in failed_tasks:
        print(f"  - {task['task_id']} (row {task['row_index']})")
else:
    print("\n🎉 All tasks passed!")

print(f"\n{'='*80}")


In [None]:
import pandas as pd
import glob
from datetime import datetime

# Read the original parquet file
df = pd.read_parquet('r_dataset.parquet')
print(f"Original dataset: {len(df)} rows")

failed_files = glob.glob('failed_r_tasks_*.csv')

if not failed_files:
    print("\n❌ No failed_tasks CSV file found!")
    print("Please run the test script first to generate failed tasks.")
    exit()

# Get the most recent file
latest_failed_file = max(failed_files, key=lambda x: x.split('_')[2])
print(f"\nUsing failed tasks from: {latest_failed_file}")

# Read the failed tasks
failed_df = pd.read_csv(latest_failed_file)
failed_task_ids = failed_df['task_id'].tolist()

print(f"Failed tasks to remove: {len(failed_task_ids)}")
print(f"\nFailed task IDs:")
for task_id in failed_task_ids:
    print(f"  - {task_id}")

# Remove failed tasks
df_filtered = df[~df['task_id'].isin(failed_task_ids)]

print(f"\n{'='*80}")
print(f"Original rows: {len(df)}")
print(f"Failed rows removed: {len(failed_task_ids)}")
print(f"Remaining rows: {len(df_filtered)}")
print(f"{'='*80}")

# Save the filtered dataset
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f'cleaned_file_passed_only_{timestamp}.parquet'
df_filtered.to_parquet(output_file, index=False)

print(f"\n✅ Filtered dataset saved to: {output_file}")
print(f"Success rate: {len(df_filtered)/len(df)*100:.2f}%")