Skip to content
Permalink
Browse files

Fixes device lock race condition

  • Loading branch information...
gre committed Feb 26, 2019
1 parent 62b8bb9 commit 750ec83ed0ec2a18bc91325c6a2ad8d5652fb615
Showing with 57 additions and 37 deletions.
  1. +57 −37 src/hw/deviceAccess.js
@@ -26,23 +26,16 @@ export const setErrorRemapping = (f: Error => Observable<*>) => {

const never = new Promise(() => {});

const transportFinally = (
transport: Transport<*>,
cleanups: Array<() => void>
) => <T>(observable: Observable<T>): Observable<T> =>
const transportFinally = (cleanup: () => Promise<void>) => <T>(
observable: Observable<T>
): Observable<T> =>
Observable.create(o => {
let done = false;
const finalize = () => {
if (done) return never;
done = true;
return transport
.close()
.catch(() => {})
.then(() => {
cleanups.forEach(c => c());
});
return cleanup();
};

const sub = observable.subscribe({
next: e => o.next(e),
complete: () => {
@@ -74,43 +67,70 @@ const deviceQueues = {};
export const withDevice = (deviceId: string) => <T>(
job: (t: Transport<*>) => Observable<T>
): Observable<T> =>
defer(() => {
// we get the current exec queue related to deviceId
Observable.create(o => {
let unsubscribed;
let sub;

const deviceQueue = deviceQueues[deviceId] || Promise.resolve();

const finalize = (transport, cleanups) =>
transport
.close()
.catch(() => {})
.then(() => {
cleanups.forEach(c => c());
});

// when we'll finish all the current job, we'll call finish
let finish;
// this new promise is the next exec queue
deviceQueues[deviceId] = new Promise(resolve => {
finish = resolve;
});

return from(
// for any new job, we'll now wait the exec queue to be available
deviceQueue
.then(() => open(deviceId)) // open the transport
.then(async transport => {
if (needsCleanup[identifyTransport(transport)]) {
delete needsCleanup[identifyTransport(transport)];
await transport.send(0, 0, 0, 0).catch(() => {});
}
return transport;
})
.catch(e => {
finish();
if (e instanceof BluetoothRequired) throw e;
throw new CantOpenDevice(e.message);
})
).pipe(
mergeMap(transport => {
// for any new job, we'll now wait the exec queue to be available
deviceQueue
.then(() => open(deviceId)) // open the transport
.then(async transport => {
if (unsubscribed) {
// it was unsubscribed prematurely
return finalize(transport, [finish]);
}

if (needsCleanup[identifyTransport(transport)]) {
delete needsCleanup[identifyTransport(transport)];
await transport.send(0, 0, 0, 0).catch(() => {});
}
return transport;
})
.catch(e => {
finish();
if (e instanceof BluetoothRequired) throw e;
throw new CantOpenDevice(e.message);
})
.then(transport => {
if (!transport) return;

if (unsubscribed) {
// it was unsubscribed prematurely
return finalize(transport, [finish]);
}

const cleanups = accessHooks.map(hook => hook());
return job(transport).pipe(
catchError(errorRemapping),
// close the transport and clean up everything
transportFinally(transport, [...cleanups, finish])
);
sub = job(transport)
.pipe(
catchError(errorRemapping),
// close the transport and clean up everything
transportFinally(() => finalize(transport, [...cleanups, finish]))
)
.subscribe(o);
})
);
.catch(error => o.error(error));

return () => {
unsubscribed = true;
if (sub) sub.unsubscribe();
};
});

export const genericCanRetryOnError = (err: ?Error) => {

0 comments on commit 750ec83

Please sign in to comment.
You can’t perform that action at this time.