Skip to content

Commit

Permalink
Merge fd17461 into 95ad696
Browse files Browse the repository at this point in the history
  • Loading branch information
ibgreen committed Mar 6, 2020
2 parents 95ad696 + fd17461 commit ee33dc3
Show file tree
Hide file tree
Showing 17 changed files with 220 additions and 76 deletions.
9 changes: 9 additions & 0 deletions docs/upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## Upgrading to v2.1

**`@loaders.gl/core`**

Some iterator helper functions have been renamed, the old naming is now deprecated.

| Old Name | New Name |
| -------------------------- | ------------------------ |
| `getStreamIterator` | `makeStreamIterator` |
| `contatenateAsyncIterator` | `concatenateChunksAsync` |

**`@loaders.gl/json`**

- Experimental exports have been removed `JSONParser`, `StreamingJSONParser`, `ClarinetParser`.
Expand Down
4 changes: 2 additions & 2 deletions modules/arrow/test/arrow-loader.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import test from 'tape-promise/tape';
import {validateLoader} from 'test/common/conformance';

import {ArrowLoader, ArrowWorkerLoader} from '@loaders.gl/arrow';
import {isBrowser, getStreamIterator, resolvePath} from '@loaders.gl/core';
import {isBrowser, makeStreamIterator, resolvePath} from '@loaders.gl/core';
import {setLoaderOptions, fetchFile, parse, parseInBatches} from '@loaders.gl/core';

