Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 33 additions & 39 deletions rsconnect/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def token_endpoint(self) -> str:
if params is None:
raise RSConnectException("No Snowflake connection found.")

return "https://{}.snowflakecomputing.com/".format(params["account"])
return f"https://{params['account']}.snowflakecomputing.com/"

def fmt_payload(self):
params = get_parameters(self.snowflake_connection_name)
Expand All @@ -276,9 +276,7 @@ def fmt_payload(self):
authenticator = params.get("authenticator")
if authenticator == "SNOWFLAKE_JWT":
spcs_url = urlparse(self.url)
scope = (
"session:role:{} {}".format(params["role"], spcs_url.netloc) if params.get("role") else spcs_url.netloc
)
scope = f"session:role:{params['role']} {spcs_url.netloc}" if params.get("role") else spcs_url.netloc
jwt = generate_jwt(self.snowflake_connection_name)
grant_type = "urn:ietf:params:oauth:grant-type:jwt-bearer"

Expand All @@ -300,13 +298,13 @@ def fmt_payload(self):
"body": payload,
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer %s" % params["token"],
"Authorization": f"Bearer {params['token']}",
"X-Snowflake-Authorization-Token-Type": "OAUTH",
},
"path": "/session/v1/login-request",
}
else:
raise NotImplementedError("Unsupported authenticator for SPCS Connect: %s" % authenticator)
raise NotImplementedError(f"Unsupported authenticator for SPCS Connect: {authenticator}")

def exchange_token(self) -> str:
try:
Expand Down Expand Up @@ -451,13 +449,13 @@ def python_settings(self) -> PyInfo:
return response

def app_get(self, app_id: str) -> ContentItemV0:
response = cast(Union[ContentItemV0, HTTPResponse], self.get("applications/%s" % app_id))
response = cast(Union[ContentItemV0, HTTPResponse], self.get(f"applications/{app_id}"))
response = self._server.handle_bad_response(response)
return response

def add_environment_vars(self, content_guid: str, env_vars: list[tuple[str, str]]):
env_body = [dict(name=kv[0], value=kv[1]) for kv in env_vars]
return self.patch("v1/content/%s/environment" % content_guid, body=env_body)
return self.patch(f"v1/content/{content_guid}/environment", body=env_body)

def is_failed_response(self, response: HTTPResponse | JsonData) -> bool:
return isinstance(response, HTTPResponse) and response.status >= 500
Expand All @@ -481,15 +479,15 @@ def access_content(self, content_guid: str) -> None:
def bundle_download(self, content_guid: str, bundle_id: str) -> HTTPResponse:
response = cast(
HTTPResponse,
self.get("v1/content/%s/bundles/%s/download" % (content_guid, bundle_id), decode_response=False),
self.get(f"v1/content/{content_guid}/bundles/{bundle_id}/download", decode_response=False),
)
response = self._server.handle_bad_response(response, is_httpresponse=True)
return response

def content_lockfile(self, content_guid: str) -> HTTPResponse:
response = cast(
HTTPResponse,
self.get("v1/content/%s/lockfile" % content_guid, decode_response=False),
self.get(f"v1/content/{content_guid}/lockfile", decode_response=False),
)
response = self._server.handle_bad_response(response, is_httpresponse=True)
return response
Expand All @@ -500,7 +498,7 @@ def content_list(self, filters: Optional[Mapping[str, JsonData]] = None) -> list
return response

def content_get(self, content_guid: str) -> ContentItemV1:
response = cast(Union[ContentItemV1, HTTPResponse], self.get("v1/content/%s" % content_guid))
response = cast(Union[ContentItemV1, HTTPResponse], self.get(f"v1/content/{content_guid}"))
response = self._server.handle_bad_response(response)
return response

