Skip to content

Commit

Permalink
fix get and watch race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
yehiyam committed Feb 23, 2022
1 parent f2afa08 commit 908bb13
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
1 change: 1 addition & 0 deletions lib/core/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class Service extends EventEmitter {
watch.watcher.on('delete', async (res) => {
this._handleWatch(watchTypes.DELETE, res, options.lock);
});
watch.data = await this._client.get(path, { isPrefix: false });
return watch.data;
}

Expand Down
9 changes: 9 additions & 0 deletions lib/core/watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ class Watcher {
}

async watch(path) {
if (this._pathToWatch.has(path)) {
throw new Error(`already watching on ${path}`);
}
const watcher = await this._watch(path);
this._pathToWatch.set(path, watcher);
return { watcher };
}

async getAndWatch(path) {
if (this._pathToWatch.has(path)) {
throw new Error(`already watching on ${path}`);
}
Expand Down
13 changes: 8 additions & 5 deletions tests/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ describe('Tests', () => {
const putEvent = sinon.spy();
const changeEvent = sinon.spy();
const deleteEvent = sinon.spy();
const watch = await etcd._client.watcher.watch(pathToWatch);
const watch = await etcd._client.watcher.getAndWatch(pathToWatch);
expect(watch).to.have.property('watcher');
expect(watch).to.have.property('data');
watch.watcher.on('disconnected', () => console.log('disconnected...'));
Expand Down Expand Up @@ -1648,14 +1648,17 @@ describe('Tests', () => {
describe('watch', () => {
it('should watch job status', async () => {
const jobId = `jobid-${uuidv4()}`;
const data = { data: { status: 'completed' } };
await etcd.jobs.status.watch({ jobId });
const data1 = { data: { status: 'pending' } };
const data2 = { data: { status: 'completed' } };
await etcd.jobs.status.set({ data: data1, jobId });
const currentData = await etcd.jobs.status.watch({ jobId });
expect(currentData.data).to.deep.equal(data1);
etcd.jobs.status.on('change', (res) => {
expect(res.jobId).to.deep.equal(jobId);
expect(res.data.data).to.deep.equal(data.data);
expect(res.data.data).to.deep.equal(data2.data);
_semaphore.callDone();
});
await etcd.jobs.status.set({ data, jobId });
await etcd.jobs.status.set({ data: data2, jobId });
await _semaphore.done();
});
it('should single watch for change job status', async () => {
Expand Down

0 comments on commit 908bb13

Please sign in to comment.