Skip to content

Commit

Permalink
Remove concurrency from extract and fix some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
rgov committed Jul 2, 2018
1 parent 9292365 commit f004f65
Showing 1 changed file with 56 additions and 48 deletions.
104 changes: 56 additions & 48 deletions starterpack/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
cases back into build.py
"""

import concurrent.futures
from distutils.dir_util import copy_tree
import os
import shutil
Expand All @@ -19,6 +18,43 @@
from . import paths


class TaskQueue:
def __init__(self):
self.tasks = {}

def add(self, name, prereqs=None):
self.tasks[name] = (prereqs or [])

def pop(self):
for name, prereqs in self.tasks.items():
if len(prereqs) == 0:
self.remove(name)
return name
# Nothing found with no pending prerequisites; maybe there's a circular
# reference?
if len(self.tasks) == 0:
raise IndexError('pop from empty queue')
raise RuntimeError('circular dependencies detected')

def remove(self, name):
del self.tasks[name]
for prereqs in self.tasks.values():
try:
prereqs.remove(name)
except ValueError:
continue

def __iter__(self):
"""Allows iterating over queued tasks, which empties the queue."""
return self

def __next__(self):
try:
return self.pop()
except IndexError:
raise StopIteration


class UnixAwareZipFile(zipfile.ZipFile):
"""A ZipFile subclass aware of how to extract UNIX file permissions."""
@staticmethod
Expand Down Expand Up @@ -54,7 +90,7 @@ def _copyfile(src, dest, zipinfo=None):
if os.path.isfile(src):
shutil.copy2(src, dest)
elif os.path.isdir(src):
copy_tree(src, dest)
copy_tree(src, dest, preserve_symlinks=True)
else:
raise IOError('Unexpected file type for %s', src)
elif isinstance(src, zipfile.ZipExtFile):
Expand All @@ -81,6 +117,7 @@ def unzip_to(filename, target_dir=None, path_pairs=None):
In 'path_pairs' mode, the argument should be a sequence of paths.
The file at the first path within the zip is written at the second path.
"""
print('unzip_to', filename, target_dir, path_pairs)
assert bool(target_dir) != bool(path_pairs), 'Choose one unzip mode!'
out = target_dir or os.path.commonpath([p[1] for p in path_pairs])
print('{:28} -> {}'.format(os.path.basename(filename)[:28],
Expand Down Expand Up @@ -123,7 +160,8 @@ def nonzip_extract(filename, target_dir=None, path_pairs=None):
for root, _, files in os.walk(tmpdir) for f in files]
prefix = os.path.commonpath(files) if len(files) > 1 else ''
if target_dir:
copy_tree(os.path.join(tmpdir, prefix), target_dir)
copy_tree(os.path.join(tmpdir, prefix), target_dir,
preserve_symlinks=True)
else:
for inpath, outpath in path_pairs:
if outpath.endswith('/'):
Expand Down Expand Up @@ -188,63 +226,33 @@ def unpack_anything(filename, tmpdir):
return False


def extract_comp(pool, comp):
"""Return args with which comp can be sent to the executor."""
def extract_comp(comp):
"""Extracts a single component."""
print('Extracting', comp.name)
if ':' not in comp.extract_to:
# first part of extract_to is paths method, remainder is args
dest, *details = comp.extract_to.split('/')
return pool.submit(unzip_to, comp.path, getattr(paths, dest)(*details))
return unzip_to(comp.path, target_dir=getattr(paths, dest)(*details))
# else using the path_pairs option; extract pairs from string
pairs = []
for pair in comp.extract_to.strip().split('\n'):
src, to = pair.split(':')
for pair in comp.extract_to.strip().splitlines():
src, _, to = pair.partition(':')
dest, *details = to.split('/')
# Note: can add format variables here as needed
if '{DFHACK_VER}' in src:
src = src.format(DFHACK_VER=component.ALL['DFHack'].version)
pairs.append([src, getattr(paths, dest)(*details)])
return pool.submit(unzip_to, comp.path, None, pairs)
pairs.append((src, getattr(paths, dest)(*details)))
return unzip_to(comp.path, path_pairs=pairs)


def extract_everything():
"""Extract everything in components.yml, respecting order requirements."""
def q_key(comp):
"""Decide extract priority by pointer-chase depth, filesize in ties."""
after = {c.install_after: c.name for c in component.ALL.values()}
name, seen = comp.name, []
while name in after:
seen.append(name)
name = after.get(name)
if name in seen:
raise ValueError('Cyclic "install_after" config detected: ' +
' -> '.join(seen + [name]))
return len(seen), os.path.getsize(comp.path)

queue = list(component.ALL.values()) + [
component.ALL['Dwarf Fortress']._replace(name=path, extract_to=path)
for path in ('curr_baseline', 'graphics/ASCII')]
queue.sort(key=q_key, reverse=True)
with concurrent.futures.ProcessPoolExecutor(8) as pool:
futures = dict()
while queue:
while sum(f.running() for f in futures.values()) < 8:
for idx, comp in enumerate(queue):
aft = futures.get(comp.install_after)
# Even if it's highest-priority, wait for parent job(s)
if aft is None or aft.done():
futures[comp.name] = extract_comp(pool, queue.pop(idx))
break # reset index or we might pop the wrong item
else:
break # if there was nothing eligible to extract, sleep
time.sleep(0.01)
failed = [k for k, v in futures.items() if v.exception() is not None]
for key in failed:
comp = component.ALL.pop(key, None)
for lst in (component.FILES, component.GRAPHICS, component.UTILITIES):
if comp in lst:
lst.remove(comp)
if failed:
print('ERROR: Could not extract: ' + ', '.join(failed))
"""Extract every component, respecting order requirements."""
queue = TaskQueue()
for comp in component.ALL.values():
after = [ comp.install_after ] if comp.install_after else []
queue.add(comp, prereqs=after)
for comp in queue:
extract_comp(comp)


def add_lnp_dirs():
Expand Down

0 comments on commit f004f65

Please sign in to comment.