diff --git a/packages/functions/__tests__/functions.test.ts b/packages/functions/__tests__/functions.test.ts index 708204b1f7..2f5d1db5c7 100644 --- a/packages/functions/__tests__/functions.test.ts +++ b/packages/functions/__tests__/functions.test.ts @@ -6,6 +6,8 @@ import functions, { connectFunctionsEmulator, httpsCallable, httpsCallableFromUrl, + httpsCallableStream, + httpsCallableFromUrlStream, HttpsErrorCode, } from '../lib'; @@ -94,6 +96,77 @@ describe('Cloud Functions', function () { it('`HttpsErrorCode` function is properly exposed to end user', function () { expect(HttpsErrorCode).toBeDefined(); }); + + it('`httpsCallableStream` function is properly exposed to end user', function () { + expect(httpsCallableStream).toBeDefined(); + }); + + it('`httpsCallableFromUrlStream` function is properly exposed to end user', function () { + expect(httpsCallableFromUrlStream).toBeDefined(); + }); + + describe('streaming', function () { + it('httpsCallable returns object with stream method', function () { + const app = getApp(); + const functionsInstance = getFunctions(app); + const callable = httpsCallable(functionsInstance, 'test'); + + expect(callable).toBeDefined(); + expect(typeof callable).toBe('function'); + expect(callable.stream).toBeDefined(); + expect(typeof callable.stream).toBe('function'); + }); + + it('httpsCallableFromUrl returns object with stream method', function () { + const app = getApp(); + const functionsInstance = getFunctions(app); + const callable = httpsCallableFromUrl(functionsInstance, 'https://example.com/test'); + + expect(callable).toBeDefined(); + expect(typeof callable).toBe('function'); + expect(callable.stream).toBeDefined(); + expect(typeof callable.stream).toBe('function'); + }); + + it('httpsCallableStream returns a function', function () { + const app = getApp(); + const functionsInstance = getFunctions(app); + const streamStarter = httpsCallableStream(functionsInstance, 'test'); + + expect(streamStarter).toBeDefined(); + expect(typeof streamStarter).toBe('function'); + }); + + it('httpsCallableFromUrlStream returns a function', function () { + const app = getApp(); + const functionsInstance = getFunctions(app); + const streamStarter = httpsCallableFromUrlStream( + functionsInstance, + 'https://example.com/test', + ); + + expect(streamStarter).toBeDefined(); + expect(typeof streamStarter).toBe('function'); + }); + + it('namespace API httpsCallable returns object with stream method', function () { + const callable = functions().httpsCallable('test'); + + expect(callable).toBeDefined(); + expect(typeof callable).toBe('function'); + expect(callable.stream).toBeDefined(); + expect(typeof callable.stream).toBe('function'); + }); + + it('namespace API httpsCallableFromUrl returns object with stream method', function () { + const callable = functions().httpsCallableFromUrl('https://example.com/test'); + + expect(callable).toBeDefined(); + expect(typeof callable).toBe('function'); + expect(callable.stream).toBeDefined(); + expect(typeof callable.stream).toBe('function'); + }); + }); }); describe('test `console.warn` is called for RNFB v8 API & not called for v9 API', function () { @@ -123,7 +196,7 @@ describe('Cloud Functions', function () { describe('Cloud Functions', function () { it('useFunctionsEmulator()', function () { const app = getApp(); - const functions = app.functions(); + const functions = getFunctions(app); functionsRefV9Deprecation( () => connectFunctionsEmulator(functions, 'localhost', 8080), () => functions.useEmulator('localhost', 8080), @@ -133,7 +206,7 @@ describe('Cloud Functions', function () { it('httpsCallable()', function () { const app = getApp(); - const functions = app.functions(); + const functions = getFunctions(app); functionsRefV9Deprecation( () => httpsCallable(functions, 'example'), () => functions.httpsCallable('example'), @@ -143,13 +216,33 @@ describe('Cloud Functions', function () { it('httpsCallableFromUrl()', function () { const app = getApp(); - const functions = app.functions(); + const functions = getFunctions(app); functionsRefV9Deprecation( () => httpsCallableFromUrl(functions, 'https://example.com/example'), () => functions.httpsCallableFromUrl('https://example.com/example'), 'httpsCallableFromUrl', ); }); + + it('httpsCallableStream()', function () { + const app = getApp(); + const functions = getFunctions(app); + const callable = httpsCallable(functions, 'example'); + + // The stream method should be available on the callable + expect(callable.stream).toBeDefined(); + expect(typeof callable.stream).toBe('function'); + }); + + it('httpsCallableFromUrlStream()', function () { + const app = getApp(); + const functions = getFunctions(app); + const callable = httpsCallableFromUrl(functions, 'https://example.com/example'); + + // The stream method should be available on the callable + expect(callable.stream).toBeDefined(); + expect(typeof callable.stream).toBe('function'); + }); }); }); }); diff --git a/packages/functions/android/src/main/java/io/invertase/firebase/functions/ReactNativeFirebaseFunctionsEvent.java b/packages/functions/android/src/main/java/io/invertase/firebase/functions/ReactNativeFirebaseFunctionsEvent.java new file mode 100644 index 0000000000..e40335af41 --- /dev/null +++ b/packages/functions/android/src/main/java/io/invertase/firebase/functions/ReactNativeFirebaseFunctionsEvent.java @@ -0,0 +1,61 @@ +package io.invertase.firebase.functions; + +/* + * Copyright (c) 2016-present Invertase Limited & Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this library except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +import com.facebook.react.bridge.Arguments; +import com.facebook.react.bridge.WritableMap; +import io.invertase.firebase.interfaces.NativeEvent; + +public class ReactNativeFirebaseFunctionsEvent implements NativeEvent { + static final String FUNCTIONS_STREAMING_EVENT = "functions_streaming_event"; + private static final String KEY_ID = "listenerId"; + private static final String KEY_BODY = "body"; + private static final String KEY_APP_NAME = "appName"; + private static final String KEY_EVENT_NAME = "eventName"; + private String eventName; + private WritableMap eventBody; + private String appName; + private int listenerId; + + ReactNativeFirebaseFunctionsEvent( + String eventName, WritableMap eventBody, String appName, int listenerId) { + this.eventName = eventName; + this.eventBody = eventBody; + this.appName = appName; + this.listenerId = listenerId; + } + + @Override + public String getEventName() { + return eventName; + } + + @Override + public WritableMap getEventBody() { + WritableMap event = Arguments.createMap(); + event.putInt(KEY_ID, listenerId); + event.putMap(KEY_BODY, eventBody); + event.putString(KEY_APP_NAME, appName); + event.putString(KEY_EVENT_NAME, eventName); + return event; + } + + @Override + public String getFirebaseAppName() { + return appName; + } +} diff --git a/packages/functions/android/src/main/java/io/invertase/firebase/functions/UniversalFirebaseFunctionsModule.java b/packages/functions/android/src/main/java/io/invertase/firebase/functions/UniversalFirebaseFunctionsModule.java index af692704b8..2cc88cab8f 100644 --- a/packages/functions/android/src/main/java/io/invertase/firebase/functions/UniversalFirebaseFunctionsModule.java +++ b/packages/functions/android/src/main/java/io/invertase/firebase/functions/UniversalFirebaseFunctionsModule.java @@ -18,15 +18,27 @@ */ import android.content.Context; +import android.util.SparseArray; +import com.facebook.react.bridge.Arguments; import com.facebook.react.bridge.ReadableMap; +import com.facebook.react.bridge.WritableMap; import com.google.android.gms.tasks.Task; import com.google.android.gms.tasks.Tasks; import com.google.firebase.FirebaseApp; import com.google.firebase.functions.FirebaseFunctions; import com.google.firebase.functions.HttpsCallableReference; +import io.invertase.firebase.common.ReactNativeFirebaseEventEmitter; import io.invertase.firebase.common.UniversalFirebaseModule; import java.net.URL; import java.util.concurrent.TimeUnit; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import okio.BufferedSource; @SuppressWarnings("WeakerAccess") public class UniversalFirebaseFunctionsModule extends UniversalFirebaseModule { @@ -34,6 +46,7 @@ public class UniversalFirebaseFunctionsModule extends UniversalFirebaseModule { public static final String CODE_KEY = "code"; public static final String MSG_KEY = "message"; public static final String DETAILS_KEY = "details"; + private static SparseArray functionsStreamingListeners = new SparseArray<>(); UniversalFirebaseFunctionsModule(Context context, String serviceName) { super(context, serviceName); @@ -95,4 +108,144 @@ Task httpsCallableFromUrl( return Tasks.await(httpReference.call(data)).getData(); }); } + + // -------------------- Streaming Support (Android only) -------------------- + // Streaming targets HTTP (onRequest) endpoints that emit SSE/NDJSON. + + void httpsCallableStream( + String appName, + String region, + String host, + Integer port, + String name, + Object data, + ReadableMap options, + Integer listenerId) { + FirebaseApp firebaseApp = FirebaseApp.getInstance(appName); + String projectId = firebaseApp.getOptions().getProjectId(); + String url; + if (host != null) { + url = "http://" + host + ":" + port + "/" + projectId + "/" + region + "/" + name; + } else { + url = "https://" + region + "-" + projectId + ".cloudfunctions.net/" + name; + } + startHttpStream(appName, host, port, url, listenerId); + } + + void httpsCallableStreamFromUrl( + String appName, + String region, + String host, + Integer port, + String url, + Object data, + ReadableMap options, + Integer listenerId) { + startHttpStream(appName, host, port, url, listenerId); + } + + public static void cancelHttpsCallableStream(Integer listenerId) { + synchronized (functionsStreamingListeners) { + Call call = functionsStreamingListeners.get(listenerId); + if (call != null) { + try { + call.cancel(); + } catch (Exception ignore) { + } + functionsStreamingListeners.remove(listenerId); + } + } + } + + private void startHttpStream( + String appName, String host, Integer port, String url, Integer listenerId) { + getExecutor() + .execute( + () -> { + OkHttpClient client = + new OkHttpClient.Builder().retryOnConnectionFailure(true).build(); + HttpUrl parsed = HttpUrl.parse(url); + if (parsed == null) { + emitError(appName, listenerId, "invalid_url"); + return; + } + HttpUrl.Builder builder = parsed.newBuilder(); + if (host != null && port != null) { + builder.scheme("http").host(host).port(port); + } + HttpUrl finalUrl = builder.build(); + Request request = + new Request.Builder() + .url(finalUrl) + .addHeader("Accept", "text/event-stream, application/x-ndjson, */*") + .build(); + Call call = client.newCall(request); + synchronized (functionsStreamingListeners) { + functionsStreamingListeners.put(listenerId, call); + } + call.enqueue( + new Callback() { + @Override + public void onFailure(Call call, java.io.IOException e) { + emitError(appName, listenerId, e.getMessage()); + synchronized (functionsStreamingListeners) { + functionsStreamingListeners.remove(listenerId); + } + } + + @Override + public void onResponse(Call call, Response response) { + try (ResponseBody body = response.body()) { + if (!response.isSuccessful()) { + emitError( + appName, + listenerId, + "http_error_" + response.code() + "_" + response.message()); + return; + } + if (body == null) { + emitError(appName, listenerId, "empty_response_body"); + return; + } + BufferedSource source = body.source(); + while (!source.exhausted()) { + String line = source.readUtf8Line(); + if (line == null) { + break; + } + String payload = line.startsWith("data: ") ? line.substring(6) : line; + WritableMap map = Arguments.createMap(); + map.putString("text", payload); + emitEvent(appName, listenerId, map); + } + WritableMap done = Arguments.createMap(); + done.putBoolean("done", true); + emitEvent(appName, listenerId, done); + } catch (Exception e) { + emitError(appName, listenerId, e.getMessage()); + } finally { + synchronized (functionsStreamingListeners) { + functionsStreamingListeners.remove(listenerId); + } + } + } + }); + }); + } + + private void emitEvent(String appName, int listenerId, WritableMap body) { + ReactNativeFirebaseEventEmitter.getSharedInstance() + .sendEvent( + new ReactNativeFirebaseFunctionsEvent( + ReactNativeFirebaseFunctionsEvent.FUNCTIONS_STREAMING_EVENT, + body, + appName, + listenerId)); + } + + private void emitError(String appName, int listenerId, String message) { + WritableMap body = Arguments.createMap(); + body.putString("error", message != null ? message : "unknown_error"); + emitEvent(appName, listenerId, body); + } } diff --git a/packages/functions/android/src/reactnative/java/io/invertase/firebase/functions/ReactNativeFirebaseFunctionsModule.java b/packages/functions/android/src/reactnative/java/io/invertase/firebase/functions/ReactNativeFirebaseFunctionsModule.java index 66246daaa7..a393f23b4c 100644 --- a/packages/functions/android/src/reactnative/java/io/invertase/firebase/functions/ReactNativeFirebaseFunctionsModule.java +++ b/packages/functions/android/src/reactnative/java/io/invertase/firebase/functions/ReactNativeFirebaseFunctionsModule.java @@ -43,6 +43,11 @@ public class ReactNativeFirebaseFunctionsModule extends ReactNativeFirebaseModul this.module = new UniversalFirebaseFunctionsModule(reactContext, SERVICE_NAME); } + @Override + public String getName() { + return "RNFBFunctionsModule"; + } + @ReactMethod public void httpsCallable( String appName, @@ -144,4 +149,45 @@ public void httpsCallableFromUrl( promise.reject(code, message, exception, userInfo); }); } + + // -------------------- Streaming bridge (Android only) -------------------- + @ReactMethod + public void addFunctionsStreaming(String appName, String region, Integer listenerId) { + // Optional hook: no-op, streaming is started explicitly by httpsCallableStream* + // Note: appName and region are auto-prepended by the native module wrapper + } + + @ReactMethod + public void removeFunctionsStreaming(String appName, String region, Integer listenerId) { + // Note: appName and region are auto-prepended by the native module wrapper + UniversalFirebaseFunctionsModule.cancelHttpsCallableStream(listenerId); + } + + @ReactMethod + public void httpsCallableStream( + String appName, + String region, + String host, + Integer port, + String name, + ReadableMap wrapper, + ReadableMap options, + Integer listenerId) { + module.httpsCallableStream( + appName, region, host, port, name, wrapper.toHashMap().get(DATA_KEY), options, listenerId); + } + + @ReactMethod + public void httpsCallableStreamFromUrl( + String appName, + String region, + String host, + Integer port, + String url, + ReadableMap wrapper, + ReadableMap options, + Integer listenerId) { + module.httpsCallableStreamFromUrl( + appName, region, host, port, url, wrapper.toHashMap().get(DATA_KEY), options, listenerId); + } } diff --git a/packages/functions/e2e/functions.e2e.js b/packages/functions/e2e/functions.e2e.js index f2f81a1d1e..9950dcb9ae 100644 --- a/packages/functions/e2e/functions.e2e.js +++ b/packages/functions/e2e/functions.e2e.js @@ -532,6 +532,62 @@ describe('functions() modular', function () { }); }); + describe('streaming', function () { + function collectStream(callable, data, opts = {}) { + const events = []; + const stop = callable.stream(data, evt => events.push(evt), opts); + const done = new Promise((resolve, reject) => { + const started = Date.now(); + const check = () => { + const last = events[events.length - 1]; + if (last && last.error) return reject(new Error(last.error)); + if (last && last.done) return resolve(events); + if (Date.now() - started > 15000) return reject(new Error('stream timeout')); + setTimeout(check, 50); + }; + check(); + }); + return { events, done, stop }; + } + + it('httpsCallable(functions, name).stream() emits chunks and ends with done', async function () { + const { getApp } = modular; + const { getFunctions, httpsCallable, connectFunctionsEmulator } = functionsModular; + const region = 'us-central1'; + const fnName = 'helloWorldV2'; + const fns = getFunctions(getApp(), region); + connectFunctionsEmulator(fns, 'localhost', 5001); + const callable = httpsCallable(fns, fnName); + + const { done } = collectStream(callable); + const all = await done; + all.length.should.be.greaterThan(0); + const firstChunk = all.find(e => e && e.text && !e.done && !e.error); + should.exist(firstChunk); + firstChunk.text.should.containEql('Hello from Firebase!'); + all[all.length - 1].done.should.eql(true); + }); + + it('httpsCallableFromUrl(functions, url).stream() emits chunks and ends with done', async function () { + const { getApp } = modular; + const { getFunctions, httpsCallableFromUrl } = functionsModular; + let hostname = 'localhost'; + if (Platform.android) { + hostname = '10.0.2.2'; + } + const fns = getFunctions(getApp()); + const url = `http://${hostname}:5001/react-native-firebase-testing/us-central1/helloWorldV2`; + const callableFromUrl = httpsCallableFromUrl(fns, url); + const { done } = collectStream(callableFromUrl); + const all = await done; + all.length.should.be.greaterThan(0); + const firstChunk = all.find(e => e && e.text && !e.done && !e.error); + should.exist(firstChunk); + firstChunk.text.should.containEql('Hello from Firebase!'); + all[all.length - 1].done.should.eql(true); + }); + }); + describe('httpsCallable(fnName)(args)', function () { it('accepts primitive args: undefined', async function () { const { getApp } = modular; @@ -773,6 +829,66 @@ describe('functions() modular', function () { return Promise.resolve(); } }); + + describe('functions() streaming', function () { + // Helper to collect events until done or error + function collectStream(callable, data, opts = {}) { + const events = []; + const stop = callable.stream(data, evt => events.push(evt), opts); + const done = new Promise((resolve, reject) => { + const started = Date.now(); + const check = () => { + const last = events[events.length - 1]; + if (last && last.error) return reject(new Error(last.error)); + if (last && last.done) return resolve(events); + if (Date.now() - started > 15000) return reject(new Error('stream timeout')); + setTimeout(check, 50); + }; + check(); + }); + return { events, done, stop }; + } + + it('httpsCallable(name).stream() emits chunks and ends with done', async function () { + const { getApp } = modular; + const { getFunctions, httpsCallable, connectFunctionsEmulator } = functionsModular; + + const region = 'us-central1'; + const fnName = 'helloWorldV2'; + const fns = getFunctions(getApp(), region); + connectFunctionsEmulator(fns, 'localhost', 5001); + const callable = httpsCallable(fns, fnName); + + const { done } = collectStream(callable); + const all = await done; + all.length.should.be.greaterThan(0); + const firstChunk = all.find(e => e && e.text && !e.done && !e.error); + should.exist(firstChunk); + firstChunk.text.should.containEql('Hello from Firebase!'); + all[all.length - 1].done.should.eql(true); + }); + + it('httpsCallableFromUrl(url).stream() emits chunks and ends with done', async function () { + const { getApp } = modular; + const { getFunctions, httpsCallableFromUrl } = functionsModular; + + let hostname = 'localhost'; + if (Platform.android) { + hostname = '10.0.2.2'; + } + const url = `http://${hostname}:5001/react-native-firebase-testing/us-central1/helloWorldV2`; + const fns = getFunctions(getApp()); + const callableFromUrl = httpsCallableFromUrl(fns, url); + + const { done } = collectStream(callableFromUrl); + const all = await done; + all.length.should.be.greaterThan(0); + const firstChunk = all.find(e => e && e.text && !e.done && !e.error); + should.exist(firstChunk); + firstChunk.text.should.containEql('Hello from Firebase!'); + all[all.length - 1].done.should.eql(true); + }); + }); }); }); }); diff --git a/packages/functions/ios/RNFBFunctions/RNFBFunctionsModule.m b/packages/functions/ios/RNFBFunctions/RNFBFunctionsModule.m index 1f4be2b3dc..7147c1d3aa 100644 --- a/packages/functions/ios/RNFBFunctions/RNFBFunctionsModule.m +++ b/packages/functions/ios/RNFBFunctions/RNFBFunctionsModule.m @@ -17,6 +17,7 @@ #import #import +#import #import "RNFBApp/RNFBSharedUtils.h" #import "RNFBFunctionsModule.h" @@ -26,6 +27,8 @@ @implementation RNFBFunctionsModule #pragma mark Module Setup RCT_EXPORT_MODULE(); +static __strong NSMutableDictionary *httpsCallableStreamListeners; +static NSString *const RNFB_FUNCTIONS_STREAMING_EVENT = @"functions_streaming_event"; #pragma mark - #pragma mark Firebase Functions Methods @@ -90,6 +93,269 @@ @implementation RNFBFunctionsModule }]; } +/** + * Start a streaming HTTP request to an onRequest endpoint using a function name. + * Emits 'functions_streaming_event' events with { listenerId, body, appName, eventName }. + * Signature mirrors Android/JS: + * (appName, regionOrDomain, host, port, name, wrapper, options, listenerId) + */ +RCT_EXPORT_METHOD(httpsCallableStream + : (FIRApp *)firebaseApp customUrlOrRegion + : (NSString *)customUrlOrRegion host + : (NSString *)host port + : (NSNumber *_Nonnull)port name + : (NSString *)name wrapper + : (__unused NSDictionary *)wrapper options + : (__unused NSDictionary *)options listenerId + : (NSNumber *_Nonnull)listenerId) { + if (!httpsCallableStreamListeners) { + httpsCallableStreamListeners = [NSMutableDictionary dictionary]; + } + // Build target URL similar to Android: + // - Emulator: http://host:port/{projectId}/{region}/{name} + // - Prod: https://{region}-{projectId}.cloudfunctions.net/{name} + NSString *projectId = firebaseApp.options.projectID ?: @""; + NSString *urlString; + if (host != nil && port != nil) { + urlString = [NSString + stringWithFormat:@"http://%@:%@/%@/%@/%@", host, port, projectId, customUrlOrRegion, name]; + } else { + urlString = [NSString stringWithFormat:@"https://%@-%@.cloudfunctions.net/%@", + customUrlOrRegion, projectId, name]; + } + NSURLComponents *components = [NSURLComponents componentsWithString:urlString]; + if (components == nil) { + NSMutableDictionary *body = [@{@"error" : @"invalid_url"} mutableCopy]; + [RNFBSharedUtils sendJSEventForApp:firebaseApp + name:RNFB_FUNCTIONS_STREAMING_EVENT + body:@{ + @"listenerId" : listenerId, + @"eventName" : RNFB_FUNCTIONS_STREAMING_EVENT, + @"body" : body + }]; + return; + } + // Override to emulator if provided (ensures scheme/host/port are correct) + if (host != nil && port != nil) { + components.scheme = @"http"; + components.host = host; + components.port = port; + } + NSMutableURLRequest *request = [NSMutableURLRequest requestWithURL:[components URL]]; + [request setHTTPMethod:@"GET"]; + [request setValue:@"text/event-stream, application/x-ndjson, */*" forHTTPHeaderField:@"Accept"]; + + NSURLSessionConfiguration *config = [NSURLSessionConfiguration defaultSessionConfiguration]; + config.requestCachePolicy = NSURLRequestReloadIgnoringLocalCacheData; + config.URLCache = nil; + NSURLSession *session = [NSURLSession sessionWithConfiguration:config]; + + NSURLSessionDataTask *task = [session + dataTaskWithRequest:request + completionHandler:^(NSData *data, NSURLResponse *response, NSError *error) { + if (error) { + NSMutableDictionary *body = + [@{@"error" : error.localizedDescription ?: @"error"} mutableCopy]; + [RNFBSharedUtils sendJSEventForApp:firebaseApp + name:RNFB_FUNCTIONS_STREAMING_EVENT + body:@{ + @"listenerId" : listenerId, + @"eventName" : RNFB_FUNCTIONS_STREAMING_EVENT, + @"body" : body + }]; + } else if ([response isKindOfClass:[NSHTTPURLResponse class]] && + [(NSHTTPURLResponse *)response statusCode] >= 400) { + NSHTTPURLResponse *http = (NSHTTPURLResponse *)response; + NSString *msg = [NSString + stringWithFormat:@"http_error_%ld_%@", (long)http.statusCode, + [NSHTTPURLResponse localizedStringForStatusCode:http.statusCode]]; + NSMutableDictionary *body = [@{@"error" : msg} mutableCopy]; + [RNFBSharedUtils sendJSEventForApp:firebaseApp + name:RNFB_FUNCTIONS_STREAMING_EVENT + body:@{ + @"listenerId" : listenerId, + @"eventName" : RNFB_FUNCTIONS_STREAMING_EVENT, + @"body" : body + }]; + } else if (data.length > 0) { + NSString *payload = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding]; + // Split into lines (handles SSE or NDJSON) + [payload enumerateLinesUsingBlock:^(NSString *line, BOOL *stop) { + if (line.length == 0) return; + NSString *trimmed = [line hasPrefix:@"data: "] ? [line substringFromIndex:6] : line; + NSMutableDictionary *body = [@{@"text" : trimmed} mutableCopy]; + [RNFBSharedUtils sendJSEventForApp:firebaseApp + name:RNFB_FUNCTIONS_STREAMING_EVENT + body:@{ + @"listenerId" : listenerId, + @"eventName" : RNFB_FUNCTIONS_STREAMING_EVENT, + @"body" : body + }]; + }]; + } + // Always emit done at end + [RNFBSharedUtils sendJSEventForApp:firebaseApp + name:RNFB_FUNCTIONS_STREAMING_EVENT + body:@{ + @"listenerId" : listenerId, + @"eventName" : RNFB_FUNCTIONS_STREAMING_EVENT, + @"body" : @{@"done" : @YES} + }]; + @synchronized(httpsCallableStreamListeners) { + [httpsCallableStreamListeners removeObjectForKey:listenerId]; + } + }]; + + @synchronized(httpsCallableStreamListeners) { + httpsCallableStreamListeners[listenerId] = task; + } + [task resume]; +} + +/** + * Start a streaming HTTP request to an onRequest endpoint using a URL. + * Emits 'functions_streaming_event' events with { listenerId, body, appName, eventName }. + * Signature mirrors Android/JS: + * (appName, regionOrDomain, host, port, url, wrapper, options, listenerId) + */ +RCT_EXPORT_METHOD(httpsCallableStreamFromUrl + : (FIRApp *)firebaseApp customUrlOrRegion + : (NSString *)customUrlOrRegion host + : (NSString *)host port + : (NSNumber *_Nonnull)port url + : (NSString *)url wrapper + : (__unused NSDictionary *)wrapper options + : (__unused NSDictionary *)options listenerId + : (NSNumber *_Nonnull)listenerId) { + if (!httpsCallableStreamListeners) { + httpsCallableStreamListeners = [NSMutableDictionary dictionary]; + } + + // Use the provided URL directly + NSURLComponents *components = [NSURLComponents componentsWithString:url]; + if (components == nil) { + NSMutableDictionary *body = [@{@"error" : @"invalid_url"} mutableCopy]; + [RNFBSharedUtils sendJSEventForApp:firebaseApp + name:RNFB_FUNCTIONS_STREAMING_EVENT + body:@{ + @"listenerId" : listenerId, + @"eventName" : RNFB_FUNCTIONS_STREAMING_EVENT, + @"body" : body + }]; + return; + } + + // Override to emulator if provided + if (host != nil && port != nil) { + components.scheme = @"http"; + components.host = host; + components.port = port; + } + + NSMutableURLRequest *request = [NSMutableURLRequest requestWithURL:[components URL]]; + [request setHTTPMethod:@"GET"]; + [request setValue:@"text/event-stream, application/x-ndjson, */*" forHTTPHeaderField:@"Accept"]; + + NSURLSessionConfiguration *config = [NSURLSessionConfiguration defaultSessionConfiguration]; + config.requestCachePolicy = NSURLRequestReloadIgnoringLocalCacheData; + config.URLCache = nil; + NSURLSession *session = [NSURLSession sessionWithConfiguration:config]; + + NSURLSessionDataTask *task = [session + dataTaskWithRequest:request + completionHandler:^(NSData *data, NSURLResponse *response, NSError *error) { + if (error) { + NSMutableDictionary *body = + [@{@"error" : error.localizedDescription ?: @"error"} mutableCopy]; + [RNFBSharedUtils sendJSEventForApp:firebaseApp + name:RNFB_FUNCTIONS_STREAMING_EVENT + body:@{ + @"listenerId" : listenerId, + @"eventName" : RNFB_FUNCTIONS_STREAMING_EVENT, + @"body" : body + }]; + } else if ([response isKindOfClass:[NSHTTPURLResponse class]] && + [(NSHTTPURLResponse *)response statusCode] >= 400) { + NSHTTPURLResponse *http = (NSHTTPURLResponse *)response; + NSString *msg = [NSString + stringWithFormat:@"http_error_%ld_%@", (long)http.statusCode, + [NSHTTPURLResponse localizedStringForStatusCode:http.statusCode]]; + NSMutableDictionary *body = [@{@"error" : msg} mutableCopy]; + [RNFBSharedUtils sendJSEventForApp:firebaseApp + name:RNFB_FUNCTIONS_STREAMING_EVENT + body:@{ + @"listenerId" : listenerId, + @"eventName" : RNFB_FUNCTIONS_STREAMING_EVENT, + @"body" : body + }]; + } else if (data.length > 0) { + NSString *payload = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding]; + // Split into lines (handles SSE or NDJSON) + [payload enumerateLinesUsingBlock:^(NSString *line, BOOL *stop) { + if (line.length == 0) return; + NSString *trimmed = [line hasPrefix:@"data: "] ? [line substringFromIndex:6] : line; + NSMutableDictionary *body = [@{@"text" : trimmed} mutableCopy]; + [RNFBSharedUtils sendJSEventForApp:firebaseApp + name:RNFB_FUNCTIONS_STREAMING_EVENT + body:@{ + @"listenerId" : listenerId, + @"eventName" : RNFB_FUNCTIONS_STREAMING_EVENT, + @"body" : body + }]; + }]; + } + // Always emit done at end + [RNFBSharedUtils sendJSEventForApp:firebaseApp + name:RNFB_FUNCTIONS_STREAMING_EVENT + body:@{ + @"listenerId" : listenerId, + @"eventName" : RNFB_FUNCTIONS_STREAMING_EVENT, + @"body" : @{@"done" : @YES} + }]; + @synchronized(httpsCallableStreamListeners) { + [httpsCallableStreamListeners removeObjectForKey:listenerId]; + } + }]; + + @synchronized(httpsCallableStreamListeners) { + httpsCallableStreamListeners[listenerId] = task; + } + [task resume]; +} + +/** + * Optional add hook; kept for API symmetry. + * Note: firebaseApp and customUrlOrRegion are auto-prepended by the native module wrapper. + */ +RCT_EXPORT_METHOD(addFunctionsStreaming + : (FIRApp *)firebaseApp customUrlOrRegion + : (NSString *)customUrlOrRegion listenerId + : (NSNumber *_Nonnull)listenerId) { + if (!httpsCallableStreamListeners) { + httpsCallableStreamListeners = [NSMutableDictionary dictionary]; + } +} + +/** + * Cancel and remove an active stream. + * Note: firebaseApp and customUrlOrRegion are auto-prepended by the native module wrapper. + */ +RCT_EXPORT_METHOD(removeFunctionsStreaming + : (FIRApp *)firebaseApp customUrlOrRegion + : (NSString *)customUrlOrRegion listenerId + : (NSNumber *_Nonnull)listenerId) { + if (!httpsCallableStreamListeners) { + return; + } + @synchronized(httpsCallableStreamListeners) { + NSURLSessionDataTask *task = httpsCallableStreamListeners[listenerId]; + if (task != nil) { + [task cancel]; + } + [httpsCallableStreamListeners removeObjectForKey:listenerId]; + } +} + RCT_EXPORT_METHOD(httpsCallableFromUrl : (FIRApp *)firebaseApp customUrlOrRegion : (NSString *)customUrlOrRegion host diff --git a/packages/functions/lib/index.d.ts b/packages/functions/lib/index.d.ts index cfa7323fba..caab7700a1 100644 --- a/packages/functions/lib/index.d.ts +++ b/packages/functions/lib/index.d.ts @@ -121,6 +121,28 @@ export namespace FirebaseFunctionsTypes { readonly data: ResponseData; } + /** + * An event emitted during streaming from a callable function. + */ + export interface HttpsCallableStreamEvent { + /** + * Text chunk received from the stream + */ + text?: string; + /** + * Error message if the stream encountered an error + */ + error?: string; + /** + * Whether the stream has completed + */ + done?: boolean; + /** + * Any additional data in the event + */ + [key: string]: any; + } + /** * An HttpsCallable is a reference to a "callable" http trigger in * Google Cloud Functions. @@ -139,9 +161,41 @@ export namespace FirebaseFunctionsTypes { * console.error(e); * } * ``` + * + * #### Streaming Example + * + * ```js + * const reference = firebase.functions().httpsCallable('streamingFunction'); + * const unsubscribe = reference.stream({ input: 'data' }, (event) => { + * if (event.error) { + * console.error('Stream error:', event.error); + * } else if (event.done) { + * console.log('Stream completed'); + * } else if (event.text) { + * console.log('Stream chunk:', event.text); + * } + * }); + * + * // Later, to stop the stream: + * unsubscribe(); + * ``` */ export interface HttpsCallable { (data?: RequestData | null): Promise>; + + /** + * Start a streaming request to the callable function. + * + * @param data The data to send to the function + * @param onEvent Callback function that receives streaming events + * @param options Optional HttpsCallableOptions for the streaming request + * @returns A function that when called, stops the stream + */ + stream( + data?: RequestData | null, + onEvent?: (event: HttpsCallableStreamEvent) => void, + options?: HttpsCallableOptions, + ): () => void; } /** diff --git a/packages/functions/lib/index.js b/packages/functions/lib/index.js index cc038403c0..f76902450a 100644 --- a/packages/functions/lib/index.js +++ b/packages/functions/lib/index.js @@ -73,15 +73,25 @@ const statics = { HttpsErrorCode, }; +const nativeEvents = ['functions_streaming_event']; + class FirebaseFunctionsModule extends FirebaseModule { constructor(...args) { super(...args); this._customUrlOrRegion = this._customUrlOrRegion || 'us-central1'; this._useFunctionsEmulatorHost = null; this._useFunctionsEmulatorPort = -1; + this._id_functions_streaming_event = 0; + + this.emitter.addListener(this.eventNameForApp('functions_streaming_event'), event => { + this.emitter.emit( + this.eventNameForApp(`functions_streaming_event:${event.listenerId}`), + event, + ); + }); } - httpsCallable(name, options = {}) { + httpsCallable(name, options = {}, _deprecationArg) { if (options.timeout) { if (isNumber(options.timeout)) { options.timeout = options.timeout / 1000; @@ -90,7 +100,8 @@ class FirebaseFunctionsModule extends FirebaseModule { } } - return data => { + // Create the main callable function + const callableFunction = data => { const nativePromise = this.native.httpsCallable( this._useFunctionsEmulatorHost, this._useFunctionsEmulatorPort, @@ -112,9 +123,53 @@ class FirebaseFunctionsModule extends FirebaseModule { ); }); }; + + // Add a streaming helper (callback-based) + // Usage: const stop = functions().httpsCallable('fn').stream(data, (evt) => {...}, options) + callableFunction.stream = (data, onEvent, options = {}) => { + if (options.timeout) { + if (isNumber(options.timeout)) { + options.timeout = options.timeout / 1000; + } else { + throw new Error('HttpsCallableOptions.timeout expected a Number in milliseconds'); + } + } + const listenerId = this._id_functions_streaming_event++; + const eventName = this.eventNameForApp(`functions_streaming_event:${listenerId}`); + const subscription = this.emitter.addListener(eventName, event => { + const body = event.body; + if (onEvent) { + onEvent(body); + } + if (body && (body.done || body.error)) { + subscription.remove(); + if (this.native.removeFunctionsStreaming) { + this.native.removeFunctionsStreaming(listenerId); + } + } + }); + // Start native streaming on both platforms. + // Note: appName and customUrlOrRegion are automatically prepended by the native module wrapper + this.native.httpsCallableStream( + this._useFunctionsEmulatorHost || null, + this._useFunctionsEmulatorPort || -1, + name, + { data }, + options, + listenerId, + ); + return () => { + subscription.remove(); + if (this.native.removeFunctionsStreaming) { + this.native.removeFunctionsStreaming(listenerId); + } + }; + }; + + return callableFunction; } - httpsCallableFromUrl(url, options = {}) { + httpsCallableFromUrl(url, options = {}, _deprecationArg) { if (options.timeout) { if (isNumber(options.timeout)) { options.timeout = options.timeout / 1000; @@ -123,7 +178,8 @@ class FirebaseFunctionsModule extends FirebaseModule { } } - return data => { + const callableFunction = data => { + // Note: appName and customUrlOrRegion are automatically prepended by the native module wrapper const nativePromise = this.native.httpsCallableFromUrl( this._useFunctionsEmulatorHost, this._useFunctionsEmulatorPort, @@ -145,6 +201,48 @@ class FirebaseFunctionsModule extends FirebaseModule { ); }); }; + + // Add streaming support for URL-based callable + callableFunction.stream = (data, onEvent, streamOptions = {}) => { + if (streamOptions.timeout) { + if (isNumber(streamOptions.timeout)) { + streamOptions.timeout = streamOptions.timeout / 1000; + } else { + throw new Error('HttpsCallableOptions.timeout expected a Number in milliseconds'); + } + } + const listenerId = this._id_functions_streaming_event++; + const eventName = this.eventNameForApp(`functions_streaming_event:${listenerId}`); + const subscription = this.emitter.addListener(eventName, event => { + const body = event.body; + if (onEvent) { + onEvent(body); + } + if (body && (body.done || body.error)) { + subscription.remove(); + if (this.native.removeFunctionsStreaming) { + this.native.removeFunctionsStreaming(listenerId); + } + } + }); + // Note: appName and customUrlOrRegion are automatically prepended by the native module wrapper + this.native.httpsCallableStreamFromUrl( + this._useFunctionsEmulatorHost || null, + this._useFunctionsEmulatorPort || -1, + url, + { data }, + streamOptions, + listenerId, + ); + return () => { + subscription.remove(); + if (this.native.removeFunctionsStreaming) { + this.native.removeFunctionsStreaming(listenerId); + } + }; + }; + + return callableFunction; } useFunctionsEmulator(origin) { @@ -196,7 +294,7 @@ export default createModuleNamespace({ version, namespace, nativeModuleName, - nativeEvents: false, + nativeEvents, hasMultiAppSupport: true, hasCustomUrlOrRegionSupport: true, ModuleClass: FirebaseFunctionsModule, diff --git a/packages/functions/lib/modular/index.d.ts b/packages/functions/lib/modular/index.d.ts index cee948f95f..349d20ada0 100644 --- a/packages/functions/lib/modular/index.d.ts +++ b/packages/functions/lib/modular/index.d.ts @@ -5,6 +5,7 @@ import FirebaseApp = ReactNativeFirebase.FirebaseApp; import Functions = FirebaseFunctionsTypes.Module; import HttpsCallable = FirebaseFunctionsTypes.HttpsCallable; import HttpsCallableOptions = FirebaseFunctionsTypes.HttpsCallableOptions; +import HttpsCallableStreamEvent = FirebaseFunctionsTypes.HttpsCallableStreamEvent; import HttpsErrorCodeType = FirebaseFunctionsTypes.HttpsErrorCode; export const HttpsErrorCode: HttpsErrorCodeType; @@ -55,3 +56,85 @@ export declare function httpsCallableFromUrl; + +/** + * Convenience helper to start a streaming callable by name from modular API. + * Returns a function that when called with data and an event callback, starts the stream and returns an unsubscribe function. + * + * #### Example + * + * ```js + * import { getFunctions, httpsCallableStream } from '@react-native-firebase/functions/lib/modular'; + * + * const functions = getFunctions(); + * const startStream = httpsCallableStream(functions, 'myStreamingFunction'); + * + * const unsubscribe = startStream({ input: 'data' }, (event) => { + * if (event.error) { + * console.error('Error:', event.error); + * } else if (event.done) { + * console.log('Stream complete'); + * } else if (event.text) { + * console.log('Received:', event.text); + * } + * }); + * + * // Stop the stream + * unsubscribe(); + * ``` + * + * @param {Functions} functionsInstance A functions instance. + * @param {string} name The name of the trigger. + * @param {HttpsCallableOptions | undefined} options Options for execution. + * @returns A function that starts the stream and returns an unsubscribe function + */ +export declare function httpsCallableStream( + functionsInstance: Functions, + name: string, + options?: HttpsCallableOptions, +): ( + data?: RequestData | null, + onEvent?: (event: HttpsCallableStreamEvent) => void, + streamOptions?: HttpsCallableOptions, +) => () => void; + +/** + * Convenience helper to start a streaming callable by URL from modular API. + * Returns a function that when called with data and an event callback, starts the stream and returns an unsubscribe function. + * + * #### Example + * + * ```js + * import { getFunctions, httpsCallableFromUrlStream } from '@react-native-firebase/functions/lib/modular'; + * + * const functions = getFunctions(); + * const startStream = httpsCallableFromUrlStream(functions, 'https://mydomain.com/myFunction'); + * + * const unsubscribe = startStream({ input: 'data' }, (event) => { + * if (event.error) { + * console.error('Error:', event.error); + * } else if (event.done) { + * console.log('Stream complete'); + * } else if (event.text) { + * console.log('Received:', event.text); + * } + * }); + * + * // Stop the stream + * unsubscribe(); + * ``` + * + * @param {Functions} functionsInstance A functions instance. + * @param {string} url The URL of the trigger. + * @param {HttpsCallableOptions | undefined} options Options for execution. + * @returns A function that starts the stream and returns an unsubscribe function + */ +export declare function httpsCallableFromUrlStream( + functionsInstance: Functions, + url: string, + options?: HttpsCallableOptions, +): ( + data?: RequestData | null, + onEvent?: (event: HttpsCallableStreamEvent) => void, + streamOptions?: HttpsCallableOptions, +) => () => void; diff --git a/packages/functions/lib/modular/index.js b/packages/functions/lib/modular/index.js index 96a076a28b..5ab76b2c71 100644 --- a/packages/functions/lib/modular/index.js +++ b/packages/functions/lib/modular/index.js @@ -83,3 +83,27 @@ export function httpsCallableFromUrl(functionsInstance, url, options) { } export { HttpsErrorCode } from '../index'; + +/** + * Convenience helper to start a streaming callable by name from modular API. + * @param {Functions} functionsInstance A functions instance. + * @param {string} name The name of the trigger. + * @param {HttpsCallableOptions | undefined} options Options for execution. + * @returns {(data: any, onEvent: (evt: any) => void, streamOptions?: HttpsCallableOptions) => () => void} + */ +export function httpsCallableStream(functionsInstance, name, options) { + const callable = httpsCallable(functionsInstance, name, options); + return (data, onEvent, streamOptions) => callable.stream(data, onEvent, streamOptions); +} + +/** + * Convenience helper to start a streaming callable by URL from modular API. + * @param {Functions} functionsInstance A functions instance. + * @param {string} url The URL of the trigger. + * @param {HttpsCallableOptions | undefined} options Options for execution. + * @returns {(data: any, onEvent: (evt: any) => void, streamOptions?: HttpsCallableOptions) => () => void} + */ +export function httpsCallableFromUrlStream(functionsInstance, url, options) { + const callable = httpsCallableFromUrl(functionsInstance, url, options); + return (data, onEvent, streamOptions) => callable.stream(data, onEvent, streamOptions); +} diff --git a/packages/functions/lib/web/RNFBFunctionsModule.js b/packages/functions/lib/web/RNFBFunctionsModule.js index cfb29c2281..70c94a31fd 100644 --- a/packages/functions/lib/web/RNFBFunctionsModule.js +++ b/packages/functions/lib/web/RNFBFunctionsModule.js @@ -5,6 +5,19 @@ import { httpsCallableFromURL, connectFunctionsEmulator, } from '@react-native-firebase/app/lib/internal/web/firebaseFunctions'; +import RNFBAppModule from '@react-native-firebase/app/lib/internal/web/RNFBAppModule'; + +const FUNCTIONS_STREAMING_EVENT = 'functions_streaming_event'; +const STREAM_CONTROLLERS = {}; + +function emitStreamingEvent(appName, listenerId, body) { + RNFBAppModule.eventsPing(FUNCTIONS_STREAMING_EVENT, { + listenerId, + body, + appName, + eventName: FUNCTIONS_STREAMING_EVENT, + }); +} /** * This is a 'NativeModule' for the web platform. @@ -123,4 +136,138 @@ export default { return Promise.reject(nativeError); } }, + + /** + * Start a streaming HTTP request to an onRequest endpoint using a function name. + * Mirrors the native streaming implementation, but uses fetch on web. + * + * Signature: + * (appName, regionOrCustomDomain, host, port, name, listenerId) + */ + async httpsCallableStream(appName, regionOrCustomDomain, host, port, name, listenerId) { + const fetchImpl = typeof fetch === 'function' ? fetch : null; + if (!fetchImpl) { + emitStreamingEvent(appName, listenerId, { error: 'fetch_not_available' }); + emitStreamingEvent(appName, listenerId, { done: true }); + return; + } + + const supportsAbort = typeof AbortController === 'function'; + const controller = supportsAbort ? new AbortController() : null; + if (controller) { + STREAM_CONTROLLERS[listenerId] = controller; + } + + try { + const app = getApp(appName); + const appOptions = app.options || {}; + const projectId = appOptions.projectId || appOptions.projectID || ''; + + let targetUrl; + const region = regionOrCustomDomain || 'us-central1'; + + if (host && port != null && port !== -1) { + // Emulator: http://host:port/{projectId}/{region}/{name} + targetUrl = `http://${host}:${port}/${projectId}/${region}/${name}`; + } else if (regionOrCustomDomain && regionOrCustomDomain.startsWith('http')) { + // Custom domain: https://example.com/{name} + const base = regionOrCustomDomain.replace(/\/+$/, ''); + targetUrl = `${base}/${name}`; + } else { + // Prod: https://{region}-{projectId}.cloudfunctions.net/{name} + targetUrl = `https://${region}-${projectId}.cloudfunctions.net/${name}`; + } + + const response = await fetchImpl(targetUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream, application/x-ndjson, */*', + }, + signal: controller ? controller.signal : undefined, + }); + + if (!response.ok) { + const msg = `http_error_${response.status}_${response.statusText || 'error'}`; + emitStreamingEvent(appName, listenerId, { error: msg }); + } else { + const payload = await response.text(); + const lines = payload.split(/\r?\n/); + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + if (!line || !line.trim()) continue; + const trimmed = line.startsWith('data: ') ? line.slice(6) : line; + emitStreamingEvent(appName, listenerId, { text: trimmed }); + } + } + } catch (error) { + if (!(supportsAbort && error && error.name === 'AbortError')) { + emitStreamingEvent(appName, listenerId, { + error: error && error.message ? error.message : String(error), + }); + } + } finally { + emitStreamingEvent(appName, listenerId, { done: true }); + if (controller && STREAM_CONTROLLERS[listenerId] === controller) { + delete STREAM_CONTROLLERS[listenerId]; + } + } + }, + + /** + * Start a streaming HTTP request to an onRequest endpoint using a URL. + * + * Signature: + * (appName, url, listenerId) + */ + async httpsCallableStreamFromUrl(appName, url, listenerId) { + const fetchImpl = typeof fetch === 'function' ? fetch : null; + if (!fetchImpl) { + emitStreamingEvent(appName, listenerId, { error: 'fetch_not_available' }); + emitStreamingEvent(appName, listenerId, { done: true }); + return; + } + + const supportsAbort = typeof AbortController === 'function'; + const controller = supportsAbort ? new AbortController() : null; + if (controller) { + STREAM_CONTROLLERS[listenerId] = controller; + } + + try { + // For web we use the provided URL directly. If host/port are provided they + // have already been baked into the URL by caller (e.g. emulator). + const response = await fetchImpl(url, { + method: 'GET', + headers: { + Accept: 'text/event-stream, application/x-ndjson, */*', + }, + signal: controller ? controller.signal : undefined, + }); + + if (!response.ok) { + const msg = `http_error_${response.status}_${response.statusText || 'error'}`; + emitStreamingEvent(appName, listenerId, { error: msg }); + } else { + const payload = await response.text(); + const lines = payload.split(/\r?\n/); + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + if (!line || !line.trim()) continue; + const trimmed = line.startsWith('data: ') ? line.slice(6) : line; + emitStreamingEvent(appName, listenerId, { text: trimmed }); + } + } + } catch (error) { + if (!(supportsAbort && error && error.name === 'AbortError')) { + emitStreamingEvent(appName, listenerId, { + error: error && error.message ? error.message : String(error), + }); + } + } finally { + emitStreamingEvent(appName, listenerId, { done: true }); + if (controller && STREAM_CONTROLLERS[listenerId] === controller) { + delete STREAM_CONTROLLERS[listenerId]; + } + } + }, };