In [1]:
import abc
import asyncio
from collections import defaultdict
import json
import os
import re
from typing import Union

import aiohttp
import datasets
from dotenv import load_dotenv
import numpy as np
from sentence_transformers import SentenceTransformer
from tenacity import (
    AsyncRetrying,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential,
)
import torch
from tqdm.notebook import tqdm
from e2b_code_interpreter import Sandbox
from e2b import TimeoutException

load_dotenv()

True

In [None]:
dataset = datasets.load_dataset("hkust-nlp/CodeIO-PyEdu-Reasoning")['train']

## Extract the relevant parts of the prompt

In [None]:
pattern = re.compile(
    r'(?s)'  # DOTALL so . matches newlines
    r'You are given a question that requires some input and output variables as follows:\s*(.*?)'
    r'\s*The input and output requirements are as follows:\s*(.*?)'
    r'\s*Given the following.*?Tip: Here is a reference code snippet for this question\. '
    r'You can refer to this code to guide your reasoning but not copy spans of code directly\.\s*(.*)'
)

seen = set()
duplicate = 0

with open("data/codeio-pyedu-extracted.jsonl", "w+") as f:
    for i, item in tqdm(enumerate(dataset), total=len(dataset)):
        match = pattern.search(item["prompt"])
        if match:
            # Extract relevant info
            task_description = match.group(1).strip()
            input_output_spec = match.group(2).strip()
            code_sample = match.group(3).strip()

            # Check if code sample is unique
            hash_entry = f"{hash(task_description)}-{hash(input_output_spec)}-{hash(code_sample)}"
            if hash_entry in seen:
                duplicate += 1
                continue
            seen.add(hash_entry)

            # Save to disk
            json.dump({
                "task_description": task_description,
                "input_output_spec": input_output_spec,
                "code_sample": code_sample
            }, f)
            f.write("\n")
        else:
            print(f"No match found for item {i}")

print(f"There were {duplicate} out of {len(dataset)} duplicate entries")

## Subsample the data

In [None]:
class IdentitySampler:
    def run(
        self, features: Union[torch.Tensor, np.ndarray]
    ) -> Union[torch.Tensor, np.ndarray]:
        return features


class BaseSampler(abc.ABC):
    def __init__(self, percentage: float):
        if not 0 < percentage < 1:
            raise ValueError("Percentage value not in (0, 1).")
        self.percentage = percentage

    @abc.abstractmethod
    def run(
        self, features: Union[torch.Tensor, np.ndarray]
    ) -> Union[torch.Tensor, np.ndarray]:
        pass

    def _store_type(self, features: Union[torch.Tensor, np.ndarray]) -> None:
        self.features_is_numpy = isinstance(features, np.ndarray)
        if not self.features_is_numpy:
            self.features_device = features.device

    def _restore_type(self, features: torch.Tensor) -> Union[torch.Tensor, np.ndarray]:
        if self.features_is_numpy:
            return features.cpu().numpy()
        return features.to(self.features_device)


class GreedyCoresetSampler(BaseSampler):
    def __init__(
        self,
        percentage: float,
        device: torch.device,
        dtype: torch.dtype = torch.float32,
        dimension_to_project_features_to=128,
    ):
        """Greedy Coreset sampling base class."""
        super().__init__(percentage)

        self.device = device
        self.dtype = dtype
        self.dimension_to_project_features_to = dimension_to_project_features_to

    def _reduce_features(self, features):
        if features.shape[1] == self.dimension_to_project_features_to:
            return features
        mapper = torch.nn.Linear(
            features.shape[1], self.dimension_to_project_features_to, bias=False, dtype=self.dtype,
        )
        _ = mapper.to(self.device)
        features = features.to(self.device)
        return mapper(features)

    def run(
        self, features: Union[torch.Tensor, np.ndarray]
    ) -> Union[torch.Tensor, np.ndarray]:
        """Subsamples features using Greedy Coreset.

        Args:
            features: [N x D]
        """
        if self.percentage == 1:
            return features
        self._store_type(features)
        if isinstance(features, np.ndarray):
            features = torch.from_numpy(features)
        reduced_features = self._reduce_features(features)
        sample_indices = self._compute_greedy_coreset_indices(reduced_features)
        return sample_indices

    @staticmethod
    def _compute_batchwise_differences(
        matrix_a: torch.Tensor, matrix_b: torch.Tensor
    ) -> torch.Tensor:
        """Computes batchwise Euclidean distances using PyTorch."""
        a_times_a = matrix_a.unsqueeze(1).bmm(matrix_a.unsqueeze(2)).reshape(-1, 1)
        b_times_b = matrix_b.unsqueeze(1).bmm(matrix_b.unsqueeze(2)).reshape(1, -1)
        a_times_b = matrix_a.mm(matrix_b.T)

        return (-2 * a_times_b + a_times_a + b_times_b).clamp(0, None).sqrt()

    def _compute_greedy_coreset_indices(self, features: torch.Tensor) -> np.ndarray:
        """Runs iterative greedy coreset selection.

        Args:
            features: [NxD] input feature bank to sample.
        """
        distance_matrix = self._compute_batchwise_differences(features, features)
        coreset_anchor_distances = torch.norm(distance_matrix, dim=1)

        coreset_indices = []
        num_coreset_samples = int(len(features) * self.percentage)

        for _ in range(num_coreset_samples):
            select_idx = torch.argmax(coreset_anchor_distances).item()
            coreset_indices.append(select_idx)

            coreset_select_distance = distance_matrix[
                :, select_idx : select_idx + 1  # noqa E203
            ]
            coreset_anchor_distances = torch.cat(
                [coreset_anchor_distances.unsqueeze(-1), coreset_select_distance], dim=1
            )
            coreset_anchor_distances = torch.min(coreset_anchor_distances, dim=1).values

        return torch.tensor(coreset_indices, device=features.device, dtype=torch.int64)


