Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Merge PR #8735

Open
wants to merge 37 commits into
base: feat/pipelines
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
acbf24f
add pipeline to Listen RPC
wu-hui Oct 15, 2024
93fdb23
Prepre for serializaion/deserialization
wu-hui Oct 17, 2024
8ac835e
fix rebase error
wu-hui Oct 17, 2024
422723a
quick hack to integrate with watch.
wu-hui Oct 21, 2024
ee64690
add api/pipelinesource and setup basic listen test
wu-hui Oct 31, 2024
1738c15
watch integration works.
wu-hui Nov 6, 2024
946a266
add pipeline canonify and eq
wu-hui Oct 22, 2024
6fc2050
runPipeline initial
wu-hui Oct 25, 2024
377e82f
initial offline function evaluation
wu-hui Oct 29, 2024
2b244a2
add basic tests for pipeline eval
wu-hui Oct 29, 2024
23b5135
runPipeline initial
wu-hui Oct 25, 2024
82b8303
Setting up QueryOrPipeline to replace Query
wu-hui Oct 30, 2024
7a3e789
type1 compiles
wu-hui Nov 12, 2024
4d7d917
introduce new variant for query_engine.test.ts
wu-hui Nov 13, 2024
c274177
Fix core/expression rebase error
wu-hui Nov 13, 2024
6e4a7e3
Add basic tests
wu-hui Nov 14, 2024
e6f860e
remove api/pipeline and use the lite one
wu-hui Nov 14, 2024
667c398
query_engine.test.ts pass with pipelines
wu-hui Nov 19, 2024
48a6324
local store tests PASS
wu-hui Nov 25, 2024
6ab2ba5
memory spec tests pass sans limitToLast
wu-hui Nov 28, 2024
58124c4
most spec tests PASS!
wu-hui Dec 2, 2024
0f63a54
limit to last, cursors and multitab for documents and database stages
wu-hui Dec 4, 2024
5945776
Merge remote-tracking branch 'origin/feat/pipelines' into wuandy/offp…
wu-hui Dec 6, 2024
5ad944e
fix merge errors
wu-hui Dec 10, 2024
da4dee3
Add expressions tests
wu-hui Dec 16, 2024
656e848
Ported all tests, plus some bug fixes
wu-hui Dec 19, 2024
90ce598
Merge remote-tracking branch 'origin/feat/pipelines' into wuandy/offp…
wu-hui Dec 19, 2024
34b3e71
fix merge errors
wu-hui Dec 19, 2024
c5678fe
fixed expressions tests
wu-hui Dec 20, 2024
f2a0585
Fixed all incompatibilities
wu-hui Dec 23, 2024
bc9ac99
Add gitignore entries
wu-hui Dec 28, 2024
0212394
Merge with feat/pipelines
wu-hui Dec 28, 2024
5e6cd52
Proto update sync and add query spec tests back
wu-hui Dec 28, 2024
2b4a5ad
merge with feat/pipeline
wu-hui Jan 28, 2025
ffc17e4
merge with feat/pipeline
wu-hui Jan 28, 2025
d2b18f9
Fix circular dependency caused by import from api_pipelines. Also sim…
MarkDuckworth Jan 29, 2025
d49ff96
Merge branch 'push-vwqlstsxuovu' of github.com:firebase/firebase-js-s…
MarkDuckworth Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add api/pipelinesource and setup basic listen test
  • Loading branch information
wu-hui committed Nov 1, 2024
commit ee64690aacb279aeaca83c0d2f71fa190d438a3f

This file was deleted.

2 changes: 1 addition & 1 deletion packages/firestore/src/api/database.ts
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ import {
connectFirestoreEmulator,
Firestore as LiteFirestore
} from '../lite-api/database';
import { PipelineSource } from '../lite-api/pipeline-source';
import { PipelineSource } from './pipeline_source';
import { DocumentReference, Query } from '../lite-api/reference';
import { newUserDataReader } from '../lite-api/user_data_reader';
import {
49 changes: 36 additions & 13 deletions packages/firestore/src/api/pipeline.ts
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ import {
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
import { PipelineResult } from '../lite-api/pipeline-result';
import { DocumentData, DocumentReference } from '../lite-api/reference';
import { AddFields, Stage } from '../lite-api/stage';
import {AddFields, Sort, Stage, Where} from '../lite-api/stage';
import { UserDataReader } from '../lite-api/user_data_reader';
import { AbstractUserDataWriter } from '../lite-api/user_data_writer';
import { DocumentKey } from '../model/document_key';
@@ -15,6 +15,8 @@ import { DocumentSnapshot, PipelineSnapshot } from './snapshot';
import { FirestoreError } from '../util/error';
import { Unsubscribe } from './reference_impl';
import { cast } from '../util/input_validation';
import {Field, FilterCondition} from '../api';
import {Expr} from '../lite-api/expressions';

export class Pipeline<
AppModelType = DocumentData
@@ -49,6 +51,20 @@ export class Pipeline<
);
}

where(condition: FilterCondition & Expr): Pipeline<AppModelType> {
const copy = this.stages.map(s => s);
super.readUserData('where', condition);
copy.push(new Where(condition));
return new Pipeline(
this.db,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
copy,
this.converter
);
}

