diff --git a/README.md b/README.md index 2b04012..27ba2c5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +# DedupFS: A deduplicating FUSE file system written in Python + The Python script `dedupfs.py` implements a file system in user-space using FUSE. It's called DedupFS because the file system's primary feature is deduplication, which enables it to store virtually unlimited copies of files @@ -7,17 +9,11 @@ In addition to deduplication the file system also supports transparent compression using any of the compression methods lzo, zlib and bz2. These two properties make the file system ideal for backups: The author -currently stores 230 GB worth of backups in two databases of 2,5 GB each. +currently stores 250 GB worth of backups using only 8 GB of disk space. The design of DedupFS was inspired by Venti and ZFS. -**Warning:** *The latest commits have introduced a hard to track bug that's -probably related to string interning. After spending two days tracking down the -bug I've suspended my efforts until I can find more time :-(. Obviously I don't -suggest using the file system until I've fixed the bug!* - - USAGE -======= +## Usage To use this script on Ubuntu (where it was developed) try the following: @@ -31,8 +27,7 @@ To use this script on Ubuntu (where it was developed) try the following: # - ~/.dedupfs-metastore.sqlite3 contains the tree and meta data # - ~/.dedupfs-datastore.db contains the (compressed) data blocks - STATUS -======== +## Status Development on DedupFS began as a proof-of-concept to find out how much disk space the author could free by employing deduplication to store his daily @@ -45,25 +40,21 @@ prove the correctness of the code (the tests are being worked on). The file system initially stored everything in a single SQLite database, but it turned out that after the database grew beyond 8 GB the write speed would drop -from 8-12 MB/s to 2-3 MB/s. Therefor the file system now uses a second database -to store the data blocks. Berkeley DB is used for this database because it's -meant to be used as a key/value store and doesn't require escaping binary data. +from 8-12 MB/s to 2-3 MB/s. Therefor the file system now stores its data blocks +in a separate database, which is a persistent key/value store managed by dbm. - DEPENDENCIES -============== +## Dependencies This script requires the Python FUSE binding in addition to several Python -standard libraries like `dbm`, `sqlite3`, `hashlib` and `cStringIO`. +standard libraries like `anydbm`, `sqlite3`, `hashlib` and `cStringIO`. - CONTACT -========= +## Contact If you have questions, bug reports, suggestions, etc. the author can be contacted at . The latest version of DedupFS is available at and . - LICENSE -========= +## License -DedupFS is licensed under the MIT license. -Copyright 2010 Peter Odding . +This software is licensed under the MIT license. +© 2010 Peter Odding <>. diff --git a/TODO b/TODO index 01c5756..baeba37 100644 --- a/TODO +++ b/TODO @@ -9,17 +9,17 @@ Here are some things on my to-do list, in no particular order: else: flush user & meta data (file contents & attributes) - * Implement rename() independently of [un]link() to improve performance? + * Implement rename() independently of link()/unlink() to improve performance? - * Implement --verify-reads option that recalculates hashes when reading to + * Implement `--verify-reads` option that recalculates hashes when reading to check for data block corruption? - * report_disk_usage() has become way too expensive for regular status + * `report_disk_usage()` has become way too expensive for regular status reports because it takes more than a minute on a 7.0 GB database. The only way it might work was if the statistics are only retrieved from the database once and from then on kept up to date inside Python, but that seems like an - awful lot of work. For now I've just removed the call to report_disk_usage() - from print_stats() and added a --print-stats command-line option that just + awful lot of work. For now I've removed the call to `report_disk_usage()` + from `print_stats()` and added a `--print-stats` command-line option that reports the disk usage and then exits. * Tag databases with a version number and implement automatic upgrades because @@ -28,3 +28,9 @@ Here are some things on my to-do list, in no particular order: * Change the project name because `DedupFS` is already used by at least two other projects? One is a distributed file system which shouldn't cause too much confusion, but the other is a deduplicating file system as well :-\ + + * Support directory hard links without upsetting FUSE and add a command-line + option that instructs `dedupfs.py` to search for identical subdirectories + and replace them with directory hard links. + + * Support files that don't fit in RAM (virtual machine disk images…) diff --git a/dedupfs.py b/dedupfs.py index 08dff9d..3ecbfba 100755 --- a/dedupfs.py +++ b/dedupfs.py @@ -12,7 +12,7 @@ compression using any of the compression methods lzo, zlib and bz2. These two properties make the file system ideal for backups: The author -currently stores about 250GB worth of backups in an 8GB SQLite database. +currently stores 250 GB worth of backups using only 8 GB of disk space. DedupFS is licensed under the MIT license. @@ -24,7 +24,6 @@ # Standard libraries. import collections import cStringIO -import dbm import errno import hashlib import logging @@ -63,13 +62,13 @@ def main(): # {{{1 dfs_opts = dfs.cmdline[0] if dfs_opts.print_stats: - dfs.metastore_file = os.path.expanduser(dfs_opts.metastore) - dfs.init_logging(dfs_opts) - dfs.setup_database_connections() + dfs.read_only = True + dfs.fsinit(silent=True) dfs.report_disk_usage() + dfs.fsdestroy(silent=True) # If the user didn't pass -h or --help and also didn't supply a mount point - # as a positional argument, print th short usage message and exit (I don't + # as a positional argument, print the short usage message and exit (I don't # agree with the Python FUSE binding's default behavior, which is something # nonsensical like using the working directory as a mount point). elif dfs.fuse_args.mount_expected() and not fuse_opts.mountpoint: @@ -107,6 +106,7 @@ def __init__(self, *args, **kw): # {{{2 self.cached_nodes = {} self.calls_log_filter = [] self.datastore_file = '~/.dedupfs-datastore.db' + self.fs_mounted_at = time.time() self.gc_enabled = True self.gc_hook_last_run = time.time() self.gc_interval = 60 @@ -116,8 +116,14 @@ def __init__(self, *args, **kw): # {{{2 self.opcount = 0 self.read_only = False self.root_mode = stat.S_IFDIR | 0755 + self.time_spent_caching_nodes = 0 + self.time_spent_hashing = 0 + self.time_spent_interning = 0 + self.time_spent_querying_tree = 0 self.time_spent_reading = 0 + self.time_spent_traversing_tree = 0 self.time_spent_writing = 0 + self.time_spent_writing_blocks = 0 self.__NODE_KEY_VALUE = 0 self.__NODE_KEY_LAST_USED = 1 @@ -127,7 +133,7 @@ def __init__(self, *args, **kw): # {{{2 self.logger.addHandler(logging.StreamHandler(sys.stderr)) # Register some custom command-line options with the option parser. - option_stored_in_db = " (this option is only useful when creating a new database, because its value is stored in the database and can't be changed after creating the database)" + option_stored_in_db = " (this option is only useful when creating a new database, because your choice is stored in the database and can't be changed after that)" self.parser.set_conflict_handler('resolve') # enable overriding the --help message. self.parser.add_option('-h', '--help', action='help', help="show this help message followed by the command-line options defined by the Python FUSE binding and exit") self.parser.add_option('-v', '--verbose', action='count', dest='verbosity', default=0, help="increase verbosity") @@ -229,38 +235,41 @@ def create(self, path, flags, mode): # {{{3 self.__rollback_changes() return self.__except_to_status('create', e, errno.EIO) - def fsdestroy(self): # {{{3 + def fsdestroy(self, silent=False): # {{{3 try: self.__log_call('fsdestroy', 'fsdestroy()') - self.logger.info("Committing outstanding changes to `%s'.", self.metastore_file) self.__collect_garbage() - self.__print_stats() + if not silent: + self.__print_stats() if not self.read_only: + self.logger.info("Committing outstanding changes to `%s'.", self.metastore_file) + self.__dbmcall('sync') self.conn.commit() self.conn.close() - self.blocks.close() + self.__dbmcall('close') return 0 except Exception, e: return self.__except_to_status('fsdestroy', e, errno.EIO) - def fsinit(self): # {{{3 + def fsinit(self, silent=False): # {{{3 try: # Process the custom command-line options defined in __init__(). options = self.cmdline[0] self.block_size = options.block_size self.compression_method = options.compression_method - self.datastore_file = self.__check_data_file(options.datastore) + self.datastore_file = self.__check_data_file(options.datastore, silent) self.gc_enabled = options.gc_enabled self.hash_function = options.hash_function - self.metastore_file = self.__check_data_file(options.metastore) + self.metastore_file = self.__check_data_file(options.metastore, silent) self.synchronous = options.synchronous self.use_transactions = options.use_transactions self.verify_writes = options.verify_writes # Initialize the logging and database subsystems. - self.init_logging(options) + self.__init_logging(options) self.__log_call('fsinit', 'fsinit()') - self.setup_database_connections() - self.init_metastore() + self.__setup_database_connections(silent) + if not self.read_only: + self.__init_metastore() self.__get_opts_from_db(options) # Make sure the hash function is (still) valid (since the database was created). if not hasattr(hashlib, self.hash_function): @@ -271,20 +280,20 @@ def fsinit(self): # {{{3 # Disable synchronous operation. This is supposed to make SQLite perform # MUCH better but it has to be enabled wit --nosync because you might # lose data when the file system isn't cleanly unmounted... - if not self.synchronous: + if not self.synchronous and not self.read_only: self.logger.warning("Warning: Disabling synchronous operation, you might lose data..") self.conn.execute('PRAGMA synchronous = OFF') # Select the compression method (if any) after potentially reading the # configured block size that was used to create the database (see the # set_block_size() call). - self.__select_compress_method(options) + self.__select_compress_method(options, silent) return 0 except Exception, e: self.__except_to_status('fsinit', e, errno.EIO) # Bug fix: Break the mount point when initialization failed with an # exception, because self.conn might not be valid, which results in # an internal error message for every FUSE API call... - sys.exit(1) + os._exit(1) def getattr(self, path): # {{{3 try: @@ -328,7 +337,7 @@ def link(self, target_path, link_path, nested=False): # {{{3 self.conn.execute('INSERT INTO tree (parent_id, name, inode) VALUES (?, ?, ?)', (link_parent_id, string_id, target_ino)) node_id = self.__fetchval('SELECT last_insert_rowid()') self.conn.execute('UPDATE inodes SET nlinks = nlinks + 1 WHERE inode = ?', (target_ino,)) - if self.__fetchval('SELECT mode FROM inodes where inode = ?', target_ino) & stat.S_IFDIR: + if self.__fetchval('SELECT mode FROM inodes WHERE inode = ?', target_ino) & stat.S_IFDIR: self.conn.execute('UPDATE inodes SET nlinks = nlinks + 1 WHERE inode = ?', (link_parent_ino,)) self.__cache_set(link_path, (node_id, target_ino)) self.__commit_changes(nested) @@ -412,7 +421,7 @@ def readlink(self, path): # {{{3 self.__log_call('readlink', 'readlink(%r)', path) inode = self.__path2keys(path)[1] query = 'SELECT target FROM links WHERE inode = ?' - return self.__fetchval(query, inode) + return str(self.__fetchval(query, inode)) except Exception, e: return self.__except_to_status('readlink', e, errno.ENOENT) @@ -507,8 +516,8 @@ def symlink(self, target_path, link_path): # {{{3 if self.read_only: return -errno.EROFS # Create an inode to hold the symbolic link. inode, parent_ino = self.__insert(link_path, self.link_mode, len(target_path)) - # Save the symbolic link's target and size. - self.conn.execute('INSERT INTO links (inode, target) VALUES (?, ?)', (inode, target_path)) + # Save the symbolic link's target. + self.conn.execute('INSERT INTO links (inode, target) VALUES (?, ?)', (inode, sqlite3.Binary(target_path))) self.__commit_changes() self.__gc_hook() return 0 @@ -583,7 +592,7 @@ def write(self, path, data, offset): # {{{3 # Miscellaneous methods: # {{{2 - def init_logging(self, options): # {{{3 + def __init_logging(self, options): # {{{3 # Configure logging of messages to a file. if options.log_file: handler = logging.StreamHandler(open(options.log_file, 'w')) @@ -597,7 +606,7 @@ def init_logging(self, options): # {{{3 else: self.logger.setLevel(logging.NOTSET) - def init_metastore(self): # {{{3 + def __init_metastore(self): # {{{3 # Bug fix: At this point fuse.FuseGetContext() returns uid = 0 and gid = 0 # which differs from the info returned in later calls. The simple fix is to # use Python's os.getuid() and os.getgid() library functions instead of @@ -607,11 +616,11 @@ def init_metastore(self): # {{{3 self.conn.executescript(""" -- Create the required tables? - CREATE TABLE IF NOT EXISTS tree (id INTEGER PRIMARY KEY, parent_id INTEGER, name INTEGER NOT NULL, inode INTEGER NOT NULL); - CREATE TABLE IF NOT EXISTS strings (id INTEGER PRIMARY KEY, value BLOB NOT NULL); + CREATE TABLE IF NOT EXISTS tree (id INTEGER PRIMARY KEY, parent_id INTEGER, name INTEGER NOT NULL, inode INTEGER NOT NULL, UNIQUE (parent_id, name)); + CREATE TABLE IF NOT EXISTS strings (id INTEGER PRIMARY KEY, value BLOB NOT NULL UNIQUE); CREATE TABLE IF NOT EXISTS inodes (inode INTEGER PRIMARY KEY, nlinks INTEGER NOT NULL, mode INTEGER NOT NULL, uid INTEGER, gid INTEGER, rdev INTEGER, size INTEGER, atime INTEGER, mtime INTEGER, ctime INTEGER); - CREATE TABLE IF NOT EXISTS links (inode INTEGER, target TEXT NOT NULL, PRIMARY KEY(inode, target)); - CREATE TABLE IF NOT EXISTS hashes (id INTEGER PRIMARY KEY, hash CHAR(40) NOT NULL UNIQUE); + CREATE TABLE IF NOT EXISTS links (inode INTEGER UNIQUE, target BLOB NOT NULL); + CREATE TABLE IF NOT EXISTS hashes (id INTEGER PRIMARY KEY, hash BLOB NOT NULL UNIQUE); CREATE TABLE IF NOT EXISTS "index" (inode INTEGER, hash_id INTEGER, block_nr INTEGER, PRIMARY KEY (inode, hash_id, block_nr)); CREATE TABLE IF NOT EXISTS options (name TEXT PRIMARY KEY, value TEXT NOT NULL); @@ -629,13 +638,16 @@ def init_metastore(self): # {{{3 """ % (self.root_mode, uid, gid, t, t, t, self.synchronous and 1 or 0, self.block_size, self.compression_method, self.hash_function)) - def setup_database_connections(self): # {{{3 - self.logger.info("Using data files %r and %r.", self.metastore_file, self.datastore_file) - # Open the Berkeley database file. - pathname = self.datastore_file - # Strip the .db suffix so the dbm module can add it back :-) - if pathname.endswith('.db'): pathname = pathname[0:-3] - self.blocks = dbm.open(pathname, 'c') + def __setup_database_connections(self, silent): # {{{3 + if not silent: + self.logger.info("Using data files %r and %r.", self.metastore_file, self.datastore_file) + # Open the key/value store containing the data blocks. + if not os.path.exists(self.metastore_file): + self.blocks = self.__open_datastore(True) + else: + from whichdb import whichdb + created_by_gdbm = whichdb(self.metastore_file) == 'gdbm' + self.blocks = self.__open_datastore(created_by_gdbm) # Open an SQLite database connection with manual transaction management. self.conn = sqlite3.connect(self.metastore_file, isolation_level=None) # Use the built-in row factory to enable named attributes. @@ -646,18 +658,42 @@ def setup_database_connections(self): # {{{3 # having concurrent reading/writing of the file system database. self.conn.execute('PRAGMA locking_mode = EXCLUSIVE') - def __check_data_file(self, pathname): # {{{3 + def __open_datastore(self, use_gdbm): + # gdbm is preferred over other dbm implementations because it supports fast + # vs. synchronous modes, however any other dedicated key/value store should + # work just fine (albeit not as fast). Note though that existing key/value + # stores are always accessed through the library that created them. + mode = self.read_only and 'r' or 'c' + if use_gdbm: + try: + import gdbm + mode += self.synchronous and 's' or 'f' + return gdbm.open(self.datastore_file, mode) + except ImportError: + pass + import anydbm + return anydbm.open(self.datastore_file, mode) + + def __dbmcall(self, fun): # {{{3 + # I simply cannot find any freakin' documentation on the type of objects + # returned by anydbm and gdbm, so cannot verify that any single method will + # always be there, although most seem to... + if hasattr(self.blocks, fun): + getattr(self.blocks, fun)() + + def __check_data_file(self, pathname, silent): # {{{3 pathname = os.path.expanduser(pathname) if os.access(pathname, os.F_OK): - # Bug fix: If the datafile already exists make sure its readable, - # because otherwise the file system would be completely unusable. + # If the datafile already exists make sure it's readable, + # otherwise the file system would be completely unusable. if not os.access(pathname, os.R_OK): self.logger.critical("Error: Datafile %r exists but isn't readable!", pathname) - sys.exit(1) - # Bug fix: Check whether the datafile is writable (e.g. when the datafile - # has been created by root but is currently accessed by another user). + os._exit(1) + # Check and respect whether the datafile is writable (e.g. when it was + # created by root but is currently being accessed by another user). if not os.access(pathname, os.W_OK): - self.logger.warning("Warning: Database file %r exists but isn't writable!", pathname) + if not silent: + self.logger.warning("File %r exists but isn't writable! Switching to read-only mode.", pathname) self.read_only = True return pathname @@ -687,14 +723,15 @@ def __get_opts_from_db(self, options): # {{{3 self.logger.warning("Ignoring --hash=%s argument, using previously chosen hash function %r instead", self.hash_function, value) self.hash_function = value - def __select_compress_method(self, options): # {{{3 + def __select_compress_method(self, options, silent): # {{{3 valid_formats = self.compressors.keys() selected_format = self.compression_method.lower() if selected_format not in valid_formats: self.logger.warning("Invalid compression format `%s' selected!", selected_format) selected_format = 'none' if selected_format != 'none': - self.logger.info("Using the %s compression method.", selected_format) + if not silent: + self.logger.debug("Using the %s compression method.", selected_format) # My custom LZO binding defines set_block_size() which enables # optimizations like pre-allocating a buffer that can be reused for # every call to compress() and decompress(). @@ -705,6 +742,7 @@ def __select_compress_method(self, options): # {{{3 self.compress, self.decompress = self.compressors[selected_format] def __write_blocks(self, inode, buf, apparent_size): # {{{3 + start_time = time.time() # Delete existing index entries for file. self.conn.execute('DELETE FROM "index" WHERE inode = ?', (inode,)) # Store any changed blocks and rebuild the file index. @@ -713,8 +751,8 @@ def __write_blocks(self, inode, buf, apparent_size): # {{{3 buf.seek(self.block_size * block_nr, os.SEEK_SET) new_block = buf.read(self.block_size) digest = self.__hash(new_block) - select_query = 'SELECT id FROM hashes WHERE hash = ?' - row = self.conn.execute(select_query, (digest,)).fetchone() + encoded_digest = sqlite3.Binary(digest) + row = self.conn.execute('SELECT id FROM hashes WHERE hash = ?', (encoded_digest,)).fetchone() if row: hash_id = row[0] existing_block = self.decompress(self.blocks[digest]) @@ -733,17 +771,18 @@ def __write_blocks(self, inode, buf, apparent_size): # {{{3 "Saved existing and conflicting data blocks to %r.", block_nr, inode, len(existing_block), digest, len(new_block), digest, dumpfile_collision) - sys.exit(1) + os._exit(1) self.conn.execute('INSERT INTO "index" (inode, hash_id, block_nr) VALUES (?, ?, ?)', (inode, hash_id, block_nr)) else: self.blocks[digest] = self.compress(new_block) - self.conn.execute('INSERT INTO hashes (hash) VALUES (?)', (digest,)) + self.conn.execute('INSERT INTO hashes (id, hash) VALUES (NULL, ?)', (encoded_digest,)) self.conn.execute('INSERT INTO "index" (inode, hash_id, block_nr) VALUES (?, last_insert_rowid(), ?)', (inode, block_nr)) # Check that the data was properly stored in the database? self.__verify_write(new_block, digest, block_nr, inode) block_nr += 1 # Update file size and last modified time. self.conn.execute('UPDATE inodes SET size = ?, mtime = ? WHERE inode = ?', (apparent_size, self.__newctime(), inode)) + self.time_spent_writing_blocks += time.time() - start_time def __insert(self, path, mode, size, rdev=0): # {{{3 parent, name = os.path.split(path) @@ -753,8 +792,6 @@ def __insert(self, path, mode, size, rdev=0): # {{{3 uid, gid = self.__getctx() self.conn.execute('INSERT INTO inodes (nlinks, mode, uid, gid, rdev, size, atime, mtime, ctime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)', (nlinks, mode, uid, gid, rdev, size, t, t, t)) inode = self.__fetchval('SELECT last_insert_rowid()') - # TODO Optional support for path segment interning? (my current database - # contains 514.90 MB worth of strings while only 5.06 MB is unique...) string_id = self.__intern(name) self.conn.execute('INSERT INTO tree (parent_id, name, inode) VALUES (?, ?, ?)', (parent_id, string_id, inode)) node_id = self.__fetchval('SELECT last_insert_rowid()') @@ -762,11 +799,13 @@ def __insert(self, path, mode, size, rdev=0): # {{{3 return inode, parent_ino def __intern(self, string): # {{{3 + start_time = time.time() args = (sqlite3.Binary(string),) result = self.conn.execute('SELECT id FROM strings WHERE value = ?', args).fetchone() if not result: self.conn.execute('INSERT INTO strings (id, value) VALUES (NULL, ?)', args) result = self.conn.execute('SELECT last_insert_rowid()').fetchone() + self.time_spent_interning += time.time() - start_time return int(result[0]) def __remove(self, path, check_empty=False): # {{{3 @@ -799,7 +838,7 @@ def __verify_write(self, block, digest, block_nr, inode): # {{{3 "Failed to verify data with block number %i of inode %i!\n" + \ "Saved original and corrupted data blocks to %i.", block_nr, inode, dumpfile_corruption) - sys.exit(1) + os._exit(1) def __access(self, inode, flags): # {{{3 # Check if the flags include writing while the database is read-only. @@ -821,47 +860,33 @@ def __access(self, inode, flags): # {{{3 and (not (flags & os.X_OK) or ((o and (m & 0100)) or (g and (m & 0010)) or (w and (m & 0001)))) def __path2keys(self, path): # {{{3 - # Start the traversal at the file system's known root node. node_id, inode = 1, 1 - # Handle requests for the keys of the file system's root. if path == '/': return node_id, inode - # Split the pathname into a list of non-empty path segments. - uncached_segments = self.__split_segments(path) - # Check whether the full pathname has already been cached or find - # the largest prefix that's already been cached and is still valid. - cached_segments = [] - time_now = time.time() + start_time = time.time() node = self.cached_nodes parent_id = node_id - while uncached_segments != []: - segment = uncached_segments.pop(0) - cached_segments.append(segment) + for segment in self.__split_segments(path): if segment in node: - # This node has already been cached: Update its last used time and - # continue with resolving the next node. node = node[segment] - node[self.__NODE_KEY_LAST_USED] = time_now + node[self.__NODE_KEY_LAST_USED] = start_time node_id, inode = node[self.__NODE_KEY_VALUE] - parent_id = node_id - if uncached_segments == []: - self.__cache_check_gc() - return node_id, inode else: - # This node hasn't been cached yet, fetch it from the database. - # TODO Would the file system perform better when whole directories are fetched here at once?! - query = 'SELECT t.id, t.inode FROM tree t, strings s WHERE t.parent_id = ? AND s.value = ?' - result = self.conn.execute(query, (parent_id, segment)).fetchone() + query_start_time = time.time() + query = 'SELECT t.id, t.inode FROM tree t, strings s WHERE t.parent_id = ? AND t.name = s.id AND s.value = ? LIMIT 1' + result = self.conn.execute(query, (parent_id, sqlite3.Binary(segment))).fetchone() + self.time_spent_querying_tree += time.time() - query_start_time if result == None: self.__cache_check_gc() + self.time_spent_traversing_tree += time.time() - start_time raise OSError, (errno.ENOENT, os.strerror(errno.ENOENT), path) - else: - node_id, inode = result - new_node = { self.__NODE_KEY_VALUE: (node_id, inode), self.__NODE_KEY_LAST_USED: time_now } - node[segment] = new_node - node = new_node - parent_id = node_id + node_id, inode = result + new_node = { self.__NODE_KEY_VALUE: (node_id, inode), self.__NODE_KEY_LAST_USED: start_time } + node[segment] = new_node + node = new_node + parent_id = node_id self.__cache_check_gc() + self.time_spent_traversing_tree += time.time() - start_time return node_id, inode def __cache_set(self, key, value): # {{{3 @@ -872,6 +897,7 @@ def __cache_set(self, key, value): # {{{3 for segment in segments: # Check that the keys of the sub path have been cached. if segment not in node: + self.time_spent_caching_nodes += time.time() - time_now return False # Resolve the next path segment. node = node[segment] @@ -890,6 +916,7 @@ def __cache_set(self, key, value): # {{{3 node[self.__NODE_KEY_VALUE] = value node[self.__NODE_KEY_LAST_USED] = time_now self.__cache_check_gc() + self.time_spent_caching_nodes += time.time() - time_now return True def __cache_check_gc(self): # {{{3 @@ -897,12 +924,11 @@ def __cache_check_gc(self): # {{{3 if self.cache_requests >= 2500: time_now = time.time() if time_now - self.cache_gc_last_run >= self.cache_timeout: - self.__cache_do_gc(self.cached_nodes) + self.__cache_do_gc(self.cached_nodes, time_now) self.cache_gc_last_run = time_now self.cache_requests = 0 - def __cache_do_gc(self, node): # {{{3 - time_now = time.time() + def __cache_do_gc(self, node, time_now): # {{{3 for key in node.keys(): child = node[key] if isinstance(child, dict): @@ -910,14 +936,10 @@ def __cache_do_gc(self, node): # {{{3 if last_used > self.cache_timeout: del node[key] else: - self.__cache_do_gc(child) + self.__cache_do_gc(child, time_now) def __split_segments(self, key): # {{{3 - if isinstance(key, str): - key = key.split('/') - if '' in key: - key = filter(None, key) - return key + return filter(None, key.split('/')) def __newctime(self): # {{{3 return time.time() @@ -927,20 +949,45 @@ def __getctx(self): # {{{3 return (c['uid'], c['gid']) def __hash(self, data): # {{{3 - m = self.hash_function_impl() - m.update(data) - return m.hexdigest() + start_time = time.time() + context = self.hash_function_impl() + context.update(data) + digest = context.digest() + self.time_spent_hashing += time.time() - start_time + return digest def __print_stats(self): # {{{3 + self.logger.info('-' * 79) self.__report_memory_usage() self.__report_throughput() - self.logger.info('-' * 79) + self.__report_timings() + + def __report_timings(self): # {{{3 + if self.logger.isEnabledFor(logging.DEBUG): + timings = [(self.time_spent_traversing_tree, 'Traversing the tree'), + (self.time_spent_caching_nodes, 'Caching tree nodes'), + (self.time_spent_interning, 'Interning path components'), + (self.time_spent_writing_blocks, 'Writing data blocks'), + (self.time_spent_hashing, 'Hashing data blocks'), + (self.time_spent_querying_tree, 'Querying the tree')] + maxdescwidth = max([len(l) for t, l in timings]) + 3 + timings.sort(reverse=True) + uptime = time.time() - self.fs_mounted_at + printed_heading = False + for timespan, description in timings: + percentage = timespan / (uptime / 100) + if percentage >= 1: + if not printed_heading: + self.logger.debug("Cumulative timings of slowest operations:") + printed_heading = True + self.logger.debug(" - %-*s%s (%i%%)" % (maxdescwidth, description + ':', format_timespan(timespan), percentage)) def report_disk_usage(self): # {{{3 disk_usage = self.__fetchval('PRAGMA page_size') * self.__fetchval('PRAGMA page_count') + disk_usage += os.stat(self.datastore_file).st_size apparent_size = self.__fetchval('SELECT SUM(inodes.size) FROM tree, inodes WHERE tree.inode = inodes.inode') - self.logger.info("The total apparent size is %s while the database takes up %s (%i%%).", - format_size(apparent_size), format_size(disk_usage), disk_usage / (apparent_size / 100)) + self.logger.info("The total apparent size is %s while the databases take up %s (that's %.2f%%).", + format_size(apparent_size), format_size(disk_usage), float(disk_usage) / (apparent_size / 100)) def __report_memory_usage(self): # {{{3 memory_usage = get_memory_usage() @@ -999,7 +1046,7 @@ def __gc_hook(self, nested=False): # {{{3 self.opcount += 1 if self.opcount % 500 == 0: # Every minute the other statistics are reported and garbage - # collection is performed when garbage collection is not disabled. + # collection is performed when it isn't disabled. if time.time() - self.gc_hook_last_run >= self.gc_interval: self.__collect_garbage() self.__print_stats() @@ -1009,33 +1056,51 @@ def __collect_garbage(self): # {{{3 if self.gc_enabled and not self.read_only: start_time = time.time() self.logger.info("Performing garbage collection (this might take a while) ..") + self.should_vacuum = False for method in self.__collect_strings, self.__collect_inodes, \ - self.__collect_indices, self.__collect_blocks: + self.__collect_indices, self.__collect_blocks, self.__vacuum_metastore: sub_start_time = time.time() msg = method() - elapsed_time = time.time() - sub_start_time - if elapsed_time > 1: + if msg: + elapsed_time = time.time() - sub_start_time self.logger.info(msg, format_timespan(elapsed_time)) elapsed_time = time.time() - start_time self.logger.info("Finished garbage collection in %s.", format_timespan(elapsed_time)) def __collect_strings(self): # {{{4 - self.conn.execute('DELETE FROM strings WHERE id NOT IN (SELECT name FROM tree)') - return "Cleaned up unused path segments in %s." + count = self.conn.execute('DELETE FROM strings WHERE id NOT IN (SELECT name FROM tree)').rowcount + if count > 0: + self.should_vacuum = True + return "Cleaned up %i unused path segment%s in %%s." % (count, count != 1 and 's' or '') def __collect_inodes(self): # {{{4 - self.conn.execute('DELETE FROM inodes WHERE nlinks = 0') - return "Cleaned up unused inodes in %s." + count = self.conn.execute('DELETE FROM inodes WHERE nlinks = 0').rowcount + if count > 0: + self.should_vacuum = True + return "Cleaned up %i unused inode%s in %%s." % (count, count != 1 and 's' or '') def __collect_indices(self): # {{{4 - self.conn.execute('DELETE FROM "index" WHERE inode NOT IN (SELECT inode FROM inodes)') - return "Cleaned up unused indices in %s." + count = self.conn.execute('DELETE FROM "index" WHERE inode NOT IN (SELECT inode FROM inodes)').rowcount + if count > 0: + self.should_vacuum = True + return "Cleaned up %i unused index entr%s in %%s." % (count, count != 1 and 'ies' or 'y') def __collect_blocks(self): # {{{4 + should_reorganize = False for row in self.conn.execute('SELECT hash FROM hashes WHERE id NOT IN (SELECT hash_id FROM "index")'): - del self.blocks[row[0]] - self.conn.execute('DELETE FROM hashes WHERE id NOT IN (SELECT hash_id FROM "index")') - return "Cleaned up unused data blocks in %s." + del self.blocks[str(row[0])] + should_reorganize = True + if should_reorganize: + self.__dbmcall('reorganize') + count = self.conn.execute('DELETE FROM hashes WHERE id NOT IN (SELECT hash_id FROM "index")').rowcount + if count > 0: + self.should_vacuum = True + return "Cleaned up %i unused data block%s in %%s." % (count, count != 1 and 's' or '') + + def __vacuum_metastore(self): # {{{4 + if self.should_vacuum: + self.conn.execute('VACUUM') + return "Vacuumed SQLite metadata store in %s." def __commit_changes(self, nested=False): # {{{3 if self.use_transactions and not nested: @@ -1056,11 +1121,9 @@ def __get_file_buffer(self, path): # {{{3 WHERE i.inode = ? AND h.id = i.hash_id ORDER BY i.block_nr ASC """ for row in self.conn.execute(query, (inode,)).fetchall(): - digest = row[0] # TODO Make the file system more robust against failure by doing # something sensible when self.blocks.has_key(digest) is false. - block = self.blocks[digest] - buf.write(self.decompress(block)) + buf.write(self.decompress(self.blocks[str(row[0])])) self.buffers[path] = buf return buf @@ -1137,7 +1200,7 @@ def write(self, *args): sys.stderr.write("\n Profiling statistics:\n\n") s = pstats.Stats(profile) s.sort_stats('time') - s.print_stats(0.10) + s.print_stats(0.1) os.unlink(profile) else: main()