class ApproximateGreedyCoresetSampler(GreedyCoresetSampler):
    def __init__(
        self,
        percentage: float,
        device: torch.device,
        dtype: torch.dtype = torch.float32,
        number_of_starting_points: int = 10,
        dimension_to_project_features_to: int = 128,
    ):
        """Approximate Greedy Coreset sampling base class."""
        self.number_of_starting_points = number_of_starting_points
        super().__init__(percentage, device, dtype, dimension_to_project_features_to)

    def _compute_greedy_coreset_indices(self, features: torch.Tensor) -> np.ndarray:
        """Runs approximate iterative greedy coreset selection.

        This greedy coreset implementation does not require computation of the
        full N x N distance matrix and thus requires a lot less memory, however
        at the cost of increased sampling times.

        Args:
            features: [NxD] input feature bank to sample.
        """
        number_of_starting_points = np.clip(
            self.number_of_starting_points, None, len(features)
        )
        start_points = np.random.choice(
            len(features), number_of_starting_points, replace=False
        ).tolist()

        approximate_distance_matrix = self._compute_batchwise_differences(
            features, features[start_points]
        )
        approximate_coreset_anchor_distances = torch.mean(
            approximate_distance_matrix, axis=-1
        ).reshape(-1, 1)
        coreset_indices = []
        num_coreset_samples = int(len(features) * self.percentage)

        with torch.no_grad():
            for _ in tqdm.tqdm(range(num_coreset_samples), desc="Subsampling..."):
                select_idx = torch.argmax(approximate_coreset_anchor_distances).item()
                coreset_indices.append(select_idx)
                coreset_select_distance = self._compute_batchwise_differences(
                    features, features[select_idx : select_idx + 1]  # noqa: E203
                )
                approximate_coreset_anchor_distances = torch.cat(
                    [approximate_coreset_anchor_distances, coreset_select_distance],
                    dim=-1,
                )
                approximate_coreset_anchor_distances = torch.min(
                    approximate_coreset_anchor_distances, dim=1
                ).values.reshape(-1, 1)

        return torch.tensor(coreset_indices, device=features.device, dtype=torch.int64)


class RandomSampler(BaseSampler):
    def __init__(self, percentage: float):
        super().__init__(percentage)

    def run(
        self, features: Union[torch.Tensor, np.ndarray]
    ) -> Union[torch.Tensor, np.ndarray]:
        """Randomly samples input feature collection.

        Args:
            features: [N x D]
        """
        num_random_samples = int(len(features) * self.percentage)
        subset_indices = np.random.choice(
            len(features), num_random_samples, replace=False
        )
        return torch.tensor(subset_indices, device=features.device, dtype=torch.int64)

In [None]:
# I ran this cell on Google Colab because I don't have a GPU on my local machine,
# hence why you see the Google Drive paths

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SentenceTransformer("nomic-ai/modernbert-embed-base")
print(model)

def get_entry_info(entry) -> str:
  return entry['task_description']

def get_embeddings(text) -> torch.Tensor:
  return torch.from_numpy(model.encode(text)).to(torch.bfloat16)

embeddings = []

with open("./drive/MyDrive/reasoning-gym/codeio-pyedu-extracted.jsonl") as f:
  for line in tqdm(f):
    entry = json.loads(line)
    entry_info = get_entry_info(entry)
    embeddings.append(get_embeddings(entry_info))

embeddings = torch.stack(embeddings).to(torch.bfloat16).to(device)
print(embeddings.shape)

