Skip to content

Commit

Permalink
feat: Implement views. (#1059)
Browse files Browse the repository at this point in the history
* feat: Implement views.

* Update sample applications.

* Add views to blank application template.

* feat: Add queryView api.

* feat: Implement executeQueryHandler and queryView api.

* feat: Add views description endpoint; Add API tests.

* Add client for querying views.

* Add client for query view API.

* Add query view API to processes.

* Add view openapi definitions to runtimes.

* Add view server to microservice manifest.

* Fix eslint issue.

* Fix some tests.

* 4.0.0-internal.46

* Fix some issues in application templates.

* 4.0.0-internal.47

* Rename collection in blank application templates.

* 4.0.0-internal.48

* Fix flows in blank application template.

* 4.0.0-internal.49

* Fix a type in blank/typescript.

* 4.0.0-internal.50

* Fix another type, this time in chat-simplified/typescript.

* 4.0.0-internal.51

* Add some stuff to npmignore.

* Adjust some cli tests.

* feat: Implement sandbox for views.

* 4.0.0-internal.52

* chore: Add view tests using new sandbox to templates.

* chore: Add eslint support for templates; Fix a lot of eslint issue.

* 4.0.0-internal.53

* fix: Add missing port of view server to docker compose manifest.

* feat: Add debug logging to domain and flow server.

* 4.0.0-internal.54

* chore: Add section about views http endpoint to readme.

* Add missing comma.

* Remove implementation from blank template applications.

* Fix package.json file.

* Clean up sample code.

* Reorder tsconfig.json files.

* Fix CLI tests.

Co-authored-by: Hannes Leutloff <hannes.leutloff@thenativeweb.io>
  • Loading branch information
goloroden and Hannes Leutloff committed Jul 15, 2020
1 parent accc311 commit 20df0c2
Show file tree
Hide file tree
Showing 319 changed files with 4,663 additions and 2,604 deletions.
3 changes: 0 additions & 3 deletions .eslintignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
/test/shared/applications/javascript/withSyntaxError/
/test/shared/applications/typescript/

# Application templates
/templates/

# Next.js build output
/websites/**/.next
/websites/**/out
3 changes: 2 additions & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
"parserOptions": {
"project": [
"./tsconfig.json",
"./websites/**/tsconfig.json"
"./websites/**/tsconfig.json",
"./templates/**/tsconfig.json"
]
}
}
3 changes: 3 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
/.dependabot
/.github
/.idea
/.vscode
/assets
/build/test
/docker
/lib
/test
Expand Down
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ $ npx wolkenkit dev

*Please note that the local development mode processes all data in-memory only, so any data will be lost when the application is closed.*

### Sending commands and receiving domain events
### Sending commands, receiving domain events, and querying views

To send commands or receive domain events, the current version offers an HTTP and a GraphQL interface.

