Skip to content

Commit

Permalink
Merge pull request #947 from orchest/improv/orchest-api-analytics
Browse files Browse the repository at this point in the history
Give orchest api access to the analytics module
  • Loading branch information
fruttasecca committed May 19, 2022
2 parents 8a396e0 + e08f0a7 commit b535ce5
Show file tree
Hide file tree
Showing 58 changed files with 2,075 additions and 250 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Expand Up @@ -38,6 +38,6 @@ repos:
hooks:
- id: flake8
- repo: https://github.com/dnephin/pre-commit-golang
rev: master
rev: v0.5.0
hooks:
- id: go-fmt
372 changes: 313 additions & 59 deletions lib/python/orchest-internals/_orchest/internals/analytics.py

Large diffs are not rendered by default.

31 changes: 30 additions & 1 deletion lib/python/orchest-internals/_orchest/internals/utils.py
Expand Up @@ -4,7 +4,7 @@
import os
import re
import subprocess
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple

import requests
from werkzeug.serving import is_running_from_reloader as _irfr
Expand Down Expand Up @@ -301,3 +301,32 @@ def is_version_lt(expected_older: str, expected_newer: str) -> bool:
elif int(o) < int(n):
return True
return False


def get_directory_size(path: str, skip_dirs: Optional[Iterable] = None):
"""Gets the directory size in bytes.
Args:
path: Path of the directory.
skip_dirs: Direcotires to skip when calculating the size.
Returns:
Size of the directory in bytes.
"""
if skip_dirs is None:
skip_dirs = []

size = 0
for root, dirs, files in os.walk(path):
for file_name in files:
file_path = os.path.join(root, file_name)
if os.path.islink(file_path):
continue
size += os.path.getsize(file_path)

for skip_dir in skip_dirs:
if skip_dir in dirs:
dirs.remove(skip_dir)

return size
2 changes: 1 addition & 1 deletion scripts/migration_manager.sh
Expand Up @@ -46,7 +46,7 @@ else
fi

