Skip to content

Commit

Permalink
Merge b88fe81 into f1b68a3
Browse files Browse the repository at this point in the history
  • Loading branch information
nassiharel committed Apr 5, 2020
2 parents f1b68a3 + b88fe81 commit 8b27194
Show file tree
Hide file tree
Showing 13 changed files with 543 additions and 468 deletions.
20 changes: 11 additions & 9 deletions lib/storage-manager.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const { Encoding } = require('@hkube/encoding');
const { STORAGE_PREFIX } = require('../consts/storage-prefix');
const StorageBase = require('./storage/storage-base');
const Hkube = require('./storage/hkube');
Expand All @@ -23,15 +24,16 @@ class StorageManager {
this.prefixesTypes = Object.values(STORAGE_PREFIX).map(x => `${config.clusterName}-${x}`);
await this._adapter.init(storage.connection, this.prefixesTypes, bootstrap);
this._wasInit = true;
this.storage = new StorageBase(this._adapter, log, config);
this.hkube = new Hkube(this._adapter, log, config);
this.hkubeResults = new HkubeResults(this._adapter, log, config);
this.hkubeStore = new HkubeStore(this._adapter, log, config);
this.hkubeMetadata = new HkubeMetadata(this._adapter, log, config);
this.hkubeExecutions = new HkubeExecutions(this._adapter, log, config);
this.hkubeIndex = new HkubeIndex(this._adapter, log, config);
this.hkubeBuilds = new HkubeBuilds(this._adapter, log, config);
this.hkubeAlgoMetrics = new HkubeAlgoMetrics(this._adapter, log, config);
const encoding = new Encoding({ type: storage.encoding });
this.storage = new StorageBase(this._adapter, log, encoding);
this.hkube = new Hkube(this._adapter, log, config, encoding);
this.hkubeResults = new HkubeResults(this._adapter, log, config, encoding);
this.hkubeStore = new HkubeStore(this._adapter, log, config, encoding);
this.hkubeMetadata = new HkubeMetadata(this._adapter, log, config, encoding);
this.hkubeExecutions = new HkubeExecutions(this._adapter, log, config, encoding);
this.hkubeIndex = new HkubeIndex(this._adapter, log, config, encoding);
this.hkubeBuilds = new HkubeBuilds(this._adapter, log, config, encoding);
this.hkubeAlgoMetrics = new HkubeAlgoMetrics(this._adapter, log, config, encoding);
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/storage/hkube-algo-metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ const StorageBase = require('./storage-base');
const { STORAGE_PREFIX } = require('../../consts/storage-prefix');

class HkubeAlgoMetrics extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

Expand Down
20 changes: 12 additions & 8 deletions lib/storage/hkube-builds.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,52 @@ const StorageBase = require('./storage-base');
const { STORAGE_PREFIX } = require('../../consts/storage-prefix');

class HkubeBuilds extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

async put({ buildId, data }, tracer) {
return super.put({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId),
path: this.createPath({ buildId }),
data
}, tracer);
}

async get({ buildId }, tracer) {
return super.get({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId)
path: this.createPath({ buildId })
}, tracer);
}

async list({ buildId }) {
return super.list({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId)
path: this.createPath({ buildId })
});
}

async delete({ buildId }) {
return super.delete({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId)
path: this.createPath({ buildId })
});
}

async getStream({ buildId }) {
return super.getStream({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId)
path: this.createPath({ buildId })
});
}

async putStream({ buildId, data }) {
return super.putStream({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId),
path: this.createPath({ buildId }),
data
});
}

createPath({ buildId }) {
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId);
}
}

module.exports = HkubeBuilds;
16 changes: 10 additions & 6 deletions lib/storage/hkube-execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,39 @@ const { STORAGE_PREFIX } = require('../../consts/storage-prefix');
const DESCRIPTOR_JSON = 'descriptor.json';

class StorageHkubeExecution extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

async put({ jobId, data }, tracer) {
return super.put({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_EXECUTION, jobId, DESCRIPTOR_JSON),
path: this.createPath({ jobId, name: DESCRIPTOR_JSON }),
data
}, tracer);
}

async get({ jobId }, tracer) {
return super.get({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_EXECUTION, jobId, DESCRIPTOR_JSON)
path: this.createPath({ jobId, name: DESCRIPTOR_JSON })
}, tracer);
}

async list({ jobId }) {
return super.list({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_EXECUTION, jobId)
path: this.createPath({ jobId })
});
}

async delete({ jobId }) {
return super.delete({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_EXECUTION, jobId)
path: this.createPath({ jobId })
});
}

createPath({ jobId, name = '' }) {
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_EXECUTION, jobId, name);
}
}

module.exports = StorageHkubeExecution;
17 changes: 11 additions & 6 deletions lib/storage/hkube-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,28 @@ const StorageBase = require('./storage-base');
const { STORAGE_PREFIX } = require('../../consts/storage-prefix');

class StorageHkubeIndex extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

async put({ jobId }, tracer) {
return super.put({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX, moment().format(super.DateFormat), jobId),
path: this.createPath({ jobId }),
data: [],
}, tracer);
}

