Skip to content

Commit

Permalink
Add support for non-blocking media sends.
Browse files Browse the repository at this point in the history
  • Loading branch information
greyson-signal committed Jun 7, 2020
1 parent 13027dc commit 1234899
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 19 deletions.
Expand Up @@ -5,6 +5,7 @@
import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread;

import com.annimon.stream.Collectors;
import com.annimon.stream.Stream;

import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec;
Expand All @@ -22,6 +23,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Manages the queue of jobs. This is the only class that should write to {@link JobStorage} to
Expand Down Expand Up @@ -97,7 +99,7 @@ synchronized void submitNewJobChain(@NonNull List<List<Job>> chain) {
}

@WorkerThread
synchronized void submitJobWithExistingDependencies(@NonNull Job job, @NonNull Collection<String> dependsOn) {
synchronized void submitJobWithExistingDependencies(@NonNull Job job, @NonNull Collection<String> dependsOn, @Nullable String dependsOnQueue) {
List<List<Job>> chain = Collections.singletonList(Collections.singletonList(job));

if (chainExceedsMaximumInstances(chain)) {
Expand All @@ -106,11 +108,17 @@ synchronized void submitJobWithExistingDependencies(@NonNull Job job, @NonNull C
return;
}

dependsOn = Stream.of(dependsOn)
.filter(id -> jobStorage.getJobSpec(id) != null)
.toList();
Set<String> dependsOnSet = Stream.of(dependsOn)
.filter(id -> jobStorage.getJobSpec(id) != null)
.collect(Collectors.toSet());

FullSpec fullSpec = buildFullSpec(job, dependsOn);
if (dependsOnQueue != null) {
dependsOnSet.addAll(Stream.of(jobStorage.getJobsInQueue(dependsOnQueue))
.map(JobSpec::getId)
.toList());
}

FullSpec fullSpec = buildFullSpec(job, dependsOnSet);
jobStorage.insertJobs(Collections.singletonList(fullSpec));

scheduleJobs(Collections.singletonList(job));
Expand Down
Expand Up @@ -4,6 +4,7 @@
import android.content.Intent;
import android.os.Build;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread;

import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory;
Expand Down Expand Up @@ -138,7 +139,33 @@ public void add(@NonNull Job job, @NonNull Collection<String> dependsOn) {
jobTracker.onStateChange(job, JobTracker.JobState.PENDING);

executor.execute(() -> {
jobController.submitJobWithExistingDependencies(job, dependsOn);
jobController.submitJobWithExistingDependencies(job, dependsOn, null);
wakeUp();
});
}

/**
* Enqueues a single job that depends on a collection of job ID's, as well as any unfinished
* items in the specified queue.
*/
public void add(@NonNull Job job, @Nullable String dependsOnQueue) {
jobTracker.onStateChange(job, JobTracker.JobState.PENDING);

executor.execute(() -> {
jobController.submitJobWithExistingDependencies(job, Collections.emptyList(), dependsOnQueue);
wakeUp();
});
}

/**
* Enqueues a single job that depends on a collection of job ID's, as well as any unfinished
* items in the specified queue.
*/
public void add(@NonNull Job job, @NonNull Collection<String> dependsOn, @Nullable String dependsOnQueue) {
jobTracker.onStateChange(job, JobTracker.JobState.PENDING);

executor.execute(() -> {
jobController.submitJobWithExistingDependencies(job, dependsOn, dependsOnQueue);
wakeUp();
});
}
Expand Down
Expand Up @@ -23,6 +23,9 @@ public interface JobStorage {
@WorkerThread
@NonNull List<JobSpec> getPendingJobsWithNoDependenciesInCreatedOrder(long currentTime);

@WorkerThread
@NonNull List<JobSpec> getJobsInQueue(@NonNull String queue);

@WorkerThread
int getJobInstanceCount(@NonNull String factoryKey);

Expand Down
Expand Up @@ -108,6 +108,14 @@ public synchronized void insertJobs(@NonNull List<FullSpec> fullSpecs) {
}
}

@Override
public synchronized @NonNull List<JobSpec> getJobsInQueue(@NonNull String queue) {
return Stream.of(jobs)
.filter(j -> queue.equals(j.getQueueKey()))
.sorted((j1, j2) -> Long.compare(j1.getCreateTime(), j2.getCreateTime()))
.toList();
}

private Optional<JobSpec> getMigrationJob() {
return Optional.fromNullable(Stream.of(jobs)
.filter(j -> Job.Parameters.MIGRATION_QUEUE_KEY.equals(j.getQueueKey()))
Expand Down
Expand Up @@ -73,9 +73,9 @@ public class PushGroupSendJob extends PushSendJob {
private long messageId;
private RecipientId filterRecipient;

public PushGroupSendJob(long messageId, @NonNull RecipientId destination, @Nullable RecipientId filterRecipient) {
public PushGroupSendJob(long messageId, @NonNull RecipientId destination, @Nullable RecipientId filterRecipient, boolean hasMedia) {
this(new Job.Parameters.Builder()
.setQueue(destination.toQueueKey())
.setQueue(destination.toQueueKey(hasMedia))
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
Expand Down Expand Up @@ -112,7 +112,7 @@ public static void enqueue(@NonNull Context context,
OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message);

jobManager.add(new PushGroupSendJob(messageId, destination, filterAddress), attachmentUploadIds);
jobManager.add(new PushGroupSendJob(messageId, destination, filterAddress, !attachmentUploadIds.isEmpty()), attachmentUploadIds, attachmentUploadIds.isEmpty() ? null : destination.toQueueKey());

} catch (NoSuchMessageException | MmsException e) {
Log.w(TAG, "Failed to enqueue message.", e);
Expand Down
Expand Up @@ -58,7 +58,7 @@ public class PushMediaSendJob extends PushSendJob {
private long messageId;

public PushMediaSendJob(long messageId, @NonNull Recipient recipient) {
this(constructParameters(recipient), messageId);
this(constructParameters(recipient, true), messageId);
}

private PushMediaSendJob(Job.Parameters parameters, long messageId) {
Expand All @@ -77,7 +77,7 @@ public static void enqueue(@NonNull Context context, @NonNull JobManager jobMana
OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message);

jobManager.add(new PushMediaSendJob(messageId, recipient), attachmentUploadIds);
jobManager.add(new PushMediaSendJob(messageId, recipient), attachmentUploadIds, recipient.getId().toQueueKey());

} catch (NoSuchMessageException | MmsException e) {
Log.w(TAG, "Failed to enqueue message.", e);
Expand Down
Expand Up @@ -71,9 +71,9 @@ protected PushSendJob(Job.Parameters parameters) {
super(parameters);
}

protected static Job.Parameters constructParameters(@NonNull Recipient recipient) {
protected static Job.Parameters constructParameters(@NonNull Recipient recipient, boolean hasMedia) {
return new Parameters.Builder()
.setQueue(recipient.getId().toQueueKey())
.setQueue(recipient.getId().toQueueKey(hasMedia))
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
Expand Down
Expand Up @@ -43,7 +43,7 @@ public class PushTextSendJob extends PushSendJob {
private long messageId;

public PushTextSendJob(long messageId, @NonNull Recipient recipient) {
this(constructParameters(recipient), messageId);
this(constructParameters(recipient, false), messageId);
}

private PushTextSendJob(@NonNull Job.Parameters parameters, long messageId) {
Expand Down
Expand Up @@ -110,7 +110,11 @@ public long toLong() {
}

public @NonNull String toQueueKey() {
return "RecipientId::" + id;
return toQueueKey(false);
}

public @NonNull String toQueueKey(boolean forMedia) {
return "RecipientId::" + id + (forMedia ? "::MEDIA" : "");
}

@Override
Expand Down
Expand Up @@ -251,9 +251,9 @@ public static void sendMediaBroadcast(@NonNull Context context, @NonNull List<Ou
if (isLocalSelfSend(context, recipient, false)) {
sendLocalMediaSelf(context, messageId);
} else if (isGroupPushSend(recipient)) {
jobManager.add(new PushGroupSendJob(messageId, recipient.getId(), null), messageDependsOnIds);
jobManager.add(new PushGroupSendJob(messageId, recipient.getId(), null, true), messageDependsOnIds, recipient.getId().toQueueKey());
} else {
jobManager.add(new PushMediaSendJob(messageId, recipient), messageDependsOnIds);
jobManager.add(new PushMediaSendJob(messageId, recipient), messageDependsOnIds, recipient.getId().toQueueKey());
}
}

Expand Down Expand Up @@ -407,8 +407,8 @@ private static void sendGroupPush(Context context, Recipient recipient, long mes
JobManager jobManager = ApplicationDependencies.getJobManager();

if (uploadJobIds.size() > 0) {
Job groupSend = new PushGroupSendJob(messageId, recipient.getId(), filterRecipientId);
jobManager.add(groupSend, uploadJobIds);
Job groupSend = new PushGroupSendJob(messageId, recipient.getId(), filterRecipientId, !uploadJobIds.isEmpty());
jobManager.add(groupSend, uploadJobIds, uploadJobIds.isEmpty() ? null : recipient.getId().toQueueKey());
} else {
PushGroupSendJob.enqueue(context, jobManager, messageId, recipient.getId(), filterRecipientId);
}
Expand Down
Expand Up @@ -464,6 +464,29 @@ public void getDependencySpecsThatDependOnJob_endOfChain() {
assertTrue(result.isEmpty());
}

@Test
public void getJobsInQueue_empty() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS));

subject.init();

List<JobSpec> result = subject.getJobsInQueue("x");

assertTrue(result.isEmpty());
}

@Test
public void getJobsInQueue_singleJob() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS));

subject.init();

List<JobSpec> result = subject.getJobsInQueue("q1");

assertEquals(1, result.size());
assertEquals("id1", result.get(0).getId());
}

private JobDatabase noopDatabase() {
JobDatabase database = mock(JobDatabase.class);

Expand Down

0 comments on commit 1234899

Please sign in to comment.