Skip to content

Commit

Permalink
feat: add schedule settable from pull flows
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Jul 27, 2022
1 parent e85c60f commit caecbfd
Show file tree
Hide file tree
Showing 19 changed files with 342 additions and 160 deletions.
3 changes: 3 additions & 0 deletions backend/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3793,6 +3793,7 @@ components:
- offset_
- extra_perms
- is_flow
- enabled

NewSchedule:
type: object
Expand All @@ -3809,6 +3810,8 @@ components:
type: boolean
args:
$ref: "#/components/schemas/ScriptArgs"
enabled:
type: boolean
required:
- path
- schedule
Expand Down
13 changes: 10 additions & 3 deletions backend/src/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub struct NewSchedule {
pub script_path: String,
pub is_flow: bool,
pub args: Option<serde_json::Value>,
pub enabled: Option<bool>,
}

pub async fn push_scheduled_job<'c>(
Expand Down Expand Up @@ -133,15 +134,16 @@ async fn create_schedule(
check_flow_conflict(&mut tx, &w_id, &ns.path, ns.is_flow, &ns.script_path).await?;

let schedule = sqlx::query_as!(Schedule,
"INSERT INTO schedule (workspace_id, path, schedule, offset_, edited_by, script_path, is_flow, args) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING *",
"INSERT INTO schedule (workspace_id, path, schedule, offset_, edited_by, script_path, is_flow, args, enabled) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *",
w_id,
ns.path,
ns.schedule,
ns.offset,
&authed.username,
ns.script_path,
ns.is_flow,
ns.args
ns.args,
ns.enabled
)
.fetch_one(&mut tx)
.await?;
Expand All @@ -165,8 +167,13 @@ async fn create_schedule(
)
.await?;

let tx = push_scheduled_job(tx, schedule).await?;
let tx = if ns.enabled.unwrap_or(true) {
push_scheduled_job(tx, schedule).await?
} else {
tx
};
tx.commit().await?;

Ok(ns.path.to_string())
}

