# Steam Factory Puzzle Generator

Pre-generate and solve puzzles using CPU multiprocessing.
Stop at any size checkpoint to save progress.

In [None]:
import json
import random
import time
from collections import deque
from multiprocessing import Pool, cpu_count
from dataclasses import dataclass, field
from typing import Optional, List, Set, Tuple, Dict
import os

print(f"CPU cores available: {cpu_count()}")

In [None]:
# Direction constants
DIRS = [(0, -1), (0, 1), (-1, 0), (1, 0)]  # up, down, left, right
DIR_NAMES = ['up', 'down', 'left', 'right']

@dataclass
class Level:
    w: int
    h: int
    grid: List[List[str]]
    entry: Optional[Tuple[int, int]] = None
    exit: Optional[Tuple[int, int]] = None
    
    def get(self, x: int, y: int) -> str:
        if x < 0 or y < 0 or x >= self.w or y >= self.h:
            return '#'
        return self.grid[y][x]
    
    def set(self, x: int, y: int, c: str):
        if 0 <= x < self.w and 0 <= y < self.h:
            self.grid[y][x] = c
    
    def copy(self) -> 'Level':
        return Level(
            w=self.w, h=self.h,
            grid=[row[:] for row in self.grid],
            entry=self.entry, exit=self.exit
        )
    
    def to_strings(self) -> List[str]:
        return [''.join(row) for row in self.grid]

In [None]:
# Level creation functions

def create_bordered_room(w: int, h: int) -> Level:
    grid = [['.' for _ in range(w)] for _ in range(h)]
    for x in range(w):
        grid[0][x] = '#'
        grid[h-1][x] = '#'
    for y in range(h):
        grid[y][0] = '#'
        grid[y][w-1] = '#'
    return Level(w=w, h=h, grid=grid)

def create_zigzag_room(w: int, h: int) -> Level:
    lv = create_bordered_room(w, h)
    for y in range(2, h - 2, 2):
        gap_x = 1 + random.randint(0, w - 3)
        for x in range(1, w - 1):
            if x != gap_x:
                lv.set(x, y, '#')
    return lv

def create_random_maze(w: int, h: int) -> Level:
    lv = create_bordered_room(w, h)
    wall_density = 0.2 + random.random() * 0.15
    for y in range(1, h - 1):
        for x in range(1, w - 1):
            if random.random() < wall_density:
                lv.set(x, y, '#')
    return lv

In [None]:
# Placement functions

def place_entry_exit_opposite(lv: Level) -> bool:
    """Place entry at top, exit at bottom on opposite sides"""
    entry_x = 1 + random.randint(0, lv.w - 3)
    exit_x = 1 + random.randint(0, lv.w - 3)
    
    # Ensure they're not directly aligned
    if abs(entry_x - exit_x) < 2:
        exit_x = lv.w - 1 - entry_x
        if exit_x < 1: exit_x = 1
        if exit_x > lv.w - 2: exit_x = lv.w - 2
    
    # Clear path for entry and exit
    lv.set(entry_x, 0, 'S')
    lv.set(entry_x, 1, '.')
    lv.set(exit_x, lv.h - 1, 'X')
    lv.set(exit_x, lv.h - 2, '.')
    
    lv.entry = (entry_x, 0)
    lv.exit = (exit_x, lv.h - 1)
    return True

def find_floor_cells(lv: Level) -> List[Tuple[int, int]]:
    """Find all empty floor cells"""
    cells = []
    for y in range(1, lv.h - 1):
        for x in range(1, lv.w - 1):
            if lv.get(x, y) == '.':
                cells.append((x, y))
    return cells

def place_barrel_blocking(lv: Level) -> bool:
    """Place a barrel that blocks the direct path"""
    cells = find_floor_cells(lv)
    if not cells:
        return False
    random.shuffle(cells)
    for x, y in cells[:10]:
        # Check if barrel can be pushed (has space behind in some direction)
        for dx, dy in DIRS:
            if lv.get(x - dx, y - dy) == '.' and lv.get(x + dx, y + dy) == '.':
                lv.set(x, y, 'B')
                return True
    return False

