Skip to content

Commit

Permalink
Make plugin install methods async
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenFrankel committed Apr 6, 2024
1 parent 448e2f2 commit 93cc0b1
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 37 deletions.
6 changes: 4 additions & 2 deletions src/meltano/cli/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from meltano.core.plugin_install_service import PluginInstallReason
from meltano.core.project_add_service import ProjectAddService
from meltano.core.tracking.contexts import CliEvent, PluginsTrackingContext
from meltano.core.utils import run_async
from meltano.core.yaml import yaml

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -114,7 +115,8 @@ def _load_yaml_from_ref(_ctx, _param, value: str | None) -> dict | None:
)
@pass_project()
@click.pass_context
def add( # noqa: WPS238
@run_async
async def add( # noqa: C901 WPS238
ctx,
project: Project,
plugin_type: str,
Expand Down Expand Up @@ -198,7 +200,7 @@ def add( # noqa: WPS238
tracker.track_command_event(CliEvent.inflight)

if not flags.get("no_install"):
success = install_plugins(
success = await install_plugins(
project,
plugins,
reason=PluginInstallReason.ADD,
Expand Down
6 changes: 4 additions & 2 deletions src/meltano/cli/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from meltano.core.plugin import PluginType
from meltano.core.schedule_service import ScheduleService
from meltano.core.tracking.contexts import CliEvent, PluginsTrackingContext
from meltano.core.utils import run_async

if t.TYPE_CHECKING:
from meltano.core.project import Project
Expand Down Expand Up @@ -59,7 +60,8 @@
)
@click.pass_context
@pass_project(migrate=True)
def install( # noqa: C901
@run_async
async def install( # noqa: C901
project: Project,
ctx: click.Context,
plugin_type: str,
Expand Down Expand Up @@ -101,7 +103,7 @@ def install( # noqa: C901
)
tracker.track_command_event(CliEvent.inflight)

success = install_plugins(
success = await install_plugins(
project,
plugins,
parallelism=parallelism,
Expand Down
10 changes: 8 additions & 2 deletions src/meltano/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,14 @@ def install_status_update(install_state):
click.secho(msg, fg="green")


def install_plugins(
async def install_plugins(
project,
plugins,
reason=PluginInstallReason.INSTALL,
parallelism=None,
clean=False,
force=False,
skip_installed=False,
) -> bool:
"""Install the provided plugins and report results to the console."""
install_service = PluginInstallService(
Expand All @@ -442,7 +443,12 @@ def install_plugins(
clean=clean,
force=force,
)
install_results = install_service.install_plugins(plugins, reason=reason)
install_results = await install_service.install_plugins(
plugins,
reason=reason,
skip_installed=skip_installed,
)

num_successful = len([status for status in install_results if status.successful])
num_skipped = len([status for status in install_results if status.skipped])
num_failed = len(install_results) - num_successful
Expand Down
44 changes: 17 additions & 27 deletions src/meltano/core/plugin_install_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class PluginInstallReason(str, Enum):

ADD = "add"
INSTALL = "install"
INVOKE = "invoke"
ELT = "elt"
RUN = "run"
UPGRADE = "upgrade"


Expand Down Expand Up @@ -215,7 +218,7 @@ def remove_duplicates(
)
return states, deduped_plugins

def install_all_plugins(
async def install_all_plugins(
self,
reason=PluginInstallReason.INSTALL,
) -> tuple[PluginInstallState]:
Expand All @@ -230,50 +233,37 @@ def install_all_plugins(
Returns:
Install state of installed plugins.
"""
return self.install_plugins(self.project.plugins.plugins(), reason=reason)
return await self.install_plugins(self.project.plugins.plugins(), reason=reason)

def install_plugins(
async def install_plugins(
self,
plugins: t.Iterable[ProjectPlugin],
reason=PluginInstallReason.INSTALL,
skip_installed=False,
) -> tuple[PluginInstallState]:
"""
Install all the provided plugins.
Blocks until all plugins are installed.
"""Install all the provided plugins.
Args:
plugins: ProjectPlugin instances to install.
reason: Plugin install reason.
skip_installed: Whether to skip plugins that are already installed.
Returns:
Install state of installed plugins.
"""
states, new_plugins = self.remove_duplicates(plugins=plugins, reason=reason)
for state in states:
self.status_cb(state)
states.extend(
asyncio.run(self.install_plugins_async(new_plugins, reason=reason)),
)
return states

async def install_plugins_async(
self,
plugins: t.Iterable[ProjectPlugin],
reason=PluginInstallReason.INSTALL,
) -> tuple[PluginInstallState]:
"""Install all the provided plugins.

Args:
plugins: ProjectPlugin instances to install.
reason: Plugin install reason.
installing = [
self.install_plugin_async(plugin, reason)
for plugin in new_plugins
if not skip_installed
or not self.project.plugin_dir(plugin, "venv", make_dirs=False).exists()
]

Returns:
Install state of installed plugins.
"""
return await asyncio.gather(
*[self.install_plugin_async(plugin, reason) for plugin in plugins],
)
states.extend(await asyncio.gather(*installing))
return states

def install_plugin(
self,
Expand Down
11 changes: 7 additions & 4 deletions src/meltano/core/upgrade_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import asyncio
import os
import subprocess
import sys
Expand Down Expand Up @@ -135,10 +136,12 @@ def update_files(self):
click.echo("Nothing to update")
return

success = install_plugins(
self.project,
file_plugins,
reason=PluginInstallReason.UPGRADE,
success = asyncio.run(
install_plugins(
self.project,
file_plugins,
reason=PluginInstallReason.UPGRADE,
)
)
if not success:
raise MeltanoError("Failed to upgrade plugin(s)") # noqa: EM101
Expand Down

0 comments on commit 93cc0b1

Please sign in to comment.