Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ReadableStreams): Support for ReadableStreams e.g. from(readableStream) #6163

Merged
merged 5 commits into from
Mar 25, 2021

Conversation

jayphelps
Copy link
Member

Description:
Support ReadableStream and other similar objects by checking for getReader() existence. In theory this isn't foolproof, but in practice I can't see this ever actually causing issues in practice. instanceof checks don't work with cross-Realm instances and neither ReadableStream nor ReadableStreamDefaultReader implement Symbol.asyncIterator yet (which is why this custom support is needed)

Related issue (if exists):
Closes #6006

| PromiseLike<T>
| ArrayLike<T>
| Iterable<T>
| ReadableStreamLike<T>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to change this and the other places to have a consistent order, if there's a preference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the order of this should match the runtime order.

import { isFunction } from './isFunction';

export interface ReadableStreamLike<T> {
getReader(): ReadableStreamDefaultReader<T>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give this a proper review later, but I don't think we can rely on - or explicitly reference using a /// comment - this type: ReadableStreamDefaultReader<T>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah great catch. I'll stub it out too since it's easy.

Copy link
Member Author

@jayphelps jayphelps Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. a7e9b56

getReader(): ReadableStreamDefaultReaderLike<T>;
}

export async function* readableStreamLikeToAsyncGenerator<T>(readableStream: ReadableStreamLike<T>): AsyncGenerator<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking... that while this is definitely the coolest way. The lame way might be more efficient, something like:

export function readableStreamLikeToAsyncGenerator(readableStream: ReadableStreamLike<T>): AsyncGenerator<T> {
  const reader = readableStream.getReader();
  
  return {
     [Symbol.asyncIterator]() { return this; },
     async next() { 
       return reader.read();
     },
     async throw(err) {
       reader.releaseLock();
       throw err;
     },
     async return() {
       reader.releaseLock();
       return { done: true };
     }
  }
}

But maybe I'm totally wrong. Just something I was thinking about. I guess if there was an error during the execution of next above, the lock wouldn't get released unless you had a try/catch in there, so maybe this isn't the best idea.

@benlesh
Copy link
Member

benlesh commented Mar 22, 2021

General approval from me. Pending @cartant's approval.

Copy link
Collaborator

@cartant cartant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cartant
Copy link
Collaborator

cartant commented Mar 22, 2021

I had a look at adding a dtslint test for a ReadableStreamLike source to spec-dtslint/observables/from-spec.ts and found some things we might want to address in this PR.

IMO, we should export ReadableStreamLike - primarily because it's unused in the union type ObservableInput which is exported.

In the dtslint test I had to import it like this:

import { from, of, animationFrameScheduler } from 'rxjs';
import { ReadableStream } from 'web-streams-polyfill';
import { ReadableStreamLike } from '../../src/internal/util/isReadableStreamLike';

In the test, ReadableStream seemed to match the AsyncIterable type in the ObservableInput and the inference wasn't what I expected:

it('should accept a ReadableStream', () => {
  const stream = new ReadableStream<string>({
    pull(controller) {
      controller.enqueue('x');
      controller.close();
    },
  });
  // Nope, this expectation fails 'cause Observable<any> is inferred
  const o = from(stream); // $ExpectType Observable<string>
});

However, trying to assign it to an explicitly-typed ReadableStreamLike variable - to test the inference using that type - like this:

it('should accept a ReadableStream', () => {
  const stream: ReadableStreamLike<string> = new ReadableStream<string>({
    pull(controller) {
      controller.enqueue('x');
      controller.close();
    },
  });
  const o = from(stream); // $ExpectType Observable<string>
});

compilation fails with this TypeScript error:

Type 'ReadableStream<string>' is not assignable to type 'ReadableStreamLike<string>'.
  Types of property 'getReader' are incompatible.
    Type '{ ({ mode }: { mode: "byob"; }): ReadableStreamBYOBReader; (): ReadableStreamDefaultReader<string>; }' is not assignable to type '() => ReadableStreamDefaultReaderLike<string>'.ts(2322)

TypeScript is such a party pooper.

@benlesh benlesh requested a review from cartant March 23, 2021 21:41
@benlesh
Copy link
Member

benlesh commented Mar 23, 2021

@cartant @jayphelps I've added a couple of commits (it should have been one, but I rushed the first one and forgot some important bits):

  1. Export the ReadableStreamLike type with some documentation around what it is.
  2. Add a dtslint test around it.
  3. Make the ReadableStreamLike type not have compilation errors with the default ReadableStream impl's types (They're apparently more exact with how they define the iterator results for that, and it's a little annoying it doesn't pass, but whatever)

Copy link
Collaborator

@cartant cartant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noice.

@benlesh benlesh merged commit 19d6502 into ReactiveX:master Mar 25, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feat: Support ReadableStream as an ObservableInput
3 participants