Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON serialization #99

Merged
merged 11 commits into from May 10, 2022
2 changes: 1 addition & 1 deletion lingua-franca-ref.txt
@@ -1 +1 @@
774ca3c3bfcfadd490cf878a2b389b6fec7ff1cb
dd9669f7b3ee0d3f1ae906d14c29fe7a71bace78
15 changes: 7 additions & 8 deletions src/core/action.ts
Expand Up @@ -78,7 +78,6 @@ export abstract class SchedulableAction<T extends Present> implements Sched<T> {
tag = tag.getMicroStepLater();
}
}

if (this.action instanceof FederatePortAction) {
if (intendedTag === undefined) {
throw new Error("FederatedPortAction must have an intended tag from RTI.");
Expand All @@ -95,10 +94,10 @@ export abstract class SchedulableAction<T extends Present> implements Sched<T> {
Log.debug(this, () => "Using intended tag from RTI, similar to schedule_at_tag(tag) with an intended tag: " +
intendedTag);
tag = intendedTag;
} else if (this.action.origin == Origin.logical && !(this.action instanceof Startup)) {
} else {
tag = tag.getMicroStepLater();
}

}
Log.debug(this, () => "Scheduling " + this.action.origin +
" action " + this.action._getFullyQualifiedName() + " with tag: " + tag);

Expand Down Expand Up @@ -140,8 +139,8 @@ export class Shutdown extends Action<Present> {
}
}

