Skip to content

Commit

Permalink
feat: add workers
Browse files Browse the repository at this point in the history
  • Loading branch information
vberlier committed Mar 19, 2021
1 parent c5ce6bd commit eacd2ae
Show file tree
Hide file tree
Showing 15 changed files with 525 additions and 42 deletions.
1 change: 1 addition & 0 deletions beet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
from .toolchain.pipeline import *
from .toolchain.project import *
from .toolchain.template import *
from .toolchain.worker import *

__version__ = "0.14.0"
2 changes: 2 additions & 0 deletions beet/toolchain/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from .pipeline import GenericPipeline, GenericPlugin, GenericPluginSpec
from .template import TemplateManager
from .worker import WorkerPoolHandle

InjectedType = TypeVar("InjectedType")

Expand Down Expand Up @@ -57,6 +58,7 @@ class Context:
output_directory: Optional[Path]
meta: JsonDict
cache: MultiCache
worker: WorkerPoolHandle
template: TemplateManager

assets: ResourcePack = field(default_factory=ResourcePack)
Expand Down
7 changes: 6 additions & 1 deletion beet/toolchain/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
from .context import Context, Plugin, PluginSpec
from .project import Project, ProjectBuilder
from .template import TemplateManager
from .worker import WorkerPool


def subproject(config: Union[ProjectConfig, JsonDict, FileSystemPath]) -> Plugin:
"""Return a plugin that runs a subproject."""

def plugin(ctx: Context):
project = Project(resolved_cache=ctx.cache)
project = Project(
resolved_cache=ctx.cache,
resolved_worker_pool=WorkerPool(resolved_handle=ctx.worker),
)

if isinstance(config, ProjectConfig):
project.resolved_config = config
Expand Down Expand Up @@ -56,6 +60,7 @@ def plugin(ctx: Context):
output_directory=None,
meta={},
cache=ctx.cache,
worker=ctx.worker,
template=TemplateManager(
templates=list(ctx.template.directories),
cache_dir=ctx.cache["template"].directory,
Expand Down
95 changes: 56 additions & 39 deletions beet/toolchain/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .pipeline import FormattedPipelineException
from .template import TemplateManager
from .utils import locate_minecraft
from .worker import WorkerPool


class ErrorMessage(FormattedPipelineException):
Expand All @@ -43,6 +44,8 @@ class Project:
resolved_cache: Optional[MultiCache] = None
cache_name: str = ".beet_cache"

resolved_worker_pool: Optional[WorkerPool] = None

@property
def config(self) -> ProjectConfig:
if self.resolved_config is not None:
Expand Down Expand Up @@ -86,6 +89,13 @@ def ignore(self) -> List[str]:
ignore.append(f"{self.output_directory.relative_to(self.directory)}/")
return ignore

@property
def worker_pool(self):
if self.resolved_worker_pool is not None:
return self.resolved_worker_pool
self.resolved_worker_pool = WorkerPool()
return self.resolved_worker_pool

def reset(self):
"""Clear the cached config and force subsequent operations to load it again."""
self.resolved_config = None
Expand All @@ -101,20 +111,21 @@ def build(self):

def watch(self, interval: float = 0.6) -> Iterator[FileChanges]:
"""Watch the project."""
for changes in DirectoryWatcher(
self.directory,
interval,
ignore_file=".gitignore",
ignore_patterns=[
f"{self.cache.path.relative_to(self.directory.resolve())}/",
"__pycache__/",
"*.tmp",
".*",
*self.ignore,
],
):
self.reset()
yield changes
with self.worker_pool.long_lived_session():
for changes in DirectoryWatcher(
self.directory,
interval,
ignore_file=".gitignore",
ignore_patterns=[
f"{self.cache.path.relative_to(self.directory.resolve())}/",
"__pycache__/",
"*.tmp",
".*",
*self.ignore,
],
):
self.reset()
yield changes

def inspect_cache(self, patterns: Sequence[str] = ()) -> Iterable[str]:
"""Return a detailed representation for each matching cache."""
Expand Down Expand Up @@ -209,34 +220,40 @@ def build(self) -> Context:
name = self.config.name or self.project.directory.stem
normalized_name = normalize_string(name)

ctx = Context(
project_name=normalized_name,
project_description=self.config.description,
project_author=self.config.author,
project_version=self.config.version,
directory=self.project.directory,
output_directory=self.project.output_directory,
meta=deepcopy(self.config.meta),
cache=self.project.cache,
template=TemplateManager(
templates=self.project.template_directories,
cache_dir=self.project.cache["template"].directory,
),
whitelist=self.config.whitelist,
)
with self.project.worker_pool.handle() as worker_pool_handle:
ctx = Context(
project_name=normalized_name,
project_description=self.config.description,
project_author=self.config.author,
project_version=self.config.version,
directory=self.project.directory,
output_directory=self.project.output_directory,
meta=deepcopy(self.config.meta),
cache=self.project.cache,
worker=worker_pool_handle,
template=TemplateManager(
templates=self.project.template_directories,
cache_dir=self.project.cache["template"].directory,
),
whitelist=self.config.whitelist,
)

with ctx.activate() as pipeline:
pipeline.require(self.bootstrap)
pipeline.run(
(
item
if isinstance(item, str)
else ProjectBuilder(
Project(resolved_config=item, resolved_cache=ctx.cache)
with ctx.activate() as pipeline:
pipeline.require(self.bootstrap)
pipeline.run(
(
item
if isinstance(item, str)
else ProjectBuilder(
Project(
resolved_config=item,
resolved_cache=ctx.cache,
resolved_worker_pool=self.project.worker_pool,
)
)
)
for item in self.config.pipeline
)
for item in self.config.pipeline
)

return ctx

Expand Down

0 comments on commit eacd2ae

Please sign in to comment.