sampler = ApproximateGreedyCoresetSampler(
    percentage=0.05, 
    device=device, 
    dtype=torch.bfloat16,
    dimension_to_project_features_to=768,
)
subsampled = sampler.run(embeddings)

indices = set(subsampled.cpu().tolist())
with open("./drive/MyDrive/reasoning-gym/codeio-pyedu-extracted.jsonl", "r") as f_in, \
  open("./drive/MyDrive/reasoning-gym/codeio-pyedu-best-coverage.jsonl", "w+") as f_out:
  for i, line in enumerate(f_in):
    if i in indices:
      f_out.write(line)

## Create input generators for each problem separately

In [None]:
SYSTEM_PROMPT = """You are a helpful assistant that generates valid Python functions that act as input generators for a given code snippet.

You have access to `random.Random`, therefore you SHOULD NOT import it again. You should use this random number generator to make the input generation process stochastic on each call.

When the user asks you to generate an input for a code snippet, you should strictly respond in the following format:
<function>
def generate_input(rng: Random) -> dict:
    # Your code here
    pass
</function>

The output of the function should be a dictionary where the keys are the variable names and the values are the generated values.

It must contain all the variables that listed in the user's input specification, or more precisely in the `main_solution` function signature. 
"""

USER_PROMPT = """Following are a task description, input/output specification, and relevant code snippet for a Python programming task.

<task_description>
{task_description}
</task_description>

<input_output_spec>
{input_output_spec}
</input_output_spec>

<code_sample>
{code_sample}
</code_sample>

Your task is to write a Python function `def generate_input(rng: Random) -> dict:` that generates valid inputs for the given code snippet, based on the provided information.
"""

# We'll control concurrency with a semaphore
CONCURRENCY_LIMIT = 10
sem = asyncio.Semaphore(CONCURRENCY_LIMIT)

async def fetch_input_generator(session: aiohttp.ClientSession, entry: dict) -> dict:
    """
    Sends a POST request to OpenRouter with the system & user prompts,
    extracts the function from the response, and returns the updated entry.
    """
    url = "https://openrouter.ai/api/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {os.getenv('OPENROUTER_API_KEY')}",
        "Content-Type": "application/json",
    }

    payload = {
        "model": "deepseek/deepseek-chat",
        "messages": [
            {"role": "system", "content": SYSTEM_PROMPT},
            {
                "role": "user",
                "content": USER_PROMPT.format(**entry)
            },
        ],
    }

    async with sem:
        async for attempt in AsyncRetrying(
            stop=stop_after_attempt(5),
            wait=wait_exponential(multiplier=1, min=1, max=60),
            retry=retry_if_exception_type(
                (aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError, ValueError)
            ),
        ):
            with attempt:
                async with session.post(url, headers=headers, json=payload) as response:
                    data = await response.json()

                    # Basic checks for valid response
                    if "choices" not in data or not data["choices"]:
                        print("No choices found in response")
                        return entry

                    content = data["choices"][0]["message"]["content"]
                    match = re.search(r"<function>(.*?)</function>", content, re.DOTALL)
                    if not match:
                        print("Could not find <function>...</function> block in response")
                        return entry

                    input_generator = match.group(1).strip()
                    entry["input_generator"] = input_generator
                    return entry

    # If we exit the loop without returning, raise Exception
    raise Exception("Failed to get valid input generator after retries")

async def process_file(input_file: str, output_file: str):
    """
    Reads each line from `input_file`, processes each entry concurrently,
    and writes augmented entries to `output_file`.
    """
    # Read all lines first (synchronously)
    with open(input_file, "r") as f_in:
        lines = f_in.readlines()

    tasks = []
    async with aiohttp.ClientSession() as session:
        # Create a task for each line/entry
        for line in lines:
            entry = json.loads(line)
            tasks.append(asyncio.create_task(fetch_input_generator(session, entry)))

        # We'll gather results while showing progress
        results = []
        for t in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
            result = await t
            results.append(result)

    # Write all results out
    with open(output_file, "w") as f_out:
        for res in results:
            f_out.write(json.dumps(res))
            f_out.write("\n")

# Finally, run the entire pipeline
await process_file(
    input_file="data/codeio-pyedu-best-coverage.jsonl",
    output_file="data/codeio-pyedu-with-input-generator.jsonl"
)

## Filter out invalid input generators

If you want to install a template with custom package

https://e2b.dev/docs/quickstart/install-custom-packages

An example e2b.Dockerfile looks like this:

```Dockerfile
FROM e2bdev/code-interpreter:latest

RUN pip install numpy matplotlib scipy pandas scikit-learn sympy networkx requests pillow bs4 cryptography spacy numba pyyaml regex
```

However, I am going with the default installed libraries: https://e2b.dev/docs/code-interpreting/analyze-data-with-ai/pre-installed-libraries 

