Skip to content
This repository has been archived by the owner on Nov 3, 2021. It is now read-only.

Bug 871897 - [email] Change mail sending process to use streaming to reduce peak memory usage #254

Closed
wants to merge 13 commits into from
24 changes: 20 additions & 4 deletions Makefile
Expand Up @@ -28,6 +28,8 @@ help:
@echo " Run one test file (all variants), do not post results to ArbPL"
@echo "make post-one-test SOLO_FILE=test_name.js"
@echo " Run one test file (all variants), post results to ArbPL"
@echo "make post-one-test SOLO_FILE=test_name.js TEST_VARIANT=imap:fake"
@echo " Run one test file (imap:fake variant), post results to ArbPL"
@echo ""
@echo "To enable verbose log output to the console: TEST_LOG_ENABLE=true"

Expand Down Expand Up @@ -70,6 +72,12 @@ install-into-gaia: clean gaia-symlink $(DEP_NODE_PKGS) $(OUR_JS_DEPS)
build: $(DEP_NODE_PKGS) $(OUR_JS_DEPS)


.PHONY: download-b2g
download-b2g:
npm install mozilla-download
node_modules/.bin/mozilla-download ./b2g --product b2g
ln -s ./b2g b2g-bindir-symlink

gaia-symlink:
echo "You need to create a symlink 'gaia-symlink' pointing at the gaia dir"

Expand Down Expand Up @@ -125,16 +133,24 @@ endef
######################
# All tests

tests: build
.PHONY: test-deps
test-deps: node_modules

# If our package.json has been updated, run npm install
node_modules: package.json
npm install
touch node_modules

tests: build test-deps
$(call run-tests)

one-test: build
one-test: build test-deps
$(call run-one-test)

post-one-test: one-test
post-one-test: one-test test-deps
cd $(ARBPLD); ./logalchew $(CURDIR)/test-logs/$(basename $(SOLO_FILE)).logs

post-tests: tests
post-tests: tests test-deps
cd $(ARBPLD); ./logalchew $(CURDIR)/test-logs/all.logs


