Skip to content

Commit b3039cd

Browse files
committed
feat: Implement Batch Streaming & Progressive Rendering (#9094)
1 parent 21abc63 commit b3039cd

4 files changed

Lines changed: 55 additions & 57 deletions

File tree

src/data/Store.mjs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -416,13 +416,6 @@ class Store extends Collection {
416416
return ClassSystemUtil.beforeSetInstance(value);
417417
}
418418

419-
/**
420-
* @param {Object} data
421-
*/
422-
onProxyData(data) {
423-
this.add(data);
424-
}
425-
426419
/**
427420
* @param {Object[]|Neo.data.Model[]} value
428421
* @param {Object[]|Neo.data.Model[]} oldValue
@@ -752,10 +745,21 @@ class Store extends Collection {
752745
me.clear();
753746
}
754747

755-
me.isLoading = true;
756-
me.suspendEvents = true;
748+
me.isLoading = true;
757749

758-
const onData = me.onProxyData.bind(me);
750+
const onData = (data) => {
751+
me.add(data);
752+
753+
// Progressive Rendering:
754+
// As soon as we have data, we want the grid to render.
755+
if (me.isLoading) {
756+
me.isLoading = false;
757+
}
758+
759+
// Fire load event to trigger grid updates (row count, scrollbar)
760+
// GridBody checks data.total or store.count.
761+
me.fire('load', {items: me.items, total: me.count});
762+
};
759763

760764
me.proxy.on('data', onData);
761765

@@ -768,12 +772,11 @@ class Store extends Collection {
768772
const response = await me.proxy.read(params);
769773

770774
me.proxy.un('data', onData);
771-
me.suspendEvents = false;
772775

773776
if (response.success) {
774777
me.totalCount = response.totalCount || me.count;
775778
me.isLoaded = true;
776-
me.isLoading = false;
779+
me.isLoading = false; // Ensure it's false at the end
777780
me.fire('load', {items: me.items, total: me.totalCount});
778781
return me.items;
779782
} else {
@@ -782,8 +785,7 @@ class Store extends Collection {
782785
}
783786
} catch (e) {
784787
me.proxy.un('data', onData);
785-
me.suspendEvents = false;
786-
me.isLoading = false;
788+
me.isLoading = false;
787789
throw e;
788790
}
789791
} else {

src/data/proxy/Stream.mjs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,24 @@ class Stream extends Base {
1515
* @member {String} ntype='proxy-stream'
1616
* @protected
1717
*/
18-
ntype: 'proxy-stream'
18+
ntype: 'proxy-stream',
19+
/**
20+
* Number of records to batch before firing a 'data' event.
21+
* @member {Number} chunkSize=500
22+
*/
23+
chunkSize: 500
1924
}
2025

2126
/**
2227
* @param {Object} operation
2328
* @returns {Promise}
2429
*/
2530
async read(operation) {
26-
let me = this,
27-
url = me.url || operation.url,
28-
count = 0;
31+
let me = this,
32+
chunk = [],
33+
chunkSize = me.chunkSize,
34+
count = 0,
35+
url = me.url || operation.url;
2936

3037
if (!url) {
3138
throw new Error('No URL specified');
@@ -53,9 +60,13 @@ class Stream extends Base {
5360
if (done) {
5461
// Process any remaining buffer
5562
if (buffer.trim()) {
56-
me.processLine(buffer);
63+
me.processLine(buffer, chunk);
5764
count++;
5865
}
66+
// Flush remaining chunk
67+
if (chunk.length > 0) {
68+
me.fire('data', chunk);
69+
}
5970
break;
6071
}
6172

@@ -66,8 +77,13 @@ class Stream extends Base {
6677

6778
for (const line of lines) {
6879
if (line.trim()) {
69-
me.processLine(line);
80+
me.processLine(line, chunk);
7081
count++;
82+
83+
if (chunk.length >= chunkSize) {
84+
me.fire('data', chunk);
85+
chunk = [];
86+
}
7187
}
7288
}
7389
}
@@ -77,11 +93,11 @@ class Stream extends Base {
7793

7894
/**
7995
* @param {String} line
96+
* @param {Array} chunk
8097
*/
81-
processLine(line) {
98+
processLine(line, chunk) {
8299
try {
83-
const data = JSON.parse(line);
84-
this.fire('data', data);
100+
chunk.push(JSON.parse(line));
85101
} catch (e) {
86102
console.warn('JSON parse error in Stream proxy:', e, line);
87103
}

test/playwright/unit/data/StoreProxy.spec.mjs

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ class MockProxy extends ProxyBase {
2222
}
2323

2424
async read(operation) {
25-
this.fire('data', {id: '1', name: 'Mock 1'});
26-
this.fire('data', {id: '2', name: 'Mock 2'});
25+
this.fire('data', [{id: '1', name: 'Mock 1'}, {id: '2', name: 'Mock 2'}]);
2726
return {success: true, count: 2};
2827
}
2928
}
@@ -48,7 +47,7 @@ test.describe.serial('Neo.data.Store Proxy Integration', () => {
4847
expect(store.proxy instanceof MockProxy).toBe(true);
4948
});
5049

51-
test('Store load() should use proxy', async () => {
50+
test('Store load() should use proxy and progressive loading', async () => {
5251
const store = Neo.create(Store, {
5352
keyProperty: 'id',
5453
model: {
@@ -60,40 +59,20 @@ test.describe.serial('Neo.data.Store Proxy Integration', () => {
6059
}
6160
});
6261

63-
let loadFired = false;
64-
store.on('load', () => loadFired = true);
62+
let loadFiredCount = 0;
63+
store.on('load', () => loadFiredCount++);
6564

6665
await store.load();
6766

68-
expect(loadFired).toBe(true);
67+
// Should fire load at least once during stream (progressive) and once at end?
68+
// MockProxy fires data once (2 items).
69+
// Store:
70+
// 1. onData -> add -> isLoading=false -> fire('load') (Count: 1)
71+
// 2. await proxy.read -> success -> fire('load') (Count: 2)
72+
73+
expect(loadFiredCount).toBeGreaterThanOrEqual(1);
6974
expect(store.count).toBe(2);
7075
expect(store.get('1').name).toBe('Mock 1');
7176
expect(store.get('2').name).toBe('Mock 2');
7277
});
73-
74-
test('Store should suspend events during stream', async () => {
75-
const store = Neo.create(Store, {
76-
keyProperty: 'id',
77-
proxy: {
78-
module: MockProxy
79-
}
80-
});
81-
82-
let mutateCount = 0;
83-
store.on('mutate', () => mutateCount++);
84-
85-
await store.load();
86-
87-
// mutate should only fire once (from add) or be suppressed if we used suspendEvents?
88-
// logic: suspendEvents = true.
89-
// proxy fires data -> onProxyData -> store.add(data) -> super.add -> splice -> fire 'mutate'
90-
// if suspendEvents is true, Observable.fire checks !me.suspendEvents.
91-
92-
// So mutate should NOT fire during streaming.
93-
94-
// Wait, does store fire 'load' at the end? Yes.
95-
// Does 'mutate' fire after suspendEvents = false? No, we don't queue events.
96-
97-
expect(mutateCount).toBe(0);
98-
});
9978
});

test/playwright/unit/data/proxy/Stream.spec.mjs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ test.describe.serial('Neo.data.proxy.Stream', () => {
5757

5858
const items = [];
5959
proxy.on('data', (data) => {
60-
items.push(data);
60+
// data is now an array
61+
items.push(...data);
6162
});
6263

6364
const result = await proxy.read({url: 'http://test.com/data.jsonl'});
@@ -98,7 +99,7 @@ ${JSON.stringify(mockData[1])}
9899

99100
const items = [];
100101
proxy.on('data', (data) => {
101-
items.push(data);
102+
items.push(...data);
102103
});
103104

104105
const result = await proxy.read({url: 'http://test.com/data.jsonl'});

0 commit comments

Comments
 (0)