-
Notifications
You must be signed in to change notification settings - Fork 574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for different codal message bus listener flags #10054
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,14 @@ | |
namespace pxsim { | ||
const MIN_MESSAGE_WAIT_MS = 200; | ||
let tracePauseMs = 0; | ||
|
||
enum MessageListenerFlags { | ||
MESSAGE_BUS_LISTENER_REENTRANT = 8, | ||
MESSAGE_BUS_LISTENER_QUEUE_IF_BUSY = 16, | ||
MESSAGE_BUS_LISTENER_DROP_IF_BUSY = 32, | ||
MESSAGE_BUS_LISTENER_IMMEDIATE = 192 | ||
} | ||
|
||
export namespace U { | ||
// Keep these helpers unified with pxtlib/browserutils.ts | ||
export function containsClass(el: SVGElement | HTMLElement, classes: string) { | ||
|
@@ -563,13 +571,43 @@ namespace pxsim { | |
|
||
export type EventIDType = number | string; | ||
|
||
class EventHandler { | ||
private busy = 0; | ||
constructor(public handler: RefAction, public flags: number) {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit pedantic, but wondering if the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm matching the codal naming here, but actually numbers can hold up to 32 flags (one for each bit) |
||
|
||
async runAsync(eventValue: EventIDType, runtime: Runtime, valueToArgs?: EventValueToActionArgs) { | ||
// The default behavior can technically be configured in codal, but we always set it to queue if busy | ||
const flags = this.flags || MessageListenerFlags.MESSAGE_BUS_LISTENER_QUEUE_IF_BUSY; | ||
|
||
if (flags === MessageListenerFlags.MESSAGE_BUS_LISTENER_IMMEDIATE) { | ||
U.userError("MESSAGE_BUS_LISTENER_IMMEDIATE is not supported!"); | ||
return; | ||
} | ||
|
||
if (flags === MessageListenerFlags.MESSAGE_BUS_LISTENER_QUEUE_IF_BUSY) { | ||
return this.runFiberAsync(eventValue, runtime, valueToArgs); | ||
} | ||
else if (flags === MessageListenerFlags.MESSAGE_BUS_LISTENER_DROP_IF_BUSY && this.busy) { | ||
return; | ||
} | ||
|
||
void this.runFiberAsync(eventValue, runtime, valueToArgs); | ||
} | ||
|
||
private async runFiberAsync(eventValue: EventIDType, runtime: Runtime, valueToArgs?: EventValueToActionArgs) { | ||
this.busy ++; | ||
await runtime.runFiberAsync(this.handler, ...(valueToArgs ? valueToArgs(eventValue) : [eventValue])); | ||
this.busy --; | ||
} | ||
} | ||
|
||
export class EventQueue { | ||
max: number = 5; | ||
events: EventIDType[] = []; | ||
private awaiters: ((v?: any) => void)[] = []; | ||
private lock: boolean; | ||
private _handlers: RefAction[] = []; | ||
private _addRemoveLog: { act: RefAction, log: LogType }[] = []; | ||
private _handlers: EventHandler[] = []; | ||
private _addRemoveLog: { act: RefAction, log: LogType, flags: number }[] = []; | ||
|
||
constructor(public runtime: Runtime, private valueToArgs?: EventValueToActionArgs) { } | ||
|
||
|
@@ -596,66 +634,75 @@ namespace pxsim { | |
return Promise.resolve() | ||
} | ||
|
||
private poke(): Promise<void> { | ||
private async poke(): Promise<void> { | ||
this.lock = true; | ||
let events = this.events; | ||
// all events will be processed by concurrent promisified code below, so start afresh | ||
this.events = [] | ||
|
||
// in order semantics for events and handlers | ||
return U.promiseMapAllSeries(events, (value) => { | ||
return U.promiseMapAllSeries(this.handlers, (handler) => { | ||
return this.runtime.runFiberAsync(handler, ...(this.valueToArgs ? this.valueToArgs(value) : [value])) | ||
}) | ||
}).then(() => { | ||
// if some events arrived while processing above then keep processing | ||
if (this.events.length > 0) { | ||
return this.poke() | ||
} else { | ||
this.lock = false | ||
// process the log (synchronous) | ||
this._addRemoveLog.forEach(l => { | ||
if (l.log === LogType.BackAdd) { this.addHandler(l.act) } | ||
else if (l.log === LogType.BackRemove) { this.removeHandler(l.act) } | ||
else this.setHandler(l.act) | ||
}); | ||
this._addRemoveLog = []; | ||
return Promise.resolve() | ||
for (const value of events) { | ||
for (const handler of this.handlers) { | ||
await handler.runAsync(value, this.runtime, this.valueToArgs); | ||
} | ||
}) | ||
} | ||
|
||
// if some events arrived while processing above then keep processing | ||
if (this.events.length > 0) { | ||
return this.poke() | ||
} | ||
else { | ||
this.lock = false | ||
// process the log (synchronous) | ||
for (const logger of this._addRemoveLog) { | ||
if (logger.log === LogType.BackAdd) { | ||
this.addHandler(logger.act, logger.flags) | ||
} | ||
else if (logger.log === LogType.BackRemove) { | ||
this.removeHandler(logger.act) | ||
} | ||
else { | ||
this.setHandler(logger.act, logger.flags) | ||
} | ||
} | ||
this._addRemoveLog = []; | ||
} | ||
} | ||
|
||
get handlers() { | ||
return this._handlers; | ||
} | ||
|
||
setHandler(a: RefAction) { | ||
setHandler(a: RefAction, flags = 0) { | ||
if (!this.lock) { | ||
this._handlers = [a]; | ||
} else { | ||
this._addRemoveLog.push({ act: a, log: LogType.UserSet }); | ||
this._handlers = [new EventHandler(a, flags)]; | ||
} | ||
else { | ||
this._addRemoveLog.push({ act: a, log: LogType.UserSet, flags}); | ||
} | ||
} | ||
|
||
addHandler(a: RefAction) { | ||
addHandler(a: RefAction, flags = 0) { | ||
if (!this.lock) { | ||
let index = this._handlers.indexOf(a) | ||
// only add if new, just like CODAL | ||
if (index == -1) { | ||
this._handlers.push(a); | ||
if (!this._handlers.some(h => h.handler === a)) { | ||
this._handlers.push(new EventHandler(a, flags)); | ||
} | ||
} else { | ||
this._addRemoveLog.push({ act: a, log: LogType.BackAdd }); | ||
} | ||
else { | ||
this._addRemoveLog.push({ act: a, log: LogType.BackAdd, flags }); | ||
} | ||
} | ||
|
||
removeHandler(a: RefAction) { | ||
if (!this.lock) { | ||
let index = this._handlers.indexOf(a) | ||
let index = this._handlers.findIndex(h => h.handler === a) | ||
if (index != -1) { | ||
this._handlers.splice(index, 1) | ||
} | ||
} else { | ||
this._addRemoveLog.push({ act: a, log: LogType.BackRemove }); | ||
} | ||
else { | ||
this._addRemoveLog.push({ act: a, log: LogType.BackRemove, flags: 0 }); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanted to put this here for reference: https://github.com/lancaster-university/codal-core/blob/509086cc8590465041b15493ab52b56e7071c110/inc/core/CodalListener.h#L33-L41
Also, just wanted to know why we're not including all of the defined flags here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those other ones are internal only AFAIK