Skip to content
This repository
Browse code

Major overhaul of backup

Uses multiprocessing.Pool for threaded implimentation
Uses sets to determine while files to upload instead of linear-chained queues
Improved date parsing for swift
Greatly reduced the number of authentication calls needed
Harvesting is now always threaded regardless of the number of threads specified
upload_directory() now is a single code-path into Application() for both serial and threaded
Added setup.py so it can be easily installed into bin/
- Will be adding to PyPi soon as well
Updated softlayer-object-storage to needing >=0.4.6 (in setup.py)
  • Loading branch information...
commit 034e0b444290b98b66fce0b3755ace8e4cd627de 1 parent 90ffb52
authored December 12, 2012

Showing 2 changed files with 323 additions and 526 deletions. Show diff stats Hide diff stats

  1. 2  setup.py
  2. 847  slbackup.py
2  setup.py
@@ -12,7 +12,7 @@
12 12
       license='MIT',
13 13
       include_package_data=True,
14 14
       zip_safe=False,
15  
-      install_requires=['softlayer-object-storage>=0.4.4'],
  15
+      install_requires=['softlayer-object-storage>=0.4.6'],
16 16
       scripts=['slbackup.py'],
17 17
       classifiers=[
18 18
         'Development Status :: 5 - Production/Stable',
847  slbackup.py
@@ -18,16 +18,19 @@
18 18
 import logging
19 19
 import logging.config
20 20
 import ConfigParser
  21
+import signal
21 22
 from copy import copy
22 23
 from hashlib import md5
23  
-from multiprocessing import Manager, Pool, cpu_count, TimeoutError
24  
-from multiprocessing import Queue, JoinableQueue
  24
+from multiprocessing import Pool, cpu_count, Process
  25
+from multiprocessing import Manager
  26
+from itertools import repeat
25 27
 
26 28
 try:
27 29
     import object_storage
28 30
 except ImportError:
29 31
     print "ERROR: You need the latest object storage bindings from github:"
30 32
     print "  https://github.com/softlayer/softlayer-object-storage-python"
  33
+    print "  or pip install softlayer-object-storage"
31 34
     sys.exit(1)
32 35
 
33 36
 try:
@@ -35,19 +38,10 @@
35 38
 except ImportError:
36 39
     # well, must be windows, assume an 4Kb slab
37 40
     # regardless if long mode is supported
38  
-    _DEFAULT_OS_BUFLEN = 4 * 1024
39  
-else:
40  
-    _DEFAULT_OS_BUFLEN = resource.getpagesize()
41  
-
42  
-DATE_FORMATS = [
43  
-    "%a, %d %b %Y %H:%M:%S %Z",
44  
-    "%a, %d %b %Y %H:%M:%S.%f %Z",
45  
-    "%Y-%m-%dT%H:%M:%S",
46  
-    "%Y-%m-%dT%H:%M:%S.%f",
47  
-    "%Y-%m-%d %H:%M:%S",
48  
-    "%Y-%m-%d %H:%M:%S.%f",
49  
-    "%Y-%m-%d"
50  
-    ]
  41
+    def default_page():
  42
+        return 4 * 1024
  43
+    resource = object()
  44
+    resource.getpagesize = default_page
51 45
 
52 46
 
53 47
 class Application(object):
@@ -57,7 +51,17 @@ class Application(object):
57 51
     _DEFAULT_THREADS = cpu_count()
58 52
     _DEFAULT_DC = 'dal05'
59 53
     _DEFAULT_USE_PRIVATE = False
60  
-    _DEFAULT_OS_BUFLEN = 1024
  54
+    _DEFAULT_OS_BUFLEN = resource.getpagesize()
  55
+
  56
+    DATE_FORMATS = [
  57
+        "%a, %d %b %Y %H:%M:%S %Z",
  58
+        "%a, %d %b %Y %H:%M:%S.%f %Z",
  59
+        "%Y-%m-%dT%H:%M:%S",
  60
+        "%Y-%m-%dT%H:%M:%S.%f",
  61
+        "%Y-%m-%d %H:%M:%S",
  62
+        "%Y-%m-%d %H:%M:%S.%f",
  63
+        "%Y-%m-%d"
  64
+    ]
61 65
 
62 66
     def __init__(self, options):
63 67
         if not isinstance(options, dict):
@@ -183,24 +187,262 @@ def __init__(self, options):
183 187
 
184 188
             sys.exit(0)
185 189
 
186  
-    def authenticate(self):
  190
+    def try_datetime_parse(self, datetime_str):
  191
+        """
  192
+        Tries to parse the datetime and return the UNIX epoc version of time
  193
+
  194
+        returns timestamp(int) or None
  195
+        """
  196
+        mtime = None
  197
+        if datetime_str:
  198
+            for fmt in self.DATE_FORMATS:
  199
+                try:
  200
+                    mtime_tuple = time.strptime(datetime_str, fmt)
  201
+                    mtime = time.mktime(tuple(mtime_tuple))
  202
+                except ValueError:
  203
+                    pass
  204
+                else:
  205
+                    break
  206
+        return mtime
  207
+
  208
+    def _setup_client(self):
187 209
         use_network = 'private' if self.use_private else 'public'
188 210
 
189 211
         object_storage.consts.USER_AGENT = __agent__
190  
-        client = object_storage.get_client(
  212
+        self.client = object_storage.get_client(
191 213
             self.username,
192 214
             self.apikey,
193 215
             datacenter=self.dc,
194 216
             network=use_network,
195 217
             auth_url=self.auth_url)
196 218
 
  219
+    def authenticate(self):
  220
+        self._setup_client()
  221
+
197 222
         logging.info("Logging in as %s in %s",
198 223
                 self.username, self.dc)
199  
-        client.conn.auth.authenticate()
200 224
 
201  
-        self.url = client.get_url()
202  
-        self.token = copy(client.conn.auth.auth_token)
203  
-        del client
  225
+        self.client.conn.auth.authenticate()
  226
+
  227
+        self.url = self.client.get_url()
  228
+        self.token = copy(self.client.conn.auth.auth_token)
  229
+
  230
+        self.client.set_storage_url(self.url)
  231
+
  232
+    def get_container(self, name=None):
  233
+        if name is None:
  234
+            name = self.container
  235
+
  236
+        return self.client[name]
  237
+
  238
+    def new_revision(self, _from, marker):
  239
+        l = logging.getLogger("new_revision")
  240
+        if self.retention < 1:
  241
+            l.warn("Retention disabled for %s", _from)
  242
+            return None
  243
+
  244
+        # copy the file to the -revisions container so we don't
  245
+        # pollute the deleted items list.  Not putting revisions
  246
+        # in a seperate container will lead to an ever growing
  247
+        # list slowing down the backups
  248
+
  249
+        _rev_container = "%s-revisions" % self.container
  250
+
  251
+        safe_filename = encode_filename(_from)
  252
+        fs = os.path.splitext(safe_filename)
  253
+        new_file = fs[0] + "_" + marker + fs[1]
  254
+
  255
+        container = self.get_container()
  256
+        revcontainer = self.get_container(name=_rev_container)
  257
+        revcontainer.create()
  258
+
  259
+        obj = container.storage_object(safe_filename)
  260
+        rev = revcontainer.storage_object(new_file)
  261
+
  262
+        if obj.exists():
  263
+            l.debug("Copying %s to %s", obj.name, rev.name)
  264
+            rev.create()
  265
+            obj.copy_to(rev)
  266
+            self.delete_later(rev)
  267
+
  268
+    def delete_later(self, obj):
  269
+        """ lacking this in the bindings currently, work around it.
  270
+            Deletes a file after the specified number of days
  271
+        """
  272
+        l = logging.getLogger("delete_later")
  273
+        delta = int(self.retention) * 24 * 60 * 60
  274
+        when = int(time.time()) + delta
  275
+        l.debug("Setting retention(%d) on %s", when, obj.name)
  276
+
  277
+        headers = {
  278
+            'X-Delete-At': str(when),
  279
+            'Content-Length': '0'}
  280
+        obj.make_request('POST', headers=headers)
  281
+
  282
+    def create_directory(self, item):
  283
+        l = logging.getLogger("create_directory")
  284
+
  285
+        safe_dir = encode_filename(item)
  286
+        l.info("Creating %s", safe_dir)
  287
+
  288
+        container = app.get_container()
  289
+        obj = container.storage_object(safe_dir)
  290
+        obj.content_type = 'application/directory'
  291
+        obj.create()
  292
+
  293
+        return True
  294
+
  295
+    def upload_file(self, _file, failed=False):
  296
+        l = logging.getLogger('upload_file')
  297
+        container = self.get_container()
  298
+
  299
+        target = encode_filename(_file)
  300
+
  301
+        try:
  302
+            obj = container.storage_object(target)
  303
+            l.info("Uploading file %s", obj.name)
  304
+            chunk_upload(obj, _file)
  305
+            l.debug("Finished file %s ", obj.name)
  306
+        except (OSError, IOError), e:
  307
+            # For some reason we couldn't read the file, skip it but log it
  308
+            l.exception("Failed to upload %s. %s", _file, e)
  309
+        except Exception, e:
  310
+            if failed:
  311
+                l.error("Couldn't upload %s, skiping: %s", _file, e)
  312
+            else:
  313
+                l.error("Failed to upload %s, requeueing. Error: %s", _file, e)
  314
+                # in case we got disconnected, reset the container
  315
+                self.authenticate()
  316
+                return self.upload_file(_file, failed=True)
  317
+        else:
  318
+            return True
  319
+
  320
+        return False
  321
+
  322
+    def delete_file(self, obj, failed=False):
  323
+        l = logging.getLogger("delete_file")
  324
+        l.info("Deleting %s", obj['name'])
  325
+
  326
+        try:
  327
+            # Copy the file out of the way
  328
+            self.new_revision(obj['name'], obj.get('hash', 'deleted'))
  329
+
  330
+            # then delete it as it no longer exists.
  331
+            rm = self.get_container().storage_object(obj['name'])
  332
+            rm.delete()
  333
+        except Exception, e:
  334
+            if not failed:
  335
+                l.exception("Failed to delete %s, requeueing. Error: %s",
  336
+                        obj['name'], e)
  337
+                # in case we got disconnected, reset the container
  338
+                self.authenticate()
  339
+                return self.delete_file(obj, failed=True)
  340
+            else:
  341
+                l.exception("Failed to upload %s. %s", obj['name'], e)
  342
+        else:
  343
+            return True
  344
+        return False
  345
+
  346
+    def process_file(self, job):
  347
+        """ returns if a file should be uploaded or not and
  348
+        if the file should be be marked as done"""
  349
+        l = logging.getLogger('process_file')
  350
+
  351
+        try:
  352
+            _file, obj = job
  353
+        except ValueError:
  354
+            raise ValueError("Job not a tuple")
  355
+
  356
+        def _do_timesize():
  357
+            oldsize = int(obj.get('size'))
  358
+            cursize = int(get_filesize(_file))
  359
+            curdate = int(os.path.getmtime(_file))
  360
+            oldtime = obj.get('last_modified')
  361
+
  362
+            # there are a few formats, try to figure out which one safely
  363
+            oldtime = self.try_datetime_parse(oldtime)
  364
+            if oldtime is None:
  365
+                l.warn("Failed to figure out the time format, skipping %s",
  366
+                        _file)
  367
+                return False
  368
+
  369
+            if cursize == oldsize and oldtime >= curdate:
  370
+                l.debug("No change in filesize/date: %s", _file)
  371
+                return False
  372
+
  373
+            l.debug("Revised: SIZE:%s:%s DATE:%s:%s FILE:%s",
  374
+                    oldsize, cursize, oldtime, curdate, _file)
  375
+            return True
  376
+
  377
+        def _do_checksum():
  378
+            l.debug("Checksumming %s", _file)
  379
+
  380
+            oldhash = obj['hash']
  381
+            newhash = swifthash(_file)
  382
+
  383
+            if oldhash == newhash:
  384
+                l.debug("No change in checksum: %s", _file)
  385
+                return False
  386
+
  387
+            l.debug("Revised: HASH:%s:%s FILE:%s", oldhash, newhash, _file)
  388
+            return True
  389
+
  390
+        compare = _do_timesize
  391
+        if app.checkhash:
  392
+            compare = _do_checksum
  393
+
  394
+        upload_file = False
  395
+        try:
  396
+            if compare():
  397
+                # make a new copy, retention is handled there.  Start uploading
  398
+                # and then remove it so it doesn't get deleted
  399
+                self.new_revision(_file, obj['hash'])
  400
+                upload_file = True
  401
+        except (OSError, IOError), e:
  402
+            l.error("Couldn't read file size skipping, %s: %s", _file, e)
  403
+        # Just because we can't read it doesn't mean we don't have
  404
+        # the permission, it could be a medium error in which case
  405
+        # don't delete the file, remove it from the remote object dict
  406
+        # so it doesn't get marked for deletion later on.  Even if
  407
+        # the file doesn't need backing up, remove it just the same
  408
+        return upload_file
  409
+
  410
+    def __call__(self, item):
  411
+        work, job = item
  412
+
  413
+        self._setup_client()
  414
+
  415
+        if self.url:
  416
+            self.client.set_storage_url(self.url)
  417
+
  418
+        if self.token:
  419
+            self.client.conn.auth.auth_token = self.token
  420
+
  421
+        if not self.url or not self.token:
  422
+            logging.warn("Invalid authentication")
  423
+            self.authenticate()
  424
+
  425
+        if work == 'stat':
  426
+            rt = self.process_file(job)
  427
+            if rt:
  428
+                rt = self.upload_file(job[0])
  429
+        elif work == 'delete':
  430
+            rt = self.delete_file(job)
  431
+        elif work == 'mkdir':
  432
+            rt = self.create_directory(job)
  433
+        elif work == 'upload':
  434
+            rt = self.upload_file(job)
  435
+        else:
  436
+            logging.fatal("Unknown work type: %s", work)
  437
+
  438
+        if isinstance(job, dict):
  439
+            logging.debug("%s for %s returned %s", work, job['name'], rt)
  440
+        else:
  441
+            logging.debug("%s for %s returned %s", work, job, rt)
  442
+
  443
+
  444
+def init_worker():
  445
+    signal.signal(signal.SIGINT, signal.SIG_IGN)
204 446
 
205 447
 
206 448
 def get_filesize(_f):
@@ -224,7 +466,7 @@ def swifthash(_f):
224 466
     return m.hexdigest()
225 467
 
226 468
 
227  
-def asblocks(_f, buflen=_DEFAULT_OS_BUFLEN):
  469
+def asblocks(_f, buflen=resource.getpagesize()):
228 470
     """Generator that yields buflen bytes from an open filehandle.
229 471
     Yielded bytes might be less buflen. """
230 472
     if not isinstance(_f, file):
@@ -242,72 +484,6 @@ def asblocks(_f, buflen=_DEFAULT_OS_BUFLEN):
242 484
         raise e
243 485
 
244 486
 
245  
-def queue_iter(queue):
246  
-    while True:
247  
-        try:
248  
-            item = queue.get()
249  
-        except Queue.Empty:
250  
-            break
251  
-
252  
-        if item is None:
253  
-            break
254  
-
255  
-        yield item
256  
-
257  
-
258  
-def roundrobin_iter(**queues):
259  
-    total_queues = len(queues)
260  
-    miss = 0
261  
-    for name, q in queues.iteritems():
262  
-        try:
263  
-            item = q.get(False, 0.1)
264  
-        except (Queue.Empty, TimeoutError):
265  
-            miss += 1
266  
-            if miss > total_queues:
267  
-                logging.info("dequeue failed")
268  
-                break
269  
-            continue
270  
-        except IOError:
271  
-            continue
272  
-        else:
273  
-            miss = 0
274  
-            if item:
275  
-                yield name, item
276  
-            else:
277  
-                logging.info("Done processing %s", name)
278  
-                q.close()
279  
-
280  
-
281  
-class IterUnwrap(object):
282  
-    def __init__(self, func, *args, **kwargs):
283  
-        self.func = func
284  
-        self.args = args
285  
-        self.kwargs = kwargs
286  
-
287  
-    def __call__(self, item):
288  
-        a = (item,) + self.args
289  
-        return self.func(*a, **self.kwargs)
290  
-
291  
-
292  
-def try_datetime_parse(datetime_str):
293  
-    """
294  
-    Tries to parse the datetime and return the UNIX epoc version of the time.
295  
-
296  
-    returns timestamp(int) or None
297  
-    """
298  
-    mtime = None
299  
-    if datetime_str:
300  
-        for fmt in DATE_FORMATS:
301  
-            try:
302  
-                mtime_tuple = time.strptime(datetime_str, fmt)
303  
-                mtime = time.mktime(tuple(mtime_tuple))
304  
-            except ValueError:
305  
-                pass
306  
-            else:
307  
-                break
308  
-    return mtime
309  
-
310  
-
311 487
 def encode_filename(string):
312 488
     string = str(string)
313 489
     uc = unicode(string, 'utf-8', 'replace')
@@ -322,22 +498,7 @@ def chunk_upload(obj, filename, headers=None):
322 498
         upload.finish()
323 499
 
324 500
 
325  
-def get_container(app, name=None):
326  
-    if name is None:
327  
-        name = app.container
328  
-
329  
-    object_storage.consts.USER_AGENT = __agent__
330  
-    client = object_storage.get_client(
331  
-            app.username,
332  
-            app.apikey,
333  
-            auth_token=app.token,
334  
-            auth_url=app.auth_url)
335  
-    client.set_storage_url(app.url)
336  
-
337  
-    return client[name]
338  
-
339  
-
340  
-def catalog_directory(app, directory, files, directories):
  501
+def catalog_directory(app, files, directories):
341 502
     logging.info("Gathering local files")
342 503
     for root, dirnames, filenames in os.walk('.'):
343 504
         # Prune all excluded directories from the list
@@ -352,17 +513,17 @@ def catalog_directory(app, directory, files, directories):
352 513
                     dirnames.remove(p)
353 514
 
354 515
         for _dir in dirnames:
355  
-            directories.put(os.path.relpath(os.path.join(root, _dir)))
  516
+            directories.append(os.path.relpath(os.path.join(root, _dir)))
356 517
 
357 518
         for _file in filenames:
358  
-            files.put(os.path.relpath(os.path.join(root, _file)))
  519
+            files.append(os.path.relpath(os.path.join(root, _file)))
359 520
 
360 521
     logging.info("Done gathering local files")
361 522
 
362 523
 
363 524
 def catalog_remote(app, objects):
364 525
     logging.info("Grabbing remote objects")
365  
-    container = get_container(app)
  526
+    container = app.get_container()
366 527
     container.create()
367 528
     f = container.objects()
368 529
     while True:
@@ -381,265 +542,24 @@ def catalog_remote(app, objects):
381 542
     logging.info("Objects %d", len(objects))
382 543
 
383 544
 
384  
-def upload_directory(app):
385  
-    """ Uploads an entire local directory. """
386  
-    manager = Manager()
387  
-    directories = JoinableQueue()
388  
-    files = JoinableQueue()
389  
-    remote_objects = manager.dict()
390  
-    uploads = manager.Queue()
391  
-    deletes = manager.Queue()
392  
-    mkdirs = manager.Queue()
393  
-
394  
-    app.authenticate()
395  
-
396  
-    logging.debug("%s %s", app.token, app.url)
397  
-
398  
-    if app.threads:
399  
-        threaded_harvestor(app, files, directories, remote_objects)
400  
-    else:
401  
-        serial_harvestor(app, files, directories, remote_objects)
402  
-
403  
-    args = (app, files, directories, remote_objects, uploads, deletes, mkdirs,)
404  
-
405  
-    if app.threads:
406  
-        threaded_processor(*args)
407  
-    else:
408  
-        serial_processor(*args)
409  
-
410  
-    logging.info("Done backing up %s to %s", app.source, app.container)
411  
-
412  
-
413  
-def serial_harvestor(app, files, directories, remote_objects):
414  
-    catalog_directory(copy(app), app.source, files, directories)
415  
-    catalog_remote(copy(app), remote_objects)
416  
-    # since serial processor doesn't use any threads and uses queue_iter()
417  
-    # we have to mark the end of the queue so queue_iter exits
418  
-    files.put(None)
419  
-    directories.put(None)
420  
-
  545
+def delta_force_one(files, directories, remote_objects):
  546
+    f = set(files)
  547
+    d = set(directories)
  548
+    r = set(remote_objects.keys())
  549
+    a = set(files + directories)
  550
+    # FIXME patchup file and directory names for comparison
421 551
 
422  
-def threaded_harvestor(app, files, directories, remote_objects):
423  
-    pool = Pool(app.threads)
424  
-
425  
-    logging.info("Starting harvesters")
426  
-
427  
-    local = pool.apply_async(catalog_directory,
428  
-            (copy(app), app.source, files, directories,))
429  
-    remote = pool.apply_async(catalog_remote,
430  
-        (copy(app), remote_objects,))
431  
-
432  
-    pool.close()
433  
-
434  
-    logging.info("Waiting for harvest")
435  
-    pool.join()
436  
-
437  
-    if not local.successful():
438  
-        logging.error("Local processing encountered an error")
439  
-        try:
440  
-            local.get()
441  
-        except Exception, e:
442  
-            logging.exception(e)
443  
-            raise e
444  
-
445  
-    if not remote.successful():
446  
-        logging.error("Remote processing encountered an error")
447  
-        try:
448  
-            remote.get()
449  
-        except Exception, e:
450  
-            logging.exception(e)
451  
-            raise e
  552
+    work = zip(repeat('upload'), f - r) + \
  553
+           zip(repeat('mkdir'), d - r)
452 554
 
  555
+    # add the remote object directly to the delete queue
  556
+    for dl in (r - a):
  557
+        work.append(('delete', remote_objects[dl],))
453 558
 
454  
-def serial_queue_item(func, queue, *args):
455  
-    rt = func(*args)
456  
-    if hasattr(queue, 'task_done'):
457  
-        queue.task_done()
458  
-    return rt
  559
+    for st in (a & r):
  560
+        work.append(('stat', (st, remote_objects[st],),))
459 561
 
460  
-
461  
-def serial_processor(app, files, directories, remote_objects, uploads,
462  
-        deletes, mkdirs):
463  
-    l = logging.getLogger('serial_processor')
464  
-
465  
-    # TODO fix to use serial_queue_item
466  
-    l.info("Processing directories (%d)", directories.qsize())
467  
-    process_directories = IterUnwrap(process_directory,
468  
-            copy(app), remote_objects, mkdirs)
469  
-    map(process_directories, queue_iter(directories))
470  
-    mkdirs.put(None)
471  
-
472  
-    l.info("Creating Directories")
473  
-    create_dir = IterUnwrap(create_directory, copy(app))
474  
-    map(create_dir, queue_iter(mkdirs))
475  
-
476  
-    process_files = IterUnwrap(process_file,
477  
-            copy(app), remote_objects, uploads)
478  
-    map(process_files, queue_iter(files))
479  
-    uploads.put(None)
480  
-
481  
-    l.info("Starting uploader")
482  
-    process_uploads = IterUnwrap(upload_file, copy(app), uploads)
483  
-    map(process_uploads, queue_iter(uploads))
484  
-
485  
-    l.info("%d objects scheduled for deletion", len(remote_objects))
486  
-    for d in remote_objects.values():
487  
-        deletes.put(d)
488  
-    deletes.put(None)
489  
-
490  
-    delete_files = IterUnwrap(delete_file, copy(app), deletes)
491  
-    map(delete_files, queue_iter(deletes))
492  
-
493  
-
494  
-def threaded_done_marker(results, queue):
495  
-    if isinstance(results, Exception):
496  
-        logging.exception(results)
497  
-
498  
-    if hasattr(queue, 'task_done'):
499  
-        queue.task_done()
500  
-    else:
501  
-        queue.put(None)
502  
-
503  
-
504  
-def process_deletes(app, files, directories, deletes)):
505  
-    files.join()
506  
-    directories.join()
507  
-    for d in remote_objects.values():
508  
-        deletes.put(d)
509  
-    deletes.join()
510  
-
511  
-def threaded_processor(app, files, directories, remote_objects, uploads,
512  
-        deletes, mkdirs):
513  
-    l = logging.getLogger("threaded_processor")
514  
-    workers = Pool(app.threads)
515  
-    jobs = JoinableQueue()
516  
-    backlog = JoinableQueue(app.threads)
517  
-
518  
-    # total work feeds into backlog as items are available
519  
-    # and as the queue is still open
520  
-    # start thread to for pushing directories and file
521  
-    # start thread for uploads and mkdirs and deletes
522  
-    # join pusher
523  
-    # populate deleter
524  
-    # join writers
525  
-
526  
-    processors = {
527  
-        'directory': {
528  
-            'func': IterUnwrap(process_directory, copy(app), remote_objects,
529  
-                mkdirs),
530  
-            'source': directories,
531  
-        },
532  
-        'file': {
533  
-            'func': IterUnwrap(process_file, copy(app), remote_objects,
534  
-                uploads),
535  
-            'source': files,
536  
-        },
537  
-        'mkdir': {
538  
-            'func': IterUnwrap(create_directory, copy(app)),
539  
-            'source': mkdirs,
540  
-        },
541  
-        'upload': {
542  
-            'func': IterUnwrap(upload_file, copy(app), uploads),
543  
-            'source': uploads,
544  
-        },
545  
-    }
546  
-
547  
-    workers.apply_async(process_deletes, (app, files, directories, deletes,))
548  
-    files.join()
549  
-    directories.join()
550  
-
551  
-    logging.info("%d objects scheduled for deletion", deletes.qsize() - 1)
552  
-    delete_files = IterUnwrap(delete_file, copy(app), deletes)
553  
-    workers.map_async(delete_files, queue_iter(deletes))
554  
-
555  
-    for name, item in roundrobin_iter(
556  
-            **dict([(k, processors[k]['source']) for k in processors.keys()])):
557  
-        workers.apply_async(processors[name]['func'], (item,))
558  
-#
559  
-#    if directories.qsize() > 1:
560  
-#        l.info("Processing %d directories", directories.qsize())
561  
-#        dir_done = IterUnwrap(threaded_done_marker, mkdirs)
562  
-#        process_directories = None
563  
-#        dir_proc = workers.map_async(process_directories,
564  
-#                queue_iter(directories), app.threads, dir_done)
565  
-#
566  
-#        l.info("Creating Directories")
567  
-#        mkdir = None
568  
-#        workers.map_async(mkdir, queue_iter(mkdirs), app.threads)
569  
-#    else:
570  
-#        directories.get_nowait()
571  
-#
572  
-#    if files.qsize() > 1:
573  
-#        l.info("Processing files")
574  
-#        file_done = IterUnwrap(threaded_done_marker, uploads)
575  
-#        process_files = None
576  
-#        file_proc = workers.map_async(process_files,
577  
-#                queue_iter(files), 2, file_done)
578  
-#
579  
-#    else:
580  
-#        directories.get_nowait()
581  
-#
582  
-#    l.info("Waiting to process files")
583  
-#
584  
-#    #TODO wait for processing to finish
585  
-#    while file_proc or dir_proc or upload_proc:
586  
-#        l.info("Waiting for processing")
587  
-#        if file_proc:
588  
-#            try:
589  
-#                file_res = file_proc.wait(1)
590  
-#                file_proc.successful()
591  
-#            except (TimeoutError, AssertionError):
592  
-#                l.info("Still processing files")
593  
-#            else:
594  
-#                l.info("Done processing files")
595  
-#                file_proc = None
596  
-#                if isinstance(file_res, Exception):
597  
-#                    raise
598  
-#
599  
-#        if dir_proc:
600  
-#            try:
601  
-#                dir_res = dir_proc.wait(1)
602  
-#            except TimeoutError:
603  
-#                l.info("Still processing directories")
604  
-#            else:
605  
-#                l.info("Done processing directories")
606  
-#                dir_proc = None
607  
-#                if isinstance(dir_res, Exception):
608  
-#                    raise
609  
-#
610  
-#        if upload_proc:
611  
-#            try:
612  
-#                upload_proc.wait(1)
613  
-#                upload_proc.successful()
614  
-#            except (TimeoutError, AssertionError):
615  
-#                l.info("Still processing uploads")
616  
-#            else:
617  
-#                l.info("Done processing uploads")
618  
-#                upload_proc = None
619  
-#        elif uploads.qsize() > 0:
620  
-#            process_uploads =
621  
-#            l.info("Starting uploader")
622  
-#            upload_proc = workers.map_async(process_uploads,
623  
-#                    queue_iter(uploads), app.threads)
624  
-#            l.info("This didn't run right away")
625  
-    l.info("Exiting")
626  
-    sys.exit(0)
627  
-    # After the readers have all exited, we know that remote_objects
628  
-    # contains the remaining files that should be deleted from
629  
-    # the backups.  Dump these into a Queue for the writers to take
630  
-    # care of.
631  
-    workers.close()
632  
-
633  
-    while (uploads.qsize() + mkdirs.qsize() + deletes.qsize()) > 0:
634  
-        l.info("Actions remaining:- uploading:%d mkdir:%d deletes:%d",
635  
-            directories.qsize(),
636  
-            files.qsize(),
637  
-            deletes.qsize(),
638  
-        )
639  
-        time.sleep(1)
640  
-    l.info("Cleaning up, letting pending items finish %d",
641  
-            (uploads.qsize() + mkdirs.qsize() + deletes.qsize()))
642  
-    workers.join()
  562
+    return work
643 563
 
644 564
 
645 565
 def process_directory(directory, app, remote_objects, mkdirs):
@@ -649,180 +569,57 @@ def process_directory(directory, app, remote_objects, mkdirs):
649 569
     if safe_dir in remote_objects and \
650 570
         remote_objects[safe_dir].get('content_type', None) == \
651 571
        'application/directory':
652  
-        del remote_objects[safe_dir]
653  
-        return
654  
-
655  
-    if safe_dir in remote_objects:
656  
-        del remote_objects[safe_dir]
657  
-
658  
-    mkdirs.put(safe_dir)
659  
-
660  
-
661  
-def create_directory(safe_dir, app):
662  
-    l = logging.getLogger("create_directory")
663  
-    l.info("Creating %s", safe_dir)
664  
-
665  
-    container = get_container(app)
666  
-    obj = container.storage_object(safe_dir)
667  
-    obj.content_type = 'application/directory'
668  
-    obj.create()
669  
-
670  
-
671  
-def delete_file(obj, app, jobs):
672  
-    l = logging.getLogger("delete_file")
673  
-    l.info("Deleting %s", obj['name'])
674  
-
675  
-    try:
676  
-        # Copy the file out of the way
677  
-        new_revision(app, obj['name'], obj.get('hash', 'deleted'))
678  
-
679  
-        # then delete it as it no longer exists.
680  
-        rm = get_container(app).storage_object(obj['name'])
681  
-        rm.delete()
682  
-    except Exception, e:
683  
-        l.error("Failed to delete %s, requeueing. Error: %s", obj['name'], e)
684  
-        jobs.put(obj)
685  
-        # in case we got disconnected, reset the container
686  
-        app.authenticate()
687  
-
688  
-
689  
-def process_file(_file, app, objects, backlog):
690  
-    l = logging.getLogger('process_file')
691  
-
692  
-    safe_filename = encode_filename(_file)
693  
-
694  
-    # don't bother with checksums for new files
695  
-    if safe_filename not in objects:
696  
-        l.debug("Queued missing %s", safe_filename)
697  
-        backlog.put((_file, safe_filename,))
698  
-        return True
699  
-
700  
-    def _do_timesize():
701  
-        oldsize = int(objects[safe_filename].get('size'))
702  
-        cursize = int(get_filesize(_file))
703  
-        curdate = int(os.path.getmtime(_file))
704  
-        oldtime = objects[safe_filename].get('last_modified')
705  
-
706  
-        # there are a few formats, try to figure out which one safely
707  
-        oldtime = try_datetime_parse(oldtime)
708  
-        if oldtime is None:
709  
-            l.warn("Failed to figure out the time format, skipping %s", _file)
710  
-            return False
711  
-
712  
-        if cursize == oldsize and oldtime >= curdate:
713  
-            l.debug("No change in filesize/date: %s", _file)
714  
-            del objects[safe_filename]
715  
-            return False
716  
-
717  
-        l.debug("Revised: SIZE:%s:%s DATE:%s:%s FILE:%s",
718  
-                oldsize, cursize, oldtime, curdate, safe_filename)
719  
-        return True
720  
-
721  
-    def _do_checksum():
722  
-        l.debug("Checksumming %s", _file)
723  
-
724  
-        oldhash = objects[safe_filename]['hash']
725  
-        newhash = swifthash(_file)
726  
-
727  
-        if oldhash == newhash:
728  
-            l.debug("No change in checksum: %s", _file)
729  
-            del objects[safe_filename]
730  
-            return False
731  
-
732  
-        l.debug("Revised: HASH:%s:%s FILE:%s", oldhash, newhash, safe_filename)
733  
-        return True
734  
-
735  
-    compare = _do_timesize
736  
-    if app.checkhash:
737  
-        compare = _do_checksum
738  
-
739  
-    try:
740  
-        if compare():
741  
-            # make a new copy, retention is handled there.  Start uploading
742  
-            # and then remove it so it doesn't get deleted
743  
-            new_revision(app, _file, objects[safe_filename]['hash'])
744  
-            backlog.put((_file, safe_filename,))
745  
-            del objects[safe_filename]
746  
-
747  
-    except (OSError, IOError), e:
748  
-        l.error("Couldn't read file size skipping, %s: %s", _file, e)
749  
-        # Just because we can't read it doesn't mean we dont' have
750  
-        # the permission, it could be a medium error in which case
751  
-        # don't delete the file, remote it from the remote object
752  
-        # so it doesn't get marked for deletion
753  
-        del objects[safe_filename]
754 572
         return False
755 573
 
  574
+    return True
756 575
 
757  
-def new_revision(app, _from, marker):
758  
-    l = logging.getLogger("new_revision")
759  
-    if app.retention < 1:
760  
-        l.warn("Retention disabled for %s", _from)
761  
-        return None
762  
-
763  
-    # copy the file to the -revisions container so we don't
764  
-    # pollute the deleted items list.  Not putting revisions
765  
-    # in a seperate container will lead to an ever growing
766  
-    # list slowing down the backups
767  
-
768  
-    _rev_container = "%s-revisions" % app.container
769 576
 
770  
-    safe_filename = encode_filename(_from)
771  
-    fs = os.path.splitext(safe_filename)
772  
-    new_file = fs[0] + "_" + marker + fs[1]
773  
-
774  
-    container = get_container(app)
775  
-    revcontainer = get_container(app, name=_rev_container)
776  
-    revcontainer.create()
777  
-
778  
-    obj = container.storage_object(safe_filename)
779  
-    rev = revcontainer.storage_object(new_file)
780  
-
781  
-    if obj.exists():
782  
-        l.debug("Copying %s to %s", obj.name, rev.name)
783  
-
784  
-        rev.create()
  577
+def upload_directory(app):
  578
+    """ Uploads an entire local directory. """
  579
+    manager = Manager()
  580
+    directories = manager.list()
  581
+    files = manager.list()
  582
+    remote_objects = manager.dict()
785 583
 
786  
-        obj.copy_to(rev)
787  
-        delete_later(rev, app)
  584
+    app.authenticate()
788 585
 
  586
+    logging.debug("%s %s", app.token, app.url)
789 587
 
790  
-def delete_later(obj, app):
791  
-    """ lacking this in the bindings currently, work around it.
792  
-        Deletes a file after the specified number of days
793  
-    """
794  
-    l = logging.getLogger("delete_later")
795  
-    delta = int(app.retention) * 24 * 60 * 60
796  
-    when = int(time.time()) + delta
797  
-    l.debug("Setting retention(%d) on %s", when, obj.name)
  588
+    logging.info("Starting harvesters")
  589
+    local = Process(target=catalog_directory,
  590
+            args=(app, files, directories,))
  591
+    remote = Process(target=catalog_remote,
  592
+        args=(app, remote_objects,))
798 593
 
799  
-    headers = {
800  
-        'X-Delete-At': str(when),
801  
-        'Content-Length': '0'}
802  
-    obj.make_request('POST', headers=headers)
  594
+    remote.start()
  595
+    local.start()
803 596
 
  597
+    logging.info("Waiting for harvest")
  598
+    local.join()
  599
+    remote.join()
804 600
 
805  
-def upload_file(job, app, jobs):
806  
-    l = logging.getLogger('upload_file')
807  
-    container = get_container(app)
  601
+    backlog = delta_force_one(files, directories, remote_objects)
808 602
 
809  
-    # job is a tuple
810  
-    _file, target = job
  603
+    logging.debug("Backlog: %s", backlog)
  604
+    if app.threads:
  605
+        p = Pool(processes=app.threads, initializer=init_worker)
  606
+        # remove client property as it can't be pickled
  607
+        app.client = None
  608
+        try:
  609
+            rs = p.map_async(app, backlog, 1)
  610
+            p.close()
  611
+            rs.wait()
  612
+            if not rs.successful():
  613
+                raise rs.get()
  614
+            p.join()
  615
+        except KeyboardInterrupt:
  616
+            logging.info("Trying to stop...")
  617
+            p.terminate()
  618
+            p.join()
  619
+    else:
  620
+        map(app, backlog)
811 621
 
812  
-    try:
813  
-        obj = container.storage_object(target)
814  
-        l.info("Uploading file %s", obj.name)
815  
-        chunk_upload(obj, _file)
816  
-        l.debug("Finished file %s ", obj.name)
817  
-    except (OSError, IOError), e:
818  
-        # For some reason we couldn't read the file, skip it but log it
819  
-        l.error("Failed to upload %s. %s", _file, e)
820  
-    except Exception, e:
821  
-        l.error("Failed to upload %s, requeueing. Error: %s", _file, e)
822  
-        jobs.put((_file, target,))
823  
-        # in case we got disconnected, reset the container
824  
-        app.authenticate()
825  
-        container = get_container(app)
  622
+    logging.info("Done backing up %s to %s", app.source, app.container)
826 623
 
827 624
 
828 625
 if __name__ == "__main__":

0 notes on commit 034e0b4

Please sign in to comment.
Something went wrong with that request. Please try again.