// Small Arrow Sample Files
Expand Down Expand Up @@ -93,7 +93,7 @@ test('ArrowLoader#parseInBatches(Stream)', async t => {
}
const fs = require('fs');
const stream = fs.createReadStream(resolvePath(ARROW_BIOGRID_NODES));
const asyncIterator = await parseInBatches(getStreamIterator(stream), ArrowLoader);
const asyncIterator = await parseInBatches(makeStreamIterator(stream), ArrowLoader);
for await (const batch of asyncIterator) {
t.ok(batch, 'received batch');
}
Expand Down
11 changes: 8 additions & 3 deletions modules/core/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ export {
export {toArrayBuffer} from './javascript-utils/binary-utils';

// ITERATOR UTILS
export {getStreamIterator} from './javascript-utils/stream-utils';
export {makeStreamIterator} from './iterator-utils/stream-iteration';

export {
forEach,
concatenateAsyncIterator,
lineAsyncIterator,
textDecoderAsyncIterator,
numberedLineAsyncIterator
} from './javascript-utils/async-iterator-utils';
} from './iterator-utils/async-iteration';

export {makeChunkIterator, concatenateChunksAsync} from './iterator-utils/chunk-iteration';

// CORE UTILS SHARED WITH LOADERS (RE-EXPORTED FROM LOADER-UTILS)
export {isBrowser, isWorker, self, window, global, document} from '@loaders.gl/loader-utils';
Expand All @@ -69,3 +70,7 @@ export {default as _fetchProgress} from './lib/progress/fetch-progress';

// FOR TESTING
export {_unregisterLoaders} from './lib/register-loaders';

// DEPRECATED in v2.1
export {concatenateChunksAsync as contatenateAsyncIterator} from './iterator-utils/chunk-iteration';
export {makeStreamIterator as getStreamIterator} from './iterator-utils/stream-iteration';
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
/* global TextDecoder, TextEncoder */
import {concatenateArrayBuffers} from '../javascript-utils/memory-copy-utils';

// GENERAL UTILITIES

// Iterate over async iterator, without resetting iterator if end is not reached
// - forEach does not reset iterator if exiting loop prematurely
// so that iteration can continue in a second loop
// - It is recommended to use a standard for await as last loop to ensure
// iterator gets properly reset
// TODO - optimize using sync iteration if argument is an Iterable?
/**
* Iterate over async iterator, without resetting iterator if end is not reached
* - forEach intentionally does not reset iterator if exiting loop prematurely
* so that iteration can continue in a second loop
* - It is recommended to use a standard for-await as last loop to ensure
* iterator gets properly reset
*
* TODO - optimize using sync iteration if argument is an Iterable?
*
* @param iterator
* @param visitor
*/
export async function forEach(iterator, visitor) {
// eslint-disable-next-line
while (true) {
Expand All @@ -24,20 +29,6 @@ export async function forEach(iterator, visitor) {
}
}

// Concatenates all data chunks yielded by an async iterator
export async function concatenateAsyncIterator(asyncIterator) {
let arrayBuffer = new ArrayBuffer();
let string = '';
for await (const chunk of asyncIterator) {
if (typeof chunk === 'string') {
string += chunk;
} else {
arrayBuffer = concatenateArrayBuffers(arrayBuffer, chunk);
}
}
return string || arrayBuffer;
}

// ITERATOR GENERATORS

// TextDecoder iterators
Expand All @@ -64,9 +55,11 @@ export async function* textEncoderAsyncIterator(textIterator, options) {
}
}

// Input: async iterable over strings
// Returns: an async iterable over lines
// See http://2ality.com/2018/04/async-iter-nodejs.html
/**
* @param textIterator async iterable yielding strings
* @returns an async iterable over lines
* See http://2ality.com/2018/04/async-iter-nodejs.html
*/

export async function* lineAsyncIterator(textIterator) {
let previous = '';
Expand All @@ -87,11 +80,11 @@ export async function* lineAsyncIterator(textIterator) {
}

/**
* Parameter: async iterable of lines
* Result: async iterable of numbered lines
* @param lineIterator async iterable yielding lines
* @returns async iterable yielding numbered lines
*
* See http://2ality.com/2018/04/async-iter-nodejs.html
*/
// See http://2ality.com/2018/04/async-iter-nodejs.html
// eslint-disable-next-line no-shadow
export async function* numberedLineAsyncIterator(lineIterator) {
let counter = 1;
for await (const line of lineIterator) {
Expand Down
88 changes: 88 additions & 0 deletions modules/core/src/iterator-utils/chunk-iteration.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Breaking big data into iterable chunks, concatenating iterable chunks into big data objects

import {concatenateArrayBuffers} from '../javascript-utils/memory-copy-utils';

/**
* Concatenates all data chunks yielded by an (async) iterator
* Supports strings and ArrayBuffers
*
* This function can e.g. be used to enable atomic parsers to work on (async) iterator inputs
*/
export async function concatenateChunksAsync(asyncIterator) {
let arrayBuffer = new ArrayBuffer(0);
let string = '';
for await (const chunk of asyncIterator) {
if (typeof chunk === 'string') {
string += chunk;
} else {
arrayBuffer = concatenateArrayBuffers(arrayBuffer, chunk);
}
}
return string || arrayBuffer;
}

/**
* Returns an iterator that breaks a big `ArrayBuffer` or string into chunks and yields them one-by-one.
*
* @param bigArrayBufferOrString
* @param options
* @param options.chunkSize
* @returns iterator that yields chunks of specified size
*
* This function can e.g. be used to enable data sources that can only be read atomically
* (such as `Blob` and `File` via `FileReader`) to still be parsed in batches.
*/
export function* makeChunkIterator(bigArrayBufferOrString, options = {}) {
if (typeof bigArrayBufferOrString === 'string') {
yield* makeStringChunkIterator(bigArrayBufferOrString, options);
return;
}
if (bigArrayBufferOrString instanceof ArrayBuffer) {
yield* makeArrayBufferChunkIterator(bigArrayBufferOrString, options);
return;
}
throw new Error('assert');
}

/**
* Helper: Breaks a big ArrayBuffer into chunks and returns an iterator that yields them one-by-one
*/
function* makeArrayBufferChunkIterator(arrayBuffer, options = {}) {
const {chunkSize = 256 * 1024} = options;

let byteOffset = 0;

while (byteOffset < arrayBuffer.byteLength) {
// Create a chunk of the right size
const chunkByteLength = Math.min(arrayBuffer.byteLength - byteOffset, chunkSize);
const chunk = new ArrayBuffer(chunkByteLength);

// Copy data from the big chunk
const sourceArray = new Uint8Array(arrayBuffer, byteOffset, chunkByteLength);
const chunkArray = new Uint8Array(chunk);
chunkArray.set(sourceArray);

// yield the chunk
byteOffset += chunkByteLength;
yield chunk;
}
}

/**
* Helper: Breaks a big string into chunks and returns an iterator that yields them one-by-one
*/
function* makeStringChunkIterator(string, options = {}) {
const {chunkSize = 256 * 1024} = options;

let offset = 0;

while (offset < string.length) {
// Create a chunk of the right size
const chunkLength = Math.min(string.length - offset, chunkSize);
const chunk = string.slice(offset, offset + chunkLength);
offset += chunkLength;

// yield the chunk
yield chunk;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {isBrowser, nodeVersion} from '@loaders.gl/loader-utils';

export function getStreamIterator(stream) {
export function makeStreamIterator(stream) {
// Hacky test for node version to ensure we don't call bad polyfills
if (isBrowser || nodeVersion >= 10) {
// NODE 10+: stream is an asyncIterator
Expand Down
12 changes: 6 additions & 6 deletions modules/core/src/lib/loader-utils/get-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import {
isFileReadable,
isBuffer
} from '../../javascript-utils/is-type';
import {getStreamIterator} from '../../javascript-utils/stream-utils';
import {concatenateAsyncIterator} from '../../javascript-utils/async-iterator-utils';
import {makeStreamIterator} from '../../iterator-utils/stream-iteration';
import {concatenateChunksAsync} from '../../iterator-utils/chunk-iteration';
import fetchFileReadable from '../fetch/fetch-file.browser';
import {checkFetchResponseStatus, checkFetchResponseStatusSync} from './check-errors';

Expand Down Expand Up @@ -89,12 +89,12 @@ export async function getArrayBufferOrStringFromData(data, loader) {
}

if (isReadableStream(data)) {
data = getStreamIterator(data);
data = makeStreamIterator(data);
}

if (isIterable(data) || isAsyncIterable(data)) {
// Assume arrayBuffer iterator - attempt to concatenate
return concatenateAsyncIterator(data);
return concatenateChunksAsync(data);
}

throw new Error(ERR_DATA);
Expand All @@ -109,11 +109,11 @@ export function getAsyncIteratorFromData(data) {
if (isFetchResponse(data) && data.body) {
// Note Since this function is not async, we currently can't load error message, just status
checkFetchResponseStatusSync(data);
return getStreamIterator(data.body);
return makeStreamIterator(data.body);
}

if (isReadableStream(data)) {
return getStreamIterator(data);
return makeStreamIterator(data);
}

if (isAsyncIterable(data)) {
Expand Down
2 changes: 1 addition & 1 deletion modules/core/src/lib/parse-in-batches.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {mergeOptions} from './loader-utils/merge-options';
import {getAsyncIteratorFromData} from './loader-utils/get-data';
import {getLoaderContext} from './loader-utils/get-loader-context';
import {selectLoader} from './select-loader';
import {textDecoderAsyncIterator} from '../javascript-utils/async-iterator-utils';
import {textDecoderAsyncIterator} from '../iterator-utils/async-iteration';

export async function parseInBatches(data, loaders, options, url) {
// Signature: parseInBatches(data, options, url)
Expand Down
8 changes: 7 additions & 1 deletion modules/core/test/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import './javascript-utils';
import './javascript-utils/text-encoder.spec';
import './javascript-utils/binary-utils.spec';
import './javascript-utils/is-type.spec';

import './iterator-utils/chunk-iteration.spec';
import './iterator-utils/async-iteration.spec';
import './iterator-utils/stream-iteration.spec';

import './lib/fetch';
import './lib/loader-utils';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ import test from 'tape-promise/tape';

import {
forEach,
concatenateAsyncIterator,
lineAsyncIterator,
textDecoderAsyncIterator,
textEncoderAsyncIterator,
numberedLineAsyncIterator
} from '@loaders.gl/core/javascript-utils/async-iterator-utils';
} from '@loaders.gl/core/iterator-utils/async-iteration';

/* global setTimeout */
const setTimeoutPromise = timeout => new Promise(resolve => setTimeout(resolve, timeout));
Expand Down Expand Up @@ -45,20 +44,6 @@ test('async-iterator#forEach', async t => {
});
});

test('async-iterator#concatenateAsyncIterator', async t => {
const RESULT = `line 1\nline 2\nline 3\nline 4`;

const text = await concatenateAsyncIterator(asyncTexts());
t.is(text, RESULT, 'returns concatenated string');

const arraybuffer = await concatenateAsyncIterator(asyncArrayBuffers());
t.ok(arraybuffer instanceof Uint8Array, 'returns buffer');
/* global TextEncoder */
t.deepEqual(arraybuffer, new TextEncoder().encode(RESULT), 'returns concatenated arraybuffer');

t.end();
});

test('async-iterator#textDecoderAsyncIterator', async t => {
t.plan(6);

Expand Down
63 changes: 63 additions & 0 deletions modules/core/test/iterator-utils/chunk-iteration.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import test from 'tape-promise/tape';

import {
makeChunkIterator,
concatenateChunksAsync
} from '@loaders.gl/core/iterator-utils/chunk-iteration';
import {textEncoderAsyncIterator} from '@loaders.gl/core/iterator-utils/async-iteration';

/* global setTimeout */
const setTimeoutPromise = timeout => new Promise(resolve => setTimeout(resolve, timeout));

async function* asyncTexts() {
await setTimeoutPromise(10);
yield 'line 1\nline';
await setTimeoutPromise(10);
yield ' 2\nline 3\n';
await setTimeoutPromise(10);
yield 'line 4';
}

function asyncArrayBuffers() {
return textEncoderAsyncIterator(asyncTexts());
}

test('concatenateChunksAsync', async t => {
const RESULT = `line 1\nline 2\nline 3\nline 4`;

const text = await concatenateChunksAsync(asyncTexts());
t.is(text, RESULT, 'returns concatenated string');

const arraybuffer = await concatenateChunksAsync(asyncArrayBuffers());
t.ok(arraybuffer instanceof Uint8Array, 'returns buffer');
/* global TextEncoder */
t.deepEqual(arraybuffer, new TextEncoder().encode(RESULT), 'returns concatenated arraybuffer');

t.end();
});

test('makeChunkIterator#string', async t => {
const bigString = '123456';
const results = ['12', '34', '56'];

const iterator = makeChunkIterator(bigString, {chunkSize: 2});

for (const chunk of iterator) {
t.equal(chunk, results.shift());
}

t.end();
});

test('makeChunkIterator#arrayBuffer', async t => {
const bigString = new ArrayBuffer(6);

const iterator = makeChunkIterator(bigString, {chunkSize: 2});

for (const chunk of iterator) {
t.ok(chunk instanceof ArrayBuffer);
t.equal(chunk.byteLength, 2);
}

t.end();
});
Loading

0 comments on commit ee33dc3

Please sign in to comment.