def place_button_gate(lv: Level) -> bool:
    """Place a button and gate combo"""
    cells = find_floor_cells(lv)
    if len(cells) < 2:
        return False
    
    random.shuffle(cells)
    # Button in upper half, gate in lower half
    button_candidates = [(x, y) for x, y in cells if y < lv.h // 2]
    gate_candidates = [(x, y) for x, y in cells if y >= lv.h // 2]
    
    if not button_candidates or not gate_candidates:
        return False
    
    bx, by = random.choice(button_candidates)
    gx, gy = random.choice(gate_candidates)
    
    lv.set(bx, by, 'O')  # Button
    lv.set(gx, gy, 'G')  # Gate
    return True

def place_valve_and_barrel(lv: Level) -> bool:
    """Place a valve that needs to be sealed with a barrel"""
    cells = find_floor_cells(lv)
    if len(cells) < 2:
        return False
    
    random.shuffle(cells)
    for vx, vy in cells[:5]:
        # Find a spot for barrel that can reach valve
        for bx, by in cells:
            if (bx, by) == (vx, vy):
                continue
            # Check if barrel can be pushed toward valve
            if bx == vx or by == vy:
                lv.set(vx, vy, 'V')
                lv.set(bx, by, 'B')
                return True
    return False

def place_conveyor(lv: Level) -> bool:
    """Place a conveyor belt"""
    cells = find_floor_cells(lv)
    if not cells:
        return False
    
    x, y = random.choice(cells)
    dirs = ['>', '<', '^', 'v']
    lv.set(x, y, random.choice(dirs))
    return True

In [None]:
# BFS Solver (optimized Python version)

def solve_level(lv: Level, max_depth: int = 100) -> Optional[Dict]:
    """Solve level using BFS, return solution or None"""
    if not lv.entry or not lv.exit:
        return None
    
    w = lv.w
    
    # Find all objects
    barrels = set()
    buttons = set()
    valve_map = {}  # idx -> bit
    has_gate = False
    gate_idx = -1
    
    valve_bit = 0
    for y in range(lv.h):
        for x in range(lv.w):
            c = lv.get(x, y)
            idx = y * w + x
            if c == 'B':
                barrels.add(idx)
            elif c == 'O':
                buttons.add(idx)
            elif c == 'V':
                valve_map[idx] = 1 << valve_bit
                valve_bit += 1
            elif c == 'G':
                has_gate = True
                gate_idx = idx
    
    entry_idx = lv.entry[1] * w + lv.entry[0]
    exit_idx = lv.exit[1] * w + lv.exit[0]
    
    # State: (pos_idx, frozen_barrels, sealed_mask)
    start_state = (entry_idx, frozenset(barrels), 0)
    visited = set()
    queue = deque([(start_state, 0, [lv.entry])])  # state, moves, path
    
    while queue:
        (pos_idx, barrel_set, sealed_mask), moves, path = queue.popleft()
        
        if moves > max_depth:
            continue
        
        # Win check
        if pos_idx == exit_idx:
            dirs = []
            for i in range(len(path) - 1):
                dx = path[i+1][0] - path[i][0]
                dy = path[i+1][1] - path[i][1]
                if dx > 0: dirs.append('right')
                elif dx < 0: dirs.append('left')
                elif dy > 0: dirs.append('down')
                else: dirs.append('up')
            return {'moves': moves, 'solution': dirs, 'path': path}
        
        state_key = (pos_idx, barrel_set, sealed_mask)
        if state_key in visited:
            continue
        visited.add(state_key)
        
        # Gate open check
        gate_open = not has_gate or pos_idx in buttons
        if not gate_open:
            for b in barrel_set:
                if b in buttons:
                    gate_open = True
                    break
        
        px, py = pos_idx % w, pos_idx // w
        
        for di, (dx, dy) in enumerate(DIRS):
            result = simulate_move(lv, px, py, dx, dy, barrel_set, sealed_mask,
                                   buttons, valve_map, gate_open, gate_idx, w)
            if result is None:
                continue
            
            new_pos_idx, new_barrels, new_sealed = result
            new_state = (new_pos_idx, new_barrels, new_sealed)
            if new_state not in visited:
                nx, ny = new_pos_idx % w, new_pos_idx // w
                queue.append((new_state, moves + 1, path + [(nx, ny)]))
    
    return None

def simulate_move(lv, start_x, start_y, dx, dy, barrels, sealed_mask,
                  buttons, valve_map, gate_open, gate_idx, w):
    """Simulate a move, return (new_pos_idx, new_barrels, new_sealed) or None if death"""
    x, y = start_x, start_y
    start_idx = y * w + x
    started_on_button = start_idx in buttons
    left_button = False
    cur_barrels = barrels
    cur_sealed = sealed_mask
    
    for step in range(50):
        nx, ny = x + dx, y + dy
        n_idx = ny * w + nx
        
        # Mid-slide gate closing
        cur_idx = y * w + x
        if started_on_button and not left_button and cur_idx not in buttons:
            left_button = True
            barrel_on_button = any(b in buttons for b in cur_barrels)
            if not barrel_on_button:
                gate_open = False
        
        c = lv.get(nx, ny)
        
        # Wall
        if c == '#':
            return (cur_idx, cur_barrels, cur_sealed)
        
        # Closed gate
        if n_idx == gate_idx and not gate_open:
            return (cur_idx, cur_barrels, cur_sealed)
        
        # Barrel - try to push
        if n_idx in cur_barrels:
            push_result = push_barrel(lv, nx, ny, dx, dy, cur_barrels, cur_sealed,
                                      buttons, valve_map, gate_open, gate_idx, w)
            if push_result is None:
                return (cur_idx, cur_barrels, cur_sealed)
            return (n_idx, push_result[0], push_result[1])
        
        # Unsealed valve - death!
        valve_bit = valve_map.get(n_idx, 0)
        if valve_bit and not (cur_sealed & valve_bit):
            return None
        
        x, y = nx, ny
        pos_idx = y * w + x
        
        # Button stops player
        if pos_idx in buttons:
            return (pos_idx, cur_barrels, cur_sealed)
        
        # Exit stops player
        if x == lv.exit[0] and y == lv.exit[1]:
            return (pos_idx, cur_barrels, cur_sealed)
        
        # Conveyor changes direction
        cell = lv.get(x, y)
        if cell == '>': dx, dy = 1, 0
        elif cell == '<': dx, dy = -1, 0
        elif cell == '^': dx, dy = 0, -1
        elif cell == 'v': dx, dy = 0, 1
    
    return (y * w + x, cur_barrels, cur_sealed)

def push_barrel(lv, bx, by, dx, dy, barrels, sealed_mask,
                buttons, valve_map, gate_open, gate_idx, w):
    """Push a barrel, return (new_barrels, new_sealed) or None"""
    nx, ny = bx + dx, by + dy
    n_idx = ny * w + nx
    c = lv.get(nx, ny)
    
    if c == '#': return None
    if n_idx == gate_idx and not gate_open: return None
    if n_idx in barrels: return None
    
    # Create new barrel set
    new_barrels = set(barrels)
    b_idx = by * w + bx
    new_barrels.discard(b_idx)
    
    x, y = nx, ny
    while True:
        pos_idx = y * w + x
        
        # Button stops barrel
        if pos_idx in buttons:
            new_barrels.add(pos_idx)
            return (frozenset(new_barrels), sealed_mask)
        
        # Valve seals
        valve_bit = valve_map.get(pos_idx, 0)
        if valve_bit and not (sealed_mask & valve_bit):
            new_barrels.add(pos_idx)
            return (frozenset(new_barrels), sealed_mask | valve_bit)
        
        # Check next cell
        next_x, next_y = x + dx, y + dy
        next_idx = next_y * w + next_x
        next_c = lv.get(next_x, next_y)
        
        if next_c == '#' or next_idx in new_barrels or next_idx == gate_idx:
            new_barrels.add(pos_idx)
            return (frozenset(new_barrels), sealed_mask)
        
        x, y = next_x, next_y

In [None]:
# Quality scoring

def score_level_quality(solution: Dict, has_button: bool, has_valve: bool, has_conveyor: bool) -> int:
    """Score a level's quality based on solution characteristics"""
    if not solution:
        return 0
    
    dirs = solution['solution']
    score = len(dirs) * 10  # Base score from length
    
    # Direction changes add interest
    changes = sum(1 for i in range(1, len(dirs)) if dirs[i] != dirs[i-1])
    score += changes * 5
    
    # Backtracking is interesting
    opposite = {'up': 'down', 'down': 'up', 'left': 'right', 'right': 'left'}
    backtracks = sum(1 for i in range(1, len(dirs)) if dirs[i] == opposite.get(dirs[i-1], ''))
    score += backtracks * 15
    
    # Mechanics bonuses
    if has_button: score += 80
    if has_valve: score += 50
    if has_conveyor: score += 30
    
    return score

def is_path_blocked(lv: Level) -> bool:
    """Check if exit is unreachable without using mechanics (good = blocked)"""
    if not lv.entry or not lv.exit:
        return True
    
    visited = set()
    queue = deque([lv.entry])
    visited.add(lv.entry)
    
    while queue:
        x, y = queue.popleft()
        if (x, y) == lv.exit:
            return False  # Can reach exit without mechanics - BAD
        
        for dx, dy in DIRS:
            nx, ny = x + dx, y + dy
            if (nx, ny) in visited:
                continue
            c = lv.get(nx, ny)
            if c == '#':
                continue
            visited.add((nx, ny))
            queue.append((nx, ny))
    
    return True  # GOOD - path blocked

In [None]:
# Level generation

def generate_level(config: Dict, max_attempts: int = 500) -> Optional[Dict]:
    """Generate a single level with given config"""
    w = config.get('width', 5)
    h = config.get('height', 9)
    min_par = config.get('minPar', 8)
    require_button = config.get('requireButton', False)
    require_valve = config.get('requireValve', False)
    require_conveyor = config.get('requireConveyor', False)
    
    for attempt in range(max_attempts):
        # Create room
        room_type = random.choice(['bordered', 'zigzag', 'maze'])
        if room_type == 'bordered':
            lv = create_bordered_room(w, h)
        elif room_type == 'zigzag':
            lv = create_zigzag_room(w, h)
        else:
            lv = create_random_maze(w, h)
        
        # Place entry/exit
        if not place_entry_exit_opposite(lv):
            continue
        
        # Place mechanics
        has_button = False
        has_valve = False
        has_conveyor = False
        
        if require_button or random.random() < 0.6:
            if place_button_gate(lv):
                has_button = True
                # Need a barrel to hold the button
                place_barrel_blocking(lv)
        
        if require_valve or random.random() < 0.4:
            if place_valve_and_barrel(lv):
                has_valve = True
        
        if require_conveyor or random.random() < 0.3:
            if place_conveyor(lv):
                has_conveyor = True
        
        # Add blocking barrel if no other mechanics
        if not has_button and not has_valve:
            place_barrel_blocking(lv)
        
        # Check path is blocked
        if not is_path_blocked(lv):
            continue
        
        # Solve
        solution = solve_level(lv)
        if not solution:
            continue
        
        # Check minimum par
        if solution['moves'] < min_par:
            continue
        
        # Check requirements
        if require_button and not has_button:
            continue
        if require_valve and not has_valve:
            continue
        
        # Score quality
        quality = score_level_quality(solution, has_button, has_valve, has_conveyor)
        
        return {
            'grid': lv.to_strings(),
            'width': w,
            'height': h,
            'par': solution['moves'],
            'solution': solution['solution'],
            'quality': quality,
            'has_button': has_button,
            'has_valve': has_valve,
            'has_conveyor': has_conveyor
        }
    
    return None

In [None]:
# Configuration by difficulty

def get_config_for_difficulty(diff: int) -> Dict:
    """Get generation config for a difficulty level"""
    # Size scales with difficulty
    base_w = 5 + (diff // 3)
    base_h = 9 + (diff // 2)
    
    # Add some randomness
    w = base_w + random.randint(-1, 2)
    h = base_h + random.randint(-1, 3)
    
    # Clamp
    w = max(5, min(12, w))
    h = max(9, min(18, h))
    
    return {
        'width': w,
        'height': h,
        'minPar': 6 + diff,
        'requireButton': diff >= 3,
        'requireValve': diff >= 5,
        'requireConveyor': diff >= 7
    }

# Size categories for checkpoints
SIZE_CATEGORIES = [
    ('5x9', 5, 9),
    ('6x10', 6, 10),
    ('6x11', 6, 11),
    ('7x11', 7, 11),
    ('7x12', 7, 12),
    ('8x13', 8, 13),
    ('8x14', 8, 14),
    ('9x15', 9, 15),
    ('10x16', 10, 16),
    ('11x17', 11, 17),
    ('12x18', 12, 18),
]

In [None]:
# Worker function for multiprocessing

def generate_one_level(args):
    """Generate a single level (for multiprocessing)"""
    config, seed = args
    random.seed(seed)
    return generate_level(config, max_attempts=500)

def generate_batch(config: Dict, count: int, num_workers: int = None) -> List[Dict]:
    """Generate a batch of levels using multiprocessing"""
    if num_workers is None:
        num_workers = cpu_count()
    
    # Create args with unique seeds
    base_seed = random.randint(0, 1000000)
    args = [(config, base_seed + i) for i in range(count)]
    
    with Pool(num_workers) as pool:
        results = pool.map(generate_one_level, args)
    
    # Filter out None results
    return [r for r in results if r is not None]

In [None]:
# Test single generation
config = get_config_for_difficulty(5)
print(f"Config: {config}")

start = time.time()
level = generate_level(config)
elapsed = time.time() - start

if level:
    print(f"Generated in {elapsed:.3f}s")
    print(f"Size: {level['width']}x{level['height']}")
    print(f"Par: {level['par']} moves")
    print(f"Quality: {level['quality']}")
    print(f"Solution: {' -> '.join(level['solution'][:10])}...")
    print("\nGrid:")
    for row in level['grid']:
        print(row)
else:
    print("Failed to generate")

In [None]:
# Benchmark: single vs multiprocessing
config = get_config_for_difficulty(5)
test_count = 50

# Single-threaded
start = time.time()
single_results = [generate_level(config) for _ in range(test_count)]
single_time = time.time() - start
single_success = len([r for r in single_results if r])

print(f"Single-threaded: {test_count} attempts in {single_time:.2f}s")
print(f"  Success rate: {single_success}/{test_count}")
print(f"  Speed: {test_count/single_time:.1f} attempts/sec")

# Multiprocessing
start = time.time()
multi_results = generate_batch(config, test_count)
multi_time = time.time() - start

print(f"\nMultiprocessing ({cpu_count()} cores): {test_count} attempts in {multi_time:.2f}s")
print(f"  Success rate: {len(multi_results)}/{test_count}")
print(f"  Speed: {test_count/multi_time:.1f} attempts/sec")
print(f"  Speedup: {single_time/multi_time:.1f}x")

## Generate Levels by Size

Run this cell to generate levels. Stop the cell after any size completes to save progress.

In [None]:
# Main generation loop with size checkpoints

OUTPUT_FILE = 'pregenerated_levels.json'
LEVELS_PER_SIZE = 100  # How many levels to generate per size category
BATCH_SIZE = 200       # Generate this many attempts at once

# Load existing progress
if os.path.exists(OUTPUT_FILE):
    with open(OUTPUT_FILE, 'r') as f:
        all_levels = json.load(f)
    print(f"Loaded {sum(len(v) for v in all_levels.values())} existing levels")
else:
    all_levels = {}

try:
    for size_name, w, h in SIZE_CATEGORIES:
        if size_name in all_levels and len(all_levels[size_name]) >= LEVELS_PER_SIZE:
            print(f"\n{size_name}: Already have {len(all_levels[size_name])} levels, skipping")
            continue
        
        print(f"\n{'='*50}")
        print(f"Generating {size_name} levels...")
        print(f"{'='*50}")
        
        if size_name not in all_levels:
            all_levels[size_name] = []
        
        config = {
            'width': w,
            'height': h,
            'minPar': 8,
            'requireButton': True,
            'requireValve': h > 11,
            'requireConveyor': h > 14
        }
        
        start_time = time.time()
        attempts = 0
        
        while len(all_levels[size_name]) < LEVELS_PER_SIZE:
            batch = generate_batch(config, BATCH_SIZE)
            attempts += BATCH_SIZE
            
            # Filter by quality and add
            for level in batch:
                if level['quality'] >= 150:
                    all_levels[size_name].append(level)
            
            elapsed = time.time() - start_time
            rate = attempts / elapsed
            print(f"  {len(all_levels[size_name])}/{LEVELS_PER_SIZE} levels | {attempts} attempts | {rate:.0f}/sec")
            
            if len(all_levels[size_name]) >= LEVELS_PER_SIZE:
                break
        
        # Sort by quality
        all_levels[size_name].sort(key=lambda x: -x['quality'])
        all_levels[size_name] = all_levels[size_name][:LEVELS_PER_SIZE]
        
        # Save checkpoint
        with open(OUTPUT_FILE, 'w') as f:
            json.dump(all_levels, f)
        
        total = sum(len(v) for v in all_levels.values())
        elapsed = time.time() - start_time
        print(f"\n>>> CHECKPOINT: {size_name} complete!")
        print(f">>> Saved {total} total levels to {OUTPUT_FILE}")
        print(f">>> Time: {elapsed:.1f}s")
        print(f">>> Stop here if you want, or continue for next size...")

except KeyboardInterrupt:
    print("\n\nInterrupted! Saving progress...")
    with open(OUTPUT_FILE, 'w') as f:
        json.dump(all_levels, f)
    total = sum(len(v) for v in all_levels.values())
    print(f"Saved {total} levels to {OUTPUT_FILE}")

print("\nDone!")

In [None]:
# View generated levels summary
if os.path.exists(OUTPUT_FILE):
    with open(OUTPUT_FILE, 'r') as f:
        all_levels = json.load(f)
    
    print("Generated Levels Summary")
    print("=" * 40)
    
    total = 0
    for size_name, levels in sorted(all_levels.items()):
        if levels:
            avg_quality = sum(l['quality'] for l in levels) / len(levels)
            avg_par = sum(l['par'] for l in levels) / len(levels)
            print(f"{size_name}: {len(levels)} levels (avg quality: {avg_quality:.0f}, avg par: {avg_par:.1f})")
            total += len(levels)
    
    print(f"\nTotal: {total} levels")
else:
    print("No levels generated yet. Run the generation cell above.")

In [None]:
# Export for game use - single flat JSON

if os.path.exists(OUTPUT_FILE):
    with open(OUTPUT_FILE, 'r') as f:
        all_levels = json.load(f)
    
    # Flatten and sort by quality
    flat_levels = []
    for size_name, levels in all_levels.items():
        for level in levels:
            level['size'] = size_name
            flat_levels.append(level)
    
    flat_levels.sort(key=lambda x: -x['quality'])
    
    # Save flattened version
    with open('levels_for_game.json', 'w') as f:
        json.dump(flat_levels, f)
    
    print(f"Exported {len(flat_levels)} levels to levels_for_game.json")
else:
    print("Generate levels first!")