Permalink
Browse files

Fixes bug 789639: Implement new Socorro filesystem classes.

This implements four classes compatible with the Socorro crash storage
API.

 * `FSRadixTreeStorage`
    This stores crashes under `YYYYMMDD/"name"/radix.../crash_id`. It
provides processed crash storage but does not support finding new
crashes.

 * `FSDatedRadixTreeStorage`
    This composes `FSRadixTreeStorage` to support additional referencing
to crashes via `YYYYMMDD/"date"/HH/MM_SS/crash_id` that link to the
actual directory stored in the `FSRadixTreeStorage`. It also provides a
reverse symlink inside the `FSRadixTreeStorage` called `date_root` to
link back to the `MM_SS` folder. This does not provide processed crash
storage but supports finding new crashes.

 * `PrimaryDeferredStorage`
   This composes two storages that support the crash storage API and
designates incoming crashes based on a `deferral_criteria` parameter to
be stored in one of the two storages. This provides processed crash
storage and finding new crashes.

 * `PrimaryDeferredProcessedStorage`
   This composes three storages, doing the same as above but all
processed crashes are stored in the same crash storage.
  • Loading branch information...
1 parent f644856 commit b5e4cbce1574f385f194c302919ace5532b1c321 @rfw rfw committed Feb 6, 2013
View
@@ -19,6 +19,7 @@ Development Discussions
dumpingdumptables
jsondumpstorage
processeddumpstorage
+ fs
database
package
commonconfig
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
@@ -0,0 +1,66 @@
+New Filesystem
+==============
+
+The new filesystem module (``socorro.external.fs``) is a rewrite of
+``socorro.external.filesystem`` to use the new, more consistent crash storage
+API. It consists of two crash storage classes: ``FSRadixTreeStorage`` and
+``FSDatedRadixTreeStorage``.
+
+``FSRadixTreeStorage``
+----------------------
+
+.. image:: fs-fsradixtreestorage.png
+
+This storage class employs a radix scheme taking the hex digits in pairs from
+the start of crash_id. For example, a crash_id that looks like
+``38a4f01e...20090514`` would be stored in a directory structure like this::
+
+ .../20090514/name/38/a4/f0/1e/38a4f01e...20090514
+
+The depth of directory is specified by the seventh directory from the right,
+i.e. the first 0 in 2009 in the example. By default, if the value is 0, the
+nesting is 4.
+
+The leaf directory contains the raw crash information, exported as JSON, and
+the various associated dump files -- or, if being used as processed storage,
+contains the processed JSON file.
+
+``FSDatedRadixTreeStorage``
+---------------------------
+
+.. image:: fs-fsdatedradixtreestorage.png
+
+This storage class extends ``FSRadixTreeStorage`` to include a date branch.
+The date branch implements an indexing scheme so that the rough order in
+which the crashes arrived is known. The directory structure consists of the
+hour, the minute and the interval of seconds the crash was received for
+processing -- for instance, if a crash was received at 3:30:12pm, the directory
+structure would look something like::
+
+ .../20090514/date/15/30_03/38a4f01e...20090514
+
+(the 03 in 30_03 corresponds to an interval slice of 4: 12 // 4 = 3)
+
+In the example, the date 20090514 corresponds to the date assigned by the
+collector from the crash's ID, rather than the date the crash was received by
+the processor.
+
+The crash ID in the dated folder is a symbolic link to the same folder in the
+radix tree, e.g. the directory given in the example would be linked to
+``.../20090514/name/38/a4/f0/1e/38a4f01e...20090514``. A corresponding link,
+named ``date_root``, is created in the folder which is linked to
+``.../20090514/date/15/30_03``. This is so that jumps can be made between the
+two directory hierarchies -- crash data can be obtained by visiting the dated
+hierarchy, and a crash's location in the dated hierarchy can be found by
+looking the crash up by its ID.
+
+This dated directory structure enables efficient traversal of the folder
+hierarchy for new crashes -- first, all the date directories in the root are
+traversed to find all the symbolic links to the radix directories. When one is
+found, it is unlinked from the filesystem and the ID yielded to the interested
+caller. This proceeds until we exhaust all the directories to visit, by which
+time all the crashes should be visited.
+
+In order to prevent race conditions, the process will compute the current slot
+and decline to enter any slots with a number greater than the current slot --
+this is because a process may already be currently writing to it.
@@ -546,10 +546,7 @@ def list_jobs(self, stream=None):
if info.get('last_error'):
print >>stream, 'Error!!'.ljust(PAD),
print >>stream, '(%s times)' % info['error_count']
- print >>stream, 'Traceback (most recent call last):'
- print >>stream, info['last_error']['traceback'],
- print >>stream, info['last_error']['type'].__name__ + ':',
- print >>stream, info['last_error']['value']
+ print >>stream, info['last_error']['traceback']
print >>stream, ''
def reset_job(self, description):
@@ -0,0 +1,60 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from configman import Namespace
+from configman.converters import classes_in_namespaces_converter
+from configman.dotdict import DotDict
+
+from socorro.cron.base import BaseCronApp
+
+from socorro.lib.datetimeutil import utc_now
+
+import os
+import shutil
+
+
+class RadixCleanupCronApp(BaseCronApp):
+ app_name = 'cleanup_radix'
+ app_description = 'Cleans up dead radix directories'
+
+ required_config = Namespace()
+ required_config.add_option(
+ 'dated_storage_classes',
+ doc='a comma delimited list of storage classes',
+ default='',
+ from_string_converter=classes_in_namespaces_converter(
+ template_for_namespace='storage%d',
+ name_of_class_option='crashstorage_class',
+ instantiate_classes=False, # we instantiate manually for thread
+ # safety
+ )
+ )
+
+ def __init__(self, config, *args, **kwargs):
+ super(RadixCleanupCronApp, self).__init__(config, *args, **kwargs)
+ self.storage_namespaces = \
+ config.dated_storage_classes.subordinate_namespace_names
+ self.stores = DotDict()
+ for a_namespace in self.storage_namespaces:
+ self.stores[a_namespace] = \
+ config[a_namespace].crashstorage_class(config[a_namespace])
+
+ def run(self):
+ today = utc_now().strftime("%Y%m%d")
+
+ for storage in self.stores.values():
+ for date in os.listdir(storage.config.fs_root):
+ if date >= today:
+ continue # don't process today's crashes or any crashes
+ # from the future
+
+ if os.listdir(os.sep.join([storage.config.fs_root, date,
+ storage.config.date_branch_base])):
+ self.config.logger.error("Could not delete crashes for "
+ "date %s: branch isn't empty",
+ date)
+ continue # if the date branch isn't empty, then it's not
+ # safe to nuke
+
+ shutil.rmtree(os.sep.join([storage.config.fs_root, date]))
@@ -174,7 +174,7 @@ def new_crashes(self):
of "new" in an implementation specific way. To be useful, derived
class ought to override this method.
"""
- raise StopIteration
+ return []
#==============================================================================
@@ -533,7 +533,6 @@ def get_raw_dumps_as_files(self, crash_id):
except CrashIDNotFound:
return self.fallback_store.get_raw_dumps_as_files(crash_id)
-
#--------------------------------------------------------------------------
def get_processed(self, crash_id):
"""the default implementation of fetching a processed_crash
@@ -566,3 +565,182 @@ def new_crashes(self):
for a_crash in self.primary_store.new_crashes():
yield a_crash
+
+#==============================================================================
+class PrimaryDeferredStorage(CrashStorageBase):
+ """
+ PrimaryDeferredStorage reads information from a raw crash and, based on a
+ predicate function, selects either the primary or deferred storage to store
+ a crash in.
+ """
+ required_config = Namespace()
+ required_config.primary = Namespace()
+ required_config.primary.add_option(
+ 'storage_class',
+ doc='storage class for primary storage',
+ default='',
+ from_string_converter=class_converter
+ )
+ required_config.deferred = Namespace()
+ required_config.deferred.add_option(
+ 'storage_class',
+ doc='storage class for deferred storage',
+ default='',
+ from_string_converter=class_converter
+ )
+ required_config.add_option(
+ 'deferral_criteria',
+ doc='criteria for deferring a crash',
+ default='lambda crash: crash.get("legacy_processing")',
+ from_string_converter=eval
+ )
+
+ #--------------------------------------------------------------------------
+ def __init__(self, config, quit_check_callback=None):
+ """instantiate the primary and deferred storage systems"""
+ super(PrimaryDeferredStorage, self).__init__(config, quit_check_callback)
+ self.primary_store = config.primary.storage_class(config.primary)
+ self.deferred_store = config.deferred.storage_class(config.deferred)
+ self.logger = self.config.logger
+
+ #--------------------------------------------------------------------------
+ def close(self):
+ """close both storage systems. The second will still be closed even
+ if the first raises an exception. """
+ poly_exception = PolyStorageError()
+ for a_store in (self.primary_store, self.deferred_store):
+ try:
+ a_store.close()
+ except NotImplementedError:
+ pass
+ except Exception:
+ poly_exception.gather_current_exception()
+ if len(poly_exception.exceptions) > 1:
+ raise poly_exception
+
+ #--------------------------------------------------------------------------
+ def save_raw_crash(self, raw_crash, dumps, crash_id):
+ """save crash data into either the primary or deferred storage,
+ depending on the deferral criteria"""
+ if not self.config.deferral_criteria(raw_crash):
+ self.primary_store.save_raw_crash(raw_crash, dumps, crash_id)
+ else:
+ self.deferred_store.save_raw_crash(raw_crash, dumps, crash_id)
+
+ #--------------------------------------------------------------------------
+ def save_processed(self, processed_crash):
+ """save processed crash data into either the primary or deferred
+ storage, depending on the deferral criteria"""
+ if not self.config.deferral_criteria(processed_crash):
+ self.primary_store.save_processed(processed_crash)
+ else:
+ self.deferred_store.save_processed(processed_crash)
+
+ #--------------------------------------------------------------------------
+ def get_raw_crash(self, crash_id):
+ """get a raw crash 1st from primary and if not found then try the
+ deferred.
+
+ parameters:
+ crash_id - the id of a raw crash to fetch"""
+ try:
+ return self.primary_store.get_raw_crash(crash_id)
+ except CrashIDNotFound:
+ return self.deferred_store.get_raw_crash(crash_id)
+
+ #--------------------------------------------------------------------------
+ def get_raw_dump(self, crash_id, name=None):
+ """get a named crash dump 1st from primary and if not found then try
+ the deferred.
+
+ parameters:
+ crash_id - the id of a dump to fetch
+ name - name of the crash to fetch, or omit to fetch default crash"""
+ try:
+ return self.primary_store.get_raw_dump(crash_id, name)
+ except CrashIDNotFound:
+ return self.deferred_store.get_raw_dump(crash_id, name)
+
+ #--------------------------------------------------------------------------
+ def get_raw_dumps(self, crash_id):
+ """get all crash dumps 1st from primary and if not found then try
+ the deferred.
+
+ parameters:
+ crash_id - the id of a dump to fetch"""
+ try:
+ return self.primary_store.get_raw_dumps(crash_id)
+ except CrashIDNotFound:
+ return self.deferred_store.get_raw_dumps(crash_id)
+
+ #--------------------------------------------------------------------------
+ def get_raw_dumps_as_files(self, crash_id):
+ """get all crash dump pathnames 1st from primary and if not found then
+ try the deferred.
+
+ parameters:
+ crash_id - the id of a dump to fetch"""
+ try:
+ return self.primary_store.get_raw_dumps_as_files(crash_id)
+ except CrashIDNotFound:
+ return self.deferred_store.get_raw_dumps_as_files(crash_id)
+
+ #--------------------------------------------------------------------------
+ def get_processed(self, crash_id):
+ """the default implementation of fetching a processed_crash
+
+ parameters:
+ crash_id - the id of a processed_crash to fetch"""
+ try:
+ return self.primary_store.get_processed(crash_id)
+ except CrashIDNotFound:
+ return self.deferred_store.get_processed(crash_id)
+
+ #--------------------------------------------------------------------------
+ def remove(self, crash_id):
+ """delete a crash from storage
+
+ parameters:
+ crash_id - the id of a crash to fetch"""
+ try:
+ self.primary_store.remove(crash_id)
+ except CrashIDNotFound:
+ self.deferred_store.remove(crash_id)
+
+ #--------------------------------------------------------------------------
+ def new_crashes(self):
+ """return an iterator that yields a list of crash_ids of raw crashes
+ that were added to the file system since the last time this iterator
+ was requested."""
+ return self.primary_store.new_crashes()
+
+
+#==============================================================================
+class PrimaryDeferredProcessedStorage(PrimaryDeferredStorage):
+ """
+ PrimaryDeferredProcessedStorage aggregates three methods of storage: it
+ uses a deferral criteria predicate to decide where to store a raw crash,
+ like PrimaryDeferredStorage -- but it stores all processed crashes in a
+ third, separate storage.
+ """
+ required_config = Namespace()
+ required_config.processed = Namespace()
+ required_config.processed.add_option(
+ 'storage_class',
+ doc='storage class for processed storage',
+ default='',
+ from_string_converter=class_converter
+ )
+
+ #--------------------------------------------------------------------------
+ def __init__(self, config, quit_check_callback=None):
+ super(PrimaryDeferredProcessedStorage, self).__init__(config, quit_check_callback)
+ self.processed_store = config.processed.storage_class(config.processed)
+
+ #--------------------------------------------------------------------------
+ def save_processed(self, processed_crash):
+ self.processed_store.save_processed(processed_crash)
+
+ #--------------------------------------------------------------------------
+ def get_processed(self, crash_id):
+ return self.processed_store.get_processed(crash_id)
No changes.
Oops, something went wrong.

0 comments on commit b5e4cbc

Please sign in to comment.