Skip to content

Commit

Permalink
Merge pull request #725 from vespa-engine/tmaregge/prod-deployment-mo…
Browse files Browse the repository at this point in the history
…nitoring

Add monitoring to prod deployments
  • Loading branch information
kkraune committed Apr 4, 2024
2 parents 04fe06d + 35166df commit 704f26b
Showing 1 changed file with 89 additions and 11 deletions.
100 changes: 89 additions & 11 deletions vespa/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,22 @@ def deploy(self, instance: Optional[str]="default", disk_folder: Optional[str] =
print("Finished deployment.", file=self.output)
return app

def _get_latest_run_id(self, instance) -> int:
# The following endpoint returns a dictionary containing information about various builds for a given application.
# It sometimes takes a couple of seconds for the actual latest build to show up, but once it does, we can get the latest run id.
endpoint = f"/application/v4/tenant/{self.tenant}/application/{self.application}/deployment"
res = self._request("GET", endpoint)

# The different deployment stages might be out of sync, so we need all the ids to determine the latest one
run_ids = []
for item in res["steps"]:
if "runs" in item.keys(): # "id" is only present in steps with "runs" key
run_ids.append(item["runs"][0]["id"]) # Index zero to get the latest id
if run_ids == []:
return -1 # No runs found

return max(run_ids)

def deploy_to_prod(self, instance: Optional[str]="default", disk_folder: Optional[str] = None) -> None:
"""
Deploy the given application package as the given instance in the Vespa Cloud prod environment.
Expand All @@ -495,6 +511,50 @@ def deploy_to_prod(self, instance: Optional[str]="default", disk_folder: Optiona
)
print(f"Follow deployment at: {deploy_url}", file=self.output)

print("Waiting for monitoring...", file=self.output)
last_run_id = self._get_latest_run_id(instance) # This may or may not be updated
retry_count = 0
max_retries = 5
while retry_count < max_retries:
run_id = self._get_latest_run_id(instance)
if run_id > last_run_id:
break # New run id found, proceed
else:
retry_count += 1
if retry_count == max_retries:
# There is a chance that last_run_id is actually updated already, and there is also a chance that it is not.
# In any case, the deployment will proceed as expected, we just won't be able to monitor it (can still be montiored from web console).
print(f"Could not find a new run id after {max_retries} retries. Proceeding anyway.", file=self.output)
break
else:
delay = min(2**retry_count, 8) + random.uniform(0, 1)
sleep(delay)

# We need to wait for the tests to finish before we can monitor the deployment itself
self._follow_deployment(instance, "staging-test", run_id)
self._follow_deployment(instance, "system-test", run_id)

# Like with the run id, it can take a couple of seconds for the job to show up here.
# TODO Replace with a more robust solution
sleep(10)
region = self.get_prod_region()
self._follow_deployment(instance, f"production-{region}", run_id)

token = os.environ.get(VESPA_CLOUD_SECRET_TOKEN, None)
if token is None:
endpoint_url = self.get_mtls_endpoint(instance=instance, region=region, environment="prod")
else:
endpoint_url = self.get_token_endpoint(instance=instance, region=region, environment="prod")

app = Vespa(
url=endpoint_url,
cert=self.data_cert_path or os.path.join(disk_folder, self.private_cert_file_name),
key=self.data_key_path or None,
application_package=self.application_package,
)

return app

def deploy_from_disk(self, instance: str, application_root: Path) -> Vespa:
"""
Deploy from a directory tree.
Expand Down Expand Up @@ -664,6 +724,10 @@ def _write_private_key_and_cert(
def get_dev_region(self) -> str:
return self._request("GET", "/zone/v1/environment/dev/default")["name"]

def get_prod_region(self):
# TODO Support multiple regions
return self.application_package.deployment_config.regions[0]

def _request(
self, method: str, path: str, body: BytesIO = BytesIO(), headers={}
) -> dict:
Expand Down Expand Up @@ -720,13 +784,18 @@ def _request(



def get_mtls_endpoint(self, instance: Optional[str]="default", region: Optional[str]=None) -> str:
def get_mtls_endpoint(self, instance: Optional[str]="default", region: Optional[str]=None, environment: Optional[str]="dev") -> str:
if region is None:
region = self.get_dev_region()
if environment == "dev":
region = self.get_dev_region()
elif environment == "prod":
region = self.get_prod_region()
else:
raise ValueError("Invalid environment. Must be 'dev' or 'prod'")
endpoints = self._request(
"GET",
"/application/v4/tenant/{}/application/{}/instance/{}/environment/dev/region/{}".format(
self.tenant, self.application, instance, region
"/application/v4/tenant/{}/application/{}/instance/{}/environment/{}/region/{}".format(
self.tenant, self.application, instance, environment, region
),
)["endpoints"]
cluster_name = "{}_container".format(self.application_package.name)
Expand All @@ -737,13 +806,18 @@ def get_mtls_endpoint(self, instance: Optional[str]="default", region: Optional[
return endpoint['url']
raise RuntimeError("No mtls endpoints found for container cluster " + cluster_name)

def get_token_endpoint(self, instance: Optional[str]="default", region: Optional[str]=None) -> List[dict]:
def get_token_endpoint(self, instance: Optional[str]="default", region: Optional[str]=None, environment: Optional[str]="dev") -> List[dict]:
if region is None:
region = self.get_dev_region()
if environment == "dev":
region = self.get_dev_region()
elif environment == "prod":
region = self.get_prod_region()
else:
raise ValueError("Invalid environment. Must be 'dev' or 'prod'")
endpoints = self._request(
"GET",
"/application/v4/tenant/{}/application/{}/instance/{}/environment/dev/region/{}".format(
self.tenant, self.application, instance, region
"/application/v4/tenant/{}/application/{}/instance/{}/environment/{}/region/{}".format(
self.tenant, self.application, instance, environment, region
),
)["endpoints"]
cluster_name = "{}_container".format(self.application_package.name)
Expand Down Expand Up @@ -775,7 +849,7 @@ def _start_prod_deployment(self, disk_folder: str) -> None:
# TODO Avoid hardcoding projectId and risk
# TODO Consider supporting optional fields
submit_options = {
"projectId": 1,
"projectId": 1,
"risk": 0,
# "repository": "",
# "branch": "",
Expand All @@ -794,7 +868,7 @@ def _start_prod_deployment(self, disk_folder: str) -> None:
}
)

# Compute content hash, etc
# Compute content hash, etc
url = "https://" + self.connection.host + ":" + str(self.connection.port) + deploy_path
digest = hashes.Hash(hashes.SHA256(), default_backend())
digest.update(multipart_data.to_string()) # This moves the buffer position to the end
Expand All @@ -821,7 +895,6 @@ def _start_prod_deployment(self, disk_folder: str) -> None:
message = response.json()["message"]
print(message, file=self.output)


def _start_deployment(self, instance: str, job: str, disk_folder: str,
application_zip_bytes: Optional[BytesIO] = None) -> int:
deploy_path = (
Expand Down Expand Up @@ -950,6 +1023,11 @@ def _get_deployment_status(
status = update["status"]
if status == "success":
return "success", last
if status == "noTests":
# We'll proceed as usual for now, as this is allowed.
# In the future, we should support tests via Pyvespa properly, though.
# TODO Support tests via Pyvespa
return "success", last
elif status in fail_status_message.keys():
raise RuntimeError(fail_status_message[status])
else:
Expand Down

0 comments on commit 704f26b

Please sign in to comment.