Skip to content

Commit

Permalink
Add Cloudflare Worker-based EthernetProvider
Browse files Browse the repository at this point in the history
Uses durable objects to route ethernet packets between clients in a
zone. Nothing is actually persisted in storage, we use the in-memory
state of the durable object, since it only makes sense to broadcast
packets while to other clients that are currently actively connected.

There is also a debug /zone/<name>/list UI to list currently connected
clients.

For #53
  • Loading branch information
mihaip committed Jun 1, 2022
1 parent e756f57 commit 791621c
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/BasiliskII/emulator-ui.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export type EmulatorConfig = {

export interface EmulatorEthernetProvider {
init(macAddress: string): void;
close?(): void;
send(destination: string, packet: Uint8Array): void;
setDelegate(delegate: EmulatorEthernetProviderDelegate): void;
}
Expand Down
125 changes: 125 additions & 0 deletions src/CloudflareWorkerEthernetProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import type {
EmulatorEthernetProvider,
EmulatorEthernetProviderDelegate,
} from "./BasiliskII/emulator-ui";

/**
* Works in conjunction with the Clouldflare worker defined in
* workers-ethernet/src/index.mjs to broadcast Ethernet packets to all emulator
* instances in the same named zone.
*/
export class CloudflareWorkerEthernetProvider
implements EmulatorEthernetProvider
{
#zoneName: string;
#macAddress?: string;
#webSsocket: WebSocket;
#delegate?: EmulatorEthernetProviderDelegate;
#state: "opening" | "closed" | "opened" = "opening";
#bufferedMessages: string[] = [];
#reconnectTimeout?: number;

constructor(zoneName: string) {
this.#zoneName = zoneName;
this.#webSsocket = this.#connect();
}

#connect(): WebSocket {
const protocol = location.protocol === "https:" ? "wss:" : "ws:";
const origin = `${protocol}//${location.host}`;
const webSocket = new WebSocket(
`${origin}/zone/${this.#zoneName}/websocket`
);
webSocket.addEventListener("open", this.#handleOpen);
webSocket.addEventListener("close", this.#handleClose);
webSocket.addEventListener("error", this.#handleError);
webSocket.addEventListener("message", this.#handleMessage);
return webSocket;
}

#reconnect(): void {
const webSocket = this.#webSsocket;
webSocket.removeEventListener("open", this.#handleOpen);
webSocket.removeEventListener("close", this.#handleClose);
webSocket.removeEventListener("error", this.#handleError);
webSocket.removeEventListener("message", this.#handleMessage);

this.#state = "opening";
if (this.#reconnectTimeout) {
window.clearTimeout(this.#reconnectTimeout);
}
this.#reconnectTimeout = window.setTimeout(() => {
this.#webSsocket = this.#connect();
}, 1000);
}

init(macAddress: string): void {
this.#macAddress = macAddress;
this.#send({type: "init", macAddress});
}

close() {
this.#state = "closed";
this.#send({type: "close"});
this.#webSsocket.close();
}

send(destination: string, packet: Uint8Array): void {
// TODO: send packets directly as Uint8Arrays, to avoid copying overhead.
this.#send({
type: "send",
destination,
packetArray: Array.from(packet),
});
}

#send(message: any) {
message = JSON.stringify(message);
if (this.#state === "opened") {
this.#webSsocket.send(message);
} else {
this.#bufferedMessages.push(message);
}
}

setDelegate(delegate: EmulatorEthernetProviderDelegate): void {
this.#delegate = delegate;
}

#handleOpen = (event: Event): void => {
this.#state = "opened";
const bufferedMessages = this.#bufferedMessages;
this.#bufferedMessages = [];
for (const message of bufferedMessages) {
this.#webSsocket.send(message);
}
if (this.#macAddress) {
this.init(this.#macAddress);
}
};

#handleClose = (event: CloseEvent): void => {
if (this.#state === "closed") {
// Intentionally closed
return;
}
this.#reconnect();
};

#handleError = (event: Event): void => {
console.error("WebSocket error", event);
this.#reconnect();
};

#handleMessage = (event: MessageEvent): void => {
const data = JSON.parse(event.data);
const {type} = data;
switch (type) {
case "receive":
const {packetArray} = data;
const packet = new Uint8Array(packetArray);
this.#delegate?.receive(packet);
break;
}
};
}
10 changes: 8 additions & 2 deletions src/Mac.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import system753HdManifest from "./Data/System 7.5.3 HD.dsk.json";
import kanjiTalk753HdManifest from "./Data/KanjiTalk 7.5.3 HD.dsk.json";
import macos81HdManifest from "./Data/Mac OS 8.1 HD.dsk.json";
import infiniteHdManifest from "./Data/Infinite HD.dsk.json";
import type {EmulatorEthernetProvider} from "./BasiliskII/emulator-ui";
import {Emulator} from "./BasiliskII/emulator-ui";
import type {EmulatorChunkedFileSpec} from "./BasiliskII/emulator-common";
import {isDiskImageFile} from "./BasiliskII/emulator-common";
import {BroadcastChannelEthernetProvider} from "./BroadcastChannelEthernetProvider";
import {CloudflareWorkerEthernetProvider} from "./CloudflareWorkerEthernetProvider";

