Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Capture run ID #8458

Closed
menzenski opened this issue Mar 21, 2024 · 7 comments · Fixed by #8459
Closed

feature: Capture run ID #8458

menzenski opened this issue Mar 21, 2024 · 7 comments · Fixed by #8459

Comments

@menzenski
Copy link
Contributor

Feature scope

CLI (options, error messages, logging, etc.)

Description

We run meltano in Kubernetes using Argo Workflows.

We use the Argo Workflows workflow archive, so we have workflow execution data saved in Postgres there.

We use the Meltano Postgres system database, so we have Meltano job run data saved in Postgres also.

But we don't have a way to join these two "job execution" data sets together. That is, we don't have a way to link "this specific argo workflow executed this specific Meltano run". We would like to be able to do that.

I am not sure exactly what would make the most sense here, or what would be easiest relative to existing behavior/functionality. But some things I can think of:

  • If meltano run could accept a --run-id=abc123 CLI argument or similar, that could be persisted as part of the runs table record for that run.
  • If meltano run would expose the run ID of the current job as an environment variable (MELTANO_RUN_ID or similar), we could capture that upon completion of the job and persist it in the argo workflows archive.

Being able to join these two sets of "job run data" would be really valuable to us and I'd be happy to try to contribute to this effort.

@edgarrmondragon
Copy link
Collaborator

Thanks for filing @menzenski!

If meltano run could accept a --run-id=abc123 CLI argument or similar, that could be persisted as part of the runs table record for that run.

I can imagine this, though we'd prefer to keep the run ID as a UUID to avoid having to create an Alembic migration script, since in Postgres it uses the builtin UUID type.

Uniqueness of run_id is not enforced, but I wonder what problems could come from running two pipelines with the same run ID. Maybe they'd just use the same log file?

Let me know if those restrictions work for you and your workflow, or if you'd need support for arbitrary strings.

If meltano run would expose the run ID of the current job as an environment variable (MELTANO_RUN_ID or similar), we could capture that upon completion of the job and persist it in the argo workflows archive.

I'm certain we could pass down a MELTANO_RUN_ID env var to the plugin's subprocess, but I don't think that would be exposed outside of it, so I'm not sure it could be retrieved.

@menzenski
Copy link
Contributor Author

If meltano run could accept a --run-id=abc123 CLI argument or similar, that could be persisted as part of the runs table record for that run.

I can imagine this, though we'd prefer to keep the run ID as a UUID to avoid having to create an Alembic migration script, since in Postgres it uses the builtin UUID type.

Uniqueness of run_id is not enforced, but I wonder what problems could come from running two pipelines with the same run ID. Maybe they'd just use the same log file?

Let me know if those restrictions work for you and your workflow, or if you'd need support for arbitrary strings.

Sorry - I wasn't clear in my original message. I wasn't trying to propose that an orchestrator external to meltano should be able to set the meltano run ID. Rather, I was thinking about something like this:

  • Invoke meltano with meltano run tap-my-source target-my-destination --run-id=abc123
  • In the runs table, the record for this run has that persisted on the payload as a new "metadata": {"run-id":"abc123"} field.

Or similar - it seems that the payload column is "just a JSON-encoded dict" (per

payload: Mapped[dict] = mapped_column(MutableDict.as_mutable(JSONEncodedDict))
) so in theory it could support an additional field (alongside the existing singer_state property).

If meltano run would expose the run ID of the current job as an environment variable (MELTANO_RUN_ID or similar), we could capture that upon completion of the job and persist it in the argo workflows archive.

I'm certain we could pass down a MELTANO_RUN_ID env var to the plugin's subprocess, but I don't think that would be exposed outside of it, so I'm not sure it could be retrieved.

For our use case, as long as it was available as an environment variable here, when the block run completed message is logged (on success or error)

