forked from couchbaselabs/TouchDB-iOS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TDPuller.m
496 lines (419 loc) · 19.3 KB
/
TDPuller.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
//
// TDPuller.m
// TouchDB
//
// Created by Jens Alfke on 12/2/11.
// Copyright (c) 2011 Couchbase, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
#import "TDPuller.h"
#import "TDDatabase+Insertion.h"
#import "TDDatabase+Replication.h"
#import <TouchDB/TDRevision.h>
#import "TDChangeTracker.h"
#import "TDBatcher.h"
#import "TDMultipartDownloader.h"
#import "TDSequenceMap.h"
#import "TDInternal.h"
#import "TDMisc.h"
#import "ExceptionUtils.h"
// Maximum number of revisions to fetch simultaneously
#define kMaxOpenHTTPConnections 8
// Maximum number of revs to fetch in a single bulk request
#define kMaxRevsToGetInBulk 50u
@interface TDPuller () <TDChangeTrackerClient>
- (void) pullRemoteRevisions;
- (void) pullRemoteRevision: (TDRevision*)rev;
- (void) insertDownloads: (NSArray*)downloads;
@end
static NSString* joinQuotedEscaped(NSArray* strings);
@implementation TDPuller
- (void)dealloc {
[_changeTracker stop];
[_changeTracker release];
[_revsToPull release];
[_deletedRevsToPull release];
[_bulkRevsToPull release];
[_downloadsToInsert release];
[_pendingSequences release];
[_endingSequence release];
[super dealloc];
}
- (void) beginReplicating {
Assert(!_changeTracker);
if (!_downloadsToInsert) {
// Note: This is a ref cycle, because the block has a (retained) reference to 'self',
// and _downloadsToInsert retains the block, and of course I retain _downloadsToInsert.
_downloadsToInsert = [[TDBatcher alloc] initWithCapacity: 200 delay: 1.0
processor: ^(NSArray *downloads) {
[self insertDownloads: downloads];
}];
}
// Get the current sequence number so we know the pull has "caught up":
[self sendAsyncRequest: @"GET" path: @"/" body: nil
onCompletion:^(id result, NSError *error) {
_endingSequence = [[[result objectForKey: @"update_seq"] description] copy];
LogTo(Sync, @"Ending sequence = %@", _endingSequence);
[self checkIfCaughtUp: _lastSequence];
}];
[_pendingSequences release];
_pendingSequences = [[TDSequenceMap alloc] init];
LogTo(SyncVerbose, @"%@ starting ChangeTracker with since=%@", self, _lastSequence);
// Always use continuous mode because it lets us parse and process changes one sequence at a
// time, instead of having to wait and parse the entire list as one JSON object.
_changeTracker = [[TDChangeTracker alloc] initWithDatabaseURL: _remote
mode: kContinuous
conflicts: YES
lastSequence: _lastSequence
client: self];
_changeTracker.filterName = _filterName;
_changeTracker.filterParameters = _filterParameters;
[_changeTracker start];
if (!_continuous)
[self asyncTaskStarted];
}
- (void) stop {
if (!_running)
return;
_changeTracker.client = nil; // stop it from calling my -changeTrackerStopped
[_changeTracker stop];
setObj(&_changeTracker, nil);
setObj(&_revsToPull, nil);
setObj(&_deletedRevsToPull, nil);
setObj(&_bulkRevsToPull, nil);
[super stop];
[_downloadsToInsert flush];
}
- (void) stopped {
setObj(&_downloadsToInsert, nil);
[super stopped];
}
- (BOOL) goOffline {
if (![super goOffline])
return NO;
[_changeTracker stop];
return YES;
}
// TDChangeTrackerClient protocol
- (NSString*) authorizationHeader {
if (!_authorizer)
return nil;
NSURL* url = _changeTracker.changesFeedURL;
NSMutableURLRequest* request = [NSMutableURLRequest requestWithURL: url];
return [_authorizer authorizeURLRequest: request];
}
// TDChangeTrackerClient protocol
- (NSURLCredential*) authCredential {
if (_authorizer)
return nil;
// It's unclear what value to use for 'realm' since we don't already have a challenge from
// the server. In practice, nil doesn't work (won't find existing Keychain passwords) while
// using the hostname does.
NSURL* url = _changeTracker.changesFeedURL;
NSURLProtectionSpace* space = [[[NSURLProtectionSpace alloc]
initWithHost: url.host
port: url.port.intValue
protocol: NSURLProtectionSpaceHTTP
realm: url.host
authenticationMethod: NSURLAuthenticationMethodDefault]
autorelease];
NSURLCredential* cred = [[NSURLCredentialStorage sharedCredentialStorage]
defaultCredentialForProtectionSpace: space];
if (cred)
LogTo(Sync, @"%@ Using credential %@", self, cred);
return cred;
}
// Got a _changes feed entry from the TDChangeTracker.
- (void) changeTrackerReceivedChange: (NSDictionary*)change {
NSString* lastSequenceID = [[change objectForKey: @"seq"] description];
NSString* docID = [change objectForKey: @"id"];
if (docID) {
if ([TDDatabase isValidDocumentID: docID]) {
BOOL deleted = [[change objectForKey: @"deleted"] isEqual: (id)kCFBooleanTrue];
NSArray* changes = $castIf(NSArray, [change objectForKey: @"changes"]);
for (NSDictionary* changeDict in changes) {
@autoreleasepool {
// Push each revision info to the inbox
NSString* revID = $castIf(NSString, [changeDict objectForKey: @"rev"]);
if (!revID)
continue;
TDPulledRevision* rev = [[TDPulledRevision alloc] initWithDocID: docID revID: revID
deleted: deleted];
// Remember its remote sequence ID (opaque), and make up a numeric sequence based
// on the order in which it appeared in the _changes feed:
rev.remoteSequenceID = lastSequenceID;
if (changes.count > 1)
rev.conflicted = true;
[self addToInbox: rev];
[rev release];
}
}
self.changesTotal += changes.count;
} else {
Warn(@"%@: Received invalid doc ID from _changes: %@", self, change);
}
}
[self checkIfCaughtUp: lastSequenceID];
}
- (void) checkIfCaughtUp: (NSString*)sequence {
if (!$equal(sequence, _endingSequence))
return;
LogTo(Sync, @"** Caught up, at sequence %@", _endingSequence);
if (!_continuous)
[_changeTracker stop];
}
- (void) changeTrackerStopped:(TDChangeTracker *)tracker {
NSError* error = tracker.error;
LogTo(Sync, @"%@: ChangeTracker stopped; error=%@", self, error.description);
[_changeTracker release];
_changeTracker = nil;
if (TDIsOfflineError(error))
[self goOffline];
else if (!_error && error)
self.error = error;
[_batcher flush];
if (!_continuous)
[self asyncTasksFinished: 1];
}
// Process a bunch of remote revisions from the _changes feed at once
- (void) processInbox: (TDRevisionList*)inbox {
// Ask the local database which of the revs are not known to it:
LogTo(SyncVerbose, @"%@: Looking up %@", self, inbox);
NSString* lastInboxSequence = [inbox.allRevisions.lastObject remoteSequenceID];
NSUInteger total = _changesTotal - inbox.count;
if (![_db findMissingRevisions: inbox]) {
Warn(@"%@ failed to look up local revs", self);
inbox = nil;
}
if (_changesTotal != total + inbox.count)
self.changesTotal = total + inbox.count;
if (inbox.count == 0) {
// Nothing to do; just count all the revisions as processed.
// Instead of adding and immediately removing the revs to _pendingSequences,
// just do the latest one (equivalent but faster):
LogTo(SyncVerbose, @"%@: no new remote revisions to fetch", self);
SequenceNumber seq = [_pendingSequences addValue: lastInboxSequence];
[_pendingSequences removeSequence: seq];
self.lastSequence = _pendingSequences.checkpointedValue;
return;
}
LogTo(SyncVerbose, @"%@ queuing remote revisions %@", self, inbox.allRevisions);
// Dump the revs into the queues of revs to pull from the remote db:
unsigned numBulked = 0;
for (TDPulledRevision* rev in inbox.allRevisions) {
if (rev.generation == 1 && !rev.deleted && !rev.conflicted) {
// Optimistically pull 1st-gen revs in bulk:
if (!_bulkRevsToPull)
_bulkRevsToPull = [[NSMutableArray alloc] initWithCapacity: 100];
[_bulkRevsToPull addObject: rev];
++numBulked;
} else {
[self queueRemoteRevision: rev];
}
rev.sequence = [_pendingSequences addValue: rev.remoteSequenceID];
}
LogTo(Sync, @"%@ queued %u remote revisions from seq=%@ (%u in bulk, %u individually)",
self, inbox.count, [[[inbox allRevisions] objectAtIndex: 0] remoteSequenceID],
numBulked, inbox.count-numBulked);
[self pullRemoteRevisions];
}
// Add a revision to the appropriate queue of revs to individually GET
- (void) queueRemoteRevision: (TDRevision*)rev {
NSMutableArray** pQueue = (rev.deleted) ? &_deletedRevsToPull : &_revsToPull;
if (!*pQueue)
*pQueue = [[NSMutableArray alloc] initWithCapacity: 100];
[*pQueue addObject: rev];
}
// Start up some HTTP GETs, within our limit on the maximum simultaneous number
- (void) pullRemoteRevisions {
while (_httpConnectionCount < kMaxOpenHTTPConnections) {
NSUInteger nBulk = MIN(_bulkRevsToPull.count, kMaxRevsToGetInBulk);
if (nBulk == 1) {
// Rather than pulling a single revision in 'bulk', just pull it normally:
[self queueRemoteRevision: [_bulkRevsToPull objectAtIndex: 0]];
[_bulkRevsToPull removeObjectAtIndex: 0];
nBulk = 0;
}
if (nBulk > 0) {
// Prefer to pull bulk revisions:
NSRange r = NSMakeRange(0, nBulk);
[self pullBulkRevisions: [_bulkRevsToPull subarrayWithRange: r]];
[_bulkRevsToPull removeObjectsInRange: r];
} else {
// Prefer to pull an existing revision over a deleted one:
NSMutableArray* queue = _revsToPull;
if (queue.count == 0) {
queue = _deletedRevsToPull;
if (queue.count == 0)
break; // both queues are empty
}
[self pullRemoteRevision: [queue objectAtIndex: 0]];
[queue removeObjectAtIndex: 0];
}
}
}
// Fetches the contents of a revision from the remote db, including its parent revision ID.
// The contents are stored into rev.properties.
- (void) pullRemoteRevision: (TDRevision*)rev
{
[self asyncTaskStarted];
++_httpConnectionCount;
// Construct a query. We want the revision history, and the bodies of attachments that have
// been added since the latest revisions we have locally.
// See: http://wiki.apache.org/couchdb/HTTP_Document_API#GET
// See: http://wiki.apache.org/couchdb/HTTP_Document_API#Getting_Attachments_With_a_Document
NSString* path = $sprintf(@"/%@?rev=%@&revs=true&attachments=true",
TDEscapeID(rev.docID), TDEscapeID(rev.revID));
NSArray* knownRevs = [_db getPossibleAncestorRevisionIDs: rev];
if (knownRevs.count > 0)
path = [path stringByAppendingFormat: @"&atts_since=%@", joinQuotedEscaped(knownRevs)];
LogTo(SyncVerbose, @"%@: GET .%@", self, path);
NSString* urlStr = [_remote.absoluteString stringByAppendingString: path];
[[[TDMultipartDownloader alloc] initWithURL: [NSURL URLWithString: urlStr]
database: _db
authorizer: _authorizer
onCompletion:
^(TDMultipartDownloader* download, NSError *error) {
// OK, now we've got the response revision:
if (error) {
self.error = error;
self.changesProcessed++;
} else {
rev.properties = download.document;
// Add to batcher ... eventually it will be fed to -insertRevisions:.
[_downloadsToInsert queueObject: rev];
[self asyncTaskStarted];
}
// Note that we've finished this task:
[self asyncTasksFinished: 1];
--_httpConnectionCount;
// Start another task if there are still revisions waiting to be pulled:
[self pullRemoteRevisions];
}
] autorelease];
}
// Get a bunch of revisions in one bulk request.
- (void) pullBulkRevisions: (NSArray*)bulkRevs {
// http://wiki.apache.org/couchdb/HTTP_Bulk_Document_API
NSUInteger nRevs = bulkRevs.count;
if (nRevs == 0)
return;
LogTo(Sync, @"%@ bulk-fetching %u remote revisions...", self, nRevs);
LogTo(SyncVerbose, @"%@ bulk-fetching remote revisions: %@", self, bulkRevs);
[self asyncTaskStarted];
++_httpConnectionCount;
NSMutableArray* remainingRevs = [[bulkRevs mutableCopy] autorelease];
NSArray* keys = [bulkRevs my_map: ^(TDRevision* rev) { return rev.docID; }];
[self sendAsyncRequest: @"POST"
path: @"/_all_docs?include_docs=true"
body: $dict({@"keys", keys})
onCompletion:^(id result, NSError *error) {
if (error) {
self.error = error;
self.changesProcessed += bulkRevs.count;
} else {
// Process the resulting rows' documents.
// We only add a document if it doesn't have attachments, and if its
// revID matches the one we asked for.
NSArray* rows = $castIf(NSArray, [result objectForKey: @"rows"]);
LogTo(Sync, @"%@ checking %u bulk-fetched remote revisions", self, rows.count);
for (NSDictionary* row in rows) {
NSDictionary* doc = $castIf(NSDictionary, [row objectForKey: @"doc"]);
if (doc && ![doc objectForKey: @"_attachments"]) {
TDRevision* rev = [TDRevision revisionWithProperties: doc];
NSUInteger pos = [remainingRevs indexOfObject: rev];
if (pos != NSNotFound) {
rev.sequence = [[remainingRevs objectAtIndex: pos] sequence];
[remainingRevs removeObjectAtIndex: pos];
[_downloadsToInsert queueObject: rev];
[self asyncTaskStarted];
}
}
}
}
// Any leftover revisions that didn't get matched will be fetched individually:
if (remainingRevs.count) {
LogTo(Sync, @"%@ bulk-fetch didn't work for %u of %u revs; getting individually",
self, remainingRevs.count, nRevs);
for (TDRevision* rev in remainingRevs)
[self queueRemoteRevision: rev];
[self pullRemoteRevisions];
}
// Note that we've finished this task:
[self asyncTasksFinished: 1];
--_httpConnectionCount;
// Start another task if there are still revisions waiting to be pulled:
[self pullRemoteRevisions];
}
];
}
// This will be called when _downloadsToInsert fills up:
- (void) insertDownloads:(NSArray *)downloads {
LogTo(SyncVerbose, @"%@ inserting %u revisions...", self, downloads.count);
CFAbsoluteTime time = CFAbsoluteTimeGetCurrent();
[_db beginTransaction];
BOOL success = NO;
@try{
downloads = [downloads sortedArrayUsingSelector: @selector(compareSequences:)];
for (TDRevision* rev in downloads) {
@autoreleasepool {
SequenceNumber fakeSequence = rev.sequence;
NSArray* history = [TDDatabase parseCouchDBRevisionHistory: rev.properties];
if (!history && rev.generation > 1) {
Warn(@"%@: Missing revision history in response for %@", self, rev);
self.error = TDStatusToNSError(kTDStatusUpstreamError, nil);
continue;
}
LogTo(SyncVerbose, @"%@ inserting %@ %@",
self, rev.docID, [history my_compactDescription]);
// Insert the revision:
int status = [_db forceInsert: rev revisionHistory: history source: _remote];
if (TDStatusIsError(status)) {
if (status == kTDStatusForbidden)
LogTo(Sync, @"%@: Remote rev failed validation: %@", self, rev);
else {
Warn(@"%@ failed to write %@: status=%d", self, rev, status);
self.error = TDStatusToNSError(status, nil);
continue;
}
}
// Mark this revision's fake sequence as processed:
[_pendingSequences removeSequence: fakeSequence];
}
}
LogTo(SyncVerbose, @"%@ finished inserting %u revisions",
self, (unsigned)downloads.count);
// Checkpoint:
self.lastSequence = _pendingSequences.checkpointedValue;
success = YES;
} @catch (NSException *x) {
MYReportException(x, @"%@: Exception inserting revisions", self);
} @finally {
[_db endTransaction: success];
}
time = CFAbsoluteTimeGetCurrent() - time;
LogTo(Sync, @"%@ inserted %u revs in %.3f sec (%.1f/sec)",
self, downloads.count, time, downloads.count/time);
[self asyncTasksFinished: downloads.count];
self.changesProcessed += downloads.count;
}
@end
@implementation TDPulledRevision
@synthesize remoteSequenceID=_remoteSequenceID, conflicted=_conflicted;
- (void) dealloc {
[_remoteSequenceID release];
[super dealloc];
}
@end
static NSString* joinQuotedEscaped(NSArray* strings) {
if (strings.count == 0)
return @"[]";
NSString* json = [TDJSON stringWithJSONObject: strings options: 0 error: NULL];
return TDEscapeURLParam(json);
}