Skip to content

Commit

Permalink
Implement send support for resumable uploads behind a flag.
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-signal authored and greyson-signal committed Apr 21, 2020
1 parent 7c44286 commit 2afb939
Show file tree
Hide file tree
Showing 24 changed files with 913 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread;

import org.greenrobot.eventbus.EventBus;
import org.thoughtcrime.securesms.R;
Expand All @@ -26,12 +27,15 @@
import org.thoughtcrime.securesms.mms.PartAuthority;
import org.thoughtcrime.securesms.service.GenericForegroundService;
import org.thoughtcrime.securesms.service.NotificationController;
import org.thoughtcrime.securesms.util.FeatureFlags;
import org.thoughtcrime.securesms.util.MediaMetadataRetrieverUtil;
import org.thoughtcrime.securesms.util.MediaUtil;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceMessageSender;
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment;
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer;
import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvalidException;
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -51,8 +55,8 @@ public final class AttachmentUploadJob extends BaseJob {

private static final long UPLOAD_REUSE_THRESHOLD = TimeUnit.DAYS.toMillis(3);

private static final String KEY_ROW_ID = "row_id";
private static final String KEY_UNIQUE_ID = "unique_id";
private static final String KEY_ROW_ID = "row_id";
private static final String KEY_UNIQUE_ID = "unique_id";

/**
* Foreground notification shows while uploading attachments above this.
Expand Down Expand Up @@ -89,6 +93,18 @@ private AttachmentUploadJob(@NonNull Job.Parameters parameters, @NonNull Attachm

@Override
public void onRun() throws Exception {
final ResumableUploadSpec resumableUploadSpec;
if (FeatureFlags.attachmentsV3()) {
Data inputData = requireInputData();
if (!inputData.hasString(ResumableUploadSpecJob.KEY_RESUME_SPEC)) {
throw new ResumeLocationInvalidException("V3 Attachment upload requires a ResumableUploadSpec");
}

resumableUploadSpec = ResumableUploadSpec.deserialize(inputData.getString(ResumableUploadSpecJob.KEY_RESUME_SPEC));
} else {
resumableUploadSpec = null;
}

SignalServiceMessageSender messageSender = ApplicationDependencies.getSignalServiceMessageSender();
AttachmentDatabase database = DatabaseFactory.getAttachmentDatabase(context);
DatabaseAttachment databaseAttachment = database.getAttachment(attachmentId);
Expand All @@ -108,7 +124,7 @@ public void onRun() throws Exception {
Log.i(TAG, "Uploading attachment for message " + databaseAttachment.getMmsId() + " with ID " + databaseAttachment.getAttachmentId());

try (NotificationController notification = getNotificationForAttachment(databaseAttachment)) {
SignalServiceAttachment localAttachment = getAttachmentFor(databaseAttachment, notification);
SignalServiceAttachment localAttachment = getAttachmentFor(databaseAttachment, notification, resumableUploadSpec);
SignalServiceAttachmentPointer remoteAttachment = messageSender.uploadAttachment(localAttachment.asStream());
Attachment attachment = PointerAttachment.forPointer(Optional.of(remoteAttachment), null, databaseAttachment.getFastPreflightId()).get();

Expand All @@ -133,10 +149,12 @@ public void onFailure() {

@Override
protected boolean onShouldRetry(@NonNull Exception exception) {
if (exception instanceof ResumeLocationInvalidException) return false;

return exception instanceof IOException;
}

private @NonNull SignalServiceAttachment getAttachmentFor(Attachment attachment, @Nullable NotificationController notification) throws InvalidAttachmentException {
private @NonNull SignalServiceAttachment getAttachmentFor(Attachment attachment, @Nullable NotificationController notification, @Nullable ResumableUploadSpec resumableUploadSpec) throws InvalidAttachmentException {
try {
if (attachment.getDataUri() == null || attachment.getSize() == 0) throw new IOException("Assertion failed, outgoing attachment has no data!");
InputStream is = PartAuthority.getAttachmentStream(context, attachment.getDataUri());
Expand All @@ -151,6 +169,7 @@ protected boolean onShouldRetry(@NonNull Exception exception) {
.withUploadTimestamp(System.currentTimeMillis())
.withCaption(attachment.getCaption())
.withCancelationSignal(this::isCanceled)
.withResumableUploadSpec(resumableUploadSpec)
.withListener((total, progress) -> {
EventBus.getDefault().postSticky(new PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress));
if (notification != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public static Map<String, Job.Factory> getJobFactories(@NonNull Application appl
put(RemoteConfigRefreshJob.KEY, new RemoteConfigRefreshJob.Factory());
put(RemoteDeleteSendJob.KEY, new RemoteDeleteSendJob.Factory());
put(RequestGroupInfoJob.KEY, new RequestGroupInfoJob.Factory());
put(ResumableUploadSpecJob.KEY, new ResumableUploadSpecJob.Factory());
put(StorageAccountRestoreJob.KEY, new StorageAccountRestoreJob.Factory());
put(RetrieveProfileAvatarJob.KEY, new RetrieveProfileAvatarJob.Factory());
put(RetrieveProfileJob.KEY, new RetrieveProfileJob.Factory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,11 @@ public static void enqueue(@NonNull Context context,
throw new MmsException("Inactive group!");
}

MmsDatabase database = DatabaseFactory.getMmsDatabase(context);
OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
JobManager.Chain compressAndUploadAttachment = createCompressingAndUploadAttachmentsChain(jobManager, message);
MmsDatabase database = DatabaseFactory.getMmsDatabase(context);
OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message);

compressAndUploadAttachment.then(new PushGroupSendJob(messageId, destination, filterAddress))
.enqueue();
jobManager.add(new PushGroupSendJob(messageId, destination, filterAddress), attachmentUploadIds);

} catch (NoSuchMessageException | MmsException e) {
Log.w(TAG, "Failed to enqueue message.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Set;

public class PushMediaSendJob extends PushSendJob {

Expand Down Expand Up @@ -72,12 +73,11 @@ public static void enqueue(@NonNull Context context, @NonNull JobManager jobMana
throw new AssertionError();
}

MmsDatabase database = DatabaseFactory.getMmsDatabase(context);
OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
JobManager.Chain compressAndUploadAttachment = createCompressingAndUploadAttachmentsChain(jobManager, message);
MmsDatabase database = DatabaseFactory.getMmsDatabase(context);
OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message);

compressAndUploadAttachment.then(new PushMediaSendJob(messageId, recipient))
.enqueue();
jobManager.add(new PushMediaSendJob(messageId, recipient), attachmentUploadIds);

} catch (NoSuchMessageException | MmsException e) {
Log.w(TAG, "Failed to enqueue message.", e);
Expand Down
17 changes: 12 additions & 5 deletions app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public abstract class PushSendJob extends SendJob {
Expand Down Expand Up @@ -146,7 +148,7 @@ protected SignalServiceAttachment getAttachmentFor(Attachment attachment) {
return null;
}

protected static JobManager.Chain createCompressingAndUploadAttachmentsChain(@NonNull JobManager jobManager, OutgoingMediaMessage message) {
protected static Set<String> enqueueCompressingAndUploadAttachmentsChains(@NonNull JobManager jobManager, OutgoingMediaMessage message) {
List<Attachment> attachments = new LinkedList<>();

attachments.addAll(message.getAttachments());
Expand All @@ -162,12 +164,17 @@ protected static JobManager.Chain createCompressingAndUploadAttachmentsChain(@No
.map(Contact.Avatar::getAttachment).withoutNulls()
.toList());

List<AttachmentCompressionJob> compressionJobs = Stream.of(attachments).map(a -> AttachmentCompressionJob.fromAttachment((DatabaseAttachment) a, false, -1)).toList();
return new HashSet<>(Stream.of(attachments).map(a -> {
AttachmentUploadJob attachmentUploadJob = new AttachmentUploadJob(((DatabaseAttachment) a).getAttachmentId());

List<AttachmentUploadJob> attachmentJobs = Stream.of(attachments).map(a -> new AttachmentUploadJob(((DatabaseAttachment) a).getAttachmentId())).toList();
jobManager.startChain(AttachmentCompressionJob.fromAttachment((DatabaseAttachment) a, false, -1))
.then(new ResumableUploadSpecJob())
.then(attachmentUploadJob)
.enqueue();

return jobManager.startChain(compressionJobs)
.then(attachmentJobs);
return attachmentUploadJob.getId();
})
.toList());
}

protected @NonNull List<SignalServiceAttachment> getAttachmentPointersFor(List<Attachment> attachments) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.thoughtcrime.securesms.jobs;

import androidx.annotation.NonNull;

import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.FeatureFlags;
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class ResumableUploadSpecJob extends BaseJob {

private static final String TAG = Log.tag(ResumableUploadSpecJob.class);

static final String KEY_RESUME_SPEC = "resume_spec";

public static final String KEY = "ResumableUploadSpecJob";

public ResumableUploadSpecJob() {
this(new Job.Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.build());
}

private ResumableUploadSpecJob(@NonNull Parameters parameters) {
super(parameters);
}

@Override
protected void onRun() throws Exception {
if (!FeatureFlags.attachmentsV3()) {
Log.i(TAG, "Attachments V3 is not enabled so there is nothing to do!");
}

ResumableUploadSpec resumableUploadSpec = ApplicationDependencies.getSignalServiceMessageSender()
.getResumableUploadSpec();

setOutputData(new Data.Builder()
.putString(KEY_RESUME_SPEC, resumableUploadSpec.serialize())
.build());
}

@Override
protected boolean onShouldRetry(@NonNull Exception e) {
return e instanceof IOException;
}

@Override
public @NonNull Data serialize() {
return Data.EMPTY;
}

@Override
public @NonNull String getFactoryKey() {
return KEY;
}

@Override
public void onFailure() {

}

public static class Factory implements Job.Factory<ResumableUploadSpecJob> {

@Override
public @NonNull ResumableUploadSpecJob create(@NonNull Parameters parameters, @NonNull Data data) {
return new ResumableUploadSpecJob(parameters);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.thoughtcrime.securesms.jobs.PushTextSendJob;
import org.thoughtcrime.securesms.jobs.ReactionSendJob;
import org.thoughtcrime.securesms.jobs.RemoteDeleteSendJob;
import org.thoughtcrime.securesms.jobs.ResumableUploadSpecJob;
import org.thoughtcrime.securesms.jobs.SmsSendJob;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.mms.MmsException;
Expand Down Expand Up @@ -275,15 +276,17 @@ public static void sendMediaBroadcast(@NonNull Context context, @NonNull List<Ou
AttachmentDatabase attachmentDatabase = DatabaseFactory.getAttachmentDatabase(context);
DatabaseAttachment databaseAttachment = attachmentDatabase.insertAttachmentForPreUpload(attachment);

Job compressionJob = AttachmentCompressionJob.fromAttachment(databaseAttachment, false, -1);
Job uploadJob = new AttachmentUploadJob(databaseAttachment.getAttachmentId());
Job compressionJob = AttachmentCompressionJob.fromAttachment(databaseAttachment, false, -1);
Job resumableUploadSpecJob = new ResumableUploadSpecJob();
Job uploadJob = new AttachmentUploadJob(databaseAttachment.getAttachmentId());

ApplicationDependencies.getJobManager()
.startChain(compressionJob)
.then(resumableUploadSpecJob)
.then(uploadJob)
.enqueue();

return new PreUploadResult(databaseAttachment.getAttachmentId(), Arrays.asList(compressionJob.getId(), uploadJob.getId()));
return new PreUploadResult(databaseAttachment.getAttachmentId(), Arrays.asList(compressionJob.getId(), resumableUploadSpecJob.getId(), uploadJob.getId()));
} catch (MmsException e) {
Log.w(TAG, "preUploadPushAttachment() - Failed to upload!", e);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException;
import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutputStreamFactory;
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec;
import org.whispersystems.signalservice.internal.util.StaticCredentialsProvider;
import org.whispersystems.signalservice.internal.util.Util;
import org.whispersystems.util.Base64;
Expand Down Expand Up @@ -350,16 +351,18 @@ public void update(SignalServiceMessagePipe pipe, SignalServiceMessagePipe unide
}

public SignalServiceAttachmentPointer uploadAttachment(SignalServiceAttachmentStream attachment) throws IOException {
byte[] attachmentKey = Util.getSecretBytes(64);
byte[] attachmentKey = attachment.getResumableUploadSpec().transform(ResumableUploadSpec::getSecretKey).or(() -> Util.getSecretBytes(64));
byte[] attachmentIV = attachment.getResumableUploadSpec().transform(ResumableUploadSpec::getIV).or(() -> Util.getSecretBytes(16));
long paddedLength = PaddingInputStream.getPaddedSize(attachment.getLength());
InputStream dataStream = new PaddingInputStream(attachment.getInputStream(), attachment.getLength());
long ciphertextLength = AttachmentCipherOutputStream.getCiphertextLength(paddedLength);
PushAttachmentData attachmentData = new PushAttachmentData(attachment.getContentType(),
dataStream,
ciphertextLength,
new AttachmentCipherOutputStreamFactory(attachmentKey),
new AttachmentCipherOutputStreamFactory(attachmentKey, attachmentIV),
attachment.getListener(),
attachment.getCancelationSignal());
attachment.getCancelationSignal(),
attachment.getResumableUploadSpec().orNull());

if (attachmentsV3.get()) {
return uploadAttachmentV3(attachment, attachmentKey, attachmentData);
Expand Down Expand Up @@ -403,7 +406,7 @@ private SignalServiceAttachmentPointer uploadAttachmentV2(SignalServiceAttachmen
attachment.getUploadTimestamp());
}

private SignalServiceAttachmentPointer uploadAttachmentV3(SignalServiceAttachmentStream attachment, byte[] attachmentKey, PushAttachmentData attachmentData) throws IOException {
public ResumableUploadSpec getResumableUploadSpec() throws IOException {
AttachmentV3UploadAttributes v3UploadAttributes = null;
Optional<SignalServiceMessagePipe> localPipe = pipe.get();

Expand All @@ -421,9 +424,13 @@ private SignalServiceAttachmentPointer uploadAttachmentV3(SignalServiceAttachmen
v3UploadAttributes = socket.getAttachmentV3UploadAttributes();
}

byte[] digest = socket.uploadAttachment(attachmentData, v3UploadAttributes);
return new SignalServiceAttachmentPointer(v3UploadAttributes.getCdn(),
new SignalServiceAttachmentRemoteId(v3UploadAttributes.getKey()),
return socket.getResumableUploadSpec(v3UploadAttributes);
}

private SignalServiceAttachmentPointer uploadAttachmentV3(SignalServiceAttachmentStream attachment, byte[] attachmentKey, PushAttachmentData attachmentData) throws IOException {
byte[] digest = socket.uploadAttachment(attachmentData);
return new SignalServiceAttachmentPointer(attachmentData.getResumableUploadSpec().getCdnNumber(),
new SignalServiceAttachmentRemoteId(attachmentData.getResumableUploadSpec().getCdnKey()),
attachment.getContentType(),
attachmentKey,
Optional.of(Util.toIntExact(attachment.getLength())),
Expand Down

0 comments on commit 2afb939

Please sign in to comment.