Skip to content

Commit

Permalink
Support file path in CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
twojtasz committed Nov 8, 2019
1 parent 98cb4cb commit c3d0711
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 10 deletions.
50 changes: 40 additions & 10 deletions modules/cli/src/connect.js
Expand Up @@ -14,9 +14,11 @@

/* eslint-env node */
/* eslint no-console: ["error", { allow: ["log"] }] */
import fs from 'fs';

import {XVIZMiddlewareStack} from './middleware';
import {WebSocketInterface} from './websocket';
import {FileSystemInterface} from './filesystem';
import {TransformLogFlow, OnlyMetadata} from './core';

const W3CWebSocket = require('websocket').w3cwebsocket;
Expand All @@ -27,21 +29,32 @@ const W3CWebSocket = require('websocket').w3cwebsocket;
* facilitate processing as needed.
*/
export function openSource(args, middlewares) {
// Non-live sessions require sending a transform log message
if (!isLive(args)) {
middlewares.push(new TransformLogFlow(null, args));
}

// If we just want metadata we shutdown afterwards
if (args.metadata) {
middlewares.push(new OnlyMetadata(null));
}

// Assemble our message processors
const socket = webSocketFromArgs(args);
const stackedMiddleware = new XVIZMiddlewareStack(middlewares);

const client = new WebSocketInterface({middleware: stackedMiddleware, socket});
let client = null;
let socket = null;
if (isWSURL(args.host)) {
// Non-live sessions require sending a transform log message
if (!isLive(args)) {
middlewares.unshift(new TransformLogFlow(null, args));
}

// Assemble our message processors
socket = webSocketFromArgs(args);
client = new WebSocketInterface({middleware: stackedMiddleware, socket});
} else if (isPath(args.host)) {
client = new FileSystemInterface({...args, middleware: stackedMiddleware});
// TODO: what to do with the return value?
client.open(args.host);
} else {
console.log(`The argument '${args.log}' is not a websocket url or an existing file path.`);
process.exit(1); // eslint-disable-line no-process-exit
}

// Some middleware needs to be able to send messages/close connections
// so provide them access to the client.
Expand All @@ -57,7 +70,9 @@ export function openSource(args, middlewares) {
process.on('SIGINT', () => {
if (sigintCount === 0) {
console.log('Closing');
socket.close();
if (socket) {
socket.close();
}
} else {
// If the user or system is really mashing Ctrl-C, then abort
console.log('Aborting');
Expand All @@ -79,7 +94,7 @@ export function webSocketFromArgs(args) {
}

/**
* Arg we operating a live session or not
* Are we operating a live session or not
*/
export function isLive(args) {
return args.log === undefined;
Expand All @@ -95,6 +110,21 @@ export function urlFromArgs(args) {
return url;
}

/**
* Test if the 'log' is a Websocket URL
*/
function isWSURL(log) {
const wsRegex = /^ws{1,2}:\/\//;
return wsRegex.test(log);
}

/**
* Test if the 'log' is a Websocket URL
*/
export function isPath(log) {
return fs.existsSync(log);
}

/**
* Create a web socket properly configured for large XVIZ data flows
*/
Expand Down
73 changes: 73 additions & 0 deletions modules/cli/src/filesystem.js
@@ -0,0 +1,73 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/* eslint no-console: ["error", { allow: ["log"] }] */
/* eslint-env node, browser */

import {XVIZProviderFactory} from '@xviz/io';
import {FileSource} from '@xviz/io/node';

/**
* Use the provider to push the data through
* the XVIZ middleware.
*/
export class FileSystemInterface {
constructor(options = {}) {
this.options = options;
this.middleware = options.middleware;
}

async open(path) {
const source = new FileSource(path);

const provider = await XVIZProviderFactory.open({
source,
options: {}
});

if (provider) {
this._processLog(provider);
}
}

async _processLog(provider) {
const options = {};
if (this.options.start || this.options.end) {
const {start, end} = this.options;
if (Number.isFinite(start)) {
options.startTime = start;
}
if (Number.isFinite(end)) {
options.endTime = end;
}
}

const metadata = provider.xvizMetadata();
this.middleware.onMetadata(metadata.message().data);

const iterator = provider.getMessageIterator(options);
while (iterator.valid()) {
const msg = await provider.xvizMessage(iterator);
if (msg) {
this.middleware.onStateUpdate(msg.message().data);
}
}

this.onClose();
}

onClose(message) {
this.middleware.onClose();
}
}

0 comments on commit c3d0711

Please sign in to comment.