-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add dev-eventgate implementation writing events to local file or stdout
Bug: T259202
- Loading branch information
Showing
4 changed files
with
172 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
# EventGate example config file. | ||
# | ||
# Configures the service-runner as well as the EventGate application. | ||
# See: https://github.com/wikimedia/service-runner#config-loading | ||
|
||
# Number of worker processes to spawn. | ||
# Set to 0 to run everything in a single process without clustering. | ||
# Use 'ncpu' to run as many workers as there are CPU units | ||
num_workers: 0 | ||
|
||
# Log error messages and gracefully restart a worker if v8 reports that it | ||
# uses more heap (note: not RSS) than this many mb. | ||
worker_heap_limit_mb: 200 | ||
|
||
# Logger info | ||
logging: | ||
level: info | ||
|
||
services: | ||
- name: eventgate-dev | ||
# a relative path or the name of an npm package, if different from name | ||
module: ./app.js | ||
# optionally, a version constraint of the npm package | ||
# version: ^0.4.0 | ||
# per-service config | ||
conf: | ||
port: 8192 | ||
# Events can be large; increase max body size | ||
max_body_size: 4mb | ||
|
||
# more per-service config settings | ||
user_agent: eventgate-dev | ||
|
||
eventgate_factory_module: '../lib/factories/dev-eventgate' | ||
|
||
# This field in each event will be used to extract a | ||
# (possibly relative) schema uri. The default is $schema. | ||
# An array of field names will cause EventGate to search for | ||
# fields by these names in each event, using the first match. | ||
schema_uri_field: $schema | ||
|
||
# If set, these URIs will be prepended to any relative schema URI | ||
# extracted from each event's schema_field. The resulting URLs will | ||
# be searched until a schema is found. Change this | ||
# to match paths to your local schema repositories. | ||
schema_base_uris: [ | ||
https://schema.wikimedia.org/repositories/primary/jsonschema, | ||
https://schema.wikimedia.org/repositories/secondary/jsonschema | ||
] | ||
|
||
# output_path: ./output.json |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
'use strict'; | ||
|
||
const fs = require('fs'); | ||
const _ = require('lodash'); | ||
const EventGate = require('../eventgate.js').EventGate; | ||
|
||
const { | ||
once | ||
} = require('events'); | ||
|
||
const { | ||
makeValidate, | ||
} = require('./default-eventgate'); | ||
|
||
|
||
/** | ||
* This file contains various functions for configuring and creating a 'development' EventGate | ||
* instance. | ||
* | ||
* The following keys are used in the options argument by functions in this file: | ||
* | ||
* - schema_uri_field | ||
* The dotted object path to extract a schema_uri from an event. | ||
* If this is an array, the event will be searched for a field | ||
* named by each element. The first match will be used. | ||
* This allows you to support events that might have | ||
* Their schema_uris at different locations. | ||
* Default: $schema | ||
* | ||
* - schema_base_uris | ||
* Base uris in which to prepend to values extracted from event schema_uri_fields | ||
* and search for a schema. | ||
* Default: undefined | ||
* | ||
* - schema_file_extension | ||
* A file extension to append to the extracted schema_uri_field if its | ||
* URI doesn't already have one. | ||
* Default: undefined | ||
* | ||
* - output_path | ||
* If set, valid events will be written to this file. | ||
* Otherwise, valid events will just be logged to stdout. | ||
* Default: undefined. | ||
*/ | ||
|
||
|
||
const defaultOptions = { | ||
schema_uri_field: '$schema', | ||
schema_base_uris: undefined, | ||
schema_file_extension: undefined, | ||
output_path: undefined, | ||
}; | ||
|
||
/** | ||
* Creates a function that writes events to output_path. | ||
* @param {Object} options | ||
* @param {Object} options.output_path | ||
* @param {Object} logger | ||
* @return {EventGate~produce} (event, context) => Promise<event> | ||
*/ | ||
function makeProduce(options, logger) { | ||
const writeOptions = { flags: 'as' }; | ||
if (!options.output_path) { | ||
// If fd is set, createWriteStream will ignored output_path | ||
writeOptions.fd = process.stdout.fd; | ||
logger.info('Writing valid events to stdout'); | ||
} else { | ||
logger.info('Writing valid events to ' + options.output_path); | ||
} | ||
const outputStream = fs.createWriteStream(options.output_path || './output.json', writeOptions); | ||
|
||
return async (event, context = {}) => { | ||
const serializedEvent = Buffer.from(JSON.stringify(event)); | ||
if (!outputStream.write(serializedEvent)) { | ||
await once(outputStream, 'drain'); | ||
} | ||
}; | ||
} | ||
|
||
/** | ||
* Returns a Promise of an instantiated EventGate that uses EventValidator | ||
* and event schema URL lookup and Kafka to produce messages. This | ||
* instance does not do any producing of error events. | ||
* @param {Object} options | ||
* @param {string} options.schema_uri_field | ||
* Used to extract the event's schema URI. | ||
* @param {string} options.schema_base_uris | ||
* If set, this is prefixed to un-anchored schema URIs. | ||
* @param {string} options.schema_file_extension | ||
* @param {Object} options.output_path | ||
* If set, events will be written to this file, else events will be written to stdout. | ||
* @param {Object} logger | ||
* @return {Promise<EventGate>} | ||
*/ | ||
async function devEventGateFactory(options, logger) { | ||
// Set default options | ||
_.defaults(options, defaultOptions); | ||
|
||
return new EventGate({ | ||
// This EventGate instance will use the EventValidator's | ||
// validate function to validate incoming events. | ||
validate: makeValidate(options, logger), | ||
// This EventGate instance will use a kafka producer | ||
produce: makeProduce(options, logger), | ||
log: logger | ||
}); | ||
} | ||
|
||
module.exports = { | ||
factory: devEventGateFactory, | ||
defaultOptions, | ||
makeValidate, | ||
makeProduce | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters