Skip to content

Commit

Permalink
feat(clients): rename internal state as state + setters for resources…
Browse files Browse the repository at this point in the history
…/variables in python
  • Loading branch information
rubenfiszel committed Nov 15, 2022
1 parent 2484f15 commit 32bca1f
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 34 deletions.
6 changes: 2 additions & 4 deletions backend/windmill-api/openapi.yaml
Expand Up @@ -4358,8 +4358,7 @@ components:
properties:
path:
type: string
value:
type: object
value: {}
description:
type: string
resource_type:
Expand All @@ -4378,8 +4377,7 @@ components:
type: string
description:
type: string
value:
type: object
value: {}

Resource:
type: object
Expand Down
45 changes: 31 additions & 14 deletions deno-client/mod.ts
Expand Up @@ -34,12 +34,13 @@ export function getWorkspace(): string {

/**
* Get a resource value by path
* @param path path of the resource
* @param path path of the resource, default to internal state path
* @param undefinedIfEmpty if the resource does not exist, return undefined instead of throwing an error
* @returns resource value
*/
export async function getResource(path: string, undefinedIfEmpty?: boolean): Promise<any> {
export async function getResource(path?: string, undefinedIfEmpty?: boolean): Promise<any> {
const workspace = getWorkspace()
path = path ?? getStatePath()
try {
const resource = await ResourceService.getResource({ workspace, path })
return await _transformLeaf(resource.value)
Expand All @@ -52,12 +53,12 @@ export async function getResource(path: string, undefinedIfEmpty?: boolean): Pro
}
}

export function getInternalStatePath(suffix?: string): string {
export function getStatePath(): string {
const env_flow_path = Deno.env.get("WM_FLOW_PATH")
const env_job_path = Deno.env.get("WM_JOB_PATH")
const permissioned_as = Deno.env.get("WM_PERMISSIONED_AS")
const flow_path = env_flow_path != undefined && env_flow_path != "" ? env_flow_path : 'NO_FLOW_PATH'
const script_path = suffix ?? (env_job_path != undefined && env_job_path != "" ? env_job_path : 'NO_JOB_PATH')
const script_path = env_job_path != undefined && env_job_path != "" ? env_job_path : 'NO_JOB_PATH'
const env_schedule_path = Deno.env.get("WM_SCHEDULE_PATH")
const schedule_path = env_schedule_path != undefined && env_schedule_path != "" ? `/${env_schedule_path}` : ''

Expand All @@ -69,11 +70,12 @@ export function getInternalStatePath(suffix?: string): string {

/**
* Set a resource value by path
* @param path path of the resource to set
* @param path path of the resource to set, default to state path
* @param value new value of the resource to set
* @param initializeToTypeIfNotExist if the resource does not exist, initialize it with this type
*/
export async function setResource(path: string, value: any, initializeToTypeIfNotExist?: string): Promise<void> {
export async function setResource(value: any, path?: string, initializeToTypeIfNotExist?: string): Promise<void> {
path = path ?? getStatePath()
const workspace = getWorkspace()
if (await ResourceService.existsResource({ workspace, path })) {
await ResourceService.updateResource({ workspace, path, requestBody: { value } })
Expand All @@ -85,20 +87,35 @@ export async function setResource(path: string, value: any, initializeToTypeIfNo
}

/**
* Set the internal state
* Set the state
* @param state state to set
* @param suffix suffix of the path of the internal state (useful to share internal state between jobs)
* @deprecated use setState instead
*/
export async function setInternalState(state: any, suffix?: string): Promise<void> {
await setResource(getInternalStatePath(suffix), state, 'state')
export async function setInternalState(state: any): Promise<void> {
await setResource(state, undefined, 'state')
}

/**
* Set the state
* @param state state to set
*/
export async function setState(state: any): Promise<void> {
await setResource(state, undefined, 'state')
}

/**
* Get the internal state
* @param suffix suffix of the path of the internal state (useful to share internal state between jobs)
* @deprecated use getState instead
*/
export async function getInternalState(): Promise<any> {
return await getResource(getStatePath(), true)
}

/**
* Get the state shared across executions
*/
export async function getInternalState(suffix?: string): Promise<any> {
return await getResource(getInternalStatePath(suffix), true)
export async function getState(): Promise<any> {
return await getResource(getStatePath(), true)
}

/**
Expand Down Expand Up @@ -161,7 +178,7 @@ export async function genNounceAndHmac(workspace: string, jobId: string, approve
Deno.env.get("WM_BASE_URL"),
);

u.searchParams.append('token', Deno.env.get("WM_TOKEN"));
u.searchParams.append('token', Deno.env.get("WM_TOKEN") ?? '');
if (approver) {
u.searchParams.append('approver', approver);
}
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/lib/components/sidebar/MenuLink.svelte
Expand Up @@ -47,7 +47,7 @@
<span
class={classNames(
'whitespace-pre text-white',
isSelected ? ' text-gray-700 font-bold' : 'text-white group-hover:text-gray-500'
isSelected ? ' text-gray-700 font-bold' : 'text-white group-hover:text-gray-900'
)}
>
{label}
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/lib/components/sidebar/SidebarContent.svelte
Expand Up @@ -58,7 +58,7 @@
{#each mainMenuLinks as menuLink}
<MenuLink class="text-lg" {...menuLink} {isCollapsed} />
{/each}
<div class="h-4" />
<div class="h-8" />
{#each secondaryMenuLinks as menuLink}
<MenuLink class="text-xs" {...menuLink} {isCollapsed} />
{/each}
Expand All @@ -67,7 +67,7 @@
<div class="space-1-2">
<div class="h-4" />
{#each thirdMenuLinks as menuLink}
<MenuLink class="text-2xs" {...menuLink} {isCollapsed} />
<MenuLink class="text-xs" {...menuLink} {isCollapsed} />
{/each}
</div>
</nav>
Expand Down
150 changes: 137 additions & 13 deletions python-client/wmill/wmill/client.py
Expand Up @@ -9,7 +9,15 @@
from windmill_api.api.job import run_script_by_hash, get_job, get_completed_job

from windmill_api.api.resource import get_resource as get_resource_api
from windmill_api.api.resource import exists_resource as exists_resource_api
from windmill_api.api.resource import update_resource as update_resource_api
from windmill_api.api.resource import create_resource as create_resource_api
from windmill_api.api.variable import get_variable as get_variable_api
from windmill_api.api.variable import exists_variable as exists_variable_api
from windmill_api.api.variable import update_variable as update_variable_api
from windmill_api.api.variable import create_variable as create_variable_api
from windmill_api.models.create_variable_json_body import CreateVariableJsonBody
from windmill_api.models import *
from windmill_api.models.get_job_response_200_type import GetJobResponse200Type

from windmill_api.models.run_script_by_hash_json_body import RunScriptByHashJsonBody
Expand All @@ -27,32 +35,41 @@ class JobStatus(Enum):
RUNNING = 2
COMPLETED = 3


_client: "AuthenticatedClient | None" = None

def create_client(base_url: "str | None" = None, token: "str | None" = None) -> AuthenticatedClient:

def create_client(
base_url: "str | None" = None, token: "str | None" = None
) -> AuthenticatedClient:
env_base_url = os.environ.get("BASE_INTERNAL_URL")

if env_base_url is not None:
env_base_url = env_base_url + "/api"

base_url_: str = base_url or env_base_url or "http://localhost:8000/api"
token_ : str = token or os.environ.get("WM_TOKEN") or ""
token_: str = token or os.environ.get("WM_TOKEN") or ""
global _client
if _client is None:
_client = AuthenticatedClient(base_url=base_url_, token=token_, timeout=30)
return _client


def get_workspace() -> str:
from_env = os.environ.get("WM_WORKSPACE")
if from_env is None:
raise Exception("Workspace not passed as WM_WORKSPACE")
return from_env


def get_version() -> str:
"""
Returns the current version of the backend
"""
return backend_version.sync_detailed(client=create_client()).content.decode("us-ascii")
return backend_version.sync_detailed(client=create_client()).content.decode(
"us-ascii"
)


def run_script_async(
hash: str,
Expand All @@ -71,7 +88,10 @@ def run_script_async(
parent_job=os.environ.get("DT_JOB_ID"),
).content.decode("us-ascii")

def run_script_sync(hash: str, args: Dict[str, Any] = {}, verbose: bool = False) -> Dict[str, Any]:

def run_script_sync(
hash: str, args: Dict[str, Any] = {}, verbose: bool = False
) -> Dict[str, Any]:
"""
Run a script, wait for it to complete and return the result of the launched script
"""
Expand All @@ -87,11 +107,14 @@ def run_script_sync(hash: str, args: Dict[str, Any] = {}, verbose: bool = False)
nb_iter += 1
return get_result(job_id)


def get_job_status(job_id: str) -> JobStatus:
"""
Returns the status of a queued or completed job
"""
res = get_job.sync_detailed(client=create_client(), workspace=get_workspace(), id=job_id).parsed
res = get_job.sync_detailed(
client=create_client(), workspace=get_workspace(), id=job_id
).parsed
if not res:
raise Exception(f"Job {job_id} not found")
elif not res.type:
Expand All @@ -106,25 +129,34 @@ def get_job_status(job_id: str) -> JobStatus:
else:
return JobStatus.WAITING


def get_result(job_id: str) -> Dict[str, Any]:
"""
Returns the result of a completed job
"""
res = get_completed_job.sync_detailed(client=create_client(), workspace=get_workspace(), id=job_id).parsed
res = get_completed_job.sync_detailed(
client=create_client(), workspace=get_workspace(), id=job_id
).parsed
if not res:
raise Exception(f"Job {job_id} not found")
if not res.result:
raise Exception(f"Unexpected result not found for completed job {job_id}")
else:
return res.result.to_dict() # type: ignore

def get_resource(path: str) -> Dict[str, Any]:

def get_resource(path: str | None) -> Any:
"""
Returns the resource at a given path as a python dict.
"""
parsed = get_resource_api.sync_detailed(workspace=get_workspace(), path=path, client=create_client()).parsed
path = path or get_state_path()
parsed = get_resource_api.sync_detailed(
workspace=get_workspace(), path=path, client=create_client()
).parsed
if parsed is None:
raise Exception(f"Resource at path {path} does not exist or you do not have read permissions on it")
raise Exception(
f"Resource at path {path} does not exist or you do not have read permissions on it"
)

if isinstance(parsed.value, Unset):
return {}
Expand All @@ -134,25 +166,117 @@ def get_resource(path: str) -> Dict[str, Any]:

return res


def get_state() -> Any:
"""
Get the state
"""
get_resource(None)


def set_resource(value: Any, path: str | None, type: str = "state") -> None:
"""
Set the resource at a given path as a string, creating it if it does not exist
"""
path = path or get_state_path()
workspace = get_workspace()
client = create_client()
if not exists_resource_api.sync_detailed(
workspace=workspace, path=path, client=client
).parsed:
create_resource_api.sync_detailed(
workspace=workspace,
client=client,
json_body=CreateResourceJsonBody(path=path, value=value, type=type),
)
else:
update_resource_api.sync_detailed(
workspace=get_workspace(),
client=client,
path=path,
json_body=UpdateResourceJsonBody(value=value),
)


def set_state(value: Any) -> None:
"""
Set the state
"""
set_resource(value, None)


def get_variable(path: str) -> str:
"""
Returns the variable at a given path as a string
"""
res = get_variable_api.sync_detailed(workspace=get_workspace(), path=path, client=create_client()).parsed
res = get_variable_api.sync_detailed(
workspace=get_workspace(), path=path, client=create_client()
).parsed
if res is None:
raise Exception(f"Variable at path {path} does not exist or you do not have read permissions on it")
raise Exception(
f"Variable at path {path} does not exist or you do not have read permissions on it"
)

return res.value # type: ignore


def set_variable(path: str, value: str) -> None:
"""
Set the variable at a given path as a string, creating it if it does not exist
"""
workspace = get_workspace()
client = create_client()
if not exists_variable_api.sync_detailed(
workspace=workspace, path=path, client=client
).parsed:
create_variable_api.sync_detailed(
workspace=workspace,
client=client,
json_body=CreateVariableJsonBody(
path=path, value=value, is_secret=False, description=""
),
)
else:
update_variable_api.sync_detailed(
workspace=get_workspace(),
path=path,
client=client,
json_body=UpdateVariableJsonBody(value=value),
)


def get_state_path() -> str:
"""
Get a stable path for next execution of the script to point to the same resource
"""
permissioned_as = os.environ.get("WM_PERMISSIONED_AS")
flow_path = os.environ.get("WM_FLOW_PATH") or "NO_FLOW_PATH"
script_path = os.environ.get("WM_JOB_PATH") or "NO_JOB_PATH"
env_schedule_path = os.environ.get("WM_SCHEDULE_PATH")
schedule_path = (
""
if env_schedule_path is None or env_schedule_path == ""
else f"/{env_schedule_path}"
)

if script_path.endswith("/"):
raise Exception(
"The script path must not end with '/', give a name to your script!"
)

return f"{permissioned_as}/{flow_path}/{script_path}{schedule_path}"

return res.value # type: ignore

def _transform_leaves(d: Dict[str, Any]) -> Dict[str, Any]:
return {k: _transform_leaf(v) for k, v in d.items()}


def _transform_leaf(v: Any) -> Any:
if isinstance(v, dict):
return Client._transform_leaves(v) # type: ignore
elif isinstance(v, str):
if v.startswith(VAR_RESOURCE_PREFIX):
var_name = v[len(VAR_RESOURCE_PREFIX):]
var_name = v[len(VAR_RESOURCE_PREFIX) :]
return get_variable(var_name)
else:
return v
Expand Down

0 comments on commit 32bca1f

Please sign in to comment.