async get({ date, jobId }, tracer) {
return super.get({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX, moment(date).format(super.DateFormat), jobId)
path: this.createPath({ date, jobId })
}, tracer);
}

async list({ date }) {
if (date) {
return super.list({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX, moment(date).format(super.DateFormat))
path: this.createPath({ date })
});
}
return super.list({ path: this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX });
Expand All @@ -39,9 +39,14 @@ class StorageHkubeIndex extends StorageBase {

async delete({ date, jobId = '' }) {
return super.delete({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX, moment(date).format(super.DateFormat), jobId)
path: this.createPath({ date, jobId })
});
}

createPath({ date, jobId = '' }) {
const dateFormat = moment(date).format(super.DateFormat);
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX, dateFormat, jobId);
}
}

module.exports = StorageHkubeIndex;
8 changes: 6 additions & 2 deletions lib/storage/hkube-metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ const StorageBase = require('./storage-base');
const { STORAGE_PREFIX } = require('../../consts/storage-prefix');

class StorageHkubeMetadata extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}
async put({ jobId, taskId, data }, tracer) { // eslint-disable-line
Expand All @@ -31,6 +31,10 @@ class StorageHkubeMetadata extends StorageBase {
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_METADATA, jobId, taskId)
});
}

createPath({ jobId, name = '' }) {
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId, name);
}
}

module.exports = StorageHkubeMetadata;
16 changes: 10 additions & 6 deletions lib/storage/hkube-results.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,39 @@ const { STORAGE_PREFIX } = require('../../consts/storage-prefix');
const RESULT_JSON = 'result.json';

class StorageHkubeResults extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

async put({ jobId, data }, tracer) {
return super.put({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId, RESULT_JSON),
path: this.createPath({ jobId, name: RESULT_JSON }),
data
}, tracer);
}

async get({ jobId }, tracer) {
return super.get({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId, RESULT_JSON)
path: this.createPath({ jobId, name: RESULT_JSON })
}, tracer);
}

async list({ jobId }) {
return super.list({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId)
path: this.createPath({ jobId })
});
}

async delete({ jobId }) {
return super.delete({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId)
path: this.createPath({ jobId })
});
}

createPath({ jobId, name = '' }) {
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId, name);
}
}

module.exports = StorageHkubeResults;
17 changes: 11 additions & 6 deletions lib/storage/hkube-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,40 @@ const StorageBase = require('./storage-base');
const { STORAGE_PREFIX } = require('../../consts/storage-prefix');

class StorageHkubeStore extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

async put({ type, name, data }, tracer) {
return super.put({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_STORE, type, `${name}.json`),
path: this.createPath({ type, name }),
data,
}, tracer);
}

async get({ type, name }, tracer) {
return super.get({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_STORE, type, `${name}.json`)
path: this.createPath({ type, name })
}, tracer);
}

async list({ type }) {
return super.list({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_STORE, type)
path: this.createPath({ type })
});
}

async delete({ type, name }) {
return super.delete({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_STORE, type, `${name}.json`)
path: this.createPath({ type, name })
});
}

createPath({ type, name }) {
const newName = (name && `${name}.json`) || '';
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_STORE, type, newName);
}
}

module.exports = StorageHkubeStore;
16 changes: 10 additions & 6 deletions lib/storage/hkube.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,39 @@ const StorageBase = require('./storage-base');
const { STORAGE_PREFIX } = require('../../consts/storage-prefix');

class StorageHkube extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

async put({ jobId, taskId, data }, tracer) {
return super.put({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE, jobId, taskId),
path: this.createPath({ jobId, taskId }),
data
}, tracer);
}

async get({ jobId, taskId }, tracer) {
return super.get({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE, jobId, taskId)
path: this.createPath({ jobId, taskId })
}, tracer);
}

async list({ jobId }) {
return super.list({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE, jobId)
path: this.createPath({ jobId })
});
}

async delete({ jobId, taskId = '' }) {
return super.delete({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE, jobId, taskId)
path: this.createPath({ jobId, taskId })
});
}

createPath({ jobId, taskId = '' }) {
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE, jobId, taskId);
}
}

module.exports = StorageHkube;
10 changes: 7 additions & 3 deletions lib/storage/storage-base.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@

const now = require('performance-now');

const component = 'StorageManager';
class StorageBase {
get DateFormat() {
return 'YYYY-MM-DD';
}

constructor(adapter, log) {
constructor(adapter, log, encoding) {
this._adapter = adapter;
this._encoding = encoding;
this._log = log;
}

Expand All @@ -18,7 +20,8 @@ class StorageBase {
if (tracer) {
span = tracer();
}
const result = await this._adapter.put(options);
const data = this._encoding.encode(options.data);
const result = await this._adapter.put({ ...options, data });
if (span) span.finish();
const end = now();
const diff = (end - start).toFixed(3);
Expand All @@ -42,7 +45,8 @@ class StorageBase {
if (tracer) {
span = tracer(options);
}
const result = await this._adapter.get(options);
const data = await this._adapter.get(options);
const result = this._encoding.decode(data);
if (span) span.finish();
const end = now();
const diff = (end - start).toFixed(3);
Expand Down
Loading

0 comments on commit 8b27194

Please sign in to comment.