Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give orchest api access to the analytics module #947

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
aa45ba5
Add analytics subscriber model
fruttasecca May 5, 2022
4be7568
Make Webhook columns nullable
fruttasecca May 5, 2022
492ecdf
Add Analytics subscriber module
fruttasecca May 5, 2022
5b50051
Deliver analytics deliveries
fruttasecca May 5, 2022
7384798
Remove overlap in analytics between webserver&api
fruttasecca May 5, 2022
fb19bc4
Merge branch 'improv/orchest-api-analytics-base' into improv/orchest-…
fruttasecca May 11, 2022
8c5f67b
Update golang pre-commit rev
fruttasecca May 11, 2022
540d34a
Merge branching schema migrations
fruttasecca May 11, 2022
c89eaef
Don't create delivery for analytics if TELEMETRY_DISABLED
fruttasecca May 11, 2022
75906a4
Remove temporary pre-controller code
fruttasecca May 11, 2022
0fe1353
Don't expose analytics subscriber through API
fruttasecca May 11, 2022
2a1a929
Move get_size to the internal lib
fruttasecca May 11, 2022
1399270
Refactor job paths (snapshot, run) construction
fruttasecca May 11, 2022
0672470
Add snapshot_size to analytics job:created payload
fruttasecca May 11, 2022
777d430
Don't get size of symlinks. Fixes: #924
fruttasecca May 12, 2022
8cc53a3
Merge branch 'dev' into improv/orchest-api-analytics
fruttasecca May 12, 2022
78f6aff
Refactor analytics events names
fruttasecca May 12, 2022
a17fe5e
Make Event to anonymization mapping more explicit
fruttasecca May 12, 2022
6ad1b77
Remove unnecessary safe_join import
fruttasecca May 12, 2022
96ce675
Anonymize orchest-api analytics events
fruttasecca May 12, 2022
56bb3a5
Minor fix on how analytics looks for config entry
fruttasecca May 12, 2022
ea30794
Add jupyter:image-build:* events support
fruttasecca May 12, 2022
d04d848
Add project:interactive-session:* events support
fruttasecca May 13, 2022
2c307d5
Fix post controller build script
fruttasecca May 16, 2022
2052a61
Add project:pipeline:interactive-pipeline-run:* events support
fruttasecca May 16, 2022
1b3828d
Add project:pipeline:interactive-pipeline-run:* events support
fruttasecca May 16, 2022
c972494
Add InteractivePipelineRun.pipeline_definition field
fruttasecca May 16, 2022
a988a90
Expand interactive pipeline run event payload
fruttasecca May 16, 2022
975eff8
Fix session event derived properties structure
fruttasecca May 16, 2022
b5c93cb
Anonymize analytics interactive pipeline run event
fruttasecca May 16, 2022
84a00d3
Add project/pipeline:created/updated/deleted event
fruttasecca May 17, 2022
2f79fbf
Remove incompatible FK constraint
fruttasecca May 17, 2022
4a5a154
Add Pipeline.name field
fruttasecca May 17, 2022
591a1ce
Add CronJob, OneOffJob updated models and ev types
fruttasecca May 17, 2022
0fa2c84
Move job update event to orchest-api
fruttasecca May 17, 2022
3e39b44
Merge pull request #956 from orchest/improv/add-project-pipeline-even…
fruttasecca May 17, 2022
3201895
Track changes in job update event
fruttasecca May 18, 2022
a998140
Improve job created, updated events anonymization
fruttasecca May 18, 2022
fc7c258
Add EnvironmentEvent model and related event_types
fruttasecca May 18, 2022
fcade12
Add support for project:environment:* events
fruttasecca May 18, 2022
ffa586a
Sort code containing analytics events to remove duplicates
yannickperrenet May 18, 2022
45469e4
Merge pull request #962 from orchest/improv/add-env-events-to-orchest…
fruttasecca May 18, 2022
d686b64
Merge pull request #961 from orchest/improv/move-job-update-event-to-…
fruttasecca May 19, 2022
613d651
Use a step payload instead of step name + uuid
fruttasecca May 19, 2022
479b0df
Merge branch 'improv/add-interactive-runs-event-to-orchest-api' of gi…
fruttasecca May 19, 2022
0ea78d1
Merge pull request #953 from orchest/improv/add-interactive-runs-even…
fruttasecca May 19, 2022
bd6ba68
project:interactive-session:* to project:pipeline:interactive-session:*
fruttasecca May 19, 2022
1ba666a
project:pipeline:interactive-pipeline-run:* to project:pipeline:inter…
fruttasecca May 19, 2022
04f4852
Merge branch 'improv/orchest-api-analytics' of github.com:orchest/orc…
fruttasecca May 19, 2022
9d83e72
Set event_type max length to 150 characters
fruttasecca May 19, 2022
b770eda
Minor fixes
fruttasecca May 19, 2022
00342f7
Merge pull request #950 from orchest/improv/add-builds-events-to-orch…
fruttasecca May 19, 2022
fb60ea7
Nest 'job' payload under 'project'
fruttasecca May 19, 2022
9694d05
Minor fix
fruttasecca May 19, 2022
e08f0a7
Remove snapshot size from job duplicated
fruttasecca May 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/python/orchest-internals/_orchest/internals/analytics.py
Expand Up @@ -30,6 +30,7 @@ class Event(Enum):
CONFIRM_SHOW = "confirm show"
CRONJOB_PAUSE = "cron-job pause"
CRONJOB_RESUME = "cron-job resume"
DEBUG_PING = "debug ping"
ENVIRONMENT_BUILD_CANCEL = "environment-build cancel"
ENVIRONMENT_BUILD_START = "environment-build start"
HEARTBEAT_TRIGGER = "heartbeat trigger"
Expand Down Expand Up @@ -147,6 +148,7 @@ def send_event(
telemetry_uuid = app.config.get("TELEMETRY_UUID")
if telemetry_uuid is None:
app.logger.error("No telemetry uuid found, won't send telemetry event.")
return False

if not _posthog_initialized:
_initialize_posthog()
Expand Down
5 changes: 5 additions & 0 deletions services/orchest-api/app/app/__init__.py
Expand Up @@ -31,6 +31,7 @@
from app.apis.namespace_runs import AbortPipelineRun
from app.apis.namespace_sessions import StopInteractiveSession
from app.connections import db
from app.core.notifications import analytics as api_analytics
from app.core.scheduler import add_recurring_jobs_to_scheduler
from app.models import (
EnvironmentImageBuild,
Expand Down Expand Up @@ -105,6 +106,10 @@ def create_app(
settings.save()
app.config.update(settings.as_dict())

# Keep analytics subscribed to all events of interest.
with app.app_context():
api_analytics.upsert_analytics_subscriptions()

# Create a background scheduler (in a daemon thread) for every
# gunicorn worker. The individual schedulers do not cause duplicate
# execution because all jobs of the all the schedulers read state
Expand Down
20 changes: 17 additions & 3 deletions services/orchest-api/app/app/core/events.py
Expand Up @@ -4,9 +4,12 @@
accordingly based on any subscribers subscribed to the event type that
happened.
"""
from app import models, utils
from app import models
from app import utils
from app import utils as app_utils
fruttasecca marked this conversation as resolved.
Show resolved Hide resolved
from app.connections import db
from app.core import notifications
from app.core.notifications import analytics as api_analytics

_logger = utils.get_logger()

Expand All @@ -29,15 +32,26 @@ def _register_event(ev: models.Event) -> None:
ev.type, project_uuid=project_uuid, job_uuid=job_uuid
)
for sub in subscribers:
if isinstance(sub, models.AnalyticsSubscriber):
if app_utils.OrchestSettings()["TELEMETRY_DISABLED"]:
_logger.info(
"Telemetry is disabled, skipping event delivery to analytics."
)
continue
notification_payload = api_analytics.generate_payload_for_analytics(ev)
else:
notification_payload = ev.to_notification_payload()

_logger.info(
f"Scheduling delivery for event {ev.uuid}, event type: {ev.type} for "
f"deliveree {sub.uuid}."
f"deliveree {sub.uuid} of type {sub.type}."
)

delivery = models.Delivery(
event=ev.uuid,
deliveree=sub.uuid,
status="SCHEDULED",
notification_payload=ev.to_notification_payload(),
notification_payload=notification_payload,
)
db.session.add(delivery)

Expand Down
33 changes: 27 additions & 6 deletions services/orchest-api/app/app/core/notifications/__init__.py
Expand Up @@ -14,7 +14,7 @@
from app import models
from app import utils as app_utils
from app.connections import db
from app.core.notifications import webhooks
from app.core.notifications import analytics, webhooks

logger = app_utils.get_logger()

Expand Down Expand Up @@ -105,16 +105,17 @@ def get_subscribers_subscribed_to_event(
def process_notifications_deliveries_task() -> None:
logger.info("Processing notifications deliveries")

delivery_filter = [
models.Delivery.status.in_(["SCHEDULED", "RESCHEDULED"]),
models.Delivery.scheduled_at <= datetime.datetime.now(datetime.timezone.utc),
]

# Note that we don't select for update here, said locking will
# happen on a single delivery basis.
webhook_deliveries = (
(db.session.query(models.Delivery.uuid))
.join(models.Webhook, models.Webhook.uuid == models.Delivery.deliveree)
.filter(
models.Delivery.status.in_(["SCHEDULED", "RESCHEDULED"]),
models.Delivery.scheduled_at
<= datetime.datetime.now(datetime.timezone.utc),
)
.filter(*delivery_filter)
.order_by(models.Delivery.scheduled_at)
.all()
)
Expand All @@ -127,3 +128,23 @@ def process_notifications_deliveries_task() -> None:
# Don't let failures affect other deliveries.
except Exception as e:
logger.error(e)

analytics_deliveries = (
(db.session.query(models.Delivery.uuid))
.join(
models.AnalyticsSubscriber,
models.AnalyticsSubscriber.uuid == models.Delivery.deliveree,
)
.filter(*delivery_filter)
.order_by(models.Delivery.scheduled_at)
.all()
)

logger.info(f"Found {len(analytics_deliveries)} analytics deliveries to deliver.")

for delivery in analytics_deliveries:
try:
analytics.deliver(delivery.uuid)
# Don't let failures affect other deliveries.
except Exception as e:
logger.error(e)
yannickperrenet marked this conversation as resolved.
Show resolved Hide resolved
204 changes: 204 additions & 0 deletions services/orchest-api/app/app/core/notifications/analytics.py
@@ -0,0 +1,204 @@
"""Implements the logic that sends events to analytics.

This is a special case when it comes to notifications modules.
Essentially, there can only be one subscriber of the type
AnalyticsSubscriber, and the actual data like URL and secrets are stored
in the code base since currently the analytics module is shared across
multiple services.

"""
from flask import current_app

from _orchest.internals import analytics
from app import models
from app import utils as app_utils
from app.connections import db
from app.core.notifications import utils as notification_utils

logger = app_utils.get_logger()


def upsert_analytics_subscriptions() -> None:
"""Makes sure the analytics backend is subscribed to all events.

Commits to the db.
"""

# Use an hardcoded uuid to make sure only 1 such subscriber exists,
# to avoid race conditions.
uuid = "c9075806-be93-4b87-9d77-9a376d347b3a"
subscriber = models.AnalyticsSubscriber.query.with_for_update().first()
if subscriber is None:
subscriber = models.AnalyticsSubscriber(uuid=uuid)
db.session.add(subscriber)
db.session.flush()

existing_subs = {
sub.event_type
for sub in models.Subscription.query.filter(
models.Subscription.subscriber_uuid == uuid
).all()
}

event_types = [event_type.name for event_type in models.EventType.query.all()]
new_subscriptions = []
for event_type in event_types:
if event_type not in existing_subs:
new_subscriptions.append({"event_type": event_type})
subs = notification_utils.subscription_specs_to_subscriptions(
uuid, new_subscriptions
)
db.session.bulk_save_objects(subs)
db.session.commit()


def send_test_ping_delivery() -> bool:
"""Used internally for testing, send a ping event.

Returns:
True if the delivery was made, False otherwise.
"""
# TODO: this is needed because a depency of this PR actually lives
# in the controller branch, remove this.
current_app.config["TELEMETRY_DISABLED"] = False
current_app.config["TELEMETRY_UUID"] = "e2e2abb5-3c54-4209-a245-e9fa37810824"
return analytics.send_event(current_app, analytics.Event.DEBUG_PING, {})


# The ones mapped to DEBUG_PING are events which are not currently
# defined in analytics, to be later expanded in a future commit.
_event_type_to_analytics_event_enum = {
"project:one-off-job:created": analytics.Event.JOB_CREATE,
"project:one-off-job:started": analytics.Event.DEBUG_PING,
"project:one-off-job:deleted": analytics.Event.JOB_DELETE,
"project:one-off-job:cancelled": analytics.Event.JOB_CANCEL,
"project:one-off-job:failed": analytics.Event.DEBUG_PING,
"project:one-off-job:succeeded": analytics.Event.DEBUG_PING,
"project:one-off-job:pipeline-run:created": analytics.Event.DEBUG_PING,
"project:one-off-job:pipeline-run:started": analytics.Event.DEBUG_PING,
"project:one-off-job:pipeline-run:cancelled": analytics.Event.JOB_PIPELINE_RUN_CANCEL, # noqa
"project:one-off-job:pipeline-run:failed": analytics.Event.DEBUG_PING,
"project:one-off-job:pipeline-run:deleted": analytics.Event.JOB_PIPELINE_RUN_DELETE,
"project:one-off-job:pipeline-run:succeeded": analytics.Event.DEBUG_PING,
"project:cron-job:created": analytics.Event.JOB_CREATE,
"project:cron-job:started": analytics.Event.DEBUG_PING,
"project:cron-job:deleted": analytics.Event.JOB_DELETE,
"project:cron-job:cancelled": analytics.Event.JOB_CANCEL,
"project:cron-job:failed": analytics.Event.DEBUG_PING,
"project:cron-job:paused": analytics.Event.CRONJOB_PAUSE,
"project:cron-job:unpaused": analytics.Event.CRONJOB_RESUME,
"project:cron-job:run:started": analytics.Event.DEBUG_PING,
"project:cron-job:run:succeeded": analytics.Event.DEBUG_PING,
"project:cron-job:run:failed": analytics.Event.DEBUG_PING,
"project:cron-job:run:pipeline-run:created": analytics.Event.DEBUG_PING,
"project:cron-job:run:pipeline-run:started": analytics.Event.DEBUG_PING,
"project:cron-job:run:pipeline-run:cancelled": analytics.Event.JOB_PIPELINE_RUN_CANCEL, # noqa
"project:cron-job:run:pipeline-run:failed": analytics.Event.DEBUG_PING,
"project:cron-job:run:pipeline-run:deleted": analytics.Event.JOB_PIPELINE_RUN_DELETE, # noqa
"project:cron-job:run:pipeline-run:succeeded": analytics.Event.DEBUG_PING,
}


def generate_payload_for_analytics(event: models.Event) -> dict:
"""Creates an analytics module compatible payload.

Acts as a compatibility layer between orchest-api events and what
the shared analytics module expects, and also provides some "old"
fields to the analytics BE, to avoid breaking changes.
"""

analytics_payload = event.to_notification_payload()
event_type = analytics_payload["type"]

if event_type.startswith("project:cron-job:") or event_type.startswith(
"project:one-off-job:"
):
analytics_payload["job_uuid"] = analytics_payload["job"]["uuid"]

if event_type.startswith("project:cron-job:run:pipeline-run:"):
analytics_payload["run_uuid"] = analytics_payload["job"]["run"]["pipeline_run"][
"uuid"
]
elif event_type.startswith("project:one-off-job:pipeline-run:"):
analytics_payload["run_uuid"] = analytics_payload["job"]["pipeline_run"]["uuid"]

if event_type in ["project:cron-job:created", "project:one-off-job:created"]:
analytics_payload["snapshot_size"] = 10
job: models.Job = models.Job.query.filter(
models.Job.project_uuid == analytics_payload["project"]["uuid"],
models.Job.uuid == analytics_payload["job"]["uuid"],
).one()
analytics_payload["job_definition"] = {
"parameters": job.parameters,
"project_uuid": job.project_uuid,
"pipeline_uuid": job.pipeline_uuid,
"draft": True,
"uuid": job.uuid,
"pipeline_run_spec": {"run_type": "full", "uuids": []},
}

return analytics_payload


def deliver(delivery_uuid: str) -> None:
"""Delivers an analytics delivery. Will commit to the database.

If the delivery fails it's rescheduled in the future with a capped
exponential backoff. This function will commit to the database to
ensure that each delivery get its own transaction.

Args:
delivery_uuid: Delivery to be delivered, the associated
deliveree must be an AnalyticsSubscriber.

Raises:
ValueError: If the associated deliveree isn't an
AnalyticsSubscriber.

"""
logger.info(f"Delivering {delivery_uuid}.")

delivery = (
models.Delivery.query.with_for_update(skip_locked=True)
.filter(
models.Delivery.uuid == delivery_uuid,
models.Delivery.status.in_(["SCHEDULED", "RESCHEDULED"]),
)
.first()
)
if delivery is None:
logger.info(f"No need to deliver {delivery_uuid}.")
return

deliveree = models.AnalyticsSubscriber.query.filter(
models.AnalyticsSubscriber.uuid == delivery.deliveree
).first()
if deliveree is None:
raise ValueError("Deliveree of delivery isn't of type AnalyticsSubscriber.")

try:
# TODO: this is needed because a depency of this PR actually
# lives in the controller branch, remove this.
current_app.config["TELEMETRY_DISABLED"] = False
current_app.config["TELEMETRY_UUID"] = "e2e2abb5-3c54-4209-a245-e9fa37810824"
yannickperrenet marked this conversation as resolved.
Show resolved Hide resolved

payload = delivery.notification_payload
analytics_event_type = _event_type_to_analytics_event_enum.get(
payload["type"], analytics.Event.DEBUG_PING
)
analytics.send_event(current_app, analytics_event_type, payload)
db.session.delete(delivery)
except Exception as e:
logger.error(e)
delivery.reschedule()

# Analytics is subscribed to all events, avoid having an unbound
# number of said deliveries in case things go wrong with the
# analytics back-end or if we move away from that.
if delivery.n_delivery_attempts > 5:
db.session.delete(delivery)
logger.info(f"Deleting {delivery_uuid}, couldn't deliver it.")
else:
logger.info(f"Rescheduling {delivery_uuid} at {delivery.scheduled_at}.")

db.session.commit()
19 changes: 14 additions & 5 deletions services/orchest-api/app/app/models.py
Expand Up @@ -1512,16 +1512,16 @@ class ContentType(enum.Enum):

__tablename__ = None

url = db.Column(db.String(), nullable=False)
url = db.Column(db.String(), nullable=True)

name = db.Column(db.String(100), nullable=False)
name = db.Column(db.String(100), nullable=True)

verify_ssl = db.Column(db.Boolean(), nullable=False)
verify_ssl = db.Column(db.Boolean(), nullable=True)

# Used to calculate the HMAC digest of the payload and sign it.
secret = deferred(db.Column(db.String(), nullable=False))
secret = deferred(db.Column(db.String(), nullable=True))

content_type = db.Column(db.String(50), nullable=False)
content_type = db.Column(db.String(50), nullable=True)

def is_slack_webhook(self) -> bool:
return self.url.startswith("https://hooks.slack.com/")
Expand All @@ -1537,6 +1537,15 @@ def is_teams_webhook(self) -> bool:
}


class AnalyticsSubscriber(Subscriber):

__tablename__ = None

__mapper_args__ = {
"polymorphic_identity": "analytics",
}


class Subscription(BaseModel):
__tablename__ = "subscriptions"

Expand Down