Skip to content

Commit

Permalink
[Refactor] Moved storage operations to EDQueueStorageEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
thisandagain committed Sep 17, 2012
1 parent 44b9b91 commit 2275cd1
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 112 deletions.
7 changes: 2 additions & 5 deletions EDQueue/EDQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
//

#import <Foundation/Foundation.h>
#import "FMDatabase.h"
#import "FMDatabaseAdditions.h"
#import "FMDatabasePool.h"
#import "FMDatabaseQueue.h"
#import "EDQueueStorageEngine.h"

//

Expand Down Expand Up @@ -41,7 +38,7 @@ typedef enum {

@interface EDQueue : NSObject
{
@private FMDatabaseQueue *queue;
@private EDQueueStorageEngine *engine;
}

@property (weak) id<EDQueueDelegate> delegate;
Expand Down
126 changes: 19 additions & 107 deletions EDQueue/EDQueue.m
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
//

@interface EDQueue ()
@property EDQueueStorageEngine *engine;
@property (readwrite) Boolean isRunning;
@property (readwrite) Boolean isActive;
@property FMDatabaseQueue *queue;
@property NSTimer *timer;
@end

//
Expand All @@ -34,22 +33,10 @@ - (id)init
{
self = [super init];
if (self) {
// Setup
_isRunning = false;
_isActive = false;
_retryLimit = 4;

// Database path
NSArray *paths = NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask,YES);
NSString *documentsDirectory = [paths objectAtIndex:0];
NSString *path = [documentsDirectory stringByAppendingPathComponent:@"edqueue_0.5.0c.db"];

// Allocate the queue
_queue = [[FMDatabaseQueue alloc] initWithPath:path];
[self.queue inDatabase:^(FMDatabase *db) {
[db executeUpdate:@"CREATE TABLE IF NOT EXISTS queue (id INTEGER PRIMARY KEY, task TEXT NOT NULL, data TEXT NOT NULL, attempts INTEGER DEFAULT 0, stamp STRING DEFAULT (strftime('%s','now')) NOT NULL, udef_1 TEXT, udef_2 TEXT)"];
[self databaseHadError:[db hadError] fromDatabase:db];
}];
_engine = [[EDQueueStorageEngine alloc] init];
_isRunning = false;
_isActive = false;
_retryLimit = 4;
}
return self;
}
Expand All @@ -67,7 +54,7 @@ - (id)init
- (void)enqueueWithData:(id)data forTask:(NSString *)task
{
if (data == nil) data = @{};
[self createJob:[NSJSONSerialization dataWithJSONObject:data options:NSJSONWritingPrettyPrinted error:nil] forTask:task];
[self.engine createJob:data forTask:task];
[self tick];
}

Expand Down Expand Up @@ -99,95 +86,21 @@ - (void)stop
}
}

#pragma mark - Database helpers

- (Boolean)databaseHadError:(Boolean)flag fromDatabase:(FMDatabase *)db
{
if (flag) NSLog(@"Queue Database Error %d: %@", [db lastErrorCode], [db lastErrorMessage]);
return flag;
}

- (NSUInteger)jobCount
{
__block NSUInteger count = 0;

[self.queue inDatabase:^(FMDatabase *db) {
FMResultSet *rs = [db executeQuery:@"SELECT count(id) AS count FROM queue"];
[self databaseHadError:[db hadError] fromDatabase:db];

while ([rs next]) {
count = [rs intForColumn:@"count"];
}

[rs close];
}];

return count;
}

- (void)createJob:(id)data forTask:(id)task
{
[self.queue inDatabase:^(FMDatabase *db) {
[db executeUpdate:@"INSERT INTO queue (task, data) VALUES (?, ?)", task, data];
[self databaseHadError:[db hadError] fromDatabase:db];
}];
}

- (NSDictionary *)fetchJob
{
__block id job;

[self.queue inDatabase:^(FMDatabase *db) {
FMResultSet *rs = [db executeQuery:@"SELECT * FROM queue ORDER BY id ASC LIMIT 1"];
[self databaseHadError:[db hadError] fromDatabase:db];

while ([rs next]) {
job = @{
@"id": [NSNumber numberWithInt:[rs intForColumn:@"id"]],
@"task": [rs stringForColumn:@"task"],
@"data": [NSJSONSerialization JSONObjectWithData:[[rs stringForColumn:@"data"] dataUsingEncoding:NSUTF8StringEncoding] options:NSJSONReadingMutableContainers error:nil],
@"attempts": [NSNumber numberWithInt:[rs intForColumn:@"attempts"]],
@"stamp": [rs stringForColumn:@"stamp"]
};
}

[rs close];
}];

return job;
}

