Skip to content

Commit

Permalink
Merge 1f74026 into f1b68a3
Browse files Browse the repository at this point in the history
  • Loading branch information
nassiharel committed May 3, 2020
2 parents f1b68a3 + 1f74026 commit b6f4875
Show file tree
Hide file tree
Showing 13 changed files with 1,359 additions and 1,414 deletions.
89 changes: 80 additions & 9 deletions lib/storage-manager.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
const { Encoding } = require('@hkube/encoding');
const prettyBytes = require('pretty-bytes');
const unitsConverter = require('@hkube/units-converter');
const { STORAGE_PREFIX } = require('../consts/storage-prefix');
const StorageBase = require('./storage/storage-base');
const Hkube = require('./storage/hkube');
Expand All @@ -20,19 +23,79 @@ class StorageManager {
this.moduleName = storage.moduleName;
this._adapterPackage = require(storage.moduleName); // eslint-disable-line
this._adapter = new this._adapterPackage();
this._storageResultsThreshold = config.storageResultsThreshold && unitsConverter.getMemoryInBytes(config.storageResultsThreshold);
this.prefixesTypes = Object.values(STORAGE_PREFIX).map(x => `${config.clusterName}-${x}`);
await this._adapter.init(storage.connection, this.prefixesTypes, bootstrap);
this._wasInit = true;
this.storage = new StorageBase(this._adapter, log, config);
this.hkube = new Hkube(this._adapter, log, config);
this.hkubeResults = new HkubeResults(this._adapter, log, config);
this.hkubeStore = new HkubeStore(this._adapter, log, config);
this.hkubeMetadata = new HkubeMetadata(this._adapter, log, config);
this.hkubeExecutions = new HkubeExecutions(this._adapter, log, config);
this.hkubeIndex = new HkubeIndex(this._adapter, log, config);
this.hkubeBuilds = new HkubeBuilds(this._adapter, log, config);
this.hkubeAlgoMetrics = new HkubeAlgoMetrics(this._adapter, log, config);
this.encoding = new Encoding({ type: storage.encoding });
this.storage = new StorageBase(this._adapter, log, this.encoding);
this.hkube = new Hkube(this._adapter, log, config, this.encoding);
this.hkubeResults = new HkubeResults(this._adapter, log, config, this.encoding);
this.hkubeStore = new HkubeStore(this._adapter, log, config, this.encoding);
this.hkubeMetadata = new HkubeMetadata(this._adapter, log, config, this.encoding);
this.hkubeExecutions = new HkubeExecutions(this._adapter, log, config, this.encoding);
this.hkubeIndex = new HkubeIndex(this._adapter, log, config, this.encoding);
this.hkubeBuilds = new HkubeBuilds(this._adapter, log, config, this.encoding);
this.hkubeAlgoMetrics = new HkubeAlgoMetrics(this._adapter, log, config, this.encoding);
return { message: `initialized ${config.defaultStorage} storage client with ${storage.encoding} encoding` };
}
return { message: 'storage client not initialized' };
}

checkDataSize(size) {
const result = {};
if (size >= this._storageResultsThreshold) {
result.error = `data too large (${prettyBytes(size)}), use the stream api`;
}
return result;
}

async getCustomFormat(options) {
const { path } = options;
const metadata = await this.getMetadata({ path });
const totalLength = metadata.size;
const { footerLength } = this.encoding;
let stream;
let data;

if (totalLength <= footerLength) {
stream = await this.getStream({ path });
}
else {
const { isCustomFormat, isDataTypeEncoded } = await this._readFormat({ path });
if (isCustomFormat) { // check if hkube encoding is here
if (isDataTypeEncoded) { // check if this data is encoded
const result = this.checkDataSize(totalLength); // currently we are not supporting huge decoding
if (result.error) {
stream = await this.getStream({ path });
}
else {
data = await this.get({ ...options, encodeOptions: { customEncode: true } });
}
}
else {
stream = await this.getStream({ path, start: 0, end: totalLength - footerLength - 1 }); // this data is not encoded, we should stream it (without footer)
}
}
else {
stream = await this.getStream({ path });
}
}
return { data, stream, size: totalLength };
}

async _readFormat(options) {
const { path } = options;
const { footerLength, magicNumber } = this.encoding;
const footer = await this.seek({ path, end: -footerLength }); // read last bytes to get footer
const magicNum = footer.slice(footer.length - magicNumber.length, footer.length).toString();
const isCustomFormat = magicNum === magicNumber;
let isDataTypeEncoded = false;

if (isCustomFormat) {
isDataTypeEncoded = this.encoding.isDataTypeEncoded(footer[2]);
}
return { isCustomFormat, isDataTypeEncoded };
}

async put(options, tracer) {
Expand All @@ -43,6 +106,14 @@ class StorageManager {
return this.storage.get(options, tracer);
}

async getMetadata(options, tracer) {
return this.storage.getMetadata(options, tracer);
}

async seek(options, tracer) {
return this.storage.seek(options, tracer);
}

async list(options) {
return this.storage.list(options);
}
Expand Down
4 changes: 2 additions & 2 deletions lib/storage/hkube-algo-metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ const StorageBase = require('./storage-base');
const { STORAGE_PREFIX } = require('../../consts/storage-prefix');

class HkubeAlgoMetrics extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

Expand Down
20 changes: 12 additions & 8 deletions lib/storage/hkube-builds.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,52 @@ const StorageBase = require('./storage-base');
const { STORAGE_PREFIX } = require('../../consts/storage-prefix');

class HkubeBuilds extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

async put({ buildId, data }, tracer) {
return super.put({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId),
path: this.createPath({ buildId }),
data
}, tracer);
}

async get({ buildId }, tracer) {
return super.get({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId)
path: this.createPath({ buildId })
}, tracer);
}

async list({ buildId }) {
return super.list({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId)
path: this.createPath({ buildId })
});
}

async delete({ buildId }) {
return super.delete({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId)
path: this.createPath({ buildId })
});
}

async getStream({ buildId }) {
return super.getStream({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId)
path: this.createPath({ buildId })
});
}

async putStream({ buildId, data }) {
return super.putStream({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId),
path: this.createPath({ buildId }),
data
});
}

createPath({ buildId }) {
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_BUILDS, buildId);
}
}

module.exports = HkubeBuilds;
16 changes: 10 additions & 6 deletions lib/storage/hkube-execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,39 @@ const { STORAGE_PREFIX } = require('../../consts/storage-prefix');
const DESCRIPTOR_JSON = 'descriptor.json';

class StorageHkubeExecution extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

async put({ jobId, data }, tracer) {
return super.put({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_EXECUTION, jobId, DESCRIPTOR_JSON),
path: this.createPath({ jobId, name: DESCRIPTOR_JSON }),
data
}, tracer);
}

async get({ jobId }, tracer) {
return super.get({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_EXECUTION, jobId, DESCRIPTOR_JSON)
path: this.createPath({ jobId, name: DESCRIPTOR_JSON })
}, tracer);
}

async list({ jobId }) {
return super.list({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_EXECUTION, jobId)
path: this.createPath({ jobId })
});
}

async delete({ jobId }) {
return super.delete({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_EXECUTION, jobId)
path: this.createPath({ jobId })
});
}

createPath({ jobId, name = '' }) {
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_EXECUTION, jobId, name);
}
}

module.exports = StorageHkubeExecution;
17 changes: 11 additions & 6 deletions lib/storage/hkube-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,28 @@ const StorageBase = require('./storage-base');
const { STORAGE_PREFIX } = require('../../consts/storage-prefix');

class StorageHkubeIndex extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

async put({ jobId }, tracer) {
return super.put({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX, moment().format(super.DateFormat), jobId),
path: this.createPath({ jobId }),
data: [],
}, tracer);
}

async get({ date, jobId }, tracer) {
return super.get({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX, moment(date).format(super.DateFormat), jobId)
path: this.createPath({ date, jobId })
}, tracer);
}

async list({ date }) {
if (date) {
return super.list({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX, moment(date).format(super.DateFormat))
path: this.createPath({ date })
});
}
return super.list({ path: this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX });
Expand All @@ -39,9 +39,14 @@ class StorageHkubeIndex extends StorageBase {

async delete({ date, jobId = '' }) {
return super.delete({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX, moment(date).format(super.DateFormat), jobId)
path: this.createPath({ date, jobId })
});
}

createPath({ date, jobId = '' }) {
const dateFormat = moment(date).format(super.DateFormat);
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_INDEX, dateFormat, jobId);
}
}

module.exports = StorageHkubeIndex;
8 changes: 6 additions & 2 deletions lib/storage/hkube-metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ const StorageBase = require('./storage-base');
const { STORAGE_PREFIX } = require('../../consts/storage-prefix');

class StorageHkubeMetadata extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}
async put({ jobId, taskId, data }, tracer) { // eslint-disable-line
Expand All @@ -31,6 +31,10 @@ class StorageHkubeMetadata extends StorageBase {
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_METADATA, jobId, taskId)
});
}

createPath({ jobId, name = '' }) {
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId, name);
}
}

module.exports = StorageHkubeMetadata;
16 changes: 10 additions & 6 deletions lib/storage/hkube-results.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,39 @@ const { STORAGE_PREFIX } = require('../../consts/storage-prefix');
const RESULT_JSON = 'result.json';

class StorageHkubeResults extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

async put({ jobId, data }, tracer) {
return super.put({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId, RESULT_JSON),
path: this.createPath({ jobId, name: RESULT_JSON }),
data
}, tracer);
}

async get({ jobId }, tracer) {
return super.get({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId, RESULT_JSON)
path: this.createPath({ jobId, name: RESULT_JSON })
}, tracer);
}

async list({ jobId }) {
return super.list({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId)
path: this.createPath({ jobId })
});
}

async delete({ jobId }) {
return super.delete({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId)
path: this.createPath({ jobId })
});
}

createPath({ jobId, name = '' }) {
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_RESULTS, jobId, name);
}
}

module.exports = StorageHkubeResults;
17 changes: 11 additions & 6 deletions lib/storage/hkube-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,40 @@ const StorageBase = require('./storage-base');
const { STORAGE_PREFIX } = require('../../consts/storage-prefix');

class StorageHkubeStore extends StorageBase {
constructor(adapter, log, config) {
super(adapter, log);
constructor(adapter, log, config, encoding) {
super(adapter, log, encoding);
this.clusterName = config.clusterName;
}

async put({ type, name, data }, tracer) {
return super.put({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_STORE, type, `${name}.json`),
path: this.createPath({ type, name }),
data,
}, tracer);
}

async get({ type, name }, tracer) {
return super.get({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_STORE, type, `${name}.json`)
path: this.createPath({ type, name })
}, tracer);
}

async list({ type }) {
return super.list({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_STORE, type)
path: this.createPath({ type })
});
}

async delete({ type, name }) {
return super.delete({
path: path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_STORE, type, `${name}.json`)
path: this.createPath({ type, name })
});
}

createPath({ type, name }) {
const newName = (name && `${name}.json`) || '';
return path.join(this.clusterName + '-' + STORAGE_PREFIX.HKUBE_STORE, type, newName);
}
}

module.exports = StorageHkubeStore;
Loading

0 comments on commit b6f4875

Please sign in to comment.