Expand Down Expand Up @@ -116,6 +116,18 @@ $ curl \
http://localhost:3000/domain-events/v2
```

##### Querying a view

To query a view, send a `GET` request to the views endpoint of the runtime. The response is a stream of newline separated JSON objects, using `application/x-ndjson` as its content-type. This response stream does _not_ contain heartbeats and ends as soon as the last item is streamed.

A sample call to `curl` might look like this:

```shell
$ curl \
-i \
http://localhost:3000/views/v2/messages/all
```

#### Using the GraphQL interface

wolkenkit provides a GraphQL endpoint under the following address:
Expand Down
8 changes: 6 additions & 2 deletions lib/apis/awaitItem/http/v2/Client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import axios from 'axios';
import { errors } from '../../../../common/errors';
import { FilterHeartbeatsFromJsonStreamTransform } from '../../../../common/utils/http/FilterHeartbeatsFromJsonStreamTransform';
import { FilterHeartbeatsTransform } from '../../../../common/utils/http/FilterHeartbeatsTransform';
import { flaschenpost } from 'flaschenpost';
import { HttpClient } from '../../../shared/HttpClient';
import { LockMetadata } from '../../../../stores/priorityQueueStore/LockMetadata';
import { ParseJsonTransform } from '../../../../common/utils/http/ParseJsonTransform';
import { PassThrough, pipeline } from 'stream';

const logger = flaschenpost.getLogger();
Expand Down Expand Up @@ -37,7 +38,6 @@ class Client<TItem> extends HttpClient {
});

const passThrough = new PassThrough({ objectMode: true });
const heartbeatFilter = new FilterHeartbeatsFromJsonStreamTransform();

const { item, metadata } = await new Promise((resolve, reject): void => {
let unsubscribe: () => void;
Expand All @@ -59,8 +59,12 @@ class Client<TItem> extends HttpClient {
passThrough.on('data', onData);
passThrough.on('error', onError);

const jsonParser = new ParseJsonTransform();
const heartbeatFilter = new FilterHeartbeatsTransform();

pipeline(
data,
jsonParser,
heartbeatFilter,
passThrough,
(err): void => {
Expand Down
2 changes: 1 addition & 1 deletion lib/apis/base/WolkenkitRequestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ declare global {
// eslint-disable-next-line @typescript-eslint/no-namespace
namespace Express {
export interface Response {
startStream(parameters: { heartbeatInterval: number }): void;
startStream(parameters: { heartbeatInterval: number | false }): void;
}
}
}
Expand Down
20 changes: 12 additions & 8 deletions lib/apis/base/streamNdjsonMiddleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@ const streamNdjsonMiddleware: WolkenkitRequestHandler = async function (
res.writeHead(200, { 'content-type': 'application/x-ndjson' });

res.connection.once('close', (): void => {
clearInterval(heartbeatIntervalId);
if (heartbeatInterval !== false) {
clearInterval(heartbeatIntervalId);
}
});

// Send an initial heartbeat to initialize the connection. If we do not do
// this, sometimes the connection does not become open until the first data
// is sent.
writeLine({ res, data: heartbeat });

heartbeatIntervalId = setInterval((): void => {
if (heartbeatInterval !== false) {
// Send an initial heartbeat to initialize the connection. If we do not do
// this, sometimes the connection does not become open until the first data
// is sent.
writeLine({ res, data: heartbeat });
}, heartbeatInterval);

heartbeatIntervalId = setInterval((): void => {
writeLine({ res, data: heartbeat });
}, heartbeatInterval);
}

return next();
} catch (ex) {
Expand Down
7 changes: 5 additions & 2 deletions lib/apis/observeDomainEvents/http/v2/Client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import axios from 'axios';
import { DomainEventDescription } from '../../../../common/application/DomainEventDescription';
import { errors } from '../../../../common/errors';
import { FilterHeartbeatsFromJsonStreamTransform } from '../../../../common/utils/http/FilterHeartbeatsFromJsonStreamTransform';
import { FilterHeartbeatsTransform } from '../../../../common/utils/http/FilterHeartbeatsTransform';
import { flaschenpost } from 'flaschenpost';
import { HttpClient } from '../../../shared/HttpClient';
import { ParseJsonTransform } from '../../../../common/utils/http/ParseJsonTransform';
import { PassThrough, pipeline } from 'stream';

const logger = flaschenpost.getLogger();
Expand Down Expand Up @@ -61,10 +62,12 @@ class Client extends HttpClient {
}

const passThrough = new PassThrough({ objectMode: true });
const heartbeatFilter = new FilterHeartbeatsFromJsonStreamTransform();
const jsonParser = new ParseJsonTransform();
const heartbeatFilter = new FilterHeartbeatsTransform();

return pipeline(
data,
jsonParser,
heartbeatFilter,
passThrough,
(err): void => {
Expand Down
19 changes: 14 additions & 5 deletions lib/apis/queryDomainEventStore/http/v2/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import axios from 'axios';
import { DomainEvent } from '../../../../common/elements/DomainEvent';
import { DomainEventData } from '../../../../common/elements/DomainEventData';
import { errors } from '../../../../common/errors';
import { FilterHeartbeatsFromJsonStreamTransform } from '../../../../common/utils/http/FilterHeartbeatsFromJsonStreamTransform';
import { FilterHeartbeatsTransform } from '../../../../common/utils/http/FilterHeartbeatsTransform';
import { flaschenpost } from 'flaschenpost';
import { HttpClient } from '../../../shared/HttpClient';
import { ParseJsonTransform } from '../../../../common/utils/http/ParseJsonTransform';
import { Snapshot } from '../../../../stores/domainEventStore/Snapshot';
import { State } from '../../../../common/elements/State';
import { toArray } from 'streamtoarray';
Expand Down Expand Up @@ -73,10 +74,12 @@ class Client extends HttpClient {
}

const passThrough = new PassThrough({ objectMode: true });
const heartbeatFilter = new FilterHeartbeatsFromJsonStreamTransform();
const jsonParser = new ParseJsonTransform();
const heartbeatFilter = new FilterHeartbeatsTransform();

return pipeline(
data,
jsonParser,
heartbeatFilter,
passThrough,
(err): void => {
Expand Down Expand Up @@ -127,10 +130,12 @@ class Client extends HttpClient {
}

const passThrough = new PassThrough({ objectMode: true });
const heartbeatFilter = new FilterHeartbeatsFromJsonStreamTransform();
const jsonParser = new ParseJsonTransform();
const heartbeatFilter = new FilterHeartbeatsTransform();

return pipeline(
data,
jsonParser,
heartbeatFilter,
passThrough,
(err): void => {
Expand Down Expand Up @@ -165,10 +170,12 @@ class Client extends HttpClient {
}

const passThrough = new PassThrough({ objectMode: true });
const heartbeatFilter = new FilterHeartbeatsFromJsonStreamTransform();
const jsonParser = new ParseJsonTransform();
const heartbeatFilter = new FilterHeartbeatsTransform();

return pipeline(
data,
jsonParser,
heartbeatFilter,
passThrough,
(err): void => {
Expand Down Expand Up @@ -211,10 +218,12 @@ class Client extends HttpClient {
}

const passThrough = new PassThrough({ objectMode: true });
const heartbeatFilter = new FilterHeartbeatsFromJsonStreamTransform();
const jsonParser = new ParseJsonTransform();
const heartbeatFilter = new FilterHeartbeatsTransform();

return pipeline(
data,
jsonParser,
heartbeatFilter,
passThrough,
(err): void => {
Expand Down
42 changes: 42 additions & 0 deletions lib/apis/queryView/http/getApiDefinitions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { ApiDefinition } from '../../openApi/ApiDefinition';
import { Application } from '../../../common/application/Application';
import { query } from './v2/query';

const getApiDefinitions = function ({ application, basePath }: {
application: Application;
basePath: string;
}): ApiDefinition[] {
const apiDefinitions: ApiDefinition[] = [];

const v2ApiDefinition: ApiDefinition = {
basePath: `${basePath}/v2`,
routes: {
get: [],
post: []
},
tags: [ 'Views' ]
};

for (const [ viewName, viewDefinition ] of Object.entries(application.views)) {
for (const [ queryHandlerName, queryDefinition ] of Object.entries(viewDefinition.queryHandlers)) {
v2ApiDefinition.routes.get.push({
path: `${viewName}/${queryHandlerName}`,
description: queryDefinition.getDocumentation ? queryDefinition.getDocumentation() : query.description,
request: {
query: queryDefinition.getOptionsSchema ? queryDefinition.getOptionsSchema() : query.request.query
},
response: {
statusCodes: query.response.statusCodes,
stream: queryDefinition.type === 'stream',
body: queryDefinition.getResultItemSchema ? queryDefinition.getResultItemSchema() : {}
}
});
}
}

apiDefinitions.push(v2ApiDefinition);

return apiDefinitions;
};

export { getApiDefinitions };
30 changes: 30 additions & 0 deletions lib/apis/queryView/http/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { ApiDefinition } from '../../openApi/ApiDefinition';
import { Application } from '../../../common/application/Application';
import { CorsOrigin } from 'get-cors-origin';
import { getApiDefinitions } from './getApiDefinitions';
import { getV2 } from './v2';
import { IdentityProvider } from 'limes';
import express, { Application as ExpressApplication } from 'express';

const getApi = async function<TItem> ({ application, corsOrigin, identityProviders }: {
application: Application;
corsOrigin: CorsOrigin;
identityProviders: IdentityProvider[];
}): Promise<{ api: ExpressApplication; getApiDefinitions: (basePath: string) => ApiDefinition[] }> {
const api = express();

const v2 = await getV2<TItem>({
application,
corsOrigin,
identityProviders
});

api.use('/v2', v2.api);

return {
api,
getApiDefinitions: (basePath: string): ApiDefinition[] => getApiDefinitions({ application, basePath })
};
};

export { getApi };
96 changes: 96 additions & 0 deletions lib/apis/queryView/http/v2/Client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import axios from 'axios';
import { errors } from '../../../../common/errors';
import { flaschenpost } from 'flaschenpost';
import { HttpClient } from '../../../shared/HttpClient';
import { ParseJsonTransform } from '../../../../common/utils/http/ParseJsonTransform';
import { QueryDescription } from '../../../../common/application/QueryDescription';
import streamToString from 'stream-to-string';
import { PassThrough, pipeline } from 'stream';

const logger = flaschenpost.getLogger();

class Client extends HttpClient {
public constructor ({ protocol = 'http', hostName, port, path = '/' }: {
protocol?: string;
hostName: string;
port: number;
path?: string;
}) {
super({ protocol, hostName, port, path });
}

public async getDescription (): Promise<Record<string, Record<string, QueryDescription>>> {
const { data, status } = await axios({
method: 'get',
url: `${this.url}/description`,
validateStatus (): boolean {
return true;
}
});

if (status === 200) {
return data;
}

logger.error('An unknown error occured.', { ex: data, status });

throw new errors.UnknownError();
}

public async query ({ viewName, queryName, queryOptions = {}}: {
viewName: string;
queryName: string;
queryOptions?: object;
}): Promise<PassThrough> {
const { data, status } = await axios({
method: 'get',
url: `${this.url}/${viewName}/${queryName}`,
params: queryOptions,
paramsSerializer (params): string {
return Object.entries(params).
map(([ key, value ]): string => `${key}=${JSON.stringify(value)}`).
join('&');
},
responseType: 'stream',
validateStatus (): boolean {
return true;
}
});

if (status !== 200) {
const error = JSON.parse(await streamToString(data));

switch (error.code) {
case 'EVIEWNOTFOUND': {
throw new errors.ViewNotFound(error.message);
}
case 'EQUERYHANDLERNOTFOUND': {
throw new errors.QueryHandlerNotFound(error.message);
}
case 'EQUERYOPTIONSINVALID': {
throw new errors.QueryOptionsInvalid(error.message);
}
default: {
logger.error('An unknown error occured.', { ex: error, status });

throw new errors.UnknownError();
}
}
}

const jsonParser = new ParseJsonTransform();

return pipeline(
data,
jsonParser,
(err): void => {
if (err) {
// Do not handle errors explicitly. The returned stream will just close.
logger.error('An error occured during stream piping.', { err });
}
}
);
}
}

export { Client };
Loading

0 comments on commit 20df0c2

Please sign in to comment.