# Get the pod to which command & cp will be issued.
pod_name=$(kubectl get pods -n orchest -l app.kubernetes.io/name=${SERVICE} \
pod_name=$(kubectl get pods -n orchest -l ${SERVICE}=${SERVICE} \
--field-selector=status.phase=Running --no-headers \
--output=jsonpath={.items..metadata.name})

Expand Down
5 changes: 5 additions & 0 deletions services/orchest-api/app/app/__init__.py
Expand Up @@ -31,6 +31,7 @@
from app.apis.namespace_runs import AbortPipelineRun
from app.apis.namespace_sessions import StopInteractiveSession
from app.connections import db
from app.core.notifications import analytics as api_analytics
from app.core.scheduler import add_recurring_jobs_to_scheduler
from app.models import (
EnvironmentImageBuild,
Expand Down Expand Up @@ -105,6 +106,10 @@ def create_app(
settings.save()
app.config.update(settings.as_dict())

# Keep analytics subscribed to all events of interest.
with app.app_context():
api_analytics.upsert_analytics_subscriptions()

# Create a background scheduler (in a daemon thread) for every
# gunicorn worker. The individual schedulers do not cause duplicate
# execution because all jobs of the all the schedulers read state
Expand Down
4 changes: 3 additions & 1 deletion services/orchest-api/app/app/apis/namespace_environments.py
Expand Up @@ -12,7 +12,7 @@
from app.apis.namespace_runs import AbortPipelineRun
from app.apis.namespace_sessions import StopInteractiveSession
from app.connections import db
from app.core import environments
from app.core import environments, events
from app.utils import register_schema

api = Namespace("environments", description="Managing Environments")
Expand Down Expand Up @@ -53,6 +53,7 @@ def post(self, project_uuid):
try:
env = models.Environment(**environment)
db.session.add(env)
events.register_environment_created_event(project_uuid, environment["uuid"])
db.session.commit()
except Exception as e:
db.session.rollback()
Expand Down Expand Up @@ -152,6 +153,7 @@ def _transaction(self, project_uuid: str, environment_uuid: str):
self.collateral_kwargs["project_uuid"] = project_uuid
self.collateral_kwargs["environment_uuid"] = environment_uuid

events.register_environment_deleted_event(project_uuid, environment_uuid)
models.Environment.query.filter_by(
project_uuid=project_uuid, uuid=environment_uuid
).delete()
Expand Down
58 changes: 58 additions & 0 deletions services/orchest-api/app/app/apis/namespace_jobs.py
Expand Up @@ -16,13 +16,15 @@
from _orchest.internals import utils as _utils
from _orchest.internals.two_phase_executor import TwoPhaseExecutor, TwoPhaseFunction
from app import schema
from app import types as app_types
from app.apis.namespace_runs import AbortPipelineRun
from app.celery_app import make_celery
from app.connections import db
from app.core import environments, events
from app.core.pipelines import Pipeline, construct_pipeline
from app.utils import (
fuzzy_filter_non_interactive_pipeline_runs,
get_env_vars_update,
get_proj_pip_env_variables,
page_to_pagination_data,
register_schema,
Expand Down Expand Up @@ -1074,6 +1076,7 @@ def _transaction(
confirm_draft,
):
job = models.Job.query.with_for_update().filter_by(uuid=job_uuid).one()
old_job = job.as_dict()

if name is not None:
job.name = name
Expand Down Expand Up @@ -1184,6 +1187,7 @@ def _transaction(

job.max_retained_pipeline_runs = max_retained_pipeline_runs

update_already_registered = False
if confirm_draft:
if job.status != "DRAFT":
raise ValueError("Failed update operation. The job is not a draft.")
Expand All @@ -1208,6 +1212,8 @@ def _transaction(
# a next_scheduled_time.
if job.next_scheduled_time is None:
job.last_scheduled_time = datetime.now(timezone.utc)
UpdateJob._register_job_updated_event(old_job, job.as_dict())
update_already_registered = True
RunJob(self.tpe).transaction(job.uuid)
else:
job.last_scheduled_time = job.next_scheduled_time
Expand All @@ -1221,11 +1227,63 @@ def _transaction(
else:
job.last_scheduled_time = job.next_scheduled_time
job.status = "STARTED"
UpdateJob._register_job_updated_event(old_job, job.as_dict())
update_already_registered = True
events.register_job_started(job.project_uuid, job.uuid)

if not update_already_registered:
UpdateJob._register_job_updated_event(old_job, job.as_dict())

def _collateral(self):
pass

@staticmethod
def _register_job_updated_event(
old_job: Dict[str, Any], new_job: Dict[str, Any]
) -> None:
"""Register the job_updated_event along with the changes.
Note that we are banking on the fact that the logic before the
call to this function will catch invalid updates.
"""
changes = []
changes.extend(
get_env_vars_update(old_job["env_variables"], new_job["env_variables"])
)
# (field name, if values should be recorded) for notifications.
# Don't record sensitive values.
to_compare = [
("name", False),
("schedule", True),
("parameters", False),
("strategy_json", False),
("max_retained_pipeline_runs", True),
("next_scheduled_time", True),
("status", True),
]
for field, record_values in to_compare:
old_value = old_job[field]
new_value = new_job[field]
if old_value == new_value:
continue

change = app_types.Change(
type=app_types.ChangeType.UPDATED,
changed_object=field,
)
if record_values:
change["old_value"] = str(old_value)
change["new_value"] = str(new_value)
changes.append(change)

if changes:
events.register_job_updated(
old_job["schedule"],
new_job["project_uuid"],
new_job["uuid"],
update=app_types.EntityUpdate(changes=changes),
)


class DeleteJob(TwoPhaseFunction):
"""Delete a job."""
Expand Down
Expand Up @@ -13,7 +13,7 @@
from app import schema
from app.celery_app import make_celery
from app.connections import db
from app.core import registry
from app.core import events, registry
from app.errors import SessionInProgressException
from app.utils import register_schema, update_status_db
from config import CONFIG_CLASS
Expand Down Expand Up @@ -53,7 +53,8 @@ def post(self):
jupyter_image_build = CreateJupyterEnvironmentBuild(tpe).transaction()
except SessionInProgressException:
return {"message": "SessionInProgressException"}, 500
except Exception:
except Exception as e:
current_app.logger.error(e)
jupyter_image_build = None

if jupyter_image_build is not None:
Expand Down Expand Up @@ -116,8 +117,17 @@ def put(self, jupyter_image_build_uuid):
base_image_version=CONFIG_CLASS.ORCHEST_VERSION,
)
)
events.register_jupyter_image_build_succeeded(jupyter_image_build_uuid)
else:
if status_update["status"] == "STARTED":
events.register_jupyter_image_build_started(
jupyter_image_build_uuid
)
elif status_update["status"] == "FAILURE":
events.register_jupyter_image_build_failed(jupyter_image_build_uuid)
db.session.commit()
except Exception:
except Exception as e:
current_app.logger.error(e)
db.session.rollback()
return {"message": "Failed update operation."}, 500

Expand Down Expand Up @@ -226,6 +236,7 @@ def _transaction(self):
"image_tag": image_tag,
}
db.session.add(models.JupyterImageBuild(**jupyter_image_build))
events.register_jupyter_image_build_created(task_id)

self.collateral_kwargs["task_id"] = task_id
self.collateral_kwargs["image_tag"] = str(image_tag)
Expand All @@ -244,6 +255,7 @@ def _revert(self):
models.JupyterImageBuild.query.filter_by(
uuid=self.collateral_kwargs["task_id"]
).update({"status": "FAILURE"})
events.register_jupyter_image_build_failed(self.collateral_kwargs["task_id"])
db.session.commit()


Expand All @@ -261,6 +273,8 @@ def _transaction(self, jupyter_image_build_uuid: str):
model=models.JupyterImageBuild,
filter_by=filter_by,
)
if abortable:
events.register_jupyter_image_build_cancelled(jupyter_image_build_uuid)

self.collateral_kwargs["jupyter_image_build_uuid"] = (
jupyter_image_build_uuid if abortable else None
Expand Down
14 changes: 11 additions & 3 deletions services/orchest-api/app/app/apis/namespace_notifications.py
Expand Up @@ -30,9 +30,14 @@ class SubscriberList(Resource):
@api.response(200, "Success", schema.subscribers)
def get(self):
"""Gets all subscribers, doesn't include their subscriptions."""
subscribers = models.Subscriber.query.options(
noload(models.Subscriber.subscriptions)
).all()
subscribers = (
models.Subscriber.query.options(
noload(models.Subscriber.subscriptions)
# Don't expose analytics subscriber.
)
.filter(models.Subscriber.type != "analytics")
.all()
)
marshalled = []
for subscriber in subscribers:
if isinstance(subscriber, models.Webhook):
Expand Down Expand Up @@ -181,6 +186,9 @@ def get(self, event_type: str):
for subscriber in alerted_subscribers:
if isinstance(subscriber, models.Webhook):
marshalled.append(marshal(subscriber, schema.webhook))
# Don't expose analytics subscriber.
elif isinstance(subscriber, models.AnalyticsSubscriber):
continue
else:
marshalled.append(marshal(subscriber, schema.subscriber))

Expand Down
45 changes: 45 additions & 0 deletions services/orchest-api/app/app/apis/namespace_pipelines.py
Expand Up @@ -11,9 +11,12 @@
from _orchest.internals import utils as _utils
from _orchest.internals.two_phase_executor import TwoPhaseExecutor, TwoPhaseFunction
from app import schema
from app import types as app_types
from app import utils as app_utils
from app.apis.namespace_runs import AbortPipelineRun
from app.apis.namespace_sessions import StopInteractiveSession
from app.connections import db
from app.core import events
from app.utils import register_schema

api = Namespace("pipelines", description="Managing pipelines")
Expand Down Expand Up @@ -42,6 +45,10 @@ def post(self):

try:
db.session.add(models.Pipeline(**pipeline))
events.register_pipeline_created_event(
pipeline["project_uuid"],
pipeline["uuid"],
)
db.session.commit()
except Exception as e:
db.session.rollback()
Expand Down Expand Up @@ -71,11 +78,24 @@ def get(self, project_uuid, pipeline_uuid):
@api.doc("update_pipeline")
def put(self, project_uuid, pipeline_uuid):
"""Update a pipeline."""
pipeline = (
models.Pipeline.query.options(undefer(models.Pipeline.env_variables))
.filter(
models.Pipeline.project_uuid == project_uuid,
models.Pipeline.uuid == pipeline_uuid,
)
.one_or_none()
)
if pipeline is None:
abort(404, "Pipeline not found.")

update = request.get_json()

# Keep mutable job pipeline name entry up to date so that the
# job views reflect the newest name.
if "name" in update:
if len(update["name"]) > 255:
return {}, 400
try:
models.Job.query.filter_by(
project_uuid=project_uuid, pipeline_uuid=pipeline_uuid
Expand All @@ -92,9 +112,30 @@ def put(self, project_uuid, pipeline_uuid):

if update:
try:
changes = []
if "env_variables" in update:
changes.extend(
app_utils.get_env_vars_update(
pipeline.env_variables, update["env_variables"]
)
)
if "name" in update and pipeline.name != update["name"]:
changes.append(
app_types.Change(
type=app_types.ChangeType.UPDATED, changed_object="name"
)
)

models.Pipeline.query.filter_by(
project_uuid=project_uuid, uuid=pipeline_uuid
).update(update)

if changes:
events.register_pipeline_updated_event(
project_uuid,
pipeline_uuid,
app_types.EntityUpdate(changes=changes),
)
db.session.commit()
except Exception as e:
db.session.rollback()
Expand Down Expand Up @@ -155,6 +196,10 @@ def _transaction(self, project_uuid: str, pipeline_uuid: str):
project_uuid=project_uuid, uuid=pipeline_uuid
).update({"env_variables": {}})

events.register_pipeline_deleted_event(
project_uuid=project_uuid, pipeline_uuid=pipeline_uuid
)

# Note that we do not delete the pipeline from the db since we
# are not deleting jobs related to the pipeline. Deleting the
# pipeline would delete cascade jobs.
Expand Down

0 comments on commit b535ce5

Please sign in to comment.