Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions apps/api/cmd/clickclack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os/signal"
"path/filepath"
"strings"
"time"

"github.com/openclaw/clickclack/apps/api/internal/config"
"github.com/openclaw/clickclack/apps/api/internal/httpapi"
Expand Down Expand Up @@ -270,6 +271,64 @@ func admin(args []string) error {
return nil
}
return json.NewEncoder(os.Stdout).Encode(map[string]any{"bot": bot, "bot_token": token, "token": token.Token})
case "events":
if len(args) < 2 || args[1] != "prune" {
return fmt.Errorf("usage: clickclack admin events prune --workspace WORKSPACE_ID [--older-than-days DAYS | --before RFC3339] [--keep-latest N]")
}
flags := flag.NewFlagSet("admin events prune", flag.ExitOnError)
data := flags.String("data", "./data", "data directory")
dbURL := flags.String("db", "", "database URL")
workspaceID := flags.String("workspace", "", "workspace id")
olderThanDays := flags.Int("older-than-days", 0, "delete events older than this many days")
before := flags.String("before", "", "delete events created before this RFC3339 timestamp")
keepLatest := flags.Int("keep-latest", 0, "always keep the latest N events in the workspace")
if err := flags.Parse(args[2:]); err != nil {
return err
}
if *workspaceID == "" {
return fmt.Errorf("--workspace is required")
}
if *olderThanDays < 0 {
return fmt.Errorf("--older-than-days must be non-negative")
}
if *keepLatest < 0 {
return fmt.Errorf("--keep-latest must be non-negative")
}
if *olderThanDays > 0 && strings.TrimSpace(*before) != "" {
return fmt.Errorf("--older-than-days and --before are mutually exclusive")
}
cutoff := strings.TrimSpace(*before)
if cutoff != "" {
parsed, err := time.Parse(time.RFC3339Nano, cutoff)
if err != nil {
return fmt.Errorf("--before must be RFC3339: %w", err)
}
cutoff = parsed.UTC().Format(time.RFC3339Nano)
}
if *olderThanDays > 0 {
cutoff = time.Now().UTC().Add(-time.Duration(*olderThanDays) * 24 * time.Hour).Format(time.RFC3339Nano)
}
if cutoff == "" && *keepLatest == 0 {
return fmt.Errorf("provide --older-than-days, --before, or --keep-latest")
}
if err := ensureDirs(*data); err != nil {
return err
}
st, err := sqlitestore.Open(resolveDB(*data, *dbURL))
if err != nil {
return err
}
defer st.Close()
ctx := context.Background()
if err := st.Migrate(ctx); err != nil {
return err
}
pruned, err := st.PruneEvents(ctx, *workspaceID, *keepLatest, cutoff)
if err != nil {
return err
}
fmt.Printf("pruned %d events\n", pruned)
return nil
case "magic-link":
if len(args) < 2 || args[1] != "create" {
return fmt.Errorf("usage: clickclack admin magic-link create --email EMAIL [--name NAME]")
Expand Down
2 changes: 1 addition & 1 deletion apps/api/internal/httpapi/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *Server) search(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusForbidden, err)
return
}
results, err := s.store.SearchMessages(r.Context(), workspaceID, act.user.ID, r.URL.Query().Get("q"), queryInt(r, "limit", 50))
results, err := s.store.SearchMessages(r.Context(), workspaceID, r.URL.Query().Get("channel_id"), act.user.ID, r.URL.Query().Get("q"), queryInt(r, "limit", 50))
writeResult(w, map[string]any{"results": results}, err)
}

Expand Down
6 changes: 3 additions & 3 deletions apps/api/internal/store/sqlite/chat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestStoreChatThreadsSearchUploadsAndEvents(t *testing.T) {
t.Fatalf("unexpected thread: %#v %#v %#v", threadRoot, replies, threadState)
}