- (void)removeJob:(NSNumber *)jid
{
[self.queue inDatabase:^(FMDatabase *db) {
[db executeUpdate:@"DELETE FROM queue WHERE id = ?", jid];
[self databaseHadError:[db hadError] fromDatabase:db];
}];
}

- (void)incrementAttemptForJob:(NSNumber *)jid
{
[self.queue inDatabase:^(FMDatabase *db) {
[db executeUpdate:@"UPDATE queue SET attempts = attempts + 1 WHERE id = ?", jid];
[self databaseHadError:[db hadError] fromDatabase:db];
}];
}

#pragma mark - Private methods

/**
* Checks the queue for available jobs, sends them to the processor delegate, and handles the response.
* Checks the queue for available jobs, sends them to the processor delegate, and then handles the response.
*
* @return {void}
*/
- (void)tick
{
dispatch_queue_t gcd = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0);
dispatch_async(gcd, ^{
if (self.isRunning && !self.isActive && [self jobCount] > 0) {
if (self.isRunning && !self.isActive && [self.engine fetchJobCount] > 0) {
// Start job
self.isActive = true;
id job = [self fetchJob];
id job = [self.engine fetchJob];

// Pass job to delegate
EDQueueResult result = [self.delegate queue:self processJob:job];
Expand All @@ -196,33 +109,33 @@ - (void)tick
switch (result) {
case EDQueueResultSuccess:
[self performSelectorOnMainThread:@selector(postNotification:) withObject:[NSDictionary dictionaryWithObjectsAndKeys:@"EDQueueJobDidSucceed", @"name", job, @"data", nil] waitUntilDone:false];
[self removeJob:[job objectForKey:@"id"]];
[self.engine removeJob:[job objectForKey:@"id"]];
break;
case EDQueueResultFail:
[self performSelectorOnMainThread:@selector(postNotification:) withObject:[NSDictionary dictionaryWithObjectsAndKeys:@"EDQueueJobDidFail", @"name", job, @"data", nil] waitUntilDone:true];
NSUInteger currentAttempt = [[job objectForKey:@"attempts"] intValue] + 1;
if (currentAttempt < self.retryLimit) {
[self incrementAttemptForJob:[job objectForKey:@"id"]];
[self.engine incrementAttemptForJob:[job objectForKey:@"id"]];
} else {
[self removeJob:[job objectForKey:@"id"]];
[self.engine removeJob:[job objectForKey:@"id"]];
}
break;
case EDQueueResultCritical:
[self performSelectorOnMainThread:@selector(postNotification:) withObject:[NSDictionary dictionaryWithObjectsAndKeys:@"EDQueueJobDidFail", @"name", job, @"data", nil] waitUntilDone:false];
[self errorWithMessage:@"Critical error. Job canceled."];
[self removeJob:[job objectForKey:@"id"]];
[self.engine removeJob:[job objectForKey:@"id"]];
break;
}

// Check drain
if ([self jobCount] == 0) {
// Clean-up
self.isActive = false;

// Drain
if ([self.engine fetchJobCount] == 0) {
[self performSelectorOnMainThread:@selector(postNotification:) withObject:[NSDictionary dictionaryWithObjectsAndKeys:@"EDQueueDidDrain", @"name", nil, @"data", nil] waitUntilDone:false];
} else {
[self performSelectorOnMainThread:@selector(tick) withObject:nil waitUntilDone:false];
}

// Clean-up
self.isActive = false;
}
});
}
Expand Down Expand Up @@ -258,8 +171,7 @@ - (void)errorWithMessage:(NSString *)message
- (void)dealloc
{
self.delegate = nil;

_queue = nil;
_engine = nil;
}

@end
26 changes: 26 additions & 0 deletions EDQueue/EDQueueStorageEngine.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// EDQueueStorage.h
// queue
//
// Created by Andrew Sliwinski on 9/17/12.
// Copyright (c) 2012 DIY, Co. All rights reserved.
//

#import <Foundation/Foundation.h>

#import "FMDatabase.h"
#import "FMDatabaseAdditions.h"
#import "FMDatabasePool.h"
#import "FMDatabaseQueue.h"

@interface EDQueueStorageEngine : NSObject

@property FMDatabaseQueue *queue;

- (void)createJob:(id)data forTask:(id)task;
- (void)incrementAttemptForJob:(NSNumber *)jid;
- (void)removeJob:(NSNumber *)jid;
- (NSUInteger)fetchJobCount;
- (NSDictionary *)fetchJob;

@end
Loading

0 comments on commit 2275cd1

Please sign in to comment.