Skip to content

Commit

Permalink
feat(util): add mutex module
Browse files Browse the repository at this point in the history
  • Loading branch information
chemzqm committed Mar 31, 2020
1 parent 6665cb3 commit 26381f0
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 15 deletions.
42 changes: 42 additions & 0 deletions src/__tests__/modules/util.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { score, positions } from '../../util/fzy'
import { getHiglights } from '../../util/highlight'
import { score as matchScore } from '../../util/match'
import { mixin } from '../../util/object'
import { Mutex } from '../../util/mutex'
import { indexOf, resolveVariables } from '../../util/string'
import helper from '../helper'
import { ansiparse } from '../../util/ansiparse'
Expand Down Expand Up @@ -212,3 +213,44 @@ describe('ansiparse', () => {
})
})
})

describe('Mutex', () => {
test('mutex run in serial', async () => {
let lastTs: number
let fn = () => {
return new Promise(resolve => {
if (lastTs) {
let dt = Date.now() - lastTs
expect(dt).toBeGreaterThanOrEqual(300)
}
lastTs = Date.now()
setTimeout(() => {
resolve()
}, 300)
})
}
let mutex = new Mutex()
await Promise.all([
mutex.use(fn),
mutex.use(fn),
mutex.use(fn)
])
})

test('mutex run after job finish', async () => {
let count = 0
let fn = () => {
return new Promise(resolve => {
count = count + 1
setTimeout(() => {
resolve()
}, 100)
})
}
let mutex = new Mutex()
await mutex.use(fn)
await helper.wait(10)
await mutex.use(fn)
expect(count).toBe(2)
})
})
49 changes: 49 additions & 0 deletions src/util/mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
export class Mutex {
private tasks: (() => void)[] = []
private count = 1

private sched(): void {
if (this.count > 0 && this.tasks.length > 0) {
this.count--
let next = this.tasks.shift()
next()
}
}

public get busy(): boolean {
return this.count == 0
}

// tslint:disable-next-line: typedef
public acquire() {
return new Promise<() => void>(res => {
let task = () => {
let released = false
res(() => {
if (!released) {
released = true
this.count++
this.sched()
}
})
}
this.tasks.push(task)
process.nextTick(this.sched.bind(this))
})
}

public use<T>(f: () => Promise<T>): Promise<T> {
return this.acquire()
.then(release => {
return f()
.then(res => {
release()
return res
})
.catch(err => {
release()
throw err
})
})
}
}
42 changes: 27 additions & 15 deletions src/workspace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { disposeAll, getKeymapModifier, isDocumentEdit, mkdirp, runCommand, wait
import { score } from './util/match'
import { getChangedFromEdits } from './util/position'
import { byteIndex, byteLength } from './util/string'
import { Mutex } from './util/mutex'
import Watchman from './watchman'
import rimraf from 'rimraf'
import uuid = require('uuid/v1')
Expand All @@ -45,6 +46,7 @@ export class Workspace implements IWorkspace {
public readonly version: string
public readonly keymaps: Map<string, [Function, boolean]> = new Map()
public bufnr: number
private mutex = new Mutex()
private resolver: Resolver = new Resolver()
private rootPatterns: Map<string, string[]> = new Map()
private _workspaceFolders: WorkspaceFolder[] = []
Expand All @@ -55,7 +57,6 @@ export class Workspace implements IWorkspace {
private _env: Env
private _root: string
private _cwd = process.cwd()
private _blocking = false
private _initialized = false
private _attached = false
private buffers: Map<number, Document> = new Map()
Expand Down Expand Up @@ -319,8 +320,7 @@ export class Workspace implements IWorkspace {
if (!res) return
fs.mkdirSync(dir)
}
let uri = URI.file(path.join(dir, 'coc-settings.json')).toString()
await this.jumpTo(uri)
await this.jumpTo(URI.file(path.join(dir, 'coc-settings.json')).toString())
}

public get textDocuments(): TextDocument[] {
Expand Down Expand Up @@ -724,7 +724,7 @@ export class Workspace implements IWorkspace {
* Show message in vim.
*/
public showMessage(msg: string, identify: MsgTypes = 'more'): void {
if (this._blocking || !this.nvim) return
if (this.mutex.busy || !this.nvim) return
let { messageLevel } = this
let method = process.env.VIM_NODE_RPC == '1' ? 'callTimer' : 'call'
let hl = 'Error'
Expand Down Expand Up @@ -1060,22 +1060,34 @@ export class Workspace implements IWorkspace {
* Show quickpick
*/
public async showQuickpick(items: string[], placeholder = 'Choose by number'): Promise<number> {
let title = placeholder + ':'
items = items.map((s, idx) => `${idx + 1}. ${s}`)
let res = await this.nvim.callAsync('coc#util#quickpick', [title, items])
let n = parseInt(res, 10)
if (isNaN(n) || n <= 0 || n > items.length) return -1
return n - 1
let release = await this.mutex.acquire()
try {
let title = placeholder + ':'
items = items.map((s, idx) => `${idx + 1}. ${s}`)
let res = await this.nvim.callAsync('coc#util#quickpick', [title, items])
release()
let n = parseInt(res, 10)
if (isNaN(n) || n <= 0 || n > items.length) return -1
return n - 1
} catch (e) {
release()
return -1
}
}

/**
* Prompt for confirm action.
*/
public async showPrompt(title: string): Promise<boolean> {
this._blocking = true
let res = await this.nvim.callAsync('coc#util#with_callback', ['coc#util#prompt_confirm', [title]])
this._blocking = false
return res == 1
let release = await this.mutex.acquire()
try {
let res = await this.nvim.callAsync('coc#util#with_callback', ['coc#util#prompt_confirm', [title]])
release()
return res == 1
} catch (e) {
release()
return false
}
}

public async callAsync<T>(method: string, args: any[]): Promise<T> {
Expand Down Expand Up @@ -1495,7 +1507,7 @@ augroup end`
}

private async checkBuffer(bufnr: number): Promise<void> {
if (this._disposed) return
if (this._disposed || !bufnr) return
let doc = this.getDocument(bufnr)
if (!doc && !this.creatingSources.has(bufnr)) await this.onBufCreate(bufnr)
}
Expand Down

0 comments on commit 26381f0

Please sign in to comment.