results, err := st.SearchMessages(ctx, workspace.ID, owner.ID, "searchable", 10)
results, err := st.SearchMessages(ctx, workspace.ID, "", owner.ID, "searchable", 10)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -656,7 +656,7 @@ func TestStoreBranchCases(t *testing.T) {
if _, err := st.ListMessages(ctx, "chn_missing", owner.ID, store.MessagePageRequest{Limit: 10}); err == nil {
t.Fatal("expected missing channel list error")
}
if results, err := st.SearchMessages(ctx, workspace.ID, owner.ID, "missingterm", 999); err != nil || len(results) != 0 {
if results, err := st.SearchMessages(ctx, workspace.ID, "", owner.ID, "missingterm", 999); err != nil || len(results) != 0 {
t.Fatalf("expected no search results, got %#v err=%v", results, err)
}
if _, err := st.ListEventsAfter(ctx, workspace.ID, owner.ID, "", 999); err != nil {
Expand All @@ -669,7 +669,7 @@ func TestStoreBranchCases(t *testing.T) {
if _, err := st.CreateInvite(ctx, workspace.ID, outsider.ID); err == nil {
t.Fatal("expected invite membership error")
}
if _, err := st.SearchMessages(ctx, workspace.ID, outsider.ID, "root", 10); err == nil {
if _, err := st.SearchMessages(ctx, workspace.ID, "", outsider.ID, "root", 10); err == nil {
t.Fatal("expected search membership error")
}

Expand Down
95 changes: 86 additions & 9 deletions apps/api/internal/store/sqlite/dms.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"github.com/openclaw/clickclack/apps/api/internal/store"
)

const directConversationMemberHydrationBatchSize = 500

func (s *Store) ListDirectConversations(ctx context.Context, workspaceID, userID string) ([]store.DirectConversation, error) {
if err := s.requireMembership(ctx, workspaceID, userID); err != nil {
return nil, err
}
rows, err := s.db.QueryContext(ctx, `
SELECT dc.id, dc.workspace_id, dc.created_at,
COALESCE((SELECT MAX(channel_seq) FROM messages WHERE direct_conversation_id = dc.id), 0) AS last_seq,
COALESCE((SELECT MAX(channel_seq) FROM messages WHERE direct_conversation_id = dc.id AND parent_message_id IS NULL), 0) AS last_seq,
COALESCE((SELECT last_read_seq FROM direct_reads WHERE conversation_id = dc.id AND user_id = ?), 0) AS last_read_seq
FROM direct_conversations dc
JOIN direct_conversation_members dcm ON dcm.conversation_id = dc.id
Expand All @@ -43,12 +45,16 @@ func (s *Store) ListDirectConversations(ctx context.Context, workspaceID, userID
if err := rows.Close(); err != nil {
return nil, err
}
ids := make([]string, 0, len(out))
for _, dm := range out {
ids = append(ids, dm.ID)
}
membersByConversation, err := s.directConversationMembersByConversationIDs(ctx, ids)
if err != nil {
return nil, err
}
for i := range out {
members, err := s.directConversationMembers(ctx, out[i].ID)
if err != nil {
return nil, err
}
out[i].Members = members
out[i].Members = membersByConversation[out[i].ID]
}
return out, nil
}
Expand All @@ -57,7 +63,7 @@ func (s *Store) GetDirectConversation(ctx context.Context, conversationID, userI
var dm store.DirectConversation
if err := s.db.QueryRowContext(ctx, `
SELECT dc.id, dc.workspace_id, dc.created_at,
COALESCE((SELECT MAX(channel_seq) FROM messages WHERE direct_conversation_id = dc.id), 0) AS last_seq,
COALESCE((SELECT MAX(channel_seq) FROM messages WHERE direct_conversation_id = dc.id AND parent_message_id IS NULL), 0) AS last_seq,
COALESCE((SELECT last_read_seq FROM direct_reads WHERE conversation_id = dc.id AND user_id = ?), 0) AS last_read_seq
FROM direct_conversations dc
JOIN direct_conversation_members dcm ON dcm.conversation_id = dc.id
Expand Down Expand Up @@ -120,7 +126,7 @@ func (s *Store) ListDirectMessages(ctx context.Context, conversationID, userID s
return store.MessagePage{}, err
}
return s.listMessagePage(ctx, messagePageScope{
where: "m.direct_conversation_id = ?",
where: "m.direct_conversation_id = ? AND m.parent_message_id IS NULL",
args: []any{conversationID},
}, page)
}
Expand Down Expand Up @@ -184,7 +190,11 @@ func (s *Store) CreateDirectMessage(ctx context.Context, input store.CreateDirec
if _, err := tx.ExecContext(ctx, `INSERT INTO thread_state (root_message_id) VALUES (?)`, id); err != nil {
return store.Message{}, store.Event{}, err
}
event, err := insertEvent(ctx, tx, workspaceID, "", "message.created", &seq, eventPayload(map[string]string{"message_id": id, "direct_conversation_id": input.ConversationID, "author_id": input.AuthorID}, nonce))
recipients, err := directConversationMemberIDsTx(ctx, tx, input.ConversationID)
if err != nil {
return store.Message{}, store.Event{}, err
}
event, err := insertEventWithRecipients(ctx, tx, workspaceID, "", "message.created", &seq, eventPayload(map[string]string{"message_id": id, "direct_conversation_id": input.ConversationID, "author_id": input.AuthorID}, nonce), recipients)
if err != nil {
return store.Message{}, store.Event{}, err
}
Expand All @@ -205,6 +215,27 @@ func requireDirectMembershipTx(ctx context.Context, tx *sql.Tx, conversationID,
return tx.QueryRowContext(ctx, `SELECT 1 FROM direct_conversation_members WHERE conversation_id = ? AND user_id = ?`, conversationID, userID).Scan(&one)
}

func directConversationMemberIDsTx(ctx context.Context, tx *sql.Tx, conversationID string) ([]string, error) {
rows, err := tx.QueryContext(ctx, `
SELECT user_id
FROM direct_conversation_members
WHERE conversation_id = ?
ORDER BY user_id`, conversationID)
if err != nil {
return nil, err
}
defer rows.Close()
ids := []string{}
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
ids = append(ids, id)
}
return ids, rows.Err()
}

func (s *Store) directConversationMembers(ctx context.Context, conversationID string) ([]store.User, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT u.id, u.kind, u.owner_user_id, u.display_name, u.handle, u.avatar_url, u.created_at
Expand All @@ -227,6 +258,52 @@ func (s *Store) directConversationMembers(ctx context.Context, conversationID st
return members, rows.Err()
}

func (s *Store) directConversationMembersByConversationIDs(ctx context.Context, conversationIDs []string) (map[string][]store.User, error) {
out := map[string][]store.User{}
if len(conversationIDs) == 0 {
return out, nil
}
for start := 0; start < len(conversationIDs); start += directConversationMemberHydrationBatchSize {
end := min(start+directConversationMemberHydrationBatchSize, len(conversationIDs))
batch := conversationIDs[start:end]
placeholders := strings.TrimRight(strings.Repeat("?,", len(batch)), ",")
args := make([]any, 0, len(batch))
for _, id := range batch {
args = append(args, id)
}
rows, err := s.db.QueryContext(ctx, `
SELECT dcm.conversation_id, u.id, u.kind, u.owner_user_id, u.display_name, u.handle, u.avatar_url, u.created_at
FROM direct_conversation_members dcm
JOIN users u ON u.id = dcm.user_id
WHERE dcm.conversation_id IN (`+placeholders+`)
ORDER BY dcm.conversation_id, u.display_name`, args...)
if err != nil {
return nil, err
}
for rows.Next() {
var conversationID string
var owner sql.NullString
var member store.User
if err := rows.Scan(&conversationID, &member.ID, &member.Kind, &owner, &member.DisplayName, &member.Handle, &member.AvatarURL, &member.CreatedAt); err != nil {
_ = rows.Close()
return nil, err
}
if owner.Valid {
member.OwnerUserID = owner.String
}
out[conversationID] = append(out[conversationID], member)
}
if err := rows.Err(); err != nil {
_ = rows.Close()
return nil, err
}
if err := rows.Close(); err != nil {
return nil, err
}
}
return out, nil
}

func compactStrings(values []string) []string {
var out []string
for _, value := range values {
Expand Down
47 changes: 47 additions & 0 deletions apps/api/internal/store/sqlite/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package sqlite

import (
"context"
"errors"
"strings"
"time"
)

// PruneEvents deletes old durable events for a workspace while preserving the
// newest keepLatest events. At least one retention bound is required so callers
// cannot accidentally wipe the whole event log with default zero values.
func (s *Store) PruneEvents(ctx context.Context, workspaceID string, keepLatest int, before string) (int64, error) {
workspaceID = strings.TrimSpace(workspaceID)
before = strings.TrimSpace(before)
if workspaceID == "" {
return 0, errors.New("workspace is required")
}
if keepLatest < 0 {
return 0, errors.New("keep_latest must be non-negative")
}
if keepLatest == 0 && before == "" {
return 0, errors.New("keep_latest or before is required")
}
if before != "" {
parsed, err := time.Parse(time.RFC3339Nano, before)
if err != nil {
return 0, errors.New("before must be RFC3339")
}
before = parsed.UTC().Format(time.RFC3339Nano)
}
result, err := s.db.ExecContext(ctx, `
DELETE FROM events
WHERE workspace_id = ?
AND (? = '' OR julianday(created_at) < julianday(?))
AND id NOT IN (
SELECT id
FROM events
WHERE workspace_id = ?
ORDER BY cursor DESC
LIMIT ?
)`, workspaceID, before, before, workspaceID, keepLatest)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
2 changes: 1 addition & 1 deletion apps/api/internal/store/sqlite/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (s *Store) ExportJSON(ctx context.Context, writer io.Writer) error {
out := map[string]any{}
tables := []string{
"users", "user_notification_settings", "identities", "workspaces", "workspace_members", "channels",
"messages", "thread_state", "reactions", "events", "uploads",
"messages", "thread_state", "reactions", "events", "event_recipients", "uploads",
"message_attachments", "direct_conversations", "direct_conversation_members",
"invites", "auth_magic_links", "sessions", "bot_tokens",
}
Expand Down
17 changes: 16 additions & 1 deletion apps/api/internal/store/sqlite/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,15 @@ func updateThreadState(ctx context.Context, tx *sql.Tx, rootID, authorID, create
}

func insertEvent(ctx context.Context, tx *sql.Tx, workspaceID, channelID, eventType string, seq *int64, payload any) (store.Event, error) {
return insertEventWithRecipients(ctx, tx, workspaceID, channelID, eventType, seq, payload, nil)
}

func insertEventWithRecipients(ctx context.Context, tx *sql.Tx, workspaceID, channelID, eventType string, seq *int64, payload any, recipientUserIDs []string) (store.Event, error) {
payloadJSON, err := json.Marshal(payload)
if err != nil {
return store.Event{}, err
}
recipients := compactStrings(recipientUserIDs)
event := store.Event{
ID: newID("evt"),
Cursor: newID("cur"),
Expand All @@ -248,9 +253,19 @@ func insertEvent(ctx context.Context, tx *sql.Tx, workspaceID, channelID, eventT
PayloadJSON: string(payloadJSON),
Payload: payload,
}
if _, err := tx.ExecContext(ctx, `INSERT INTO events (id, cursor, workspace_id, channel_id, type, seq, payload_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, event.ID, event.Cursor, event.WorkspaceID, nullableString(event.ChannelID), event.Type, event.Seq, event.PayloadJSON, event.CreatedAt); err != nil {
isPrivate := 0
if len(recipients) > 0 {
isPrivate = 1
}
if _, err := tx.ExecContext(ctx, `INSERT INTO events (id, cursor, workspace_id, channel_id, type, seq, payload_json, created_at, is_private) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, event.ID, event.Cursor, event.WorkspaceID, nullableString(event.ChannelID), event.Type, event.Seq, event.PayloadJSON, event.CreatedAt, isPrivate); err != nil {
return store.Event{}, err
}
for _, userID := range recipients {
if _, err := tx.ExecContext(ctx, `INSERT INTO event_recipients (event_id, user_id) VALUES (?, ?)`, event.ID, userID); err != nil {
return store.Event{}, err
}
event.RecipientUserIDs = append(event.RecipientUserIDs, userID)
}
return event, nil
}

Expand Down
Loading
Loading