const SCREEN_WIDTH = 800;
const SCREEN_HEIGHT = 600;
Expand Down Expand Up @@ -45,9 +47,12 @@ export function Mac() {
prefetchChunks: [0],
...infiniteHdManifest,
};
let ethernetProvider;
let ethernetProvider: EmulatorEthernetProvider | undefined;
if (searchParams.get("ethernet")) {
ethernetProvider = new BroadcastChannelEthernetProvider();
const zoneName = searchParams.get("ethernet_zone");
ethernetProvider = zoneName
? new CloudflareWorkerEthernetProvider(zoneName)
: new BroadcastChannelEthernetProvider();
}
const emulator = new Emulator(
{
Expand Down Expand Up @@ -101,6 +106,7 @@ export function Mac() {
);
emulator.stop();
emulatorRef.current = undefined;
ethernetProvider?.close?.();
};
}, []);

Expand Down
105 changes: 105 additions & 0 deletions workers-site/ethernet-zone.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
export class EthernetZone implements DurableObject {
#clients: EthernetZoneClient[] = [];

async fetch(request: Request) {
const url = new URL(request.url);
if (url.pathname === "/websocket") {
return this.handleWebSocket(request);
} else if (url.pathname === "/list") {
return new Response(
`clients: ${JSON.stringify(
this.#clients,
(key, value) => {
if (key === "webSocket") {
const webSocket = value as WebSocket;
return {
readyState: webSocket.readyState,
url: webSocket.url,
protocol: webSocket.protocol,
};
}
return value;
},
2
)}`,
{
status: 200,
statusText: "OK",
headers: {
"Content-Type": "text/plain",
},
}
);
}
return new Response("Not found (EthernetZone)", {status: 404});
}

async handleWebSocket(request: Request) {
if (request.headers.get("Upgrade") !== "websocket") {
return new Response("Expected websocket", {status: 400});
}

const [client, server] = Object.values(new WebSocketPair());

await this.handleClient(server);

return new Response(null, {status: 101, webSocket: client});
}

async handleClient(webSocket: WebSocket) {
webSocket.accept();

const client: EthernetZoneClient = {webSocket};
this.#clients.push(client);

webSocket.addEventListener("message", async event => {
const data = JSON.parse(event.data as string);
const {type, ...payload} = data;
switch (type) {
case "init":
client.macAddress = payload.macAddress;
break;
case "close":
this.closeClient(client);
break;
case "send":
const {destination, ...sendPayload} = payload;
for (const otherClient of this.#clients) {
if (otherClient === client || otherClient.closed) {
continue;
}
if (
destination === "*" ||
destination === "AT" ||
destination === otherClient.macAddress
) {
try {
otherClient.webSocket.send(
JSON.stringify({
type: "receive",
...sendPayload,
})
);
} catch (err) {
this.closeClient(otherClient);
}
}
}
break;
default:
console.warn("Unexpected message", data);
}
});
}

closeClient(client: EthernetZoneClient) {
client.closed = true;
this.#clients.splice(this.#clients.indexOf(client), 1);
}
}

type EthernetZoneClient = {
webSocket: WebSocket;
macAddress?: string;
closed?: boolean;
};
12 changes: 12 additions & 0 deletions workers-site/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,29 @@ const manifest = JSON.parse(manifestJSON);

type Env = {
__STATIC_CONTENT: string;
ETHERNET_ZONE: DurableObjectNamespace;
};
const handler: ExportedHandler<Env> = {fetch: handleRequest};

export default handler;
export {EthernetZone} from "./ethernet-zone";

async function handleRequest(
request: Request,
env: Env,
ctx: ExecutionContext
) {
const url = new URL(request.url);
const path = url.pathname.slice(1).split("/");
if (path[0] === "zone") {
const zoneName = path[1];
const zoneId = env.ETHERNET_ZONE.idFromName(zoneName);
const zone = env.ETHERNET_ZONE.get(zoneId);
const zoneUrl = new URL(request.url);
zoneUrl.pathname = "/" + path.slice(2).join("/");
return zone.fetch(zoneUrl.toString(), request);
}

const fetchEvent = {
request,
waitUntil(promise: Promise<any>) {
Expand Down
12 changes: 12 additions & 0 deletions wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ workers_dev = false
compatibility_date = "2021-11-08"
main = "workers-site/index.ts"

[durable_objects]
bindings = [
{name = "ETHERNET_ZONE", class_name = "EthernetZone"}
]

[[migrations]]
tag = "v1"
new_classes = ["EthernetZone"]

[site]
bucket = "./build"
exclude = ["BasiliskII.wasm.map"]
Expand All @@ -15,3 +24,6 @@ routes = [
{pattern = "kanjitalk7.app/*", zone_name = "kanjitalk7.app"},
{pattern = "mac.persistent.info/*", zone_name = "persistent.info"},
]
durable_objects.bindings = [
{name = "ETHERNET_ZONE", class_name = "EthernetZone"}
]

0 comments on commit 791621c

Please sign in to comment.