Skip to content

Commit

Permalink
refactor(node): convert handlePeerMessage to an actor
Browse files Browse the repository at this point in the history
  • Loading branch information
justmoon committed Mar 25, 2023
1 parent d1c456a commit 66d63b2
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 192 deletions.
@@ -1,13 +1,22 @@
import { Reactor, createTopic } from "@dassie/lib-reactive"
import {
Actor,
ActorFactory,
createActor,
createTopic,
} from "@dassie/lib-reactive"

import { handleInterledgerPacket } from "../handlers/interledger-packet"
import { handleLinkStateRequest } from "../handlers/link-state-request"
import { handleLinkStateUpdate } from "../handlers/link-state-update"
import { handlePeeringRequest } from "../handlers/peering-request"
import type { PeerMessage } from "../peer-schema"

export interface IncomingPeerMessageEvent {
message: PeerMessage
export interface IncomingPeerMessageEvent<
TType extends PeerMessageType = PeerMessageType
> {
message: PeerMessage & {
content: { value: { type: TType; value: PeerMessageContent<TType> } }
}
authenticated: boolean
asUint8Array: Uint8Array
}
Expand All @@ -22,33 +31,44 @@ export type PeerMessageContent<T extends PeerMessageType> = Extract<
export const incomingPeerMessageTopic = () =>
createTopic<IncomingPeerMessageEvent>()

type AllPeerMessageHandlerFactories = {
[K in PeerMessageType]: ActorFactory<
(parameters: IncomingPeerMessageEvent<K>) => Uint8Array
>
}

type AllPeerMessageHandlers = {
[K in PeerMessageType]: (
content: PeerMessageContent<K>,
parameters: IncomingPeerMessageEvent
) => Uint8Array
[K in PeerMessageType]: Actor<
(parameters: IncomingPeerMessageEvent<K>) => Uint8Array
>
}

export const handlePeerMessage = (reactor: Reactor) => {
const handlers: AllPeerMessageHandlers = {
peeringRequest: reactor.use(handlePeeringRequest),
linkStateUpdate: reactor.use(handleLinkStateUpdate),
interledgerPacket: reactor.use(handleInterledgerPacket),
linkStateRequest: reactor.use(handleLinkStateRequest),
}
const HANDLERS: AllPeerMessageHandlerFactories = {
peeringRequest: handlePeeringRequest,
linkStateUpdate: handleLinkStateUpdate,
interledgerPacket: handleInterledgerPacket,
linkStateRequest: handleLinkStateRequest,
}

const runHandler = <T extends PeerMessage["content"]["value"]["type"]>(
type: T,
content: PeerMessageContent<T>,
parameters: IncomingPeerMessageEvent
) => {
const handler = handlers[type]
return handler(content, parameters)
}
export const handlePeerMessage = () =>
createActor((sig) => {
for (const handler of Object.values(
HANDLERS as Record<string, ActorFactory<unknown>>
)) {
sig.run(handler, undefined, { register: true })
}

return (parameters: IncomingPeerMessageEvent): Uint8Array => {
const content = parameters.message.content.value
const handlers = Object.fromEntries(
Object.entries(HANDLERS as Record<string, ActorFactory<unknown>>).map(
([key, value]) => [key, sig.use(value)]
)
) as unknown as AllPeerMessageHandlers

return runHandler(content.type, content.value, parameters)
}
}
return async <TType extends PeerMessageType>(
parameters: IncomingPeerMessageEvent<TType>
) => {
const type: TType = parameters.message.content.value.type

return await handlers[type].ask(parameters)
}
})
@@ -1,70 +1,71 @@
import { createLogger } from "@dassie/lib-logger"
import type { Reactor } from "@dassie/lib-reactive"
import { createActor } from "@dassie/lib-reactive"

import { EMPTY_UINT8ARRAY } from "../../../common/constants/general"
import { subnetBalanceMapStore } from "../../balances/stores/subnet-balance-map"
import { configSignal } from "../../config"
import { incomingIlpPacketTopic } from "../../ilp-connector/topics/incoming-ilp-packet"
import subnetModules from "../../subnets/modules"
import type {
IncomingPeerMessageEvent,
PeerMessageContent,
} from "../actions/handle-peer-message"
import type { IncomingPeerMessageEvent } from "../actions/handle-peer-message"

const logger = createLogger("das:node:handle-interledger-packet")

export const handleInterledgerPacket = (reactor: Reactor) => {
const incomingIlpPacketTopicValue = reactor.use(incomingIlpPacketTopic)
const { ilpAllocationScheme } = reactor.use(configSignal).read()
const balanceMap = reactor.use(subnetBalanceMapStore)
export const handleInterledgerPacket = () =>
createActor((sig) => {
const incomingIlpPacketTopicValue = sig.use(incomingIlpPacketTopic)
const { ilpAllocationScheme } = sig.getKeys(configSignal, [
"ilpAllocationScheme",
])
const balanceMap = sig.use(subnetBalanceMapStore)

const handleInterledgerPacketAsync = async (
content: PeerMessageContent<"interledgerPacket">,
{ message: { sender, subnetId }, authenticated }: IncomingPeerMessageEvent
) => {
if (!authenticated) {
logger.warn("received unauthenticated interledger packet, discarding")
return
}
const handleInterledgerPacketAsync = async ({
message: {
sender,
subnetId,
content: {
value: { value: content },
},
},
authenticated,
}: IncomingPeerMessageEvent<"interledgerPacket">) => {
if (!authenticated) {
logger.warn("received unauthenticated interledger packet, discarding")
return
}

logger.debug("handle interledger packet", {
subnet: subnetId,
from: sender,
})
logger.debug("handle interledger packet", {
subnet: subnetId,
from: sender,
})

const incomingPacketEvent = incomingIlpPacketTopicValue.prepareEvent({
source: `${ilpAllocationScheme}.das.${subnetId}.${sender}`,
packet: content.signed.packet,
requestId: content.signed.requestId,
})
const incomingPacketEvent = incomingIlpPacketTopicValue.prepareEvent({
source: `${ilpAllocationScheme}.das.${subnetId}.${sender}`,
packet: content.signed.packet,
requestId: content.signed.requestId,
})

const subnetModule = subnetModules[subnetId]
const subnetModule = subnetModules[subnetId]

if (!subnetModule) {
throw new Error(`unknown subnet: ${subnetId}`)
}
if (!subnetModule) {
throw new Error(`unknown subnet: ${subnetId}`)
}

await subnetModule.processIncomingPacket({
subnetId,
balanceMap,
packet: incomingPacketEvent.packet,
})
await subnetModule.processIncomingPacket({
subnetId,
balanceMap,
packet: incomingPacketEvent.packet,
})

incomingIlpPacketTopicValue.emit(incomingPacketEvent)
}
incomingIlpPacketTopicValue.emit(incomingPacketEvent)
}

return (
content: PeerMessageContent<"interledgerPacket">,
parameters: IncomingPeerMessageEvent
) => {
handleInterledgerPacketAsync(content, parameters).catch(
(error: unknown) => {
return (parameters: IncomingPeerMessageEvent<"interledgerPacket">) => {
handleInterledgerPacketAsync(parameters).catch((error: unknown) => {
logger.error("error while handling interledger packet", {
error,
})
}
)
})

return EMPTY_UINT8ARRAY
}
}
return EMPTY_UINT8ARRAY
}
})
@@ -1,19 +1,28 @@
import type { Reactor } from "@dassie/lib-reactive"
import { createActor } from "@dassie/lib-reactive"

import { EMPTY_UINT8ARRAY } from "../../../common/constants/general"
import { configSignal } from "../../config"
import type { PeerMessageContent } from "../actions/handle-peer-message"
import type { IncomingPeerMessageEvent } from "../actions/handle-peer-message"
import { nodeTableStore } from "../stores/node-table"

export const handleLinkStateRequest = (reactor: Reactor) => {
const nodeTable = reactor.use(nodeTableStore)
const { nodeId } = reactor.use(configSignal).read()
export const handleLinkStateRequest = () =>
createActor((sig) => {
const nodeTable = sig.use(nodeTableStore)
const { nodeId } = sig.getKeys(configSignal, ["nodeId"])

return ({ subnetId }: PeerMessageContent<"linkStateRequest">) => {
const ownNodeTableEntry = nodeTable.read().get(`${subnetId}.${nodeId}`)
return ({
message: {
content: {
value: {
value: { subnetId },
},
},
},
}: IncomingPeerMessageEvent<"linkStateRequest">) => {
const ownNodeTableEntry = nodeTable.read().get(`${subnetId}.${nodeId}`)

if (!ownNodeTableEntry?.lastLinkStateUpdate) return EMPTY_UINT8ARRAY
if (!ownNodeTableEntry?.lastLinkStateUpdate) return EMPTY_UINT8ARRAY

return ownNodeTableEntry.lastLinkStateUpdate
}
}
return ownNodeTableEntry.lastLinkStateUpdate
}
})

0 comments on commit 66d63b2

Please sign in to comment.