Expand Down
2 changes: 1 addition & 1 deletion frontend/src/lib/components/ArgInput.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@
{disabled}
class="default-button-secondary items-center leading-4 py-0 my-px px-1 float-right"
on:click={() => (value = undefined)}
>Reset<Tooltip>Reset to default value</Tooltip></button
>Reset&nbsp;<Tooltip>Reset to default value</Tooltip></button
>
</div>
{/if}
Expand Down
11 changes: 9 additions & 2 deletions frontend/src/lib/components/CronInput.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
let preview: string[] = []
let cronError = ''
export let schedule: string = '0 0 12 * *'
let limit = 3
$: handleScheduleInput(schedule)
Expand Down Expand Up @@ -69,11 +70,17 @@
<CollapseLink text="preview next runs" open={true}>
{#if preview && preview.length > 0}
<div class="text-sm text-gray-700 border p-2 rounded-md">
<div class="flex flex-row justify-between">The next 10 runs will be scheduled at:</div>
<div class="flex flex-row justify-between">The next runs will be scheduled at:</div>
<ul class="list-disc mx-12">
{#each preview as p}
{#each preview.slice(0, limit) as p}
<li class="mx-2 text-gray-700 text-sm">{displayDate(p)}</li>
{/each}
<li class="text-sm mx-2">...</li>
{#if limit != 10}
<button class="underline text-gray-400" on:click={() => (limit = 10)}>Load more</button>
{:else}
<button class="underline text-gray-400" on:click={() => (limit = 3)}>Load less</button>
{/if}
</ul>
</div>
{/if}
Expand Down
4 changes: 1 addition & 3 deletions frontend/src/lib/components/Editor.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,7 @@
(lastWsAttempt.getTime() - new Date().getTime() > 60000 && nbWsAttempt < 2)
) {
if (!websocketAlive.black && !websocketAlive.deno && !websocketAlive.pyright) {
sendUserToast(
'Smart assistant got disconnected. Reconnecting to windmill language server for smart assistance'
)
console.log('reconnecting to language servers')
lastWsAttempt = new Date()
nbWsAttempt++
reloadWebsocket()
Expand Down
97 changes: 79 additions & 18 deletions frontend/src/lib/components/FlowBuilder.svelte
Original file line number Diff line number Diff line change
@@ -1,49 +1,104 @@
<script lang="ts">
import { goto } from '$app/navigation'
import { page } from '$app/stores'
import { FlowService, type Flow } from '$lib/gen'
import { FlowService, ScheduleService, ScriptService, type Flow } from '$lib/gen'
import { clearPreviewResults, hubScripts, workspaceStore } from '$lib/stores'
import { loadHubScripts, sendUserToast, setQueryWithoutLoad } from '$lib/utils'
import { formatCron, loadHubScripts, sendUserToast, setQueryWithoutLoad } from '$lib/utils'
import { onMount } from 'svelte'
import { OFFSET } from './CronInput.svelte'
import FlowEditor from './FlowEditor.svelte'
import { flowStore, type FlowMode } from './flows/flowStore'
import { flowStore, mode } from './flows/flowStore'
import { flowToMode } from './flows/utils'
import ScriptSchema from './ScriptSchema.svelte'
export let initialPath: string = ''
let pathError = ''
let mode: FlowMode
let scheduleArgs: Record<string, any>
let scheduleEnabled
let scheduleCron: string
$: step = Number($page.url.searchParams.get('step')) || 1
async function createSchedule(path: string) {
await ScheduleService.createSchedule({
workspace: $workspaceStore!,
requestBody: {
path: path,
schedule: formatCron(scheduleCron),
offset: OFFSET,
script_path: path,
is_flow: true,
args: scheduleArgs,
enabled: scheduleEnabled
}
})
}
async function saveFlow(): Promise<void> {
const newFlow = flowToMode($flowStore, mode)
const flow = flowToMode($flowStore, $mode)
if (initialPath === '') {
await FlowService.createFlow({
workspace: $workspaceStore!,
requestBody: {
path: newFlow.path,
summary: newFlow.summary,
description: newFlow.description ?? '',
value: newFlow.value,
schema: newFlow.schema
path: flow.path,
summary: flow.summary,
description: flow.description ?? '',
value: flow.value,
schema: flow.schema
}
})
if ($mode == 'pull') {
await createSchedule(flow.path)
}
} else {
await FlowService.updateFlow({
workspace: $workspaceStore!,
path: newFlow.path,
path: initialPath,
requestBody: {
path: newFlow.path,
summary: newFlow.summary,
description: newFlow.description ?? '',
value: newFlow.value,
schema: newFlow.schema
path: flow.path,
summary: flow.summary,
description: flow.description ?? '',
value: flow.value,
schema: flow.schema
}
})
const scheduleExists = await ScheduleService.existsSchedule({
workspace: $workspaceStore ?? '',
path: initialPath
})
if (scheduleExists) {
const schedule = await ScheduleService.getSchedule({
workspace: $workspaceStore ?? '',
path: initialPath
})
if (
schedule.path != flow.path ||
JSON.stringify(schedule.args) != JSON.stringify(scheduleArgs) ||
schedule.schedule != scheduleCron
) {
await ScheduleService.updateSchedule({
workspace: $workspaceStore ?? '',
path: initialPath,
requestBody: {
schedule: formatCron(scheduleCron),
script_path: flow.path,
is_flow: true,
args: scheduleArgs
}
})
}
if (scheduleEnabled != schedule.enabled) {
await ScheduleService.setScheduleEnabled({
workspace: $workspaceStore ?? '',
path: flow.path,
requestBody: { enabled: scheduleEnabled }
})
}
} else {
await createSchedule(flow.path)
}
}
sendUserToast(`Success! flow saved at ${$flowStore.path}`)
goto(`/flows/get/${$flowStore.path}`)
Expand All @@ -54,7 +109,7 @@
}
flowStore.subscribe((flow: Flow) => {
setQueryWithoutLoad($page.url, 'state', btoa(JSON.stringify(flowToMode(flow, mode))))
setQueryWithoutLoad($page.url, 'state', btoa(JSON.stringify(flowToMode(flow, $mode))))
})
onMount(() => {
Expand Down Expand Up @@ -121,7 +176,13 @@
<!-- metadata -->

{#if step === 1}
<FlowEditor bind:mode bind:pathError bind:initialPath />
<FlowEditor
bind:pathError
bind:initialPath
bind:scheduleEnabled
bind:scheduleCron
bind:scheduleArgs
/>
{:else if step === 2}
<ScriptSchema
synchronizedHeader={false}
Expand Down
Loading

0 comments on commit caecbfd

Please sign in to comment.