Skip to content

Commit

Permalink
optimizing ndjson file
Browse files Browse the repository at this point in the history
  • Loading branch information
james-prysm committed Nov 17, 2023
1 parent be10e83 commit bfe2fa7
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions src/app/modules/core/utils/ndjson.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import { from, Observable, Observer, Subject } from 'rxjs';
import { from, Observable, Observer} from 'rxjs';
import { concatMap, filter, map, scan } from 'rxjs/operators';

/*
Expand All @@ -37,15 +37,13 @@ export function stream(url: string): Observable<string> {
const textStream = extractStream(xhr);
const jsonStream = collate(textStream).pipe(
concatMap((lineArray: string) => from(lineArray)),
map((x: string, _) => JSON.parse(x)),
map((x: string) => JSON.parse(x)),
);

if (options.beforeOpen) {
options.beforeOpen(xhr);
}
options.beforeOpen?.(xhr);

xhr.open(options.method ? options.method : 'GET', url);
xhr.send(options.postData ? options.postData : null);
xhr.open(options.method ?? 'GET', url);
xhr.send(options.postData ?? null);

return jsonStream;
}
Expand Down Expand Up @@ -85,16 +83,18 @@ function collate(chunkStream: Observable<string>): Observable<string> {
* flag "endWithNewline: true" which adds a trailing newline if one did not
* exist in the source.
*/
function extractStream(xhr: XMLHttpRequest): Observable<string> {
const options = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {};
interface ExtractStreamOptions {
endWithNewline?: boolean;
}
function extractStream(xhr: XMLHttpRequest, options: ExtractStreamOptions = {}): Observable<string> {
return new Observable((observer: Observer<string>) => {
let charactersSeen = 0;
const notified = () => {
if (xhr.readyState >= 3 && xhr.responseText.length > charactersSeen) {
if (xhr.readyState >= XMLHttpRequest.LOADING && xhr.responseText.length > charactersSeen) {
observer.next(xhr.responseText.substring(charactersSeen));
charactersSeen = xhr.responseText.length;
}
if (xhr.readyState === 4) {
if (xhr.readyState === XMLHttpRequest.DONE) {
if (options.endWithNewline && xhr.responseText[xhr.responseText.length - 1] !== '\n') {
observer.next('\n');
}
Expand All @@ -103,8 +103,6 @@ function extractStream(xhr: XMLHttpRequest): Observable<string> {
};
xhr.onreadystatechange = notified;
xhr.onprogress = notified;
xhr.onerror = event => {
observer.error(event);
};
xhr.onerror = (event: ProgressEvent) => observer.error(event);
});
}

0 comments on commit bfe2fa7

Please sign in to comment.