Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 96 additions & 3 deletions packages/functions/__tests__/functions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import functions, {
connectFunctionsEmulator,
httpsCallable,
httpsCallableFromUrl,
httpsCallableStream,
httpsCallableFromUrlStream,
HttpsErrorCode,
} from '../lib';

Expand Down Expand Up @@ -94,6 +96,77 @@ describe('Cloud Functions', function () {
it('`HttpsErrorCode` function is properly exposed to end user', function () {
expect(HttpsErrorCode).toBeDefined();
});

it('`httpsCallableStream` function is properly exposed to end user', function () {
expect(httpsCallableStream).toBeDefined();
});

it('`httpsCallableFromUrlStream` function is properly exposed to end user', function () {
expect(httpsCallableFromUrlStream).toBeDefined();
});

describe('streaming', function () {
it('httpsCallable returns object with stream method', function () {
const app = getApp();
const functionsInstance = getFunctions(app);
const callable = httpsCallable(functionsInstance, 'test');

expect(callable).toBeDefined();
expect(typeof callable).toBe('function');
expect(callable.stream).toBeDefined();
expect(typeof callable.stream).toBe('function');
});

it('httpsCallableFromUrl returns object with stream method', function () {
const app = getApp();
const functionsInstance = getFunctions(app);
const callable = httpsCallableFromUrl(functionsInstance, 'https://example.com/test');

expect(callable).toBeDefined();
expect(typeof callable).toBe('function');
expect(callable.stream).toBeDefined();
expect(typeof callable.stream).toBe('function');
});

it('httpsCallableStream returns a function', function () {
const app = getApp();
const functionsInstance = getFunctions(app);
const streamStarter = httpsCallableStream(functionsInstance, 'test');

expect(streamStarter).toBeDefined();
expect(typeof streamStarter).toBe('function');
});

it('httpsCallableFromUrlStream returns a function', function () {
const app = getApp();
const functionsInstance = getFunctions(app);
const streamStarter = httpsCallableFromUrlStream(
functionsInstance,
'https://example.com/test',
);

expect(streamStarter).toBeDefined();
expect(typeof streamStarter).toBe('function');
});

it('namespace API httpsCallable returns object with stream method', function () {
const callable = functions().httpsCallable('test');

expect(callable).toBeDefined();
expect(typeof callable).toBe('function');
expect(callable.stream).toBeDefined();
expect(typeof callable.stream).toBe('function');
});

it('namespace API httpsCallableFromUrl returns object with stream method', function () {
const callable = functions().httpsCallableFromUrl('https://example.com/test');

expect(callable).toBeDefined();
expect(typeof callable).toBe('function');
expect(callable.stream).toBeDefined();
expect(typeof callable.stream).toBe('function');
});
});
});

