/
connection.ts
362 lines (335 loc) · 12 KB
/
connection.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
import { Dictionary, isFunction } from 'lodash';
import nodeCleanup from 'node-cleanup';
import { AuthToken, Config, Driver, Session } from 'neo4j-driver/types';
import * as neo4j from 'neo4j-driver';
import { Transformer } from './transformer';
import { Query } from './query';
import { Builder } from './builder';
import { Clause } from './clause';
import { Observable } from 'rxjs';
let connections: Connection[] = [];
// Closes all open connections
nodeCleanup(() => {
connections.forEach(con => con.close());
connections = [];
});
export interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (error: any) => void;
complete: () => void;
}
export type DriverConstructor = typeof neo4j.driver;
export interface FullConnectionOptions {
driverConstructor: DriverConstructor;
driverConfig: Config;
}
export type ConnectionOptions = Partial<FullConnectionOptions>;
export interface Credentials { username: string; password: string; }
function isCredentials(credentials: any): credentials is Credentials {
return 'username' in credentials && 'password' in credentials;
}
// We have to correct the type of lodash's isFunction method because it doesn't correctly narrow
// union types such as the options parameter passed to the connection constructor.
const isTrueFunction: (value: any) => value is Function = isFunction;
// tslint:disable max-line-length
/**
* The Connection class lets you access the Neo4j server and run queries against it. Under the hood,
* the Connection class uses the official Neo4j Nodejs driver which manages connection pooling on a
* [session basis]{@link https://neo4j.com/docs/api/javascript-driver/current/class/src/v1/driver.js~Driver.html#instance-method-session}.
* It should be enough to have a single Connection instance per database per application.
*
* To create the connection, simply call the
* [constructor]{@link https://jamesfer.me/cypher-query-builder/classes/connection.html#constructor}
* and pass in the database url, username and password.
* ```
* const db = new Connection('bolt://localhost', {
* username: 'neo4j',
* password: 'password',
* })
* ```
*
* To use the connection, just start calling any of the clause methods such as `match`, `create` or
* `matchNode` etc. They automatically create a {@link Query} object that you can then chain other
* methods off of.
* ```
* db.matchNode('people', 'Person')
* .where({ 'people.age': greaterThan(18) })
* .return('people')
* .run()
* ```
*
* You can also pass a query to the
* [run]{@link https://jamesfer.me/cypher-query-builder/classes/connection.html#run} method,
* however, this is probably much less convenient.
* ```
* db.run(
* new Query().matchNode('people', 'Person')
* .where({ 'people.age': greaterThan(18) })
* .return('people')
* .run()
* );
* ```
*
* Once you've finished with the connection you should close the connection.
* ```
* db.close()
* ```
*
* The library will attempt to clean up all connections when the process exits, but it is better to
* be explicit.
*/
// tslint:enable max-line-length
export class Connection extends Builder<Query> {
protected auth: AuthToken;
protected driver: Driver;
protected options: FullConnectionOptions;
protected open: boolean;
protected transformer = new Transformer();
/**
* Creates a new connection to the database.
*
* @param url Url of the database such as `'bolt://localhost'`
* @param auth Auth can either be an object in the form `{ username: ..., password: ... }`, or a
* Neo4j AuthToken object which contains the `scheme`, `principal` and `credentials` properties
* for more advanced authentication scenarios. The AuthToken object is what is passed directly to
* the neo4j javascript driver so checkout their docs for more information on it.
* @param options Additional configuration options. If you provide a function instead of an
* object, it will be used as the driver constructor. While passing a driver constructor function
* here is not deprecated, it is the legacy way of setting it and you should prefer to pass an
* options object with the `driverConstructor` parameter.
* @param options.driverConstructor An optional driver constructor to use for
* this connection. Defaults to the official Neo4j driver. The constructor is
* given the url you pass to this constructor and an auth token that is
* generated from calling [`neo4j.auth.basic`]{@link
* https://neo4j.com/docs/api/javascript-driver/current#usage-examples}.
* @param options.driverConfig Neo4j options that are passed directly to the underlying driver.
*/
constructor(
protected url: string,
auth: Credentials | AuthToken,
options: DriverConstructor | ConnectionOptions = neo4j.driver,
) {
super();
this.auth = isCredentials(auth)
? neo4j.auth.basic(auth.username, auth.password)
: auth;
const driverConstructor = isTrueFunction(options) ? options
: options.driverConstructor ? options.driverConstructor : neo4j.driver;
const driverConfig = isTrueFunction(options) || !options.driverConfig
? {} : options.driverConfig;
this.options = { driverConstructor, driverConfig };
this.driver = driverConstructor(this.url, this.auth, this.options.driverConfig);
this.open = true;
connections.push(this);
}
/**
* Closes this connection if it is open. Closed connections cannot be
* reopened.
*/
async close(): Promise<void> {
if (this.open) {
await this.driver.close();
this.open = false;
}
}
/**
* Opens and returns a session. You should never need to use this directly.
* Your probably better off with `run` instead.
*/
session(): Session | null {
if (this.open) {
return this.driver.session();
}
return null;
}
/**
* Returns a new query that uses this connection. The methods such as `match`
* or `create` are probably more useful to you as they automatically create a
* new chainable query for you.
* @return {Query}
*/
query(): Query {
return new Query(this);
}
protected continueChainClause(clause: Clause) {
return this.query().addClause(clause);
}
/**
* Runs the provided query on this connection, regardless of which connection
* the query was created from. Each query is run on it's own session.
*
* Run returns a promise that resolves to an array of records. Each key of the
* record is the name of a variable that you specified in your `RETURN`
* clause.
* Eg:
* ```typescript
* connection.match([
* node('steve', { name: 'Steve' }),
* relation('out', [ 'FriendsWith' ]),
* node('friends'),
* ])
* .return([ 'steve', 'friends' ])
* .run();
* ```
*
* Would result in the value:
* ```
* [
* {
* steve: { ... } // steve node,
* friends: { ... } // first friend,
* },
* {
* steve: { ... } // steve node,
* friends: { ... } // second friend,
* },
* {
* steve: { ... } // steve node,
* friends: { ... } // third friend,
* },
* ]
* ```
*
* Notice how the steve record is returned for each row, this is how cypher
* works. If you use lodash you can extract all of Steve's friends from the
* results like using `_.map(results, 'friends')`. If you don't, you can use
* ES2015/ES6: `results.map(record => record.friends)`.
*
* If you use typescript you can use the type parameter to hint at the type of
* the return value which is `Dictionary<R>[]`.
*
* Throws an exception if this connection is not open or there are no clauses
* in the query.
*
* @param {Query} query
* @returns {Promise<Dictionary<R>[]>}
*/
async run<R = any>(query: Query): Promise<Dictionary<R>[]> {
if (!this.open) {
throw new Error('Cannot run query; connection is not open.');
}
if (query.getClauses().length === 0) {
throw new Error('Cannot run query: no clauses attached to the query.');
}
const session = this.session();
if (!session) {
throw new Error('Cannot run query: connection is not open.');
}
const queryObj = query.buildQueryObject();
return session.run(queryObj.query, queryObj.params)
.then(
async ({ records }) => {
await session.close();
return this.transformer.transformRecords<R>(records);
},
async (error) => {
await session.close();
throw error;
},
);
}
/**
* Runs the provided query on this connection, regardless of which connection
* the query was created from. Each query is run on it's own session.
*
* Returns an RxJS observable that emits each record as it is received from the
* database. This is the most efficient way of working with very large
* datasets. Each record is an object where each key is the name of a variable
* that you specified in your return clause.
*
* Eg:
* ```typescript
* const results$ = connection.match([
* node('steve', { name: 'Steve' }),
* relation('out', [ 'FriendsWith' ]),
* node('friends'),
* ])
* .return([ 'steve', 'friends' ])
* .stream();
*
* // Emits
* // {
* // steve: { ... } // steve node,
* // friends: { ... } // first friend,
* // },
* // Then emits
* // {
* // steve: { ... } // steve node,
* // friends: { ... } // first friend,
* // },
* // And so on
* ```
*
* Notice how the steve record is returned for each row, this is how cypher
* works. You can extract all of steve's friends from the query by using
* operators:
* ```
* const friends$ = results$.map(row => row.friends);
* ```
*
* If you use typescript you can use the type parameter to hint at the type of
* the return value which is `Dictionary<R>`.
*
* Throws an exception if this connection is not open or there are no clauses
* in the query.
*
* The query is run when you call stream so you should subscribe to the results
* immediately to prevent missing any data.
*
* Due to the way the Neo4j javascript driver works, once you call stream there
* is no way to stop the query until it is complete. Even if you unsubscribe from
* the observable, all the remaining rows will still be parsed by the driver but
* then immediately discarded.
* ```typescript
* const results$ = connection.matchNode('records')
* .return('records')
* .limit(1000) // 1000 records will be loaded and parsed from the database
* .stream()
* .take(10) // even though you only take the first 10
* .subscribe(record => {});
* ```
* In practice this should never happen unless you're doing some strange things.
*/
stream<R = any>(query: Query): Observable<Dictionary<R>> {
return new Observable((subscriber: Observer<Dictionary<R>>): void => {
if (!this.open) {
subscriber.error(new Error('Cannot run query: connection is not open.'));
return;
}
if (query.getClauses().length === 0) {
subscriber.error(new Error('Cannot run query: no clauses attached to the query.'));
return;
}
const session = this.session();
if (!session) {
subscriber.error(new Error('Cannot run query: connection is not open.'));
return;
}
// Run the query
const queryObj = query.buildQueryObject();
const result = session.run(queryObj.query, queryObj.params);
// Subscribe to the result and clean up the session
// Note: Neo4j observables use a different subscribe syntax to RxJS observables
result.subscribe({
onNext: (record) => {
if (!subscriber.closed) {
subscriber.next(this.transformer.transformRecord<R>(record));
}
},
onError: async (error) => {
await session.close();
if (!subscriber.closed) {
subscriber.error(error);
}
},
onCompleted: async () => {
await session.close();
if (!subscriber.closed) {
subscriber.complete();
}
},
});
});
}
}