Expand Down Expand Up @@ -546,17 +544,17 @@ def upload_bundle(
body, content_type = create_multipart_form_data(fields)
response = cast(
Union[BundleMetadata, HTTPResponse],
self.post("v1/content/%s/bundles" % content_guid, body=body, headers={"Content-Type": content_type}),
self.post(f"v1/content/{content_guid}/bundles", body=body, headers={"Content-Type": content_type}),
)
else:
response = cast(
Union[BundleMetadata, HTTPResponse], self.post("v1/content/%s/bundles" % content_guid, body=tarball)
Union[BundleMetadata, HTTPResponse], self.post(f"v1/content/{content_guid}/bundles", body=tarball)
)
response = self._server.handle_bad_response(response)
return response

def content_update(self, content_guid: str, updates: Mapping[str, str | None]) -> ContentItemV1:
response = cast(Union[ContentItemV1, HTTPResponse], self.patch("v1/content/%s" % content_guid, body=updates))
response = cast(Union[ContentItemV1, HTTPResponse], self.patch(f"v1/content/{content_guid}", body=updates))
response = self._server.handle_bad_response(response)
return response

Expand All @@ -571,7 +569,7 @@ def content_build(
body["activate"] = False
response = cast(
Union[BuildOutputDTO, HTTPResponse],
self.post("v1/content/%s/build" % content_guid, body=body),
self.post(f"v1/content/{content_guid}/build", body=body),
)
response = self._server.handle_bad_response(response)
return response
Expand All @@ -587,7 +585,7 @@ def content_deploy(
body["activate"] = False
response = cast(
Union[BuildOutputDTO, HTTPResponse],
self.post("v1/content/%s/deploy" % content_guid, body=body),
self.post(f"v1/content/{content_guid}/deploy", body=body),
)
response = self._server.handle_bad_response(response)
return response
Expand Down Expand Up @@ -615,7 +613,7 @@ def task_get(
params["first"] = first
if wait is not None:
params["wait"] = wait
response = cast(Union[TaskStatusV1, HTTPResponse], self.get("v1/tasks/%s" % task_id, query_params=params))
response = cast(Union[TaskStatusV1, HTTPResponse], self.get(f"v1/tasks/{task_id}", query_params=params))
response = self._server.handle_bad_response(response)

# compatibility with rsconnect-jupyter
Expand Down Expand Up @@ -1102,11 +1100,11 @@ def make_bundle(
def upload_posit_bundle(self, prepare_deploy_result: PrepareDeployResult, bundle_size: int, contents: bytes):
upload_url = prepare_deploy_result.presigned_url
parsed_upload_url = urlparse(upload_url)
with S3Client("{}://{}".format(parsed_upload_url.scheme, parsed_upload_url.netloc)) as s3_client:
with S3Client(f"{parsed_upload_url.scheme}://{parsed_upload_url.netloc}") as s3_client:
upload_result = cast(
HTTPResponse,
s3_client.upload(
"{}?{}".format(parsed_upload_url.path, parsed_upload_url.query),
f"{parsed_upload_url.path}?{parsed_upload_url.query}",
prepare_deploy_result.presigned_checksum,
bundle_size,
contents,
Expand Down Expand Up @@ -1156,7 +1154,7 @@ def deploy_bundle(self, activate: bool = True):
# type: ignore[arg-type] - PrepareDeployResult uses int, but format() accepts it
shinyapps_service.do_deploy(prepare_deploy_result.bundle_id, prepare_deploy_result.app_id)

print("Application successfully deployed to {}".format(prepare_deploy_result.app_url))
print(f"Application successfully deployed to {prepare_deploy_result.app_url}")
webbrowser.open_new(prepare_deploy_result.app_url)

self.deployed_info = RSConnectClientDeployResult(
Expand Down Expand Up @@ -1577,21 +1575,21 @@ def get_extra_headers(self, url: str, method: str, body: str | bytes):
signature = self._get_canonical_request_signature(canonical_request)

return {
"X-Auth-Token": "{0}".format(self._token),
"X-Auth-Signature": "{0}; version=1".format(signature),
"X-Auth-Token": self._token,
"X-Auth-Signature": f"{signature}; version=1",
"Date": canonical_request_date,
"X-Content-Checksum": canonical_request_checksum,
}

def get_application(self, application_id: str):
response = cast(Union[PositClientApp, HTTPResponse], self.get("/v1/applications/{}".format(application_id)))
response = cast(Union[PositClientApp, HTTPResponse], self.get(f"/v1/applications/{application_id}"))
response = self._server.handle_bad_response(response)
return response

def update_application_property(self, application_id: int, property: str, value: str) -> HTTPResponse:
response = cast(
HTTPResponse,
self.put("/v1/applications/{}/properties/{}".format(application_id, property), body={"value": value}),
self.put(f"/v1/applications/{application_id}/properties/{property}", body={"value": value}),
)
response = self._server.handle_bad_response(response, is_httpresponse=True)
return response
Expand All @@ -1614,11 +1612,7 @@ def get_accounts(self) -> PositClientAccountSearchResults:
def _get_applications_like_name_page(self, name: str, offset: int) -> PositClientAppSearchResults:
response = cast(
Union[PositClientAppSearchResults, HTTPResponse],
self.get(
"/v1/applications?filter=name:like:{}&offset={}&count=100&use_advanced_filters=true".format(
name, offset
)
),
self.get(f"/v1/applications?filter=name:like:{name}&offset={offset}&count=100&use_advanced_filters=true"),
)
response = self._server.handle_bad_response(response)
return response
Expand All @@ -1637,22 +1631,22 @@ def create_bundle(
return response

def set_bundle_status(self, bundle_id: str, bundle_status: str):
response = self.post("/v1/bundles/{}/status".format(bundle_id), body={"status": bundle_status})
response = self.post(f"/v1/bundles/{bundle_id}/status", body={"status": bundle_status})
response = self._server.handle_bad_response(response)
return response

def deploy_application(self, bundle_id: str, app_id: str) -> PositClientDeployTask:
response = cast(
Union[PositClientDeployTask, HTTPResponse],
self.post("/v1/applications/{}/deploy".format(app_id), body={"bundle": bundle_id, "rebuild": False}),
self.post(f"/v1/applications/{app_id}/deploy", body={"bundle": bundle_id, "rebuild": False}),
)
response = self._server.handle_bad_response(response)
return response

def get_task(self, task_id: str) -> PositClientDeployTask:
response = cast(
Union[PositClientDeployTask, HTTPResponse],
self.get("/v1/tasks/{}".format(task_id), query_params={"legacy": "true"}),
self.get(f"/v1/tasks/{task_id}", query_params={"legacy": "true"}),
)
response = self._server.handle_bad_response(response)
return response
Expand All @@ -1664,7 +1658,7 @@ def get_shinyapps_build_task(self, parent_task_id: str) -> PositClientShinyappsB
"/v1/tasks",
query_params={
"filter": [
"parent_id:eq:{}".format(parent_task_id),
f"parent_id:eq:{parent_task_id}",
"action:eq:image-build",
]
},
Expand All @@ -1674,7 +1668,7 @@ def get_shinyapps_build_task(self, parent_task_id: str) -> PositClientShinyappsB
return response

def get_task_logs(self, task_id: str) -> HTTPResponse:
response = cast(HTTPResponse, self.get("/v1/tasks/{}/logs".format(task_id)))
response = cast(HTTPResponse, self.get(f"/v1/tasks/{task_id}/logs"))
response = self._server.handle_bad_response(response, is_httpresponse=True)
return response

Expand All @@ -1685,7 +1679,7 @@ def get_current_user(self):

def wait_until_task_is_successful(self, task_id: str, timeout: int = get_task_timeout()) -> None:
print()
print("Waiting for task: {}".format(task_id))
print(f"Waiting for task: {task_id}")

start_time = time.time()
finished: bool | None = None
Expand All @@ -1703,16 +1697,16 @@ def wait_until_task_is_successful(self, task_id: str, timeout: int = get_task_ti
if finished:
break

print(" {} - {}".format(status, description))
print(f" {status} - {description}")
time.sleep(2)

if not finished:
raise RSConnectException(get_task_timeout_help_message(timeout))

if status != "success":
raise DeploymentFailedException("Application deployment failed with error: {}".format(error))
raise DeploymentFailedException(f"Application deployment failed with error: {error}")

print("Task done: {}".format(description))
print(f"Task done: {description}")

def get_applications_like_name(self, name: str) -> list[str]:
applications: list[PositClientApp] = []
Expand Down Expand Up @@ -1794,7 +1788,7 @@ def do_deploy(self, bundle_id: str, app_id: str):
build_task_result = self._posit_client.get_shinyapps_build_task(deploy_task["id"])
build_task = build_task_result["tasks"][0]
logs = self._posit_client.get_task_logs(build_task["id"])
logger.error("Build logs:\n{}".format(logs.response_body))
logger.error(f"Build logs:\n{logs.response_body}")
raise e


Expand Down
Loading
Loading