diff --git a/api/Dockerfile b/api/Dockerfile index 3b985f8..5a51966 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -9,4 +9,4 @@ RUN go build ./cmd/api_server/api_server.go FROM debian:buster AS runner RUN mkdir /app COPY --from=builder /workspace/api_server /app/api_server -ENTRYPOINT ["/app/api_server"] +ENTRYPOINT ["/app/api_server", "--nats=nats://address-chat-nats.internal:4222"] diff --git a/api/cmd/api_server/api_server.go b/api/cmd/api_server/api_server.go index 7a200e5..10df94e 100644 --- a/api/cmd/api_server/api_server.go +++ b/api/cmd/api_server/api_server.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "flag" "fmt" "log" "net/http" @@ -62,10 +63,7 @@ func wsDriver(nc *nats.Conn, conn *websocket.Conn) { if err != nil { log.Fatalf("could not marshall msg: %s", err) } - if err := nc.Publish(fmt.Sprintf("MESSAGES.%s", msg.From), data); err != nil { - log.Fatalf("could not publish to nats: %s", err) - } - for _, addr := range msg.To { + for addr := range msg.Participants() { if err := nc.Publish(fmt.Sprintf("MESSAGES.%s", addr), data); err != nil { log.Fatalf("could not publish to nats: %s", err) } @@ -99,6 +97,7 @@ func wsDriver(nc *nats.Conn, conn *websocket.Conn) { if err != nil { log.Fatalf("could not subscribe to MESSAGES: %s", err) } + log.Println("subscribed to:", subj) for { message, err := sub.NextMsg(1 * time.Second) if err == nats.ErrTimeout { @@ -107,6 +106,11 @@ func wsDriver(nc *nats.Conn, conn *websocket.Conn) { if err != nil { log.Fatalf("unexpected error reading from nats: %s", err) } + meta, err := message.Metadata() + if err != nil { + log.Fatalf("unexpected metadata error: %s", err) + } + log.Println("received nats message:", meta) var msg protocol.Message if err := json.Unmarshal(message.Data, &msg); err != nil { log.Fatalf("could not decode message: %s", err) @@ -115,6 +119,9 @@ func wsDriver(nc *nats.Conn, conn *websocket.Conn) { log.Println("write to websocket:", err) return } + if err := message.AckSync(); err != nil { + log.Println("ack:", err) + } } }() } @@ -163,8 +170,8 @@ func healthCheckHandler(w http.ResponseWriter, r *http.Request) { } func main() { - nc, err := nats.Connect("nats://address-chat-nats.internal:4222") - // nc, err := nats.Connect(nats.DefaultURL) + natsUrl := flag.String("nats", nats.DefaultURL, "the url for the NATS cluster") + nc, err := nats.Connect(*natsUrl) if err != nil { log.Fatalf("could not connect to nats: %s", err) } diff --git a/api/protocol/protocol.go b/api/protocol/protocol.go index fe33893..51ed541 100644 --- a/api/protocol/protocol.go +++ b/api/protocol/protocol.go @@ -22,3 +22,12 @@ type Message struct { To []string `json:"to"` Content string `json:"content"` } + +func (m Message) Participants() map[string]bool { + ps := make(map[string]bool) + ps[m.From] = true + for _, addr := range m.To { + ps[addr] = true + } + return ps +} diff --git a/frontend/src/SignedIn.svelte b/frontend/src/SignedIn.svelte index 0653948..039eeed 100644 --- a/frontend/src/SignedIn.svelte +++ b/frontend/src/SignedIn.svelte @@ -6,6 +6,11 @@ readonly address: string; readonly name?: string; } + interface Message { + readonly from: string; + readonly to: readonly string[]; + readonly content: string; + } import { ethers } from "ethers"; import Mailbox from "./Mailbox.svelte"; @@ -18,9 +23,10 @@ author = { ...author, name }; }); - const ws = new WebSocket("wss://address-chat-api.fly.dev/ws"); - // const ws = new WebSocket("ws://localhost:8080/ws"); + // const ws = new WebSocket("wss://address-chat-api.fly.dev/ws"); + const ws = new WebSocket("ws://localhost:8080/ws"); let authenticatedUntil: number | null = null; + let messages: readonly Message[] = []; ws.onopen = (evt) => { console.log("[OPEN]", evt); ws.send(token); @@ -31,6 +37,8 @@ const msg = JSON.parse(evt.data); if (typeof msg.authenticatedUntil === "number") { authenticatedUntil = msg.authenticatedUntil || null; + } else { + messages = [...messages, msg]; } } catch (e) { console.error(e); @@ -80,6 +88,11 @@ } +
+ {#each messages as m} +

{m.from} -> {m.to}: {m.content}

+ {/each} +