async def _run_blocks(
tracker: Tracker,
parsed_blocks: list[BlockSet | PluginCommandBlock],
dry_run: bool,
) -> None:
for idx, blk in enumerate(parsed_blocks):
blk_name = blk.__class__.__name__
tracking_ctx = PluginsTrackingContext.from_block(blk)
with tracker.with_contexts(tracking_ctx):
tracker.track_block_event(blk_name, BlockEvents.initialized)
if dry_run:
msg = f"Dry run, but would have run block {idx + 1}/{len(parsed_blocks)}."
if isinstance(blk, BlockSet):
logger.info(
msg,
block_type=blk_name,
comprised_of=[plugin.string_id for plugin in blk.blocks],
)
elif isinstance(blk, PluginCommandBlock):
logger.info(
msg,
block_type=blk_name,
comprised_of=f"{blk.string_id}:{blk.command}",
)
continue
try:
await blk.run()
except RunnerError as err:
logger.error(
"Block run completed.",
set_number=idx,
block_type=blk_name,
success=False,
err=err,
exit_codes=err.exitcodes,
)
with tracker.with_contexts(tracking_ctx):
tracker.track_block_event(blk_name, BlockEvents.failed)
raise CliError(
f"Run invocation could not be completed as block failed: {err}", # noqa: EM102
) from err
except Exception as bare_err:
# make sure we also fire block failed events for all other exceptions
with tracker.with_contexts(tracking_ctx):
tracker.track_block_event(blk_name, BlockEvents.failed)
raise bare_err
logger.info(
"Block run completed.",
set_number=idx,
block_type=blk.__class__.__name__,
success=True,
err=None,
)
with tracker.with_contexts(tracking_ctx):
tracker.track_block_event(blk_name, BlockEvents.completed)
, I think we'd be able to pull it from the environment in our workflow exit handler.

@edgarrmondragon
Copy link
Collaborator

  • Invoke meltano with meltano run tap-my-source target-my-destination --run-id=abc123
  • In the runs table, the record for this run has that persisted on the payload as a new "metadata": {"run-id":"abc123"} field.

@menzenski Would this have a different value to the run_id column in the runs table? If so, I can imagine it could lead to some confusion.

FWIW if you wanna check out the approach, I was able to experiment with a --run-id=... option in #8459 and I'm able to see the value correctly set in the runs table:

Screenshot 2024-03-22 at 11 12 49 a m

edgarrmondragon added a commit that referenced this issue Mar 25, 2024
edgarrmondragon added a commit that referenced this issue Apr 3, 2024
@menzenski
Copy link
Contributor Author

@edgarrmondragon sorry for my delayed response here, I was out of office and missed your update - the draft PR https://github.com/meltano/meltano/pull/8459/files looks awesome, that'd totally work for our use case. (I confirmed that Argo Workflows is using v4 UUID strings).

@edgarrmondragon
Copy link
Collaborator

edgarrmondragon commented Apr 17, 2024

@edgarrmondragon sorry for my delayed response here, I was out of office and missed your update - the draft PR https://github.com/meltano/meltano/pull/8459/files looks awesome, that'd totally work for our use case. (I confirmed that Argo Workflows is using v4 UUID strings).

Thanks for confirming @menzenski. I'm already in the process of beta testing Meltano 3.4.0 but I could probably slip #8459 in if the team accepts it.

github-merge-queue bot pushed a commit that referenced this issue Apr 18, 2024
…tom run UUIDs (#8459)

* feat(CLI): `meltano run` now has a `--run-id` option to allow for custom run UUIDs

Related:

* Closes #8458

* Test UUID without hyphens
@menzenski
Copy link
Contributor Author

@edgarrmondragon I put Meltano 3.4.0 into production today - we're using this new --run-id flag to set the Meltano run ID to the workflow ID of the Argo Workflows workflow that runs Meltano.

It works great! Huge quality-of-life improvement for us. Thanks so much for implementing this!

@edgarrmondragon
Copy link
Collaborator

@edgarrmondragon I put Meltano 3.4.0 into production today - we're using this new --run-id flag to set the Meltano run ID to the workflow ID of the Argo Workflows workflow that runs Meltano.

It works great! Huge quality-of-life improvement for us. Thanks so much for implementing this!

I'm glad that it's helpful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants