Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rose.config_processors.file: improve table create #1225

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
170 changes: 84 additions & 86 deletions lib/python/rose/config_processors/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@

from fnmatch import fnmatch
from glob import glob
from hashlib import md5
import hashlib
import os
import re
from rose.checksum import get_checksum
from rose.config_processor import ConfigProcessError, ConfigProcessorBase
from rose.env import env_var_process, UnboundEnvironmentVariableError
from rose.fs_util import FileSystemUtil
from rose.job_runner import JobManager, JobProxy, JobRunner
from rose.popen import RosePopener
from rose.reporter import Event
from rose.scheme_handler import SchemeHandlersManager
import shlex
Expand All @@ -54,7 +55,8 @@ def handle_event(self, *args):
"""Invoke event handler with *args, if there is one."""
self.manager.handle_event(*args)

def process(self, conf_tree, item, orig_keys=None, orig_value=None, **kwargs):
def process(self, conf_tree, item, orig_keys=None, orig_value=None,
**kwargs):
"""Install files according to [file:*] in conf_tree.

kwargs["no_overwrite_mode"]: fail if a target file already exists.
Expand Down Expand Up @@ -91,26 +93,24 @@ def process(self, conf_tree, item, orig_keys=None, orig_value=None, **kwargs):
self.manager.fs_util.makedirs(file_install_root)
self.manager.fs_util.chdir(file_install_root)
try:
self._process(conf_tree, nodes, loc_dao,
orig_keys=None, orig_value=None, **kwargs)
self._process(conf_tree, nodes, loc_dao, **kwargs)
finally:
if cwd != os.getcwd():
self.manager.fs_util.chdir(cwd)


def _process(self, conf_tree, nodes, loc_dao,
orig_keys=None, orig_value=None, **kwargs):
def _process(self, conf_tree, nodes, loc_dao, **kwargs):
"""Helper for self.process."""
# Ensure that everything is overwritable
# Ensure that container directories exist
for key, node in sorted(nodes.items()):
try:
name = env_var_process(key[len(self.PREFIX):])
except UnboundEnvironmentVariableError as e:
raise ConfigProcessError([key], key, e)
except UnboundEnvironmentVariableError as exc:
raise ConfigProcessError([key], key, exc)
if os.path.exists(name) and kwargs.get("no_overwrite_mode"):
e = FileOverwriteError(name)
raise ConfigProcessError([key], None, e)
exc = FileOverwriteError(name)
raise ConfigProcessError([key], None, exc)
self.manager.fs_util.makedirs(self.manager.fs_util.dirname(name))

# Gets a list of sources and targets
Expand All @@ -132,8 +132,8 @@ def _process(self, conf_tree, nodes, loc_dao,
continue
try:
source_str = env_var_process(source_str)
except UnboundEnvironmentVariableError as e:
raise ConfigProcessError([key, k], source_str, e)
except UnboundEnvironmentVariableError as exc:
raise ConfigProcessError([key, k], source_str, exc)
source_names = []
for raw_source_glob in shlex.split(source_str):
source_glob = raw_source_glob
Expand Down Expand Up @@ -172,23 +172,23 @@ def _process(self, conf_tree, nodes, loc_dao,
config_schemes = [] # [(pattern, scheme), ...]
if config_schemes_str:
for line in config_schemes_str.splitlines():
p, s = line.split("=", 1)
p = p.strip()
s = s.strip()
config_schemes.append((p, s))
pattern, scheme = line.split("=", 1)
pattern = pattern.strip()
scheme = scheme.strip()
config_schemes.append((pattern, scheme))

# Where applicable, determine for each source:
# * Its real name.
# * The checksums of its paths.
# * Whether it can be considered unchanged.
for source in sources.values():
try:
for p, s in config_schemes:
if fnmatch(source.name, p):
source.scheme = s
for pattern, scheme in config_schemes:
if fnmatch(source.name, pattern):
source.scheme = scheme
break
self.loc_handlers_manager.parse(source, conf_tree)
except ValueError as e:
except ValueError as exc:
if source.is_optional:
sources.pop(source.name)
for name in source.used_by_names:
Expand Down Expand Up @@ -294,14 +294,14 @@ def _process(self, conf_tree, nodes, loc_dao,
nproc = int(nproc_str)
job_runner = JobRunner(self, nproc)
job_runner(JobManager(jobs), conf_tree, loc_dao, work_dir)
except ValueError as e:
if e.args and jobs.has_key(e.args[0]):
job = jobs[e.args[0]]
except ValueError as exc:
if exc.args and jobs.has_key(exc.args[0]):
job = jobs[exc.args[0]]
if job.context.action_key == Loc.A_SOURCE:
source = job.context
keys = [self.PREFIX + source.used_by_names[0], "source"]
raise ConfigProcessError(keys, source.name)
raise e
raise exc
finally:
rmtree(work_dir)

Expand All @@ -315,8 +315,8 @@ def _process(self, conf_tree, nodes, loc_dao,
continue
checksum = target.paths[0].checksum
if checksum_expected and checksum_expected != checksum:
e = ChecksumError(checksum_expected, checksum)
raise ConfigProcessError(keys, checksum_expected, e)
exc = ChecksumError(checksum_expected, checksum)
raise ConfigProcessError(keys, checksum_expected, exc)
event = ChecksumEvent(target.name, checksum)
self.handle_event(event)

Expand All @@ -327,7 +327,8 @@ def process_job(self, job, conf_tree, loc_dao, work_dir):
if job.context.action_key == key:
return method(job.context, conf_tree, work_dir)

def post_process_job(self, job, conf_tree, loc_dao, work_dir):
@classmethod
def post_process_job(cls, job, conf_tree, loc_dao, work_dir):
"""Post-process a successful job, helper for "process"."""
loc_dao.update(job.context)

Expand All @@ -340,9 +341,9 @@ def set_event_handler(self, event_handler):

def _source_pull(self, source, conf_tree, work_dir):
"""Pulls a source to its cache in the work directory."""
m = md5()
m.update(source.name)
source.cache = os.path.join(work_dir, m.hexdigest())
md5 = hashlib.md5()
md5.update(source.name)
source.cache = os.path.join(work_dir, md5.hexdigest())
return self.loc_handlers_manager.pull(source, conf_tree)

def _target_install(self, target, conf_tree, work_dir):
Expand All @@ -352,7 +353,7 @@ def _target_install(self, target, conf_tree, work_dir):
Calculate the checksum(s) of (paths in) target.

"""
f = None
handle = None
mod_bits = None
is_first = True
# Install target
Expand All @@ -363,18 +364,18 @@ def _target_install(self, target, conf_tree, work_dir):
raise LocTypeError(target.name, source.name, target.loc_type,
source.loc_type)
if target.loc_type == target.TYPE_BLOB:
if f is None:
if handle is None:
if not os.path.isfile(target.name):
self.manager.fs_util.delete(target.name)
f = open(target.name, "wb")
handle = open(target.name, "wb")
f_bsize = os.statvfs(source.cache).f_bsize
s = open(source.cache)
source_handle = open(source.cache)
while True:
bytes = s.read(f_bsize)
if not bytes:
bytes_ = source_handle.read(f_bsize)
if not bytes_:
break
f.write(bytes)
s.close()
handle.write(bytes_)
source_handle.close()
if mod_bits is None:
mod_bits = os.stat(source.cache).st_mode
else:
Expand All @@ -386,10 +387,10 @@ def _target_install(self, target, conf_tree, work_dir):
args.append("--delete-excluded")
args.extend(["--checksum", source.cache + "/", target.name])
cmd = self.manager.popen.get_cmd("rsync", *args)
out, err = self.manager.popen(*cmd)
self.manager.popen(*cmd)
is_first = False
if f is not None:
f.close()
if handle is not None:
handle.close()
if mod_bits:
os.chmod(target.name, mod_bits)

Expand Down Expand Up @@ -533,47 +534,47 @@ class LocDAO(object):
"""DAO for information for incremental updates."""

FILE_NAME = ".rose-config_processors-file.db"
SCHEMA_LOCS = ("name TEXT, " +
"real_name TEXT, " +
"scheme TEXT, " +
"mode TEXT, " +
"loc_type TEXT, " +
"key TEXT, " +
"PRIMARY KEY(name)")
SCHEMA_PATHS = "name TEXT, path TEXT,checksum TEXT, UNIQUE(name, path)"
SCHEMA_DEP_NAMES = "name TEXT, dep_name TEXT, UNIQUE(name, dep_name)"

def __init__(self):
self.file_name = os.path.abspath(self.FILE_NAME)
self.conn = None

def get_conn(self):
"""Return a Connection object to the database."""
if self.conn is None:
self.conn = sqlite3.connect(self.file_name)
return self.conn

def create(self):
"""Create the database file if it does not exist."""
if not os.path.exists(self.file_name):
conn = self.get_conn()
c = conn.cursor()
c.execute("""CREATE TABLE locs(
name TEXT,
real_name TEXT,
scheme TEXT,
mode TEXT,
loc_type TEXT,
key TEXT,
PRIMARY KEY(name))""")
c.execute("""CREATE TABLE paths(
name TEXT,
path TEXT,
checksum TEXT,
UNIQUE(name, path))""")
c.execute("""CREATE TABLE dep_names(
name TEXT,
dep_name TEXT,
UNIQUE(name, dep_name))""")
conn.commit()
conn = self.get_conn()
cur = conn.execute(
"""SELECT name FROM sqlite_master
WHERE type="table"
ORDER BY name""")
names = [str(row[0]) for row in cur.fetchall()]
for name, schema in [("locs", self.SCHEMA_LOCS),
("paths", self.SCHEMA_PATHS),
("dep_names", self.SCHEMA_DEP_NAMES),]:
if name not in names:
conn.execute("CREATE TABLE " + name + "(" + schema + ")")
conn.commit()

def delete(self, loc):
"""Remove settings related to loc from the database."""
conn = self.get_conn()
c = conn.cursor()
c.execute("""DELETE FROM locs WHERE name=?""", [loc.name])
c.execute("""DELETE FROM dep_names WHERE name=?""", [loc.name])
c.execute("""DELETE FROM paths WHERE name=?""", [loc.name])
conn.execute("""DELETE FROM locs WHERE name=?""", [loc.name])
conn.execute("""DELETE FROM dep_names WHERE name=?""", [loc.name])
conn.execute("""DELETE FROM paths WHERE name=?""", [loc.name])
conn.commit()

def select(self, name):
Expand All @@ -583,25 +584,23 @@ def select(self, name):

"""
conn = self.get_conn()
c = conn.cursor()

c.execute("""SELECT real_name,scheme,mode,loc_type,key FROM locs""" +
""" WHERE name=?""", [name])
row = c.fetchone()
row = conn.execute("""SELECT real_name,scheme,mode,loc_type,key""" +
""" FROM locs WHERE name=?""", [name]).fetchone()
if row is None:
return
loc = Loc(name)
loc.real_name, loc.scheme, loc.mode, loc.loc_type, loc.key = row

c.execute("""SELECT path,checksum FROM paths WHERE name=?""", [name])
for row in c:
for row in conn.execute(
"""SELECT path,checksum FROM paths WHERE name=?""",
[name]):
path, checksum = row
if loc.paths is None:
loc.paths = []
loc.add_path(path, checksum)

c.execute("""SELECT dep_name FROM dep_names WHERE name=?""", [name])
for row in c:
for row in conn.execute(
"""SELECT dep_name FROM dep_names WHERE name=?""", [name]):
dep_name, = row
if loc.dep_locs is None:
loc.dep_locs = []
Expand All @@ -612,18 +611,17 @@ def select(self, name):
def update(self, loc):
"""Insert or update settings related to loc to the database."""
conn = self.get_conn()
c = conn.cursor()
c.execute("""INSERT OR REPLACE INTO locs VALUES(?,?,?,?,?,?)""",
[loc.name, loc.real_name, loc.scheme, loc.mode, loc.loc_type,
loc.key])
conn.execute("""INSERT OR REPLACE INTO locs VALUES(?,?,?,?,?,?)""",
[loc.name, loc.real_name, loc.scheme, loc.mode,
loc.loc_type, loc.key])
if loc.paths:
for path in loc.paths:
c.execute("""INSERT OR REPLACE INTO paths VALUES(?,?,?)""",
[loc.name, path.name, path.checksum])
conn.execute("""INSERT OR REPLACE INTO paths VALUES(?,?,?)""",
[loc.name, path.name, path.checksum])
if loc.dep_locs:
for dep_loc in loc.dep_locs:
c.execute("""INSERT OR REPLACE INTO dep_names VALUES(?,?)""",
[loc.name, dep_loc.name])
conn.execute("""INSERT OR REPLACE INTO dep_names VALUES(?,?)""",
[loc.name, dep_loc.name])
conn.commit()


Expand All @@ -645,9 +643,9 @@ def __init__(self, event_handler=None, popen=None, fs_util=None):
if fs_util is None:
fs_util = FileSystemUtil(event_handler)
self.fs_util = fs_util
p = os.path.dirname(os.path.dirname(sys.modules["rose"].__file__))
path = os.path.dirname(os.path.dirname(sys.modules["rose"].__file__))
SchemeHandlersManager.__init__(
self, [p], ns="rose.loc_handlers", attrs=["parse", "pull"],
self, [path], ns="rose.loc_handlers", attrs=["parse", "pull"],
can_handle="can_pull")

def handle_event(self, *args, **kwargs):
Expand Down
42 changes: 42 additions & 0 deletions t/rose-app-run/17-file-db-bad.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/bash
#-------------------------------------------------------------------------------
# (C) British Crown Copyright 2012-4 Met Office.
#
# This file is part of Rose, a framework for meteorological suites.
#
# Rose is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Rose is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test "rose app-run", file installation, empty database in incremental mode.
#-------------------------------------------------------------------------------
. $(dirname $0)/test_header
#-------------------------------------------------------------------------------
tests 3
#-------------------------------------------------------------------------------
test_init <<'__CONFIG__'
[command]
default=true

[file:COPYING]
source=$ROSE_HOME/COPYING
__CONFIG__
#-------------------------------------------------------------------------------
TEST_KEY=$TEST_KEY_BASE
test_setup
touch .rose-config_processors-file.db
run_pass "$TEST_KEY" rose app-run --config=../config -q
file_test "$TEST_KEY.db" .rose-config_processors-file.db -s
file_test "$TEST_KEY.COPYING" COPYING
test_teardown
#-------------------------------------------------------------------------------
exit