In [1]:
import re
import os
from datasets import Dataset, load_dataset
from random import randint, seed, choice
from typing import List, Tuple
from tqdm import tqdm
import argparse
from datasets import Dataset
from collections import defaultdict

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def get_divisors_below(n, limit):
    if n < 0:
        n = -n
    if n == 0:
        return list(range(1, limit))

    divisors = set()
    for i in range(1, int(n**0.5) + 1):
        if n % i == 0:
            if i < limit:
                divisors.add(i)
            if (n // i) < limit:
                divisors.add(n // i)
    return sorted(divisors)

def gen_dataset(
    num_samples: int,
    num_operands: int = 6,
    max_operand: int = 100,
    max_target: int = 1000,
    operations: List[str] = ['+', '-', '*', '/'],
    seed_value: int = 42,
) -> List[Tuple]:
    """Generate dataset for countdown task.
    
    Args:
        num_samples: Number of samples to generate
        num_operands: Number of numbers provided in each sample
        max_target: Maximum value for target number
        min_number: Minimum value for provided numbers
        max_number: Maximum value for provided numbers
        operations: List of allowed operations
        seed_value: Random seed for reproducibility
        
    Returns:
        List of tuples containing (target, numbers, solution)
    """
    seed(seed_value)
    samples = set()
    
    while len(samples) < num_samples:
        # Generate random target
        target = randint(1, max_target)
        original_target = target

        numbers = []
        for _ in range(num_operands - 1):
            op = choice(operations)
            
            if op == '+':
                num = randint(1, max_operand)
                target += num
            
            elif op == '-':
                num = randint(1, max_operand)
                target -= num

            elif op == '*':
                num = randint(1, max_operand)
                target *= num
            
            elif op == '/':
                divisors = get_divisors_below(n=target, limit=max_operand)
                num = choice(divisors)
                target //= num
            
            else:
                raise ValueError(f"Invalid operation: {op}")
        
            assert 1 <= num <= max_operand
            numbers.append(num)
        
        if 1 <= target <= max_operand:
            numbers.append(target)
            samples.add(tuple([original_target, tuple(numbers)]))
    
    return samples

In [3]:
def solve_countdown(candidates, target):
    if len(candidates) == 1:
        return abs(candidates[0] - target) < 0.001, []
    
    ans = False
    for i in range(len(candidates)):
        for j in range(i+1, len(candidates)):
            ops = [
                (f"{candidates[i]} + {candidates[j]}", candidates[i] + candidates[j]), 
                (f"{candidates[i]} - {candidates[j]}", candidates[i] - candidates[j]), 
                (f"{candidates[j]} - {candidates[i]}", candidates[j] - candidates[i]), 
                (f"{candidates[i]} * {candidates[j]}", candidates[i] * candidates[j])
            ]
            
            if candidates[i] != 0:
                ops.append((f"{candidates[j]} / {candidates[i]}", candidates[j] / candidates[i]))
            if candidates[j] != 0:
                ops.append((f"{candidates[i]} / {candidates[j]}", candidates[i] / candidates[j]))
            
            new_candidates = [candidates[k] for k in range(len(candidates)) if k != i and k != j]

            for op_name, op in ops:
                ans, op_lst = solve_countdown(new_candidates+[op], target)
                if ans:
                    return ans, [(candidates, op_name, new_candidates+[op])] + op_lst

    return False, None

In [4]:
difficulty_to_dataset = defaultdict(set)
for difficulty in range(3, 10):
    difficulty_to_dataset[difficulty] = list(gen_dataset(6000, num_operands=difficulty))

In [5]:
def gen(split):
    if split == 'train':
        for difficulty in range(3, 10):
            for target, numbers in difficulty_to_dataset[difficulty][:5000]:
                yield {'target': target, 'nums': numbers}
    elif split == 'test':
        for difficulty in range(3, 10):
            for target, numbers in difficulty_to_dataset[difficulty][5000:]:
                yield {'target': target, 'nums': numbers}

In [6]:
train = Dataset.from_generator(gen, gen_kwargs={"split": 'train'})
test = Dataset.from_generator(gen, gen_kwargs={"split": 'test'})

Generating train split: 35000 examples [00:00, 212027.51 examples/s]
Generating train split: 7000 examples [00:00, 356355.48 examples/s]


In [7]:
hub_dataset_name = f"d1shs0ap/countdown-3-4-5-6-7-8-9"

train.push_to_hub(
    hub_dataset_name,
    revision='main',
    split='train',
    private=True,
)

test.push_to_hub(
    hub_dataset_name,
    revision='main',
    split='test',
    private=True,
)

Creating parquet from Arrow format: 100%|██████████| 35/35 [00:00<00:00, 3324.29ba/s]
Uploading the dataset shards: 100%|██████████| 1/1 [00:00<00:00,  1.07it/s]
Creating parquet from Arrow format: 100%|██████████| 7/7 [00:00<00:00, 4389.97ba/s]
Uploading the dataset shards: 100%|██████████| 1/1 [00:00<00:00,  2.98it/s]


CommitInfo(commit_url='https://huggingface.co/datasets/d1shs0ap/countdown-3-4-5-6-7-8-9/commit/e1d1b209456fd9710a40212460d3aa3832c7e38b', commit_message='Upload dataset', commit_description='', oid='e1d1b209456fd9710a40212460d3aa3832c7e38b', pr_url=None, repo_url=RepoUrl('https://huggingface.co/datasets/d1shs0ap/countdown-3-4-5-6-7-8-9', endpoint='https://huggingface.co', repo_type='dataset', repo_id='d1shs0ap/countdown-3-4-5-6-7-8-9'), pr_revision=None, pr_num=None)