Expand Down
20 changes: 18 additions & 2 deletions data/lib/imap.js
Expand Up @@ -1168,7 +1168,15 @@ ImapConnection.prototype.append = function(data, options, cb) {
cmd += ' "' + formatImapDateTime(options.date) + '"';
}
cmd += ' {';
cmd += (Buffer.isBuffer(data) ? data.length : Buffer.byteLength(data));
if (data instanceof Blob) {
cmd += data.size;
}
else if (Buffer.isBuffer(data)) {
cmd += data.length;
}
else {
cmd += Buffer.byteLength(data);
}
cmd += '}';
var self = this, step = 1;
this._send('APPEND', cmd, function(err) {
Expand Down Expand Up @@ -1203,7 +1211,15 @@ ImapConnection.prototype.multiappend = function(messages, cb) {
cmd += ' "' + formatImapDateTime(options.date) + '"';
}
cmd += ' {';
cmd += (Buffer.isBuffer(data) ? data.length : Buffer.byteLength(data));
if (data instanceof Blob) {
cmd += data.size;
}
else if (Buffer.isBuffer(data)) {
cmd += data.length;
}
else {
cmd += Buffer.byteLength(data);
}
cmd += '}';
}

Expand Down
13 changes: 4 additions & 9 deletions data/lib/mailapi/activesync/account.js
Expand Up @@ -801,20 +801,20 @@ ActiveSyncAccount.prototype = {

// we want the bcc included because that's how we tell the server the bcc
// results.
composer.withMessageBuffer({ includeBcc: true }, function(mimeBuffer) {
composer.withMessageBlob({ includeBcc: true }, function(mimeBlob) {
// ActiveSync 14.0 has a completely different API for sending email. Make
// sure we format things the right way.
if (this.conn.currentVersion.gte('14.0')) {
var cm = ASCP.ComposeMail.Tags;
var w = new $wbxml.Writer('1.3', 1, 'UTF-8');
var w = new $wbxml.Writer('1.3', 1, 'UTF-8', null, 'blob');
w.stag(cm.SendMail)
// The ClientId is defined to be for duplicate messages suppression
// and does not need to have any uniqueness constraints apart from
// not being similar to (recently sent) messages by this client.
.tag(cm.ClientId, Date.now().toString()+'@mozgaia')
.tag(cm.SaveInSentItems)
.stag(cm.Mime)
.opaque(mimeBuffer)
.opaque(mimeBlob)
.etag()
.etag();

Expand All @@ -838,12 +838,7 @@ ActiveSyncAccount.prototype = {
});
}
else { // ActiveSync 12.x and lower
var encoder = new TextEncoder('UTF-8');

// On B2G 18, XHRs expect ArrayBuffers and will barf on Uint8Arrays. In
// the future, we can remove the last |.buffer| bit below.
this.conn.postData('SendMail', 'message/rfc822',
encoder.encode(mimeBuffer).buffer,
this.conn.postData('SendMail', 'message/rfc822', mimeBlob,
function(aError, aResponse) {
if (aError) {
account._reportErrorIfNecessary(aError);
Expand Down
48 changes: 31 additions & 17 deletions data/lib/mailapi/activesync/folder.js
Expand Up @@ -4,6 +4,7 @@ define(
'../date',
'../syncbase',
'../util',
'mailapi/db/mail_rep',
'activesync/codepages/AirSync',
'activesync/codepages/AirSyncBase',
'activesync/codepages/ItemEstimate',
Expand All @@ -18,6 +19,7 @@ define(
$date,
$sync,
$util,
mailRep,
$AirSync,
$AirSyncBase,
$ItemEstimate,
Expand Down Expand Up @@ -508,7 +510,8 @@ ActiveSyncFolderConn.prototype = {
break;
case as.ApplicationData:
try {
msg = folderConn._parseMessage(child, node.tag === as.Add);
msg = folderConn._parseMessage(child, node.tag === as.Add,
storage);
}
catch (ex) {
// If we get an error, just log it and skip this message.
Expand All @@ -519,13 +522,7 @@ ActiveSyncFolderConn.prototype = {
}
}

if (node.tag === as.Add) {
msg.header.id = id = storage._issueNewHeaderId();
msg.header.suid = folderConn._storage.folderId + '/' + id;
msg.header.guid = '';
}
msg.header.srvid = guid;
// XXX need to get the message's message-id header value!

var collection = node.tag === as.Add ? added : changed;
collection.push(msg);
Expand Down Expand Up @@ -594,19 +591,24 @@ ActiveSyncFolderConn.prototype = {
* changed one
* @return {object} An object containing the header and body for the message
*/
_parseMessage: function asfc__parseMessage(node, isAdded) {
_parseMessage: function asfc__parseMessage(node, isAdded, storage) {
var em = $Email.Tags;
var asb = $AirSyncBase.Tags;
var asbEnum = $AirSyncBase.Enums;

var header, body, flagHeader;

if (isAdded) {
var newId = storage._issueNewHeaderId();
// note: these will be passed through mailRep.make* later
header = {
id: null,
id: newId,
// This will be fixed up afterwards for control flow paranoia.
srvid: null,
suid: null,
guid: null,
suid: storage.folderId + '/' + newId,
// ActiveSync does not/cannot tell us the Message-ID header unless we
// fetch the entire MIME body
guid: '',
author: null,
to: null,
cc: null,
Expand Down Expand Up @@ -799,23 +801,35 @@ ActiveSyncFolderConn.prototype = {
}

if (isInline)
body.relatedParts.push(attachment);
body.relatedParts.push(mailRep.makeAttachmentPart(attachment));
else
body.attachments.push(attachment);
body.attachments.push(mailRep.makeAttachmentPart(attachment));
}
header.hasAttachments = body.attachments.length > 0;
break;
}
}

body.bodyReps = [{
body.bodyReps = [mailRep.makeBodyPart({
type: bodyType,
sizeEstimate: bodySize,
amountDownloaded: 0,
isDownloaded: false
}];
})];

return { header: header, body: body };
// If this is an add, then these are new structures so we need to normalize
// them.
if (isAdded) {
return {
header: mailRep.makeHeaderInfo(header),
body: mailRep.makeBodyInfo(body)
};
}
// It's not an add, so this is a delta, and header/body have mergeInto
// methods and we should not attempt to normalize them.
else {
return { header: header, body: body };
}
},

/**
Expand Down Expand Up @@ -1072,7 +1086,7 @@ ActiveSyncFolderConn.prototype = {
};

this._storage.updateMessageHeader(header.date, header.id, false, header);
this._storage.updateMessageBody(header, bodyInfo, event);
this._storage.updateMessageBody(header, bodyInfo, {}, event);
this._storage.runAfterDeferredCalls(callback.bind(null, null, bodyInfo));
},

Expand Down
33 changes: 7 additions & 26 deletions data/lib/mailapi/activesync/jobs.js
@@ -1,7 +1,9 @@
define(
[
'rdcommon/log',
'mix',
'../jobmixins',
'mailapi/drafts/jobs',
'activesync/codepages/AirSync',
'activesync/codepages/Email',
'activesync/codepages/Move',
Expand All @@ -11,7 +13,9 @@ define(
],
function(
$log,
mix,
$jobmixins,
draftsJobs,
$AirSync,
$Email,
$Move,
Expand Down Expand Up @@ -280,6 +284,7 @@ ActiveSyncJobDriver.prototype = {
undo_move: function(op, jobDoneCallback) {
},


//////////////////////////////////////////////////////////////////////////////
// delete

Expand Down Expand Up @@ -423,32 +428,6 @@ ActiveSyncJobDriver.prototype = {

undo_download: $jobmixins.undo_download,

//////////////////////////////////////////////////////////////////////////////
// saveDraft

local_do_saveDraft: $jobmixins.local_do_saveDraft,

do_saveDraft: $jobmixins.do_saveDraft,

check_saveDraft: $jobmixins.check_saveDraft,

local_undo_saveDraft: $jobmixins.local_undo_saveDraft,

undo_saveDraft: $jobmixins.undo_saveDraft,

//////////////////////////////////////////////////////////////////////////////
// deleteDraft

local_do_deleteDraft: $jobmixins.local_do_deleteDraft,

do_deleteDraft: $jobmixins.do_deleteDraft,

check_deleteDraft: $jobmixins.check_deleteDraft,

local_undo_deleteDraft: $jobmixins.local_undo_deleteDraft,

undo_deleteDraft: $jobmixins.undo_deleteDraft,

//////////////////////////////////////////////////////////////////////////////
// purgeExcessMessages is a NOP for activesync

Expand All @@ -475,6 +454,8 @@ ActiveSyncJobDriver.prototype = {
//////////////////////////////////////////////////////////////////////////////
};

mix(ActiveSyncJobDriver.prototype, draftsJobs.draftsMixins);

var LOGFAB = exports.LOGFAB = $log.register($module, {
ActiveSyncJobDriver: {
type: $log.DAEMON,
Expand Down
52 changes: 52 additions & 0 deletions data/lib/mailapi/async_blob_fetcher.js
@@ -0,0 +1,52 @@
define(
[
'exports'
],
function(
exports
) {

/**
* Asynchronously fetch the contents of a Blob, returning a Uint8Array.
* Exists because there is no FileReader in Gecko workers and this totally
* works. In discussion, it sounds like :sicking wants to deprecate the
* FileReader API anyways.
*
* Our consumer in this case is our specialized base64 encode that wants a
* Uint8Array since that is more compactly represented than a binary string
* would be.
*
* @param blob {Blob}
* @param callback {Function(err, Uint8Array)}
*/
function asyncFetchBlobAsUint8Array(blob, callback) {
var blobUrl = URL.createObjectURL(blob);
var xhr = new XMLHttpRequest();
xhr.open('GET', blobUrl, true);
xhr.responseType = 'arraybuffer';
xhr.onload = function() {
// blobs currently result in a status of 0 since there is no server.
if (xhr.status !== 0 && (xhr.status < 200 || xhr.status >= 300)) {
callback(xhr.status);
return;
}
callback(null, new Uint8Array(xhr.response));
};
xhr.onerror = function() {
callback('error');
};
try {
xhr.send();
}
catch(ex) {
console.error('XHR send() failure on blob');
callback('error');
}
URL.revokeObjectURL(blobUrl);
}

return {
asyncFetchBlobAsUint8Array: asyncFetchBlobAsUint8Array
};

}); // end define