describe('test `console.warn` is called for RNFB v8 API & not called for v9 API', function () {
Expand Down Expand Up @@ -123,7 +196,7 @@ describe('Cloud Functions', function () {
describe('Cloud Functions', function () {
it('useFunctionsEmulator()', function () {
const app = getApp();
const functions = app.functions();
const functions = getFunctions(app);
functionsRefV9Deprecation(
() => connectFunctionsEmulator(functions, 'localhost', 8080),
() => functions.useEmulator('localhost', 8080),
Expand All @@ -133,7 +206,7 @@ describe('Cloud Functions', function () {

it('httpsCallable()', function () {
const app = getApp();
const functions = app.functions();
const functions = getFunctions(app);
functionsRefV9Deprecation(
() => httpsCallable(functions, 'example'),
() => functions.httpsCallable('example'),
Expand All @@ -143,13 +216,33 @@ describe('Cloud Functions', function () {

it('httpsCallableFromUrl()', function () {
const app = getApp();
const functions = app.functions();
const functions = getFunctions(app);
functionsRefV9Deprecation(
() => httpsCallableFromUrl(functions, 'https://example.com/example'),
() => functions.httpsCallableFromUrl('https://example.com/example'),
'httpsCallableFromUrl',
);
});

it('httpsCallableStream()', function () {
const app = getApp();
const functions = getFunctions(app);
const callable = httpsCallable(functions, 'example');

// The stream method should be available on the callable
expect(callable.stream).toBeDefined();
expect(typeof callable.stream).toBe('function');
});

it('httpsCallableFromUrlStream()', function () {
const app = getApp();
const functions = getFunctions(app);
const callable = httpsCallableFromUrl(functions, 'https://example.com/example');

// The stream method should be available on the callable
expect(callable.stream).toBeDefined();
expect(typeof callable.stream).toBe('function');
});
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.invertase.firebase.functions;

/*
* Copyright (c) 2016-present Invertase Limited & Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this library except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import com.facebook.react.bridge.Arguments;
import com.facebook.react.bridge.WritableMap;
import io.invertase.firebase.interfaces.NativeEvent;

public class ReactNativeFirebaseFunctionsEvent implements NativeEvent {
static final String FUNCTIONS_STREAMING_EVENT = "functions_streaming_event";
private static final String KEY_ID = "listenerId";
private static final String KEY_BODY = "body";
private static final String KEY_APP_NAME = "appName";
private static final String KEY_EVENT_NAME = "eventName";
private String eventName;
private WritableMap eventBody;
private String appName;
private int listenerId;

ReactNativeFirebaseFunctionsEvent(
String eventName, WritableMap eventBody, String appName, int listenerId) {
this.eventName = eventName;
this.eventBody = eventBody;
this.appName = appName;
this.listenerId = listenerId;
}

@Override
public String getEventName() {
return eventName;
}

@Override
public WritableMap getEventBody() {
WritableMap event = Arguments.createMap();
event.putInt(KEY_ID, listenerId);
event.putMap(KEY_BODY, eventBody);
event.putString(KEY_APP_NAME, appName);
event.putString(KEY_EVENT_NAME, eventName);
return event;
}

@Override
public String getFirebaseAppName() {
return appName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,35 @@
*/

import android.content.Context;
import android.util.SparseArray;
import com.facebook.react.bridge.Arguments;
import com.facebook.react.bridge.ReadableMap;
import com.facebook.react.bridge.WritableMap;
import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.Tasks;
import com.google.firebase.FirebaseApp;
import com.google.firebase.functions.FirebaseFunctions;
import com.google.firebase.functions.HttpsCallableReference;
import io.invertase.firebase.common.ReactNativeFirebaseEventEmitter;
import io.invertase.firebase.common.UniversalFirebaseModule;
import java.net.URL;
import java.util.concurrent.TimeUnit;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.BufferedSource;

@SuppressWarnings("WeakerAccess")
public class UniversalFirebaseFunctionsModule extends UniversalFirebaseModule {
public static final String DATA_KEY = "data";
public static final String CODE_KEY = "code";
public static final String MSG_KEY = "message";
public static final String DETAILS_KEY = "details";
private static SparseArray<Call> functionsStreamingListeners = new SparseArray<>();

UniversalFirebaseFunctionsModule(Context context, String serviceName) {
super(context, serviceName);
Expand Down Expand Up @@ -95,4 +108,144 @@ Task<Object> httpsCallableFromUrl(
return Tasks.await(httpReference.call(data)).getData();
});
}

// -------------------- Streaming Support (Android only) --------------------
// Streaming targets HTTP (onRequest) endpoints that emit SSE/NDJSON.

void httpsCallableStream(
String appName,
String region,
String host,
Integer port,
String name,
Object data,
ReadableMap options,
Integer listenerId) {
FirebaseApp firebaseApp = FirebaseApp.getInstance(appName);
String projectId = firebaseApp.getOptions().getProjectId();
String url;
if (host != null) {
url = "http://" + host + ":" + port + "/" + projectId + "/" + region + "/" + name;
} else {
url = "https://" + region + "-" + projectId + ".cloudfunctions.net/" + name;
}
startHttpStream(appName, host, port, url, listenerId);
}

void httpsCallableStreamFromUrl(
String appName,
String region,
String host,
Integer port,
String url,
Object data,
ReadableMap options,
Integer listenerId) {
startHttpStream(appName, host, port, url, listenerId);
}

public static void cancelHttpsCallableStream(Integer listenerId) {
synchronized (functionsStreamingListeners) {
Call call = functionsStreamingListeners.get(listenerId);
if (call != null) {
try {
call.cancel();
} catch (Exception ignore) {
}
functionsStreamingListeners.remove(listenerId);
}
}
}

private void startHttpStream(
String appName, String host, Integer port, String url, Integer listenerId) {
getExecutor()
.execute(
() -> {
OkHttpClient client =
new OkHttpClient.Builder().retryOnConnectionFailure(true).build();
HttpUrl parsed = HttpUrl.parse(url);
if (parsed == null) {
emitError(appName, listenerId, "invalid_url");
return;
}
HttpUrl.Builder builder = parsed.newBuilder();
if (host != null && port != null) {
builder.scheme("http").host(host).port(port);
}
HttpUrl finalUrl = builder.build();
Request request =
new Request.Builder()
.url(finalUrl)
.addHeader("Accept", "text/event-stream, application/x-ndjson, */*")
.build();
Call call = client.newCall(request);
synchronized (functionsStreamingListeners) {
functionsStreamingListeners.put(listenerId, call);
}
call.enqueue(
new Callback() {
@Override
public void onFailure(Call call, java.io.IOException e) {
emitError(appName, listenerId, e.getMessage());
synchronized (functionsStreamingListeners) {
functionsStreamingListeners.remove(listenerId);
}
}

@Override
public void onResponse(Call call, Response response) {
try (ResponseBody body = response.body()) {
if (!response.isSuccessful()) {
emitError(
appName,
listenerId,
"http_error_" + response.code() + "_" + response.message());
return;
}
if (body == null) {
emitError(appName, listenerId, "empty_response_body");
return;
}
BufferedSource source = body.source();
while (!source.exhausted()) {
String line = source.readUtf8Line();
if (line == null) {
break;
}
String payload = line.startsWith("data: ") ? line.substring(6) : line;
WritableMap map = Arguments.createMap();
map.putString("text", payload);
emitEvent(appName, listenerId, map);
}
WritableMap done = Arguments.createMap();
done.putBoolean("done", true);
emitEvent(appName, listenerId, done);
} catch (Exception e) {
emitError(appName, listenerId, e.getMessage());
} finally {
synchronized (functionsStreamingListeners) {
functionsStreamingListeners.remove(listenerId);
}
}
}
});
});
}

private void emitEvent(String appName, int listenerId, WritableMap body) {
ReactNativeFirebaseEventEmitter.getSharedInstance()
.sendEvent(
new ReactNativeFirebaseFunctionsEvent(
ReactNativeFirebaseFunctionsEvent.FUNCTIONS_STREAMING_EVENT,
body,
appName,
listenerId));
}

private void emitError(String appName, int listenerId, String message) {
WritableMap body = Arguments.createMap();
body.putString("error", message != null ? message : "unknown_error");
emitEvent(appName, listenerId, body);
}
}
Loading
Loading