Skip to content

Commit

Permalink
Add ability to do unused reads from CDSv2 to test server load.
Browse files Browse the repository at this point in the history
  • Loading branch information
greyson-signal authored and cody-signal committed Aug 18, 2022
1 parent 84717b9 commit 15e52a8
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 46 deletions.
2 changes: 1 addition & 1 deletion app/build.gradle
Expand Up @@ -201,7 +201,7 @@ android {
buildConfigField "String[]", "SIGNAL_CONTENT_PROXY_IPS", content_proxy_ips
buildConfigField "String", "SIGNAL_AGENT", "\"OWA\""
buildConfigField "String", "CDS_MRENCLAVE", "\"74778bb0f93ae1f78c26e67152bab0bbeb693cd56d1bb9b4e9244157acc58081\""
buildConfigField "String", "CDSI_MRENCLAVE", "\"7b75dd6e862decef9b37132d54be082441917a7790e82fe44f9cf653de03a75f\""
buildConfigField "String", "CDSI_MRENCLAVE", "\"ef4787a56a154ac6d009138cac17155acd23cfe4329281252365dd7c252e7fbf\""
buildConfigField "org.thoughtcrime.securesms.KbsEnclave", "KBS_ENCLAVE", "new org.thoughtcrime.securesms.KbsEnclave(\"0cedba03535b41b67729ce9924185f831d7767928a1d1689acb689bc079c375f\", " +
"\"187d2739d22be65e74b65f0055e74d31310e4267e5fac2b1246cc8beba81af39\", " +
"\"ee19f1965b1eefa3dc4204eb70c04f397755f771b8c1909d080c04dad2a6a9ba\")"
Expand Down
Expand Up @@ -13,6 +13,7 @@ import org.signal.contacts.SystemContactsRepository.ContactIterator
import org.signal.contacts.SystemContactsRepository.ContactPhoneDetails
import org.signal.core.util.Stopwatch
import org.signal.core.util.StringUtil
import org.signal.core.util.concurrent.SignalExecutors
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.BuildConfig
import org.thoughtcrime.securesms.R
Expand All @@ -36,7 +37,11 @@ import org.thoughtcrime.securesms.util.Util
import org.whispersystems.signalservice.api.push.SignalServiceAddress
import org.whispersystems.signalservice.api.util.UuidUtil
import java.io.IOException
import java.lang.Exception
import java.util.Calendar
import java.util.concurrent.Callable
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future

/**
* Methods for discovering which users are registered and marking them as such in the database.
Expand Down Expand Up @@ -76,6 +81,8 @@ object ContactDiscovery {
refresh = {
if (FeatureFlags.phoneNumberPrivacy()) {
ContactDiscoveryRefreshV2.refreshAll(context)
} else if (FeatureFlags.cdsV2LoadTesting()) {
loadTestRefreshAll(context)
} else {
ContactDiscoveryRefreshV1.refreshAll(context)
}
Expand All @@ -97,6 +104,8 @@ object ContactDiscovery {
refresh = {
if (FeatureFlags.phoneNumberPrivacy()) {
ContactDiscoveryRefreshV2.refresh(context, recipients)
} else if (FeatureFlags.cdsV2LoadTesting()) {
loadTestRefresh(context, recipients)
} else {
ContactDiscoveryRefreshV1.refresh(context, recipients)
}
Expand All @@ -116,6 +125,8 @@ object ContactDiscovery {
refresh = {
if (FeatureFlags.phoneNumberPrivacy()) {
ContactDiscoveryRefreshV2.refresh(context, listOf(recipient))
} else if (FeatureFlags.cdsV2LoadTesting()) {
loadTestRefresh(context, listOf(recipient))
} else {
ContactDiscoveryRefreshV1.refresh(context, listOf(recipient))
}
Expand Down Expand Up @@ -367,6 +378,38 @@ object ContactDiscovery {
ApplicationDependencies.getProtocolStore().pni().containsSession(protocolAddress)
}

private fun loadTestRefreshAll(context: Context): RefreshResult {
return loadTestOperation(
{ ContactDiscoveryRefreshV1.refreshAll(context) },
{ ContactDiscoveryRefreshV2.refreshAll(context, ignoreResults = true) }
)
}

private fun loadTestRefresh(context: Context, recipients: List<Recipient>): RefreshResult {
return loadTestOperation(
{ ContactDiscoveryRefreshV1.refresh(context, recipients) },
{ ContactDiscoveryRefreshV2.refresh(context, recipients, ignoreResults = true) }
)
}

private fun loadTestOperation(operationV1: Callable<RefreshResult>, operationV2: Callable<RefreshResult>): RefreshResult {
val v1Future: Future<RefreshResult> = SignalExecutors.UNBOUNDED.submit(operationV1)
val v2Future: Future<RefreshResult> = SignalExecutors.UNBOUNDED.submit(operationV2)

try {
v2Future.get()
} catch (e: Exception) {
Log.w(TAG, "Failed to complete the V2 fetch!", e)
}

try {
return v1Future.get()
} catch (e: ExecutionException) {
Log.w(TAG, "Hit exception during V1 fetch!", e)
throw e.cause!!
}
}

class RefreshResult(
val registeredIds: Set<RecipientId>,
val rewrites: Map<String, String>
Expand Down
Expand Up @@ -62,7 +62,8 @@
*/
class ContactDiscoveryRefreshV1 {

private static final String TAG = Log.tag(ContactDiscoveryRefreshV1.class);
// Using Log.tag will cut off the version number
private static final String TAG = "CdsRefreshV1";

private static final int MAX_NUMBERS = 20_500;

Expand Down
Expand Up @@ -24,7 +24,8 @@ import java.util.Optional
*/
object ContactDiscoveryRefreshV2 {

private val TAG = Log.tag(ContactDiscoveryRefreshV2::class.java)
// Using Log.tag will cut off the version number
private const val TAG = "CdsRefreshV2"

/**
* The maximum number items we will allow in a 'one-off' request.
Expand All @@ -38,7 +39,7 @@ object ContactDiscoveryRefreshV2 {
@WorkerThread
@Synchronized
@JvmStatic
fun refreshAll(context: Context): ContactDiscovery.RefreshResult {
fun refreshAll(context: Context, ignoreResults: Boolean = false): ContactDiscovery.RefreshResult {
val stopwatch = Stopwatch("refresh-all")

val previousE164s: Set<String> = if (SignalStore.misc().cdsToken != null) {
Expand All @@ -59,28 +60,45 @@ object ContactDiscoveryRefreshV2 {

val newE164s: Set<String> = newRecipientE164s + newSystemE164s

val tokenToUse: ByteArray? = if (previousE164s.isNotEmpty()) {
SignalStore.misc().cdsToken
} else {
if (SignalStore.misc().cdsToken != null) {
Log.w(TAG, "We have a token, but our previousE164 list is empty! We cannot provide a token.")
}
null
}

val response: CdsiV2Service.Response = makeRequest(
previousE164s = previousE164s,
newE164s = newE164s,
serviceIds = SignalDatabase.recipients.getAllServiceIdProfileKeyPairs(),
token = SignalStore.misc().cdsToken,
saveToken = true
token = tokenToUse,
saveToken = true,
tag = "refresh-all"
)
stopwatch.split("network")

SignalDatabase.cds.updateAfterCdsQuery(newE164s, recipientE164s + systemE164s)
stopwatch.split("cds-db")

val registeredIds: Set<RecipientId> = SignalDatabase.recipients.bulkProcessCdsV2Result(
response.results
.mapValues { entry -> RecipientDatabase.CdsV2Result(entry.value.pni, entry.value.aci.orElse(null)) }
)
stopwatch.split("recipient-db")
var registeredIds: Set<RecipientId> = emptySet()

if (ignoreResults) {
Log.w(TAG, "[refresh-all] Ignoring CDSv2 results.")
} else {
registeredIds = SignalDatabase.recipients.bulkProcessCdsV2Result(
response.results
.mapValues { entry -> RecipientDatabase.CdsV2Result(entry.value.pni, entry.value.aci.orElse(null)) }
)
stopwatch.split("recipient-db")

SignalDatabase.recipients.bulkUpdatedRegisteredStatus(registeredIds.associateWith { null }, emptyList())
stopwatch.split("update-registered")
SignalDatabase.recipients.bulkUpdatedRegisteredStatus(registeredIds.associateWith { null }, emptyList())
stopwatch.split("update-registered")
}

stopwatch.stop(TAG)
Log.d(TAG, "[refresh-all] Used ${response.quotaUsedDebugOnly} units of our quota.")

return ContactDiscovery.RefreshResult(registeredIds, emptyMap())
}
Expand All @@ -89,7 +107,7 @@ object ContactDiscoveryRefreshV2 {
@WorkerThread
@Synchronized
@JvmStatic
fun refresh(context: Context, inputRecipients: List<Recipient>): ContactDiscovery.RefreshResult {
fun refresh(context: Context, inputRecipients: List<Recipient>, ignoreResults: Boolean = false): ContactDiscovery.RefreshResult {
val stopwatch = Stopwatch("refresh-some")

val recipients = inputRecipients.map { it.resolve() }
Expand All @@ -100,7 +118,7 @@ object ContactDiscoveryRefreshV2 {

if (inputE164s.size > MAXIMUM_ONE_OFF_REQUEST_SIZE) {
Log.i(TAG, "List of specific recipients to refresh is too large! (Size: ${recipients.size}). Doing a full refresh instead.")
val fullResult: ContactDiscovery.RefreshResult = refreshAll(context)
val fullResult: ContactDiscovery.RefreshResult = refreshAll(context, ignoreResults)

return ContactDiscovery.RefreshResult(
registeredIds = fullResult.registeredIds.intersect(inputIds),
Expand All @@ -120,35 +138,44 @@ object ContactDiscoveryRefreshV2 {
newE164s = inputE164s,
serviceIds = SignalDatabase.recipients.getAllServiceIdProfileKeyPairs(),
token = null,
saveToken = false
saveToken = false,
tag = "refresh-some"
)
stopwatch.split("network")

val registeredIds: Set<RecipientId> = SignalDatabase.recipients.bulkProcessCdsV2Result(
response.results
.mapValues { entry -> RecipientDatabase.CdsV2Result(entry.value.pni, entry.value.aci.orElse(null)) }
)
stopwatch.split("recipient-db")
var registeredIds: Set<RecipientId> = emptySet()

SignalDatabase.recipients.bulkUpdatedRegisteredStatus(registeredIds.associateWith { null }, emptyList())
stopwatch.split("update-registered")
if (ignoreResults) {
Log.w(TAG, "[refresh-some] Ignoring CDSv2 results.")
} else {
registeredIds = SignalDatabase.recipients.bulkProcessCdsV2Result(
response.results
.mapValues { entry -> RecipientDatabase.CdsV2Result(entry.value.pni, entry.value.aci.orElse(null)) }
)
stopwatch.split("recipient-db")

SignalDatabase.recipients.bulkUpdatedRegisteredStatus(registeredIds.associateWith { null }, emptyList())
stopwatch.split("update-registered")
}

Log.d(TAG, "[refresh-some] Used ${response.quotaUsedDebugOnly} units of our quota.")
stopwatch.stop(TAG)

return ContactDiscovery.RefreshResult(registeredIds, emptyMap())
}

@Throws(IOException::class)
private fun makeRequest(previousE164s: Set<String>, newE164s: Set<String>, serviceIds: Map<ServiceId, ProfileKey>, token: ByteArray?, saveToken: Boolean): CdsiV2Service.Response {
private fun makeRequest(previousE164s: Set<String>, newE164s: Set<String>, serviceIds: Map<ServiceId, ProfileKey>, token: ByteArray?, saveToken: Boolean, tag: String): CdsiV2Service.Response {
return ApplicationDependencies.getSignalServiceAccountManager().getRegisteredUsersWithCdsi(
previousE164s,
newE164s,
serviceIds,
Optional.ofNullable(token),
BuildConfig.CDSI_MRENCLAVE
) { token ->
) { tokenToSave ->
if (saveToken) {
SignalStore.misc().cdsToken = token
SignalStore.misc().cdsToken = tokenToSave
Log.d(TAG, "[$tag] Token saved!")
}
}
}
Expand Down
Expand Up @@ -101,6 +101,7 @@ public final class FeatureFlags {
private static final String TELECOM_MODEL_BLOCKLIST = "android.calling.telecomModelBlockList";
private static final String CAMERAX_MODEL_BLOCKLIST = "android.cameraXModelBlockList";
private static final String RECIPIENT_MERGE_V2 = "android.recipientMergeV2";
private static final String CDS_V2_LOAD_TEST = "android.csdV2LoadTest";

/**
* We will only store remote values for flags in this set. If you want a flag to be controllable
Expand Down Expand Up @@ -154,7 +155,8 @@ public final class FeatureFlags {
TELECOM_MANUFACTURER_ALLOWLIST,
TELECOM_MODEL_BLOCKLIST,
CAMERAX_MODEL_BLOCKLIST,
RECIPIENT_MERGE_V2
RECIPIENT_MERGE_V2,
CDS_V2_LOAD_TEST
);

@VisibleForTesting
Expand Down Expand Up @@ -217,7 +219,8 @@ public final class FeatureFlags {
TELECOM_MANUFACTURER_ALLOWLIST,
TELECOM_MODEL_BLOCKLIST,
CAMERAX_MODEL_BLOCKLIST,
RECIPIENT_MERGE_V2
RECIPIENT_MERGE_V2,
CDS_V2_LOAD_TEST
);

/**
Expand Down Expand Up @@ -545,6 +548,13 @@ public static boolean recipientMergeV2() {
return getBoolean(RECIPIENT_MERGE_V2, false);
}

/**
* Whether or not we should also query CDSv2 as a form of load test.
*/
public static boolean cdsV2LoadTesting() {
return getBoolean(CDS_V2_LOAD_TEST, false);
}

/** Only for rendering debug info. */
public static synchronized @NonNull Map<String, Object> getMemoryValues() {
return new TreeMap<>(REMOTE_VALUES);
Expand Down
Expand Up @@ -52,13 +52,15 @@ public Single<ServiceResponse<Response>> getRegisteredUsers(String username, Str
.map(CdsiV2Service::parseEntries)
.collect(Collectors.toList())
.flatMap(pages -> {
Map<String, ResponseItem> all = new HashMap<>();
Map<String, ResponseItem> all = new HashMap<>();
int quotaUsed = 0;

for (Response page : pages) {
all.putAll(page.getResults());
quotaUsed += page.getQuotaUsedDebugOnly();
}

return Single.just(new Response(all));
return Single.just(new Response(all, quotaUsed));
})
.map(result -> ServiceResponse.forResult(result, 200, null))
.onErrorReturn(error -> {
Expand Down Expand Up @@ -87,7 +89,7 @@ private static Response parseEntries(ClientResponse clientResponse) {
}
}

return new Response(results);
return new Response(results, clientResponse.getDebugPermitsUsed());
}

private static ClientRequest buildClientRequest(Request request) {
Expand Down Expand Up @@ -146,13 +148,13 @@ private static List<Long> parseAndSortE164Strings(Collection<String> e164s) {
}

public static final class Request {
private final Set<String> previousE164s;
private final Set<String> newE164s;
private final Set<String> removedE164s;
final Set<String> previousE164s;
final Set<String> newE164s;
final Set<String> removedE164s;

private final Map<ServiceId, ProfileKey> serviceIds;
final Map<ServiceId, ProfileKey> serviceIds;

private final byte[] token;
final byte[] token;

public Request(Set<String> previousE164s, Set<String> newE164s, Map<ServiceId, ProfileKey> serviceIds, Optional<byte[]> token) {
if (previousE164s.size() > 0 && !token.isPresent()) {
Expand All @@ -166,25 +168,31 @@ public Request(Set<String> previousE164s, Set<String> newE164s, Map<ServiceId, P
this.token = token.orElse(null);
}

public int totalE164s() {
return previousE164s.size() + newE164s.size() - removedE164s.size();
}

public int serviceIdSize() {
return previousE164s.size() + newE164s.size() + removedE164s.size() + serviceIds.size();
}
}

public static final class Response {
private final Map<String, ResponseItem> results;
private final int quotaUsed;

public Response(Map<String, ResponseItem> results) {
this.results = results;
public Response(Map<String, ResponseItem> results, int quoteUsed) {
this.results = results;
this.quotaUsed = quoteUsed;
}

public Map<String, ResponseItem> getResults() {
return results;
}

/**
* Tells you how much quota you used in the request. This should only be used for debugging/logging purposed, and should never be relied upon for making
* actual decisions.
*/
public int getQuotaUsedDebugOnly() {
return quotaUsed;
}
}

public static final class ResponseItem {
Expand Down

0 comments on commit 15e52a8

Please sign in to comment.