Skip to content

ravecat/phoenix

Repository files navigation

@rvct/phoenix

@rvct/phoenix transforms a Phoenix Channel instance into a reactive store for channel state and outgoing messages.

Contents

Motivation

The Phoenix JavaScript client provides the channel transport primitives: join a topic, register on callbacks, push messages, and handle channel failures. @rvct/phoenix keeps those primitives available and adds a reactive layer for UI concerns: the current value, state updates from incoming events, join status, transport errors, outgoing call state, and mount/unmount cleanup.

Installation

Client Command
pnpm pnpm add @rvct/phoenix
npm npm install @rvct/phoenix
yarn yarn add @rvct/phoenix

@rvct/phoenix expects the Phoenix JavaScript client to already be available in your app. Install phoenix separately only if your project does not already provide the Socket instance.

Development

Required dependencies:

  • Node.js >=24
  • pnpm 10.26.2

Recommended:

Prepare Nix environment

Official docs:

Linux multi-user Nix install:

sh <(curl --proto '=https' --tlsv1.2 -L https://nixos.org/nix/install) --daemon

Enable flakes:

mkdir -p ~/.config/nix
printf "experimental-features = nix-command flakes\n" >> ~/.config/nix/nix.conf

Optional direnv and nix-direnv setup through Nix:

nix profile install nixpkgs#direnv nixpkgs#nix-direnv
mkdir -p ~/.config/direnv
printf 'source $HOME/.nix-profile/share/nix-direnv/direnvrc\n' >> ~/.config/direnv/direnvrc

Add the direnv hook for your shell, then restart the shell. For bash:

printf 'eval "$(direnv hook bash)"\n' >> ~/.bashrc

For other shells, use the direnv hook docs.


  • Enter the environment with nix develop, or run direnv allow once and let direnv load it automatically.
  • The flake provides Node.js, pnpm, Git, and repository check tooling.

Manual setup:

  • Install and configure the required dependencies above manually.
  • Use pnpm when running the project commands outside the Nix shell.

Local development variables can be placed in envs/.env. Use envs/.env.example as the template.

Install dependencies and run checks:

pnpm install --frozen-lockfile
pnpm run check

Basic usage

Organize a demonstration store for managing chat state. The chat domain is only the running example: replace chat:lobby, messages, and send with the topic, value, events, and actions your channel owns.

import { session } from "@rvct/phoenix"
import { Socket } from "phoenix"

const socket = new Socket("/socket", {
  params: { token: window.userToken },
})

socket.connect()

const chat = session(socket, {
  topic: "chat:lobby",
})

const unsubscribe = chat.subscribe((state) => {
  console.log(state.status, state.value, state.error, state.processing)
})

The store state has six fields:

  • value - the current channel state, or null before a value is available
  • status - channel lifecycle status: loading, ready, stale, or failed. See Lifecycle.
  • error - connection, join, transport, or close information when the session is not healthy
  • processing - per-action flags for pushes that are waiting for a reply
  • errors - per-action error replies from failed pushes, or null when clear
  • timeouts - per-action flags for pushes that timed out

Initial state

Imagine a chat view that should show something before the channel finishes joining.

Seed value with an empty list, cached messages, or server-rendered data when the UI already has useful state before channel.join() succeeds. Initial value is exposed immediately, but status still starts as loading until the channel reports a successful join.

type ChatValue = {
  messages: Array<{ id: string; body: string; insertedAt: string }>
}

const chat = session<ChatValue>(socket, {
  topic: "chat:lobby",
  value: {
    messages: [],
  },
})

Join replies

Imagine the chat server can return all messages, only the missing messages, or a normalized snapshot when the join succeeds.

By default, a successful channel.join() keeps the current store value and only marks the session as ready. Use connect.ok when the server reply should replace, normalize, or merge with the current value.

type ChatMessage = { id: string; body: string; insertedAt: string }

type ChatValue = {
  messages: ChatMessage[]
}

type ChatJoinOk = {
  messages: ChatMessage[]
}

type ChatJoinError = {
  reason: string
}

const chat = session<ChatValue>(socket, {
  topic: "chat:lobby",
  value: {
    messages: [],
  },
  connect: {
    ok(value, reply: ChatJoinOk) {
      return {
        messages: [...(value?.messages ?? []), ...reply.messages],
      }
    },
    error(reply: ChatJoinError) {
      return reply.reason
    },
  },
})

Incoming channel events

Imagine the chat server sends small updates after the initial join instead of sending the whole message list again.

events is the reactive equivalent of registering channel.on(event, callback) and then writing the new state yourself. Each handler receives the current value and the event payload, then returns the next value.

type ChatMessage = { id: string; body: string; insertedAt: string }

type ChatValue = {
  messages: ChatMessage[]
}

type ChatJoinOk = {
  messages: ChatMessage[]
}

type MessageDeleted = {
  id: string
}

const chat = session<ChatValue>(socket, {
  topic: "chat:lobby",
  value: {
    messages: [],
  },
  connect: {
    ok(_value, reply: ChatJoinOk) {
      return {
        messages: reply.messages,
      }
    },
  },
  events: {
    new_msg(value, message: ChatMessage) {
      return {
        messages: [...(value?.messages ?? []), message],
      }
    },
    message_updated(value, message: ChatMessage) {
      return {
        messages: (value?.messages ?? []).map((current) =>
          current.id === message.id ? message : current,
        ),
      }
    },
    message_deleted(value, payload: MessageDeleted) {
      return {
        messages: (value?.messages ?? []).filter((message) => message.id !== payload.id),
      }
    },
  },
})

Sending messages

Imagine a chat input that should call the server to create a message and expose pending, failed, and timed-out UI states for that button.

Use extend to define domain-specific actions while keeping the reactive subscribe method. The extension factory receives call and cast helpers that follow the Phoenix channel.push(event, payload) model where the event name maps to handle_in/3 on the server channel. Function properties returned from extend become action names in processing, errors, and timeouts; payload types come from the action method parameters. Use call<OkReply, ErrorReply>(...) for request/reply operations. Action error replies are stored under errors[action] as ErrorReply | null; untyped calls use unknown | null.

type ChatValue = {
  messages: Array<{ id: string; body: string; insertedAt: string }>
}

type SendOk = Record<string, never>

type SendError = {
  reason?: string
}

const chat = session<ChatValue>(socket, {
  topic: "chat:lobby",
  value: {
    messages: [],
  },
}).extend(({ call }) => ({
  send(payload: { body: string }) {
    return call<SendOk, SendError>("new_msg", payload)
  },
}))

chat.send({ body: "Hello" })
chat.subscribe((state) => {
  const sendError = state.errors.send

  console.log(state.processing.send)
  console.log(sendError?.reason)
  console.log(state.timeouts.send)
})

Action calls are modeled as request/reply operations. The Phoenix channel should return a reply from the matching handle_in/3; a handler that returns {:noreply, socket} leaves the client without an "ok" or "error" reply, so the action bucket is resolved by the call timeout instead.

Use cast for fire-and-forget messages. It sends through Phoenix channel.push(...) without registering receive handlers and does not transition the action bucket:

const chat = session<ChatValue>(socket, {
  topic: "chat:lobby",
}).extend(({ cast }) => ({
  typing(payload: { active: boolean }) {
    cast("typing", payload)
  },
}))
def handle_in("new_msg", %{"body" => body}, socket) do
  body = String.trim(body)

  if body == "" do
    {:reply, {:error, %{reason: "empty_message"}}, socket}
  else
    inserted_at = DateTime.utc_now() |> DateTime.to_iso8601()

    message = %{
      "id" => Integer.to_string(System.unique_integer([:positive])),
      "body" => body,
      "insertedAt" => inserted_at
    }

    broadcast!(socket, "new_msg", message)

    {:reply, {:ok, %{}}, socket}
  end
end

Action buckets

An action bucket is the group of per-action state entries under processing, errors, and timeouts. Buckets are registered for function properties returned from extend, keyed by the public method name. The Phoenix event name passed to call or cast can be different from that method name. Incoming channel events do not create buckets; they update value through events.

type ChatValue = {
  messages: Array<{ id: string; body: string; insertedAt: string }>
}

type ActionError = {
  reason?: string
}

const chat = session<ChatValue>(socket, {
  topic: "chat:lobby",
  value: {
    messages: [],
  },
}).extend(({ call }) => ({
  send(payload: { body: string }) {
    return call<unknown, ActionError>("new_msg", payload)
  },
  edit(payload: { id: string; body: string }) {
    return call<unknown, ActionError>("message_updated", payload)
  },
  remove(payload: { id: string }) {
    return call<unknown, ActionError>("message_deleted", payload)
  },
}))

chat.subscribe((state) => {
  const editError = state.errors.edit

  console.log(state.processing.send)
  console.log(editError?.reason)
  console.log(state.timeouts.remove)
})

Each bucket follows the same reply lifecycle:

  • Starting an action sets processing[action] to true and clears the previous errors[action] and timeouts[action].
  • An "ok" reply sets processing[action] to false and leaves the error and timeout entries clear.
  • An "error" reply sets processing[action] to false and stores the reply payload in errors[action], or null when the reply is nullish.
  • A "timeout" reply sets processing[action] to false, clears errors[action], and sets timeouts[action] to true.
  • Different actions are tracked independently. If the same action is called again before an older reply returns, only the latest run can update that action bucket.

Store lifecycle

The lifecycle example combines the chat pieces so the state transitions are visible in one place.

A session is a reactive store whose state is derived from Phoenix Channel interaction. The sequence below uses a complete chat store assembled from the examples above: initial chat state, join reply normalization, incoming new_msg events, and the send action that calls channel.push("new_msg").

type ChatMessage = { id: string; body: string; insertedAt: string }

type ChatValue = {
  messages: ChatMessage[]
}

type ChatJoinOk = {
  messages: ChatMessage[]
}

type SendOk = Record<string, never>

type SendError = {
  reason?: string
}

const chat = session<ChatValue>(socket, {
  topic: "chat:lobby",
  value: {
    messages: [],
  },
  connect: {
    ok(_value, reply: ChatJoinOk) {
      return {
        messages: reply.messages,
      }
    },
  },
  events: {
    new_msg(value, message: ChatMessage) {
      return {
        messages: [...(value?.messages ?? []), message],
      }
    },
  },
}).extend(({ call }) => ({
  send(payload: { body: string }) {
    return call<SendOk, SendError>("new_msg", payload)
  },
}))
%%{init: { "sequence": { "noteAlign": "left" } }}%%
sequenceDiagram
  participant App
  participant Store as store
  participant Socket as Phoenix Socket
  participant Chan as Phoenix Channel

  rect rgba(219, 234, 254, 0.35)
    Note over App,Chan: Create store and join channel
    App->>Store: session(socket, config).extend(...)
    Note over Store: {<br/>value: { messages: [] },<br/>status: "loading",<br/>error: null,<br/>processing: { send: false },<br/>errors: { send: null },<br/>timeouts: { send: false }<br/>}

    App->>Store: subscribe(listener)
    Store->>Socket: socket.channel(topic, {})
    Socket-->>Store: Channel instance
    Store->>Chan: channel.on(...)
    Store->>Chan: channel.join()
    alt JOIN OK
      Chan-->>Store: receive("ok", reply)
      Store-->>App: emit state
      Note over Store: {<br/>value: { messages: [...] },<br/>status: "ready",<br/>error: null,<br/>processing: { send: false },<br/>errors: { send: null },<br/>timeouts: { send: false }<br/>}
    else JOIN ERROR
      Chan-->>Store: receive("error", reply)
      Store-->>App: emit state
      Note over Store: {<br/>value: { messages: [] },<br/>status: "failed",<br/>error: { kind: "connect_error", cause: reply }<br/>}
    else JOIN TIMEOUT
      Chan-->>Store: receive("timeout")
      Store-->>App: emit state
      Note over Store: {<br/>value: { messages: [] },<br/>status: "failed",<br/>error: { kind: "connect_timeout" }<br/>}
    end
  end

  rect rgba(224, 242, 254, 0.35)
    Note over App,Chan: Receive channel event after JOIN
    Chan-->>Store: new_msg { id: "2", body: "Hi", insertedAt: "..." }
    Store-->>App: emit state
    Note over Store: {<br/>value: { messages: [{ id: "1", ... }, { id: "2", body: "Hi", insertedAt: "..." }] },<br/>status: "ready",<br/>error: null,<br/>processing: { send: false },<br/>errors: { send: null },<br/>timeouts: { send: false }<br/>}
  end

  rect rgba(220, 252, 231, 0.35)
    Note over App,Chan: Call message action and update action state
    App->>Store: send(payload)
    Note over Store: {<br/>value: { messages: [...] },<br/>status: "ready",<br/>error: null,<br/>processing: { send: true },<br/>errors: { send: null },<br/>timeouts: { send: false }<br/>}
    Store->>Chan: channel.push("new_msg", payload)
    alt CALL OK
      Chan-->>Store: receive("ok")
      Store-->>App: emit action state
      Note over Store: {<br/>value: { messages: [...] },<br/>status: "ready",<br/>error: null,<br/>processing: { send: false },<br/>errors: { send: null },<br/>timeouts: { send: false }<br/>}
    else CALL ERROR
      Chan-->>Store: receive("error", reply)
      Store-->>App: emit action state
      Note over Store: {<br/>value: { messages: [...] },<br/>status: "ready",<br/>error: null,<br/>processing: { send: false },<br/>errors: { send: { reason: "blocked" } },<br/>timeouts: { send: false }<br/>}
    else CALL TIMEOUT
      Chan-->>Store: receive("timeout")
      Store-->>App: emit action state
      Note over Store: {<br/>value: { messages: [...] },<br/>status: "ready",<br/>error: null,<br/>processing: { send: false },<br/>errors: { send: null },<br/>timeouts: { send: true }<br/>}
    end
  end

  rect rgba(254, 243, 199, 0.35)
    Note over App,Chan: Unsubscribe and leave channel
    App->>Store: unsubscribe()
    Store->>Chan: channel.off(...)
    Store->>Chan: channel.leave()
    Note over Store: last state remains readable, channel reference is cleared
  end
Loading

The arrows show when the store calls socket.channel, channel.on, channel.join, channel.push, channel.off, and channel.leave. Incoming channel events configured through events update value and emit state without using the action bucket. Phoenix receive statuses update the action bucket for call: "ok" and "error" come from channel replies, and "timeout" is emitted by the client when no matching reply arrives before the call timeout. Only the latest run for a given action can update that action bucket.

While the store is unmounted, no channel exists behind it, so an extended action that calls or casts throws until a later subscription mounts the store again.

Adapters

The adapter examples use a chat component that renders messages, shows connection state, sends a message, and displays action feedback.

The reactive layer is built on nanostores, but the public subscription surface is intentionally small. A session can be consumed anywhere that can subscribe to an external readable store.

The file names below are illustrative. The important split is to keep the Phoenix socket and session construction in a small client-side module, then import that readable store from framework code.

src/lib/chat.ts

import { Socket } from "phoenix"
import { session } from "@rvct/phoenix"

const socket = new Socket("/socket", {
  params: { token: window.userToken },
})

socket.connect()

type ChatMessage = { id: string; body: string; insertedAt: string }

type ChatValue = {
  messages: ChatMessage[]
}

type ChatJoinOk = {
  messages: ChatMessage[]
}

type SendOk = Record<string, never>

type SendError = {
  reason?: string
}

export const chat = session<ChatValue>(socket, {
  topic: "chat:lobby",
  value: {
    messages: [],
  },
  connect: {
    ok(_value, reply: ChatJoinOk) {
      return {
        messages: reply.messages,
      }
    },
  },
  events: {
    new_msg(value, message: ChatMessage) {
      return {
        messages: [...(value?.messages ?? []), message],
      }
    },
  },
}).extend(({ call }) => ({
  send(payload: { body: string }) {
    return call<SendOk, SendError>("new_msg", payload)
  },
}))
Svelte

Svelte can consume the exported session directly because its store contract is based on subscribe.

src/lib/Chat.svelte

<script lang="ts">
  import { chat } from "./chat"

  let body = $state("")
  const messages = $derived($chat.value?.messages ?? [])
  const sendError = $derived($chat.errors.send)
  const canSend = $derived(
    $chat.status === "ready" && body.trim().length > 0 && !$chat.processing.send,
  )

  function sendMessage() {
    const nextBody = body.trim()

    if (nextBody.length === 0) return

    chat.send({ body: nextBody })
    body = ""
  }
</script>

<section>
  {#if $chat.status === "loading"}
    <p>Joining chat...</p>
  {:else if $chat.status === "failed"}
    <p>Chat unavailable</p>
  {/if}

  <ul>
    {#each messages as message (message.id)}
      <li>
        <p>{message.body}</p>
        <time datetime={message.insertedAt}>{message.insertedAt}</time>
      </li>
    {/each}
  </ul>

  <form
    onsubmit={(event) => {
      event.preventDefault()
      sendMessage()
    }}
  >
    {#if $chat.timeouts.send}
      <p>Message timed out</p>
    {:else if sendError?.reason}
      <p>{sendError.reason}</p>
    {/if}

    <label for="chat-message">Message</label>
    <input id="chat-message" bind:value={body} disabled={$chat.status !== "ready"} />

    <button type="submit" disabled={!canSend}>
      {$chat.processing.send ? "Sending" : "Send"}
    </button>
  </form>
</section>
React

React can bridge the same subscribe method through a small hook:

src/lib/useSession.ts

import { useEffect, useState } from "react"

type Readable<TState> = {
  subscribe(listener: (state: TState) => void): () => void
}

export function useSession<TState>(store: Readable<TState>) {
  const [state, setState] = useState<TState>()

  useEffect(() => store.subscribe(setState), [store])

  return state
}

src/components/Chat.tsx

import { type FormEvent, useState } from "react"
import { chat } from "../lib/chat"
import { useSession } from "../lib/useSession"

export function Chat() {
  const state = useSession(chat)
  const [body, setBody] = useState("")
  const messageBody = body.trim()
  const messages = state?.value?.messages ?? []
  const isReady = state?.status === "ready"
  const canSend = isReady && messageBody.length > 0 && !state?.processing.send
  const sendError = state?.errors.send

  function sendMessage(event: FormEvent<HTMLFormElement>) {
    event.preventDefault()

    if (!canSend) return

    chat.send({ body: messageBody })
    setBody("")
  }

  return (
    <section>
      {state?.status === "loading" && <p>Joining chat...</p>}
      {state?.status === "failed" && <p>Chat unavailable</p>}

      <ul>
        {messages.map((message) => (
          <li key={message.id}>
            <p>{message.body}</p>
            <time dateTime={message.insertedAt}>{message.insertedAt}</time>
          </li>
        ))}
      </ul>

      <form onSubmit={sendMessage}>
        {state?.timeouts.send ? (
          <p>Message timed out</p>
        ) : sendError?.reason ? (
          <p>{sendError.reason}</p>
        ) : null}

        <label htmlFor="chat-message">Message</label>
        <input
          id="chat-message"
          value={body}
          disabled={!isReady}
          onChange={(event) => setBody(event.target.value)}
        />

        <button type="submit" disabled={!canSend}>
          {state?.processing.send ? "Sending" : "Send"}
        </button>
      </form>
    </section>
  )
}

References

License

MIT