diff --git a/admin/flag.go b/admin/flag.go index 55e6cbac..40e67911 100644 --- a/admin/flag.go +++ b/admin/flag.go @@ -7,15 +7,15 @@ import ( "mu/internal/app" "mu/internal/auth" - "mu/internal/moderation" + "mu/internal/flag" ) // Re-export types from moderation subsystem for backward compatibility // in admin dashboard handlers. -type PostContent = moderation.PostContent -type FlaggedItem = moderation.FlaggedItem -type ContentDeleter = moderation.ContentDeleter -type LLMAnalyzer = moderation.LLMAnalyzer +type PostContent = flag.PostContent +type FlaggedItem = flag.FlaggedItem +type ContentDeleter = flag.ContentDeleter +type LLMAnalyzer = flag.LLMAnalyzer // Import blog to get new account blog posts - will be set by blog package to avoid circular import var GetNewAccountBlog func() []PostContent @@ -26,15 +26,15 @@ var RefreshBlogCache func() // Delegated functions — building blocks should import internal/moderation directly. // These exist only so admin's own handlers can call them. var ( - RegisterDeleter = moderation.RegisterDeleter - SetAnalyzer = moderation.SetAnalyzer - CheckContent = moderation.CheckContent - IsHidden = moderation.IsHidden - AdminFlag = moderation.AdminFlag + RegisterDeleter = flag.RegisterDeleter + SetAnalyzer = flag.SetAnalyzer + CheckContent = flag.CheckContent + IsHidden = flag.IsHidden + AdminFlag = flag.AdminFlag ) func Load() { - moderation.Load() + flag.Load() } // ============================================ @@ -79,7 +79,7 @@ func FlagHandler(w http.ResponseWriter, r *http.Request) { } // Add flag - count, alreadyFlagged, err := moderation.Add(contentType, contentID, flagger) + count, alreadyFlagged, err := flag.Add(contentType, contentID, flagger) if err != nil { http.Error(w, "Failed to flag content", http.StatusInternalServerError) return @@ -93,7 +93,7 @@ func FlagHandler(w http.ResponseWriter, r *http.Request) { // Refresh cache if content was hidden if count >= 3 { - if deleter, ok := moderation.GetDeleter(contentType); ok { + if deleter, ok := flag.GetDeleter(contentType); ok { deleter.RefreshCache() } } @@ -118,7 +118,7 @@ func ModerateHandler(w http.ResponseWriter, r *http.Request) { _ = acc // acc.Admin is always true here - flaggedItems := moderation.GetAll() + flaggedItems := flag.GetAll() var itemsList []string for _, item := range flaggedItems { @@ -128,7 +128,7 @@ func ModerateHandler(w http.ResponseWriter, r *http.Request) { var createdAt string // Get content from the appropriate handler - if deleter, ok := moderation.GetDeleter(item.ContentType); ok { + if deleter, ok := flag.GetDeleter(item.ContentType); ok { content := deleter.Get(item.ContentID) switch item.ContentType { case "post": @@ -319,11 +319,11 @@ func handleModeration(w http.ResponseWriter, r *http.Request) { switch action { case "approve": - moderation.Approve(contentType, contentID) + flag.Approve(contentType, contentID) http.Redirect(w, r, "/admin/moderate", http.StatusSeeOther) case "delete": - moderation.Delete(contentType, contentID) + flag.Delete(contentType, contentID) http.Redirect(w, r, "/admin/moderate", http.StatusSeeOther) case "approve_account": diff --git a/blog/blog.go b/blog/blog.go index 254fd645..1f62eec2 100644 --- a/blog/blog.go +++ b/blog/blog.go @@ -14,7 +14,8 @@ import ( "mu/internal/app" "mu/internal/auth" "mu/internal/data" - "mu/internal/moderation" + "mu/internal/event" + "mu/internal/flag" "mu/wallet" ) @@ -140,12 +141,12 @@ func Load() { } // Subscribe to tag generation responses - tagSub := data.Subscribe(data.EventTagGenerated) + tagSub := event.Subscribe(event.EventTagGenerated) go func() { - for event := range tagSub.Chan { - postID, okID := event.Data["post_id"].(string) - tag, okTag := event.Data["tag"].(string) - eventType, okType := event.Data["type"].(string) + for evt := range tagSub.Chan { + postID, okID := evt.Data["post_id"].(string) + tag, okTag := evt.Data["tag"].(string) + eventType, okType := evt.Data["type"].(string) if okID && okTag && okType && eventType == "post" { app.Log("blog", "Received generated tag for post: %s", postID) @@ -255,10 +256,10 @@ func Load() { }() // Register with moderation subsystem - moderation.RegisterDeleter("post", &postDeleter{}) + flag.RegisterDeleter("post", &postDeleter{}) } -// postDeleter implements moderation.ContentDeleter interface +// postDeleter implements flag.ContentDeleter interface type postDeleter struct{} func (d *postDeleter) Delete(id string) error { @@ -270,7 +271,7 @@ func (d *postDeleter) Get(id string) interface{} { if post == nil { return nil } - return moderation.PostContent{ + return flag.PostContent{ Title: post.Title, Content: post.Content, Author: post.Author, @@ -279,20 +280,20 @@ func (d *postDeleter) Get(id string) interface{} { } // GetNewAccountBlogPosts returns blog posts from new accounts for the moderation page. -func GetNewAccountBlogPosts() []moderation.PostContent { +func GetNewAccountBlogPosts() []flag.PostContent { mutex.RLock() defer mutex.RUnlock() - var result []moderation.PostContent + var result []flag.PostContent for _, post := range posts { // Skip flagged/hidden posts - if moderation.IsHidden("post", post.ID) { + if flag.IsHidden("post", post.ID) { continue } // Only include posts from new accounts if post.AuthorID != "" && auth.IsNewAccount(post.AuthorID) { - result = append(result, moderation.PostContent{ + result = append(result, flag.PostContent{ ID: post.ID, Title: post.Title, Content: post.Content, @@ -351,7 +352,7 @@ func updateCache() { updateCacheUnlocked() // Publish event to refresh home page cache - data.Publish(data.Event{ + event.Publish(event.Event{ Type: "blog_updated", Data: map[string]interface{}{}, }) @@ -365,7 +366,7 @@ func updateCacheUnlocked() { for i := 0; i < len(posts) && count < 1; i++ { post := posts[i] // Skip flagged posts - if moderation.IsHidden("post", post.ID) { + if flag.IsHidden("post", post.ID) { continue } // Skip private posts (home page shows only public posts) @@ -460,7 +461,7 @@ func updateCacheUnlocked() { var fullList []string for _, post := range posts { // Skip flagged posts - if moderation.IsHidden("post", post.ID) { + if flag.IsHidden("post", post.ID) { continue } @@ -582,7 +583,7 @@ func previewUncached() string { for i := 0; i < len(posts) && count < 1; i++ { post := posts[i] // Skip flagged posts - if moderation.IsHidden("post", post.ID) { + if flag.IsHidden("post", post.ID) { continue } // Skip posts from new accounts (< 24 hours old) @@ -725,7 +726,7 @@ func handleGetBlog(w http.ResponseWriter, r *http.Request) { // Filter out flagged posts and private posts (unless admin) var visiblePosts []*Post for _, post := range posts { - if !moderation.IsHidden("post", post.ID) { + if !flag.IsHidden("post", post.ID) { // Skip private posts for non-admins if post.Private && !isAdmin { continue @@ -960,8 +961,8 @@ func autoTagPost(postID, title, content string) { app.Log("blog", "Requesting tag generation for post: %s", postID) // Publish tag generation request - data.Publish(data.Event{ - Type: data.EventGenerateTag, + event.Publish(event.Event{ + Type: event.EventGenerateTag, Data: map[string]interface{}{ "post_id": postID, "title": title, @@ -1232,7 +1233,7 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { wallet.ConsumeQuota(acc.ID, wallet.OpBlogCreate) // Run async LLM-based content moderation - go moderation.CheckContent("post", postID, title, content) + go flag.CheckContent("post", postID, title, content) if app.SendsJSON(r) { app.RespondJSON(w, map[string]interface{}{ @@ -1701,7 +1702,7 @@ func handlePost(w http.ResponseWriter, r *http.Request) { } // Run async LLM-based content moderation (non-blocking) - go moderation.CheckContent("post", postID, title, content) + go flag.CheckContent("post", postID, title, content) // Redirect back to posts page http.Redirect(w, r, "/blog", http.StatusSeeOther) diff --git a/chat/chat.go b/chat/chat.go index 352e60d7..4295992c 100644 --- a/chat/chat.go +++ b/chat/chat.go @@ -16,7 +16,8 @@ import ( "mu/internal/app" "mu/internal/auth" "mu/internal/data" - "mu/internal/moderation" + "mu/internal/event" + "mu/internal/flag" "mu/wallet" ) @@ -471,7 +472,7 @@ func getOrCreateRoom(id string) *Room { // Subscribe to index complete events via channel go func() { - sub := data.Subscribe(data.EventIndexComplete) + sub := event.Subscribe(event.EventIndexComplete) defer sub.Close() // Wait for either index event or timeout @@ -834,8 +835,8 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request, room *Room) { room.mutex.Unlock() app.Log("chat", "Publishing refresh event for: %s", room.URL) - data.Publish(data.Event{ - Type: data.EventRefreshHNComments, + event.Publish(event.Event{ + Type: event.EventRefreshHNComments, Data: map[string]interface{}{ "url": room.URL, }, @@ -1017,7 +1018,7 @@ func Load() { head = app.Head("chat", topics) // Register LLM analyzer for content moderation - moderation.SetAnalyzer(&llmAnalyzer{}) + flag.SetAnalyzer(&llmAnalyzer{}) // Load existing summaries from disk if b, err := data.LoadFile("chat_summaries.json"); err == nil { @@ -1029,12 +1030,12 @@ func Load() { } // Subscribe to summary generation requests - summaryRequestSub := data.Subscribe(data.EventGenerateSummary) + summaryRequestSub := event.Subscribe(event.EventGenerateSummary) go func() { - for event := range summaryRequestSub.Chan { - uri, okUri := event.Data["uri"].(string) - content, okContent := event.Data["content"].(string) - eventType, okType := event.Data["type"].(string) + for evt := range summaryRequestSub.Chan { + uri, okUri := evt.Data["uri"].(string) + content, okContent := evt.Data["content"].(string) + eventType, okType := evt.Data["type"].(string) if okUri && okContent && okType { app.Log("chat", "Received summary generation request for %s (%s)", uri, eventType) @@ -1053,8 +1054,8 @@ func Load() { } // Publish the generated summary - data.Publish(data.Event{ - Type: data.EventSummaryGenerated, + event.Publish(event.Event{ + Type: event.EventSummaryGenerated, Data: map[string]interface{}{ "uri": uri, "summary": summary, @@ -1068,12 +1069,12 @@ func Load() { }() // Subscribe to tag generation requests - tagRequestSub := data.Subscribe(data.EventGenerateTag) + tagRequestSub := event.Subscribe(event.EventGenerateTag) go func() { - for event := range tagRequestSub.Chan { - title, _ := event.Data["title"].(string) - content, okContent := event.Data["content"].(string) - eventType, okType := event.Data["type"].(string) + for evt := range tagRequestSub.Chan { + title, _ := evt.Data["title"].(string) + content, okContent := evt.Data["content"].(string) + eventType, okType := evt.Data["type"].(string) if !okContent || !okType { continue @@ -1081,7 +1082,7 @@ func Load() { // Handle blog post tagging (predefined categories) if eventType == "post" { - postID, ok := event.Data["post_id"].(string) + postID, ok := evt.Data["post_id"].(string) if !ok { continue } @@ -1126,8 +1127,8 @@ func Load() { continue } - data.Publish(data.Event{ - Type: data.EventTagGenerated, + event.Publish(event.Event{ + Type: event.EventTagGenerated, Data: map[string]interface{}{ "post_id": postID, "tag": tag, @@ -1139,11 +1140,11 @@ func Load() { // Handle note tagging (free-form single tag) if eventType == "note" { - noteID, ok := event.Data["note_id"].(string) + noteID, ok := evt.Data["note_id"].(string) if !ok { continue } - userID, ok := event.Data["user_id"].(string) + userID, ok := evt.Data["user_id"].(string) if !ok { continue } @@ -1170,8 +1171,8 @@ func Load() { tag = tag[:20] } - data.Publish(data.Event{ - Type: data.EventTagGenerated, + event.Publish(event.Event{ + Type: event.EventTagGenerated, Data: map[string]interface{}{ "note_id": noteID, "user_id": userID, @@ -1577,7 +1578,7 @@ func handlePostChat(w http.ResponseWriter, r *http.Request) { w.Write([]byte(renderHTML)) } -// llmAnalyzer implements the moderation.LLMAnalyzer interface +// llmAnalyzer implements the flag.LLMAnalyzer interface type llmAnalyzer struct{} func (a *llmAnalyzer) Analyze(promptText, question string) (string, error) { diff --git a/home/home.go b/home/home.go index 45397ac9..e0ed35f6 100644 --- a/home/home.go +++ b/home/home.go @@ -14,7 +14,7 @@ import ( "mu/agent" "mu/internal/app" "mu/blog" - "mu/internal/data" + "mu/internal/event" "mu/news" "mu/markets" "mu/reminder" @@ -748,7 +748,7 @@ func Load() { // Subscribe to blog update events go func() { - sub := data.Subscribe("blog_updated") + sub := event.Subscribe("blog_updated") for range sub.Chan { ForceRefresh() } diff --git a/internal/data/data.go b/internal/data/data.go index 086f4430..13324183 100644 --- a/internal/data/data.go +++ b/internal/data/data.go @@ -9,29 +9,10 @@ import ( "strings" "sync" "time" -) - -// ============================================ -// EVENT SYSTEM -// ============================================ -// Event types -const ( - EventRefreshHNComments = "refresh_hn_comments" - EventIndexComplete = "index_complete" - EventNewArticleMetadata = "new_article_metadata" - EventGenerateSummary = "generate_summary" - EventSummaryGenerated = "summary_generated" - EventGenerateTag = "generate_tag" - EventTagGenerated = "tag_generated" + "mu/internal/event" ) -// Event represents a data event -type Event struct { - Type string - Data map[string]interface{} -} - // SearchOptions configures search behavior type SearchOptions struct { Type string @@ -55,75 +36,6 @@ func WithKeywordOnly() SearchOption { } } -// EventSubscription represents an active subscription -type EventSubscription struct { - Chan chan Event - eventType string - id string -} - -var ( - eventMutex sync.RWMutex - eventSubscribers = make(map[string]map[string]chan Event) // eventType -> subscriberID -> channel - subscriberIDSeq int -) - -// Subscribe creates a channel-based subscription for a specific event type -func Subscribe(eventType string) *EventSubscription { - eventMutex.Lock() - defer eventMutex.Unlock() - - // Generate unique subscriber ID - subscriberIDSeq++ - id := fmt.Sprintf("sub_%d", subscriberIDSeq) - - // Create buffered channel to prevent blocking - ch := make(chan Event, 10) - - // Initialize map if needed - if eventSubscribers[eventType] == nil { - eventSubscribers[eventType] = make(map[string]chan Event) - } - - eventSubscribers[eventType][id] = ch - - return &EventSubscription{ - Chan: ch, - eventType: eventType, - id: id, - } -} - -// Close closes the channel and removes the subscription -func (s *EventSubscription) Close() { - eventMutex.Lock() - defer eventMutex.Unlock() - - if subscribers, ok := eventSubscribers[s.eventType]; ok { - if ch, ok := subscribers[s.id]; ok { - close(ch) - delete(subscribers, s.id) - } - } -} - -// Publish sends an event to all subscribers -func Publish(event Event) { - eventMutex.RLock() - subscribers := eventSubscribers[event.Type] - eventMutex.RUnlock() - - // Send to channel subscribers (non-blocking) - for _, ch := range subscribers { - select { - case ch <- event: - // Sent successfully - default: - // Channel full, skip (subscriber should have buffer or be reading) - } - } -} - // SaveFile saves data to disk func SaveFile(key, val string) error { dir := os.ExpandEnv("$HOME/.mu") @@ -304,8 +216,8 @@ func processIndexWork(work IndexWork) { indexMutex.Unlock() // Publish event that indexing is complete - Publish(Event{ - Type: EventIndexComplete, + event.Publish(event.Event{ + Type: event.EventIndexComplete, Data: map[string]interface{}{ "id": work.ID, "type": work.Type, diff --git a/internal/data/sqlite.go b/internal/data/sqlite.go index 74515e66..3f04b305 100644 --- a/internal/data/sqlite.go +++ b/internal/data/sqlite.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "mu/internal/event" + _ "modernc.org/sqlite" ) @@ -115,8 +117,8 @@ func IndexSQLite(id, entryType, title, content string, metadata map[string]inter // Insert into FTS index db.Exec(`INSERT INTO index_fts(rowid, title, content) SELECT rowid, title, content FROM index_entries WHERE id = ?`, id) // Publish event - Publish(Event{ - Type: EventIndexComplete, + event.Publish(event.Event{ + Type: event.EventIndexComplete, Data: map[string]interface{}{ "id": id, "type": entryType, diff --git a/internal/event/event.go b/internal/event/event.go new file mode 100644 index 00000000..c5694fce --- /dev/null +++ b/internal/event/event.go @@ -0,0 +1,94 @@ +// Package event provides a pub/sub event system for decoupling +// background operations across packages. +package event + +import ( + "fmt" + "sync" +) + +// Event types +const ( + EventRefreshHNComments = "refresh_hn_comments" + EventIndexComplete = "index_complete" + EventNewArticleMetadata = "new_article_metadata" + EventGenerateSummary = "generate_summary" + EventSummaryGenerated = "summary_generated" + EventGenerateTag = "generate_tag" + EventTagGenerated = "tag_generated" +) + +// Event represents a data event +type Event struct { + Type string + Data map[string]interface{} +} + +// Subscription represents an active subscription +type Subscription struct { + Chan chan Event + eventType string + id string +} + +var ( + mu sync.RWMutex + subscribers = make(map[string]map[string]chan Event) // eventType -> subscriberID -> channel + subscriberIDSeq int +) + +// Subscribe creates a channel-based subscription for a specific event type +func Subscribe(eventType string) *Subscription { + mu.Lock() + defer mu.Unlock() + + // Generate unique subscriber ID + subscriberIDSeq++ + id := fmt.Sprintf("sub_%d", subscriberIDSeq) + + // Create buffered channel to prevent blocking + ch := make(chan Event, 10) + + // Initialize map if needed + if subscribers[eventType] == nil { + subscribers[eventType] = make(map[string]chan Event) + } + + subscribers[eventType][id] = ch + + return &Subscription{ + Chan: ch, + eventType: eventType, + id: id, + } +} + +// Close closes the channel and removes the subscription +func (s *Subscription) Close() { + mu.Lock() + defer mu.Unlock() + + if subs, ok := subscribers[s.eventType]; ok { + if ch, ok := subs[s.id]; ok { + close(ch) + delete(subs, s.id) + } + } +} + +// Publish sends an event to all subscribers +func Publish(e Event) { + mu.RLock() + subs := subscribers[e.Type] + mu.RUnlock() + + // Send to channel subscribers (non-blocking) + for _, ch := range subs { + select { + case ch <- e: + // Sent successfully + default: + // Channel full, skip (subscriber should have buffer or be reading) + } + } +} diff --git a/internal/moderation/moderation.go b/internal/flag/flag.go similarity index 98% rename from internal/moderation/moderation.go rename to internal/flag/flag.go index 4d793496..0a08479b 100644 --- a/internal/moderation/moderation.go +++ b/internal/flag/flag.go @@ -1,7 +1,7 @@ -// Package moderation provides content moderation primitives (flagging, hiding, +// Package flag provides content moderation primitives (flagging, hiding, // auto-moderation). It lives in internal/ because it is infrastructure used by // multiple building blocks, not a feature itself. -package moderation +package flag import ( "encoding/json" diff --git a/news/news.go b/news/news.go index 5b44d73a..56cf1c3f 100644 --- a/news/news.go +++ b/news/news.go @@ -25,6 +25,7 @@ import ( "mu/internal/app" "mu/internal/auth" "mu/internal/data" + "mu/internal/event" "mu/wallet" ) @@ -840,8 +841,8 @@ func requestArticleSummary(uri string, md *Metadata) { app.Log("news", "Requesting summary generation for %s (attempt %d)", uri, md.SummaryAttempts) // Publish summary generation request - data.Publish(data.Event{ - Type: data.EventGenerateSummary, + event.Publish(event.Event{ + Type: event.EventGenerateSummary, Data: map[string]interface{}{ "uri": uri, "content": contentToSummarize, @@ -1323,10 +1324,10 @@ func Load() { // Loaded // Subscribe to refresh events - sub := data.Subscribe(data.EventRefreshHNComments) + sub := event.Subscribe(event.EventRefreshHNComments) go func() { - for event := range sub.Chan { - if url, ok := event.Data["url"].(string); ok { + for evt := range sub.Chan { + if url, ok := evt.Data["url"].(string); ok { app.Log("news", "Received refresh request for: %s", url) RefreshHNMetadata(url) } @@ -1334,12 +1335,12 @@ func Load() { }() // Subscribe to summary generation responses - summarySub := data.Subscribe(data.EventSummaryGenerated) + summarySub := event.Subscribe(event.EventSummaryGenerated) go func() { - for event := range summarySub.Chan { - uri, okUri := event.Data["uri"].(string) - summary, okSummary := event.Data["summary"].(string) - eventType, okType := event.Data["type"].(string) + for evt := range summarySub.Chan { + uri, okUri := evt.Data["uri"].(string) + summary, okSummary := evt.Data["summary"].(string) + eventType, okType := evt.Data["type"].(string) if okUri && okSummary && okType && eventType == "news" { app.Log("news", "Received generated summary for: %s", uri) diff --git a/social/social.go b/social/social.go index f7272287..5109c85e 100644 --- a/social/social.go +++ b/social/social.go @@ -14,7 +14,7 @@ import ( "mu/internal/app" "mu/internal/auth" "mu/internal/data" - "mu/internal/moderation" + "mu/internal/flag" "mu/wallet" ) @@ -107,7 +107,7 @@ func Load() { }() // Register admin deleter - moderation.RegisterDeleter("thread", &threadDeleter{}) + flag.RegisterDeleter("thread", &threadDeleter{}) } func sortThreads() { @@ -146,7 +146,7 @@ func updateCache() { var ids []string for _, t := range threads { - if moderation.IsHidden("thread", t.ID) { + if flag.IsHidden("thread", t.ID) { continue } ids = append(ids, t.ID) @@ -261,7 +261,7 @@ func handleList(w http.ResponseWriter, r *http.Request) { mutex.RLock() var visible []*Thread for _, t := range threads { - if moderation.IsHidden("thread", t.ID) { + if flag.IsHidden("thread", t.ID) { continue } if topic != "" && topic != "all" && t.Topic != topic { @@ -551,7 +551,7 @@ func DismissHandler(w http.ResponseWriter, r *http.Request) { // Add to blocklist and flag DismissThread(threadID) - moderation.AdminFlag("thread", threadID, "system") + flag.AdminFlag("thread", threadID, "system") app.Log("social", "Admin dismissed thread %s", threadID) http.Redirect(w, r, "/social", http.StatusSeeOther) @@ -567,7 +567,7 @@ func handleThread(w http.ResponseWriter, r *http.Request, id string) { return } - if moderation.IsHidden("thread", t.ID) { + if flag.IsHidden("thread", t.ID) { http.NotFound(w, r) return } @@ -778,7 +778,7 @@ func handleCreate(w http.ResponseWriter, r *http.Request) { updateCache() // Content moderation - go moderation.CheckContent("thread", thread.ID, thread.Title, thread.Content) + go flag.CheckContent("thread", thread.ID, thread.Title, thread.Content) // Fact-check in background go factCheckThread(thread.ID) @@ -886,7 +886,7 @@ func handleReply(w http.ResponseWriter, r *http.Request, threadID string) { updateCache() // Content moderation - go moderation.CheckContent("thread", reply.ID, "", reply.Content) + go flag.CheckContent("thread", reply.ID, "", reply.Content) // Fact-check in background go factCheckReply(threadID, reply.ID) @@ -960,7 +960,7 @@ func handleDelete(w http.ResponseWriter, r *http.Request, threadID string) { } } -// threadDeleter implements moderation.ContentDeleter for threads +// threadDeleter implements flag.ContentDeleter for threads type threadDeleter struct{} func (d *threadDeleter) Delete(id string) error {