Skip to content

Commit

Permalink
perf: increase task throughput in Android using thread pool executor (#…
Browse files Browse the repository at this point in the history
…4981)

* implement thread pool executor service
* add getTransactionalExecutor method
* use transactional executor during write action
* transactional executor with identifier
* execute database event in serial
* execute firestore event in serial
* absctract task excutor
* do not re-excute rejected task while shutdown
* add documentation
* tests configuration
* disable identified executor when maximum pool size is zero
* update document
* Avoid race condition in executors
  • Loading branch information
kmsbernard committed Apr 4, 2021
1 parent eeeba2e commit 0e4e331
Show file tree
Hide file tree
Showing 13 changed files with 207 additions and 47 deletions.
20 changes: 20 additions & 0 deletions docs/index.md
Expand Up @@ -226,6 +226,26 @@ setting present in `/android/gradle.properties`:
org.gradle.jvmargs=-Xmx2048m -XX:MaxPermSize=512m -XX:+HeapDumpOnOutOfMemoryError -Dfile.encoding=UTF-8
```

### Android Performance

On Android, React Native Firebase uses [thread pool executor](https://developer.android.com/reference/java/util/concurrent/ThreadPoolExecutor) to provide improved performance and managed resources.
To increase throughput, you can tune the thread pool executor via `firebase.json` file within the root of your project:

```json
// <project-root>/firebase.json
{
"react-native": {
"android_task_executor_maximum_pool_size": 10,
"android_task_executor_keep_alive_seconds": 3,
}
}
```

| Key | Description |
| ------------ | ----------------------------------------------- |
| `android_task_executor_maximum_pool_size` | Maximum pool size of ThreadPoolExecutor. Defaults to `1`. Larger values typically improve performance when executing large numbers of asynchronous tasks, e.g. Firestore queries. Setting this value to `0` completely disables the pooled executor and all tasks execute in serial per module. |
| `android_task_executor_keep_alive_seconds` | Keep-alive time of ThreadPoolExecutor, in seconds. Defaults to `3`. Excess threads in the pool executor will be terminated if they have been idle for more than the keep-alive time. This value doesn't have any effect when the maximum pool size is lower than `2`. |

### Allow iOS Static Frameworks

If you are using Static Frameworks on iOS, you need to manually enable this for the project. To enable Static Framework
Expand Down
@@ -0,0 +1,121 @@
package io.invertase.firebase.common;

/*
* Copyright (c) 2016-present Invertase Limited & Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this library except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import io.invertase.firebase.common.ReactNativeFirebaseJSON;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.SynchronousQueue;

public class TaskExecutorService {
private static final String MAXIMUM_POOL_SIZE_KEY = "android_task_executor_maximum_pool_size";
private static final String KEEP_ALIVE_SECONDS_KEY = "android_task_executor_keep_alive_seconds";

private final String name;
private final int maximumPoolSize;
private final int keepAliveSeconds;
private static Map<String, ExecutorService> executors = new HashMap<>();

TaskExecutorService(String name) {
this.name = name;
ReactNativeFirebaseJSON json = ReactNativeFirebaseJSON.getSharedInstance();
this.maximumPoolSize = json.getIntValue(MAXIMUM_POOL_SIZE_KEY, 1);
this.keepAliveSeconds = json.getIntValue(KEEP_ALIVE_SECONDS_KEY, 3);
}

public ExecutorService getExecutor() {
boolean isTransactional = maximumPoolSize <= 1;
return getExecutor(isTransactional, "");
}

public ExecutorService getTransactionalExecutor() {
return getExecutor(true, "");
}

public ExecutorService getTransactionalExecutor(String identifier) {
String executorIdentifier = maximumPoolSize != 0 ? identifier : "";
return getExecutor(true, executorIdentifier);
}

public ExecutorService getExecutor(boolean isTransactional, String identifier) {
String executorName = getExecutorName(isTransactional, identifier);
synchronized(executors) {
ExecutorService existingExecutor = executors.get(executorName);
if (existingExecutor == null) {
ExecutorService newExecutor = getNewExecutor(isTransactional);
executors.put(executorName, newExecutor);
return newExecutor;
}
return existingExecutor;
}
}

private ExecutorService getNewExecutor(boolean isTransactional) {
if (isTransactional == true) {
return Executors.newSingleThreadExecutor();
} else {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, maximumPoolSize, keepAliveSeconds, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
threadPoolExecutor.setRejectedExecutionHandler(executeInFallback);
return threadPoolExecutor;
}
}

private RejectedExecutionHandler executeInFallback = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
return;
}
ExecutorService fallbackExecutor = getTransactionalExecutor();
fallbackExecutor.execute(r);
};
};

public String getExecutorName(boolean isTransactional, String identifier) {
if (isTransactional == true) {
return name + "TransactionalExecutor" + identifier;
}
return name + "Executor" + identifier;
}

public void shutdown() {
Set<String> existingExecutorNames = executors.keySet();
existingExecutorNames.removeIf((executorName) -> {
return executorName.startsWith(name) == false;
});
existingExecutorNames.forEach((executorName) -> {
removeExecutor(executorName);
});
}

public void removeExecutor(String executorName) {
synchronized(executors) {
ExecutorService existingExecutor = executors.get(executorName);
if (existingExecutor != null) {
existingExecutor.shutdownNow();
executors.remove(executorName);
}
}
}
}
Expand Up @@ -18,23 +18,24 @@
*/

import android.content.Context;
import io.invertase.firebase.common.TaskExecutorService;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.OverridingMethodsMustInvokeSuper;

public class UniversalFirebaseModule {
private static Map<String, ExecutorService> executors = new HashMap<>();
private final TaskExecutorService executorService;

private final Context context;
private final String serviceName;

protected UniversalFirebaseModule(Context context, String serviceName) {
this.context = context;
this.serviceName = serviceName;
this.executorService = new TaskExecutorService(getName());
}

public Context getContext() {
Expand All @@ -46,11 +47,7 @@ public Context getApplicationContext() {
}

protected ExecutorService getExecutor() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) return existingSingleThreadExecutor;
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
executors.put(getName(), newSingleThreadExecutor);
return newSingleThreadExecutor;
return executorService.getExecutor();
}

public String getName() {
Expand All @@ -59,11 +56,7 @@ public String getName() {

@OverridingMethodsMustInvokeSuper
public void onTearDown() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) {
existingSingleThreadExecutor.shutdownNow();
executors.remove(getName());
}
executorService.shutdown();
}

public Map<String, Object> getConstants() {
Expand Down
Expand Up @@ -53,6 +53,11 @@ public boolean getBooleanValue(String key, boolean defaultValue) {
return jsonObject.optBoolean(key, defaultValue);
}

public int getIntValue(String key, int defaultValue) {
if (jsonObject == null) return defaultValue;
return jsonObject.optInt(key, defaultValue);
}

public long getLongValue(String key, long defaultValue) {
if (jsonObject == null) return defaultValue;
return jsonObject.optLong(key, defaultValue);
Expand Down
Expand Up @@ -21,15 +21,16 @@
import android.content.Context;
import com.facebook.react.bridge.*;
import io.invertase.firebase.interfaces.ContextProvider;
import io.invertase.firebase.common.TaskExecutorService;

import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ReactNativeFirebaseModule extends ReactContextBaseJavaModule implements ContextProvider {
private static Map<String, ExecutorService> executors = new HashMap<>();
private final TaskExecutorService executorService;

private String moduleName;

public ReactNativeFirebaseModule(
Expand All @@ -38,6 +39,7 @@ public ReactNativeFirebaseModule(
) {
super(reactContext);
this.moduleName = moduleName;
this.executorService = new TaskExecutorService(getName());
}

public static void rejectPromiseWithExceptionMap(Promise promise, Exception exception) {
Expand Down Expand Up @@ -74,20 +76,25 @@ public ReactContext getContext() {
}

public ExecutorService getExecutor() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) return existingSingleThreadExecutor;
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
executors.put(getName(), newSingleThreadExecutor);
return newSingleThreadExecutor;
return executorService.getExecutor();
}

public ExecutorService getTransactionalExecutor() {
return executorService.getTransactionalExecutor();
}

public ExecutorService getTransactionalExecutor(String identifier) {
return executorService.getTransactionalExecutor(identifier);
}

@Override
public void onCatalystInstanceDestroy() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) {
existingSingleThreadExecutor.shutdownNow();
executors.remove(getName());
}
executorService.shutdown();
}

public void removeEventListeningExecutor(String identifier) {
String executorName = executorService.getExecutorName(true, identifier);
executorService.removeExecutor(executorName);
}

public Context getApplicationContext() {
Expand Down
8 changes: 8 additions & 0 deletions packages/app/firebase-schema.json
Expand Up @@ -65,6 +65,14 @@
"perf_auto_collection_enabled": {
"description": "Disable or enable auto collection of performance monitoring data collection.\n This is useful for opt-in-first data flows, for example when dealing with GDPR compliance.\nThis can be overridden in JavaScript.",
"type": "boolean"
},
"android_task_executor_maximum_pool_size": {
"description": "Maximum pool size of ThreadPoolExecutor used by RNFirebase for Android. Defaults to `1`.\n Larger values typically improve performance when executing large numbers of asynchronous tasks, e.g. Firestore queries.",
"type": "number"
},
"android_task_executor_keep_alive_seconds": {
"description": "Keep-alive time of ThreadPoolExecutor used by RNFirebase for Android, in seconds. Defaults to `3`.\n Excess threads in the pool executor will be terminated if they have been idle for more than the keep-alive time.",
"type": "number"
}
}
}
Expand Down
Expand Up @@ -289,7 +289,8 @@ private void handleDatabaseEvent(
DataSnapshot dataSnapshot,
@Nullable String previousChildName
) {
Tasks.call(getExecutor(), () -> {
final String eventRegistrationKey = registration.getString("eventRegistrationKey");
Tasks.call(getTransactionalExecutor(eventRegistrationKey), () -> {
if (eventType.equals("value")) {
return snapshotToMap(dataSnapshot);
} else {
Expand Down Expand Up @@ -407,6 +408,7 @@ public void off(String queryKey, String eventRegistrationKey) {

if (databaseQuery != null) {
databaseQuery.removeEventListener(eventRegistrationKey);
removeEventListeningExecutor(eventRegistrationKey);

if (!databaseQuery.hasListeners()) {
queryMap.remove(queryKey);
Expand Down
Expand Up @@ -41,9 +41,9 @@ public class ReactNativeFirebaseDatabaseReferenceModule extends ReactNativeFireb
@ReactMethod
public void set(String app, String dbURL, String path, ReadableMap props, Promise promise) {
Tasks
.call(getExecutor(), () -> toHashMap(props).get("value"))
.call(getTransactionalExecutor(), () -> toHashMap(props).get("value"))
.onSuccessTask(aValue -> module.set(app, dbURL, path, aValue))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -56,9 +56,9 @@ public void set(String app, String dbURL, String path, ReadableMap props, Promis
@ReactMethod
public void update(String app, String dbURL, String path, ReadableMap props, Promise promise) {
Tasks
.call(getExecutor(), () -> toHashMap(props).get("values"))
.call(getTransactionalExecutor(), () -> toHashMap(props).get("values"))
.onSuccessTask(aMap -> module.update(app, dbURL, path, (Map<String, Object>) aMap))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -70,9 +70,9 @@ public void update(String app, String dbURL, String path, ReadableMap props, Pro
@ReactMethod
public void setWithPriority(String app, String dbURL, String path, ReadableMap props, Promise promise) {
Tasks
.call(getExecutor(), () -> toHashMap(props))
.call(getTransactionalExecutor(), () -> toHashMap(props))
.onSuccessTask(aMap -> module.setWithPriority(app, dbURL, path, aMap.get("value"), aMap.get("priority")))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -85,7 +85,7 @@ public void setWithPriority(String app, String dbURL, String path, ReadableMap p
public void remove(String app, String dbURL, String path, Promise promise) {
// continuation tasks not needed for this as no data
module.remove(app, dbURL, path)
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -98,7 +98,7 @@ public void remove(String app, String dbURL, String path, Promise promise) {
public void setPriority(String app, String dbURL, String path, ReadableMap props, Promise promise) {
// continuation tasks not needed for this as minimal data
module.setPriority(app, dbURL, path, toHashMap(props).get("priority"))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand Down
Expand Up @@ -111,6 +111,7 @@ public void collectionOffSnapshot(
if (listenerRegistration != null) {
listenerRegistration.remove();
collectionSnapshotListeners.remove(listenerId);
removeEventListeningExecutor(Integer.toString(listenerId));
}
}

Expand Down Expand Up @@ -159,7 +160,7 @@ public void collectionGet(
}

private void sendOnSnapshotEvent(String appName, int listenerId, QuerySnapshot querySnapshot, MetadataChanges metadataChanges) {
Tasks.call(getExecutor(), () -> snapshotToWritableMap("onSnapshot", querySnapshot, metadataChanges)).addOnCompleteListener(task -> {
Tasks.call(getTransactionalExecutor(Integer.toString(listenerId)), () -> snapshotToWritableMap("onSnapshot", querySnapshot, metadataChanges)).addOnCompleteListener(task -> {
if (task.isSuccessful()) {
WritableMap body = Arguments.createMap();
body.putMap("snapshot", task.getResult());
Expand Down

1 comment on commit 0e4e331

@vercel
Copy link

@vercel vercel bot commented on 0e4e331 Apr 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.