Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions docs/content/references/cli.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,22 @@ Providers: `anthropic`, `google`, `openai`, `ollama`, `azure-openai`, `amazon-be
| `--job-id <id>` | Custom Job ID for new Job (default: auto-generated) |
| `--continue` | Continue latest Job with new Run |
| `--continue-job <id>` | Continue specific Job with new Run |
| `--resume-from <id>` | Resume from specific checkpoint (requires `--continue` or `--continue-job`) |
| `--resume-from <id>` | Resume from specific checkpoint (requires `--continue-job`) |

**Combining options:**

```bash
# Continue latest Job from its latest checkpoint
--continue

# Continue latest Job from a specific checkpoint
--continue --resume-from <checkpointId>

# Continue specific Job from its latest checkpoint
--continue-job <jobId>

# Continue specific Job from a specific checkpoint
--continue-job <jobId> --resume-from <checkpointId>
```

**Note:** `--resume-from` must be combined with `--continue` or `--continue-job`. You can only resume from the Coordinator Expert's checkpoints.
**Note:** `--resume-from` requires `--continue-job` (Job ID must be specified). You can only resume from the Coordinator Expert's checkpoints.

### Interactive

Expand Down
8 changes: 2 additions & 6 deletions docs/content/using-experts/state-management.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,9 @@ npx perstack run my-expert "Follow-up" --continue-job <jobId>

## Resuming from a checkpoint

Resume from a specific checkpoint to branch execution. Use `--resume-from` with `--continue` or `--continue-job`:
Resume from a specific checkpoint to branch execution. Use `--resume-from` with `--continue-job`:

```bash
# Resume from checkpoint in latest Job
npx perstack run my-expert "try again" --continue --resume-from <checkpointId>