export class FederatePortAction extends Action<Buffer> {
constructor(__parent__: Reactor) {
super(__parent__, Origin.logical)
export class FederatePortAction<T extends Present> extends Action<T> {
constructor(__parent__: Reactor, origin: Origin) {
super(__parent__, origin)
}
}
}
55 changes: 34 additions & 21 deletions src/core/federation.ts
Expand Up @@ -3,7 +3,7 @@ import {Socket, createConnection, SocketConnectOpts} from 'net'
import {EventEmitter} from 'events';
import {
Log, Tag, TimeValue, Origin, getCurrentPhysicalTime, Alarm,
Present, App, Action, FederatePortAction, TaggedEvent
Present, App, Action, TaggedEvent
} from './internal';

//---------------------------------------------------------------------//
Expand Down Expand Up @@ -241,15 +241,20 @@ class RTIClient extends EventEmitter {

// The mapping between a federate port ID and the federate port action
// scheduled upon reception of a message designated for that federate port.
private federatePortActionByID: Map<number, FederatePortAction> = new Map<number, FederatePortAction>();

/**
* A mapping from port IDs to FederatePortAction instances. Unfortunately, the data type of the action has to be `any`,
* meaning that the type checker cannot check whether uses of the action are type safe.
* In an alternative design, type information might be preserved. TODO(marten): Look into this.
*/
private federatePortActionByID: Map<number, Action<any>> = new Map<number, Action<any>>();

/**
* Establish the mapping between a federate port's action and its ID.
* @param federatePortID The federate port's ID.
* @param federatePort The federate port's action.
*/
public registerFederatePortAction<T extends Present>(federatePortID: number, federatePortAction: Action<Buffer>) {
Object.setPrototypeOf(federatePortAction, FederatePortAction.prototype);
public registerFederatePortAction<T extends Present>(federatePortID: number, federatePortAction: Action<T>) {
this.federatePortActionByID.set(federatePortID, federatePortAction);
}

Expand Down Expand Up @@ -420,13 +425,15 @@ class RTIClient extends EventEmitter {
* @param destPortID The port ID for the port on the destination
* federate to which this message should be sent.
*/
public sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number) {
let msg = Buffer.alloc(data.length + 9);
public sendRTIMessage<T extends Present>(data: T, destFederateID: number, destPortID: number) {
const value = Buffer.from(JSON.stringify(data), "utf-8");

let msg = Buffer.alloc(value.length + 9);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_MESSAGE, 0);
msg.writeUInt16LE(destPortID, 1);
msg.writeUInt16LE(destFederateID, 3);
msg.writeUInt32LE(data.length, 5);
data.copy(msg, 9); // Copy data into the message
msg.writeUInt32LE(value.length, 5);
value.copy(msg, 9); // Copy data into the message
try {
Log.debug(this, () => {return `Sending RTI (untimed) message to `
+ `federate ID: ${destFederateID} and port ID: ${destPortID}.`});
Expand All @@ -447,15 +454,17 @@ class RTIClient extends EventEmitter {
* @param time The time of the message encoded as a 64 bit little endian
* unsigned integer in a Buffer.
*/
public sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number, time: Buffer) {
let msg = Buffer.alloc(data.length + 21);
public sendRTITimedMessage<T extends Present>(data: T, destFederateID: number, destPortID: number, time: Buffer) {
const value = Buffer.from(JSON.stringify(data), "utf-8");

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove extra blank lines.

let msg = Buffer.alloc(value.length + 21);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_TAGGED_MESSAGE, 0);
msg.writeUInt16LE(destPortID, 1);
msg.writeUInt16LE(destFederateID, 3);
msg.writeUInt32LE(data.length, 5);
msg.writeUInt32LE(value.length, 5);
time.copy(msg, 9); // Copy the current time into the message
// FIXME: Add microstep properly.
data.copy(msg, 21); // Copy data into the message
value.copy(msg, 21); // Copy data into the message
try {
Log.debug(this, () => {return `Sending RTI (timed) message to `
+ `federate ID: ${destFederateID}, port ID: ${destPortID} `
Expand Down Expand Up @@ -897,7 +906,7 @@ export class FederatedApp extends App {
* unique among all port IDs on this federate and be a number between 0 and NUMBER_OF_PORTS - 1
* @param federatePort The federate port's action for registration.
*/
public registerFederatePortAction(federatePortID: number, federatePortAction: Action<Buffer>) {
public registerFederatePortAction<T extends Present>(federatePortID: number, federatePortAction: Action<T>) {
if (federatePortAction.origin === Origin.logical) {
this.rtiSynchronized = true;
}
Expand All @@ -911,7 +920,7 @@ export class FederatedApp extends App {
* @param destFederateID The ID of the federate intended to receive the message.
* @param destPortID The ID of the federate's port intended to receive the message.
*/
public sendRTIMessage(msg: Buffer, destFederateID: number, destPortID: number ) {
public sendRTIMessage<T extends Present>(msg: T, destFederateID: number, destPortID: number ) {
Log.debug(this, () => {return `Sending RTI message to federate ID: ${destFederateID}`
+ ` port ID: ${destPortID}`});
this.rtiClient.sendRTIMessage(msg, destFederateID, destPortID);
Expand All @@ -925,7 +934,7 @@ export class FederatedApp extends App {
* @param destFederateID The ID of the Federate intended to receive the message.
* @param destPortID The ID of the FederateInPort intended to receive the message.
*/
public sendRTITimedMessage(msg: Buffer, destFederateID: number, destPortID: number ) {
public sendRTITimedMessage<T extends Present>(msg: T, destFederateID: number, destPortID: number ) {
let time = this.util.getCurrentTag().toBinary();
Log.debug(this, () => {return `Sending RTI timed message to federate ID: ${destFederateID}`
+ ` port ID: ${destPortID} and time: ${time.toString('hex')}`});
Expand Down Expand Up @@ -1015,14 +1024,16 @@ export class FederatedApp extends App {
}
});

this.rtiClient.on('message', (destPortAction: FederatePortAction, messageBuffer: Buffer) => {
this.rtiClient.on('message', <T extends Present>(destPortAction: Action<T>, messageBuffer: Buffer) => {
// Schedule this federate port's action.
// This message is untimed, so schedule it immediately.
Log.debug(this, () => {return `(Untimed) Message received from RTI.`})
destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, messageBuffer);
const value: T = JSON.parse(messageBuffer.toString());

destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, value);
});

this.rtiClient.on('timedMessage', (destPortAction: FederatePortAction, messageBuffer: Buffer,
this.rtiClient.on('timedMessage', <T extends Present>(destPortAction: Action<T>, messageBuffer: Buffer,
tag: Tag) => {
// Schedule this federate port's action.

Expand All @@ -1047,13 +1058,15 @@ export class FederatedApp extends App {
// FIXME: implement decentralized control.

Log.debug(this, () => {return `Timed Message received from RTI with tag ${tag}.`})
const value: T = JSON.parse(messageBuffer.toString());

if (destPortAction.origin == Origin.logical) {
destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, messageBuffer, tag);
destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, value, tag);

} else {
// The schedule function for physical actions implements
// Tr = max(r, R + A)
destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, messageBuffer);
destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, value);
}
});

Expand Down Expand Up @@ -1092,4 +1105,4 @@ export class FederatedApp extends App {
*/
export class RemoteFederatePort {
constructor(public federateID: number, public portID: number) {}
}
}
16 changes: 8 additions & 8 deletions src/core/reactor.ts
Expand Up @@ -1555,8 +1555,8 @@ interface UtilityFunctions {
getCurrentPhysicalTime(): TimeValue;
getElapsedLogicalTime(): TimeValue;
getElapsedPhysicalTime(): TimeValue;
sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number): void;
sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number): void;
sendRTIMessage<T extends Present>(data: T, destFederateID: number, destPortID: number): void;
sendRTITimedMessage<T extends Present>(data: T, destFederateID: number, destPortID: number): void;
}

export interface MutationSandbox extends ReactionSandbox {
Expand Down Expand Up @@ -1663,11 +1663,11 @@ export class App extends Reactor {
return getCurrentPhysicalTime().subtract(this.app._startOfExecution);
}

public sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number) {
public sendRTIMessage<T extends Present>(data: T, destFederateID: number, destPortID: number) {
return this.app.sendRTIMessage(data, destFederateID, destPortID);
};

public sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number) {
public sendRTITimedMessage<T extends Present>(data: T, destFederateID: number, destPortID: number) {
return this.app.sendRTITimedMessage(data, destFederateID, destPortID);
};

Expand Down Expand Up @@ -1780,22 +1780,22 @@ export class App extends Reactor {
/**
* Send an (untimed) message to the designated federate port through the RTI.
* This function throws an error if it isn't called on a FederatedApp.
* @param data A Buffer containing the body of the message.
* @param data The data that contain the body of the message.
* @param destFederateID The federate ID that is the destination of this message.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the comment on @param data? It's not a Buffer anymore.

* @param destPortID The port ID that is the destination of this message.
*/
protected sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number) {
protected sendRTIMessage<T extends Present>(data: T, destFederateID: number, destPortID: number) {
throw new Error("Cannot call sendRTIMessage from an App. sendRTIMessage may be called only from a FederatedApp");
}

/**
* Send a (timed) message to the designated federate port through the RTI.
* This function throws an error if it isn't called on a FederatedApp.
* @param data A Buffer containing the body of the message.
* @param data The data that contain the body of the message.
* @param destFederateID The federate ID that is the destination of this message.
lhstrh marked this conversation as resolved.
Show resolved Hide resolved
* @param destPortID The port ID that is the destination of this message.
*/
protected sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number) {
protected sendRTITimedMessage<T extends Present>(data: T, destFederateID: number, destPortID: number) {
throw new Error("Cannot call sendRTIMessage from an App. sendRTIMessage may be called only from a FederatedApp");
}

Expand Down