In [None]:
# Example usage of the Sandbox class
with Sandbox() as sandbox:

    # First initialize the sandbox
    execution = sandbox.run_code("""
from random import Random   # <----- ALWAYS PREPEND THIS LINE TO YOUR CODE SNIPPET

def hello_world():
    return {"a": 5, "b": 10}

def multiple_hello_worlds(rng: Random):
    return [
        {"a": rng.randint(1, 10), "b": rng.randint(10, 20)},
        {"a": 10, "b": 20},
    ]
"""
    )
    try:
        # Run the code snippet
        execution = sandbox.run_code("rng = Random(53);multiple_hello_worlds(rng)", timeout=5)
        print(execution)
        if execution.error:
            print("[!! FOUND ERROR !!]")
        else:
            print(type(execution.text))
            print(execution.text)
    except TimeoutException as e:
        print(e)


In [2]:
CODE_TEMPLATE = """from random import Random
{code_sample}

{input_generator}

def multiple_eval(num_generations: int, seed: int = 42) -> tuple:
    rng = Random(seed)
    inputs = [generate_input(rng) for _ in range(num_generations)]
    outputs = [main_solution(**inp) for inp in inputs]
    return inputs, outputs
"""

SAMPLING_TEMPLATE = "multiple_eval({num_generations})"

WARMUP_GENERATIONS = 5
TOTAL_GENERATIONS = 1_000
TIMEOUT_CODE_INIT = 10
TIMEOUT_PER_SAMPLE = 0.01

errors = defaultdict(int)
total_entries = sum(1 for _ in open("data/codeio-pyedu-with-input-generator.jsonl", "r"))

with open("data/codeio-pyedu-with-input-generator.jsonl", "r") as f_in, \
    open("data/codeio-pyedu-with-input-generator-filtered.jsonl", "w+") as f_out:

    iterator = tqdm(enumerate(f_in), total=total_entries)

    for i, line in iterator:
        iterator.set_description(f"Failures: " + " | ".join(f"{k}: {v}" for k, v in errors.items()) + f" | total: {sum(errors.values())}")
        entry = json.loads(line)

        if not "input_generator" in entry:
            errors["missing_input_generator"] += 1
            continue
        
        with Sandbox() as sandbox:
            # 1. Initialize the sandbox
            try: 
                execution = sandbox.run_code(
                    code=CODE_TEMPLATE.format(**entry), 
                    timeout=TIMEOUT_CODE_INIT
                )
                assert not execution.error, "Error in code snippet"
            except Exception as e:
                errors["cannot_initialize_code"] += 1
                continue
            
            # 2. Warmup the sampling
            try:
                execution = sandbox.run_code(
                    code=SAMPLING_TEMPLATE.format(num_generations=WARMUP_GENERATIONS),
                    timeout=TIMEOUT_CODE_INIT,
                )
                assert not execution.error, "Error in input generator (warmup)"
                assert execution.text, "Empty input generator output (warmup)"
                inputs, outputs = eval(execution.text)
            except Exception as e:
                errors["warmup_fails"] += 1
                continue

            # 3. Run the full sampling
            try:
                execution = sandbox.run_code(
                    code=SAMPLING_TEMPLATE.format(num_generations=TOTAL_GENERATIONS),
                    timeout=int(TIMEOUT_PER_SAMPLE * TOTAL_GENERATIONS),
                )
                assert not execution.error, "Error in input generator (full)"
                assert execution.text, "Empty input generator output (full)"
                inputs, outputs = eval(execution.text)
                assert len(inputs) == TOTAL_GENERATIONS, "Mismatch in input generations"
                assert len(outputs) == TOTAL_GENERATIONS, "Mismatch in output generations"
                unique_inputs = len(set(hash(json.dumps(inp, sort_keys=True)) for inp in inputs))
                unique_outputs = len(set(hash(json.dumps(out, sort_keys=True)) for out in outputs))
            except:
                errors["full_sampling_fails"] += 1
                continue
                
            # 4. Save the entry
            entry = entry | {
                "unique_inputs": unique_inputs,
                "unique_outputs": unique_outputs,
                "total_generations": TOTAL_GENERATIONS,
            }
            f_out.write(json.dumps(entry))
            f_out.write("\n")

for k, v in errors.items():
    print(f"{k}: {v}")
print(f"Total errors: {sum(errors.values())}")

  0%|          | 0/7053 [00:00<?, ?it/s]

Response 404
Response 404
Response 404
Response 404
Response 404
Response 404
Response 404
Response 404


full_sampling_fails: 913
warmup_fails: 528
missing_input_generator: 36
cannot_initialize_code: 98
Total errors: 1575