/**
* Executes this pipeline and returns a Promise to represent the asynchronous operation.
*
@@ -106,23 +122,30 @@ export class Pipeline<
* @internal
* @private
*/
_onSnapshot(observer: {
next?: (snapshot: PipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}): Unsubscribe {
_onSnapshot(
next: (snapshot: PipelineSnapshot) => void,
error?: (error: FirestoreError) => void,
complete?: () => void
): Unsubscribe {
// this.stages.push(
// new AddFields(
// this.selectablesToMap([
// '__name__',
// '__create_time__',
// '__update_time__'
// ])
// )
// );

this.stages.push(
new AddFields(
this.selectablesToMap([
'__name__',
'__create_time__',
'__update_time__'
])
new Sort([
Field.of('__name__').ascending()
]
)
);

const client = ensureFirestoreConfigured(this.db);
firestoreClientListenPipeline(client, this, observer);
firestoreClientListenPipeline(client, this, {next, error, complete});

return () => {};
}
91 changes: 91 additions & 0 deletions packages/firestore/src/api/pipeline_source.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import { DocumentKey } from '../model/document_key';

import { Firestore } from './database';
import { Pipeline } from './pipeline';
import { DocumentReference } from './reference';
import {
CollectionGroupSource,
CollectionSource,
DatabaseSource,
DocumentsSource
} from '../lite-api/stage';
import {PipelineSource as LitePipelineSource} from '../lite-api/pipeline-source';
import { UserDataReader } from '../lite-api/user_data_reader';
import { AbstractUserDataWriter } from '../lite-api/user_data_writer';

/**
* Represents the source of a Firestore {@link Pipeline}.
* @beta
*/
export class PipelineSource extends LitePipelineSource{
/**
* @internal
* @private
* @param db
* @param userDataReader
* @param userDataWriter
* @param documentReferenceFactory
*/
constructor(
db: Firestore,
userDataReader: UserDataReader,
userDataWriter: AbstractUserDataWriter,
documentReferenceFactory: (id: DocumentKey) => DocumentReference
) {
super(db, userDataReader, userDataWriter, documentReferenceFactory);
}

collection(collectionPath: string): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[new CollectionSource(collectionPath)]
);
}

collectionGroup(collectionId: string): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[new CollectionGroupSource(collectionId)]
);
}

database(): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[new DatabaseSource()]
);
}

documents(docs: DocumentReference[]): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[DocumentsSource.of(docs)]
);
}
}
5 changes: 5 additions & 0 deletions packages/firestore/src/core/sync_engine_impl.ts
Original file line number Diff line number Diff line change
@@ -999,6 +999,11 @@ function removeAndCleanupTarget(
): void {
syncEngineImpl.sharedClientState.removeLocalQueryTarget(targetId);

// TODO(pipeline): REMOVE this hack.
if(!syncEngineImpl.queriesByTarget.has(targetId)||syncEngineImpl.queriesByTarget.get(targetId)!.length !== 0){
return;
}

debugAssert(
syncEngineImpl.queriesByTarget.has(targetId) &&
syncEngineImpl.queriesByTarget.get(targetId)!.length !== 0,
8 changes: 4 additions & 4 deletions packages/firestore/src/lite-api/pipeline-source.ts
Original file line number Diff line number Diff line change
@@ -40,10 +40,10 @@ export class PipelineSource {
* @param documentReferenceFactory
*/
constructor(
private db: Firestore,
private userDataReader: UserDataReader,
private userDataWriter: AbstractUserDataWriter,
private documentReferenceFactory: (id: DocumentKey) => DocumentReference
protected db: Firestore,
protected userDataReader: UserDataReader,
protected userDataWriter: AbstractUserDataWriter,
protected documentReferenceFactory: (id: DocumentKey) => DocumentReference
) {}

collection(collectionPath: string): Pipeline {
6 changes: 3 additions & 3 deletions packages/firestore/src/lite-api/pipeline.ts
Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ export class Pipeline<AppModelType = DocumentData> implements ProtoSerializable<
*/
constructor(
private liteDb: Firestore,
private userDataReader: UserDataReader,
protected userDataReader: UserDataReader,
/**
* @internal
* @private
@@ -144,7 +144,7 @@ export class Pipeline<AppModelType = DocumentData> implements ProtoSerializable<
protected stages: Stage[],
// TODO(pipeline) support converter
//private converter: FirestorePipelineConverter<AppModelType> = defaultPipelineConverter()
private converter: unknown = {}
protected converter: unknown = {}
) {}

/**
@@ -265,7 +265,7 @@ export class Pipeline<AppModelType = DocumentData> implements ProtoSerializable<
* @return the expressionMap argument.
* @private
*/
private readUserData<
protected readUserData<
T extends
| Map<string, ReadableUserData>
| ReadableUserData[]
7 changes: 7 additions & 0 deletions packages/firestore/src/local/local_store_impl.ts
Original file line number Diff line number Diff line change
@@ -1050,6 +1050,13 @@ export async function localStoreReleaseTarget(
): Promise<void> {
const localStoreImpl = debugCast(localStore, LocalStoreImpl);
const targetData = localStoreImpl.targetDataByTarget.get(targetId);

// TODO(pipeline): this is a hack that only works because pipelines are the only ones returning nulls here.
// REMOVE ASAP.
if(targetData === null) {
return;
}

debugAssert(
targetData !== null,
`Tried to release nonexistent target: ${targetId}`
Loading
Oops, something went wrong.
Loading
Oops, something went wrong.