# Resume from checkpoint in specific Job
npx perstack run my-expert "try again" --continue-job <jobId> --resume-from <checkpointId>
```

Expand All @@ -93,7 +89,7 @@ Useful for:
- Debugging

**Important:**
- `--resume-from` requires `--continue` or `--continue-job`
- `--resume-from` requires `--continue-job` (Job ID must be specified)
- You can only resume from the Coordinator Expert's checkpoints

## Interactive tool calls
Expand Down
2 changes: 1 addition & 1 deletion e2e/run.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ describe("CLI run", () => {
"checkpoint-123",
])
expect(result.exitCode).toBe(1)
expect(result.stderr).toContain("--resume-from requires --continue or --continue-job")
expect(result.stderr).toContain("--resume-from requires --continue-job")
})
})
13 changes: 5 additions & 8 deletions packages/perstack/src/lib/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { Checkpoint, PerstackConfig, ProviderConfig, ProviderName } from "@
import { getEnv } from "./get-env.js"
import { getPerstackConfig } from "./perstack-toml.js"
import { getProviderConfig } from "./provider-config.js"
import { getCheckpointById, getMostRecentCheckpoint, getMostRecentRunInJob } from "./run-manager.js"
import { getCheckpointById, getMostRecentCheckpoint } from "./run-manager.js"

const defaultProvider: ProviderName = "anthropic"
const defaultModel = "claude-sonnet-4-5"
Expand Down Expand Up @@ -33,15 +33,12 @@ export async function resolveRunContext(input: ResolveRunContextInput): Promise<
const perstackConfig = await getPerstackConfig(input.configPath)
let checkpoint: Checkpoint | undefined
if (input.resumeFrom) {
if (!input.continue && !input.continueJob) {
throw new Error("--resume-from requires --continue or --continue-job")
if (!input.continueJob) {
throw new Error("--resume-from requires --continue-job")
}
const jobId = input.continueJob ?? (await getMostRecentCheckpoint()).jobId
const run = await getMostRecentRunInJob(jobId)
checkpoint = await getCheckpointById(jobId, run.runId, input.resumeFrom)
checkpoint = await getCheckpointById(input.continueJob, input.resumeFrom)
} else if (input.continueJob) {
const run = await getMostRecentRunInJob(input.continueJob)
checkpoint = await getMostRecentCheckpoint(run.jobId, run.runId)
checkpoint = await getMostRecentCheckpoint(input.continueJob)
} else if (input.continue) {
checkpoint = await getMostRecentCheckpoint()
}
Expand Down
161 changes: 69 additions & 92 deletions packages/perstack/src/lib/run-manager.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,43 @@
import { existsSync } from "node:fs"
import { readdir, readFile } from "node:fs/promises"
import path from "node:path"
import { type Checkpoint, checkpointSchema, type RunEvent, type RunSetting } from "@perstack/core"
import { getRunDir } from "@perstack/runtime"
import {
type Checkpoint,
checkpointSchema,
type Job,
jobSchema,
type RunEvent,
type RunSetting,
} from "@perstack/core"
import { getCheckpointDir, getCheckpointPath, getRunDir } from "@perstack/runtime"

export async function getAllJobs(): Promise<Job[]> {
const dataDir = path.resolve(process.cwd(), "perstack")
if (!existsSync(dataDir)) {
return []
}
const jobsDir = path.resolve(dataDir, "jobs")
if (!existsSync(jobsDir)) {
return []
}
const jobDirs = await readdir(jobsDir, { withFileTypes: true }).then((dirs) =>
dirs.filter((dir) => dir.isDirectory()).map((dir) => dir.name),
)
if (jobDirs.length === 0) {
return []
}
const jobs: Job[] = []
for (const jobDir of jobDirs) {
const jobPath = path.resolve(jobsDir, jobDir, "job.json")
try {
const jobContent = await readFile(jobPath, "utf-8")
jobs.push(jobSchema.parse(JSON.parse(jobContent)))
} catch {
// Ignore invalid jobs
}
}
return jobs.sort((a, b) => b.startedAt - a.startedAt)
}

export async function getAllRuns(): Promise<RunSetting[]> {
const dataDir = path.resolve(process.cwd(), "perstack")
Expand Down Expand Up @@ -67,63 +102,30 @@ export async function getMostRecentRunInJob(jobId: string): Promise<RunSetting>
return runs[0]
}

export async function getCheckpoints(
jobId: string,
runId: string,
): Promise<{ timestamp: string; stepNumber: string; id: string }[]> {
const runDir = getRunDir(jobId, runId)
if (!existsSync(runDir)) {
export async function getCheckpointsByJobId(jobId: string): Promise<Checkpoint[]> {
const checkpointDir = getCheckpointDir(jobId)
if (!existsSync(checkpointDir)) {
return []
}
return await readdir(runDir).then((files) =>
files
.filter((file) => file.startsWith("checkpoint-"))
.map((file) => {
const [_, timestamp, stepNumber, id] = file.split(".")[0].split("-")
return {
timestamp,
stepNumber,
id,
}
})
.sort((a, b) => Number(a.stepNumber) - Number(b.stepNumber)),
)
}

export async function getCheckpoint(checkpointId: string): Promise<Checkpoint> {
const run = await getMostRecentRun()
const runDir = getRunDir(run.jobId, run.runId)
const checkpointPath = path.resolve(runDir, `checkpoint-${checkpointId}.json`)
const checkpoint = await readFile(checkpointPath, "utf-8")
return checkpointSchema.parse(JSON.parse(checkpoint))
}

export async function getMostRecentCheckpoint(jobId?: string, runId?: string): Promise<Checkpoint> {
let runJobId: string
let runIdForCheckpoint: string
if (jobId && runId) {
runJobId = jobId
runIdForCheckpoint = runId
} else {
const run = await getMostRecentRun()
runJobId = run.jobId
runIdForCheckpoint = run.runId
}
const runDir = getRunDir(runJobId, runIdForCheckpoint)
const checkpointFiles = await readdir(runDir, { withFileTypes: true }).then((files) =>
files.filter((file) => file.isFile() && file.name.startsWith("checkpoint-")),
)
if (checkpointFiles.length === 0) {
throw new Error(`No checkpoints found for run ${runIdForCheckpoint}`)
}
const files = await readdir(checkpointDir)
const checkpointFiles = files.filter((file) => file.endsWith(".json"))
const checkpoints = await Promise.all(
checkpointFiles.map(async (file) => {
const checkpointPath = path.resolve(runDir, file.name)
const checkpoint = await readFile(checkpointPath, "utf-8")
return checkpointSchema.parse(JSON.parse(checkpoint))
const checkpointPath = path.resolve(checkpointDir, file)
const content = await readFile(checkpointPath, "utf-8")
return checkpointSchema.parse(JSON.parse(content))
}),
)
return checkpoints.sort((a, b) => b.stepNumber - a.stepNumber)[0]
return checkpoints.sort((a, b) => a.stepNumber - b.stepNumber)
}

export async function getMostRecentCheckpoint(jobId?: string): Promise<Checkpoint> {
const targetJobId = jobId ?? (await getMostRecentRun()).jobId
const checkpoints = await getCheckpointsByJobId(targetJobId)
if (checkpoints.length === 0) {
throw new Error(`No checkpoints found for job ${targetJobId}`)
}
return checkpoints[checkpoints.length - 1]
}

export async function getRecentExperts(
Expand Down Expand Up @@ -168,51 +170,26 @@ export async function getEvents(
.sort((a, b) => Number(a.stepNumber) - Number(b.stepNumber)),
)
}
export async function getCheckpointById(
jobId: string,
runId: string,
checkpointId: string,
): Promise<Checkpoint> {
const runDir = getRunDir(jobId, runId)
const files = await readdir(runDir)
const checkpointFile = files.find(
(file) => file.startsWith("checkpoint-") && file.includes(`-${checkpointId}.`),
)
if (!checkpointFile) {
throw new Error(`Checkpoint ${checkpointId} not found in run ${runId}`)
export async function getCheckpointById(jobId: string, checkpointId: string): Promise<Checkpoint> {
const checkpointPath = getCheckpointPath(jobId, checkpointId)
if (!existsSync(checkpointPath)) {
throw new Error(`Checkpoint ${checkpointId} not found in job ${jobId}`)
}
const checkpointPath = path.resolve(runDir, checkpointFile)
const checkpoint = await readFile(checkpointPath, "utf-8")
return checkpointSchema.parse(JSON.parse(checkpoint))
}
export async function getCheckpointsWithDetails(
jobId: string,
runId: string,
): Promise<
{ id: string; runId: string; stepNumber: number; timestamp: number; contextWindowUsage: number }[]
> {
const runDir = getRunDir(jobId, runId)
if (!existsSync(runDir)) {
return []
}
const files = await readdir(runDir)
const checkpointFiles = files.filter((file) => file.startsWith("checkpoint-"))
const checkpoints = await Promise.all(
checkpointFiles.map(async (file) => {
const [_, timestamp, stepNumber, id] = file.split(".")[0].split("-")
const checkpointPath = path.resolve(runDir, file)
const content = await readFile(checkpointPath, "utf-8")
const checkpoint = checkpointSchema.parse(JSON.parse(content))
return {
id,
runId,
stepNumber: Number(stepNumber),
timestamp: Number(timestamp),
contextWindowUsage: checkpoint.contextWindowUsage ?? 0,
}
}),
)
return checkpoints.sort((a, b) => b.stepNumber - a.stepNumber)
): Promise<{ id: string; runId: string; stepNumber: number; contextWindowUsage: number }[]> {
const checkpoints = await getCheckpointsByJobId(jobId)
return checkpoints
.map((cp) => ({
id: cp.id,
runId: cp.runId,
stepNumber: cp.stepNumber,
contextWindowUsage: cp.contextWindowUsage ?? 0,
}))
.sort((a, b) => b.stepNumber - a.stepNumber)
}
export async function getEventsWithDetails(
jobId: string,
Expand Down
41 changes: 18 additions & 23 deletions packages/perstack/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import {
startCommandInputSchema,
} from "@perstack/core"
import { run, runtimeVersion } from "@perstack/runtime"
import type { CheckpointHistoryItem, EventHistoryItem, RunHistoryItem } from "@perstack/tui"
import type { CheckpointHistoryItem, EventHistoryItem, JobHistoryItem } from "@perstack/tui"
import { renderStart } from "@perstack/tui"
import { Command } from "commander"
import { resolveRunContext } from "./lib/context.js"
import { parseInteractiveToolCallResult } from "./lib/interactive.js"
import {
getAllRuns,
getAllJobs,
getCheckpointById,
getCheckpointsWithDetails,
getEventContents,
Expand Down Expand Up @@ -72,15 +72,14 @@ export const startCommand = new Command()
name: key,
}))
const recentExperts = await getRecentExperts(10)
const historyRuns: RunHistoryItem[] = showHistory
? (await getAllRuns()).map((r) => ({
jobId: r.jobId,
runId: r.runId,
expertKey: r.expertKey,
model: r.model,
inputText: r.input.text ?? "",
startedAt: r.startedAt,
updatedAt: r.updatedAt,
const historyJobs: JobHistoryItem[] = showHistory
? (await getAllJobs()).map((j) => ({
jobId: j.id,
status: j.status,
expertKey: j.coordinatorExpertKey,
totalSteps: j.totalSteps,
startedAt: j.startedAt,
finishedAt: j.finishedAt,
}))
: []
const resumeState: { checkpoint: CheckpointHistoryItem | null } = { checkpoint: null }
Expand All @@ -106,7 +105,7 @@ export const startCommand = new Command()
},
configuredExperts,
recentExperts,
historyRuns,
historyJobs,
onContinue: (query: string) => {
if (resolveContinueQuery) {
resolveContinueQuery(query)
Expand All @@ -116,16 +115,16 @@ export const startCommand = new Command()
onResumeFromCheckpoint: (cp: CheckpointHistoryItem) => {
resumeState.checkpoint = cp
},
onLoadCheckpoints: async (r: RunHistoryItem): Promise<CheckpointHistoryItem[]> => {
const checkpoints = await getCheckpointsWithDetails(r.jobId, r.runId)
return checkpoints.map((cp) => ({ ...cp, jobId: r.jobId }))
onLoadCheckpoints: async (j: JobHistoryItem): Promise<CheckpointHistoryItem[]> => {
const checkpoints = await getCheckpointsWithDetails(j.jobId)
return checkpoints.map((cp) => ({ ...cp, jobId: j.jobId }))
},
onLoadEvents: async (
r: RunHistoryItem,
j: JobHistoryItem,
cp: CheckpointHistoryItem,
): Promise<EventHistoryItem[]> => {
const events = await getEventsWithDetails(r.jobId, r.runId, cp.stepNumber)
return events.map((e) => ({ ...e, jobId: r.jobId }))
const events = await getEventsWithDetails(j.jobId, cp.runId, cp.stepNumber)
return events.map((e) => ({ ...e, jobId: j.jobId }))
},
onLoadHistoricalEvents: async (cp: CheckpointHistoryItem) => {
return await getEventContents(cp.jobId, cp.runId, cp.stepNumber)
Expand All @@ -139,11 +138,7 @@ export const startCommand = new Command()
}
let currentCheckpoint =
resumeState.checkpoint !== null
? await getCheckpointById(
resumeState.checkpoint.jobId,
resumeState.checkpoint.runId,
resumeState.checkpoint.id,
)
? await getCheckpointById(resumeState.checkpoint.jobId, resumeState.checkpoint.id)
: checkpoint
if (currentCheckpoint && currentCheckpoint.expert.key !== finalExpertKey) {
console.error(
Expand Down
Loading