Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 83 additions & 12 deletions bot/vikingbot/agent/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from vikingbot.bus.events import InboundMessage, OutboundEventType, OutboundMessage
from vikingbot.bus.queue import MessageBus
from vikingbot.config import load_config
from vikingbot.config.schema import Config, SessionKey
from vikingbot.config.schema import BotMode, Config, SessionKey
from vikingbot.hooks import HookContext
from vikingbot.hooks.manager import hook_manager
from vikingbot.providers.base import LLMProvider
Expand Down Expand Up @@ -200,6 +200,7 @@ async def run(self) -> None:
OutboundMessage(
session_key=msg.session_key,
content=f"Sorry, I encountered an error: {str(e)}",
metadata=msg.metadata,
)
)
except asyncio.TimeoutError:
Expand Down Expand Up @@ -425,7 +426,6 @@ async def check_long_running():
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
logger.info(f"Processing message from {msg.session_key}:{msg.sender_id}: {preview}")

# Get or create session
session_key = msg.session_key
# For CLI/direct sessions, skip heartbeat by default
skip_heartbeat = session_key.type == "cli"
Expand All @@ -439,6 +439,11 @@ async def check_long_running():
cmd = msg.content.strip().lower()
if cmd == "/new":
# Clone session for async consolidation, then immediately clear original
if not self._check_cmd_auth(msg):
return OutboundMessage(
session_key=msg.session_key, content="🐈 Sorry, you are not authorized to use this command.",
metadata=msg.metadata
)
session_clone = session.clone()
session.clear()
await self.sessions.save(session)
Expand All @@ -447,13 +452,31 @@ async def check_long_running():
return OutboundMessage(
session_key=msg.session_key, content="🐈 New session started. Memory consolidated.", metadata=msg.metadata
)
if cmd == "/remember":
if not self._check_cmd_auth(msg):
return OutboundMessage(
session_key=msg.session_key, content="🐈 Sorry, you are not authorized to use this command.",
metadata=msg.metadata
)
session_clone = session.clone()
await self._consolidate_viking_memory(session_clone)
return OutboundMessage(
session_key=msg.session_key, content="This conversation has been submitted to memory storage.", metadata=msg.metadata
)
if cmd == "/help":
return OutboundMessage(
session_key=msg.session_key,
content="🐈 vikingbot commands:\n/new — Start a new conversation\n/help — Show available commands",
content="🐈 vikingbot commands:\n/new — Start a new conversation\n/remember — Submit current session to memories and start new session\n/help — Show available commands",
metadata=msg.metadata
)

# Debug mode handling
if self.config.mode == BotMode.DEBUG:
# In debug mode, only record message to session, no processing or reply
session.add_message("user", msg.content, sender_id=msg.sender_id)
await self.sessions.save(session)
return None

# Consolidate memory before processing if session is too large
if len(session.messages) > self.memory_window:
# Clone session for async consolidation, then immediately trim original
Expand Down Expand Up @@ -567,15 +590,18 @@ async def _consolidate_memory(self, session, archive_all: bool = False) -> None:
return

# use openviking tools to extract memory
await hook_manager.execute_hooks(
context=HookContext(
event_type="message.compact",
session_id=session.key.safe_name(),
workspace_id=self.sandbox_manager.to_workspace_id(session.key),
session_key=session.key,
),
session=session,
)
config = self.config
if config.mode == BotMode.READONLY:
if not config.channels_config or not config.channels_config.get_all_channels():
return
allow_from = [config.ov_server.admin_user_id]
for channel_config in config.channels_config.get_all_channels():
if channel_config and channel_config.type.value == session.key.type:
if hasattr(channel_config, "allow_from"):
allow_from.extend(channel_config.allow_from)
messages = [msg for msg in session.messages if msg.get("sender_id") in allow_from]
session.messages = messages
await self._consolidate_viking_memory(session)

if self.sandbox_manager:
memory_workspace = self.sandbox_manager.get_workspace_path(session.key)
Expand Down Expand Up @@ -656,13 +682,58 @@ async def _consolidate_memory(self, session, archive_all: bool = False) -> None:
except Exception as e:
logger.exception(f"Memory consolidation failed: {e}")

async def _consolidate_viking_memory(self, session) -> None:
"""Consolidate old messages into MEMORY.md + HISTORY.md. Works on a cloned session."""
try:
if not session.messages:
logger.info(f"No messages to commit openviking for session {session.key.safe_name()} (allow_from filter applied)")
return

# use openviking tools to extract memory
await hook_manager.execute_hooks(
context=HookContext(
event_type="message.compact",
session_id=session.key.safe_name(),
workspace_id=self.sandbox_manager.to_workspace_id(session.key),
session_key=session.key,
),
session=session,
)
except Exception as e:
logger.exception(f"Memory consolidation failed: {e}")

async def _safe_consolidate_memory(self, session, archive_all: bool = False) -> None:
"""Safe wrapper for _consolidate_memory that ensures all exceptions are caught."""
try:
await self._consolidate_memory(session, archive_all)
except Exception as e:
logger.exception(f"Background memory consolidation task failed: {e}")

def _check_cmd_auth(self, msg: InboundMessage) -> bool:
"""Check if the session key is authorized for command execution.

Returns:
True if authorized, False otherwise.
Args:
session_key: Session key to check.
"""
if self.config.mode == BotMode.NORMAL:
return True
allow_from = []
if self.config.ov_server and self.config.ov_server.admin_user_id:
allow_from.append(self.config.ov_server.admin_user_id)
for channel in self.config.channels_config.get_all_channels():
if channel.channel_key() == msg.session_key.channel_key():
if channel.allow_from:
allow_from.extend(channel.allow_from)
break

# If channel not found or sender not in allow_from list, ignore message
if msg.sender_id not in allow_from:
logger.debug(f"Sender {msg.sender_id} not allowed in channel {msg.session_key.channel_key()}")
return False
return True

async def process_direct(
self,
content: str,
Expand Down
6 changes: 1 addition & 5 deletions bot/vikingbot/agent/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,17 @@ async def get_viking_memory_context(self, current_message: str, workspace_id: st
try:
client = await VikingClient.create(agent_id=workspace_id)
admin_user_id = load_config().ov_server.admin_user_id
start = time.time()
result = await client.search_memory(current_message, user_id=admin_user_id, limit=3)
cost = round(time.time() - start, 2)
if not result:
logger.info(f"[USER_MEMORY]: search failed. cost {cost}")
return ""
user_memory = self._parse_viking_memory(result["user_memory"])
agent_memory = self._parse_viking_memory(result["agent_memory"])
logger.info(f"[USER_MEMORY]: search success. res: {user_memory[:100]}. cost {cost}")
return (
f"### user memories:\n{user_memory}\n"
f"### agent memories:\n{agent_memory}"
)
except Exception as e:
logger.error(f"[USER_MEMORY]: search failed. {e}")
logger.error(f"[READ_USER_MEMORY]: search error. {e}")
return ""

async def get_viking_user_profile(self, workspace_id: str, user_id: str) -> str:
Expand Down
26 changes: 11 additions & 15 deletions bot/vikingbot/channels/feishu.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import httpx
from loguru import logger

from vikingbot.config import load_config
from vikingbot.utils import get_data_path

# Optional HTML processing libraries
Expand All @@ -30,7 +31,7 @@
from vikingbot.bus.events import OutboundMessage
from vikingbot.bus.queue import MessageBus
from vikingbot.channels.base import BaseChannel
from vikingbot.config.schema import FeishuChannelConfig
from vikingbot.config.schema import FeishuChannelConfig, BotMode

try:
import lark_oapi as lark
Expand Down Expand Up @@ -747,25 +748,19 @@ async def _on_message(self, data: "P2ImMessageReceiveV1") -> None:
# 检查是否@了机器人
is_mentioned = False
mention_pattern = re.compile(r"@_user_\d+")
bot_open_id = self.config.open_id
bot_app_id = self.config.app_id
bot_name = self.config.bot_name

# 优先从message的mentions字段提取@信息(text和post类型都适用)
if hasattr(message, 'mentions') and message.mentions and bot_open_id:
if hasattr(message, 'mentions') and message.mentions and bot_name:
for mention in message.mentions:
if hasattr(mention, 'id') and hasattr(mention.id, 'open_id'):
at_id = mention.id.open_id
if at_id == bot_open_id:
if hasattr(mention, 'name'):
at_name = mention.name
if at_name == self.config.bot_name:
is_mentioned = True
break
continue
# 兼容其他可能的ID格式
at_id = getattr(mention, 'id', '') or getattr(mention, 'user_id', '')
if at_id == f"app_{bot_app_id}" or at_id == bot_app_id:
is_mentioned = True
break

# 话题群@检查逻辑
config = load_config()
should_process = True
if chat_type == "group":
chat_mode = await self._get_chat_mode(chat_id)
Expand All @@ -780,7 +775,7 @@ async def _on_message(self, data: "P2ImMessageReceiveV1") -> None:
should_process = False
else:
# 模式2:False,仅话题首条消息不需要@,后续回复需要@
if not is_topic_starter and not is_mentioned:
if not is_topic_starter and not is_mentioned and config.mode != BotMode.DEBUG:
logger.info(f"Skipping thread message: not topic starter and not mentioned")
should_process = False

Expand All @@ -789,7 +784,8 @@ async def _on_message(self, data: "P2ImMessageReceiveV1") -> None:
return

# 确认需要处理后再添加"已读"表情
await self._add_reaction(message_id, "MeMeMe")
if config and config.mode != BotMode.DEBUG:
await self._add_reaction(message_id, "MeMeMe")

# 替换所有@占位符
content = mention_pattern.sub(f"@{sender_id}", content)
Expand Down
16 changes: 14 additions & 2 deletions bot/vikingbot/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ class AgentMemoryMode(str, Enum):
PER_CHANNEL = "per-channel"


class BotMode(str, Enum):
"""Bot running mode enumeration."""
NORMAL = "normal"
READONLY = "readonly"
DEBUG = "debug"


class BaseChannelConfig(BaseModel):
"""Base channel configuration."""

Expand Down Expand Up @@ -104,7 +111,7 @@ class FeishuChannelConfig(BaseChannelConfig):

type: ChannelType = ChannelType.FEISHU
app_id: str = ""
open_id: str = ""
bot_name: str = ""
app_secret: str = ""
encrypt_key: str = ""
verification_token: str = ""
Expand Down Expand Up @@ -615,7 +622,12 @@ class Config(BaseSettings):
)
storage_workspace: Optional[str] = None # From ov.conf root level storage.workspace
use_local_memory: bool = False
read_only: bool = False
mode: BotMode = BotMode.NORMAL

@property
def read_only(self) -> bool:
"""Backward compatibility for read_only property."""
return self.mode == BotMode.READONLY

@property
def channels_config(self) -> ChannelsConfig:
Expand Down
30 changes: 1 addition & 29 deletions bot/vikingbot/hooks/builtins/openviking_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,42 +38,14 @@ async def _get_client(self, workspace_id: str) -> VikingClient:
# Use global singleton client
return await get_global_client()

def _filter_messages_by_sender(self, messages: list[dict], allow_from: list[str]) -> list[dict]:
"""筛选出 sender_id 在 allow_from 列表中的消息"""
if not allow_from:
return []
return [msg for msg in messages if msg.get("sender_id") in allow_from]

def _get_channel_allow_from(self, session_key: SessionKey):
"""根据 session_id 获取对应频道的 allow_from 配置"""
config = load_config()
if not config.read_only:
return True, []
allow_from = [config.ov_server.admin_user_id]
if not session_key or not config.channels:
return False, allow_from
# 查找对应类型的 channel config
for channel_config in config.channels_config.get_all_channels():
if channel_config and channel_config.type.value == session_key.type:
if hasattr(channel_config, "allow_from"):
allow_from.extend(channel_config.allow_from)
return False, allow_from

async def execute(self, context: HookContext, **kwargs) -> Any:
vikingbot_session: Session = kwargs.get("session", {})
session_id = context.session_key.safe_name()

try:
is_shared, allow_from = self._get_channel_allow_from(context.session_key)
filtered_messages = vikingbot_session.messages
if not is_shared:
filtered_messages = self._filter_messages_by_sender(vikingbot_session.messages, allow_from)
if not filtered_messages:
logger.info(f"No messages to commit openviking for session {session_id} (allow_from filter applied)")
return {"success": True, "message": "No messages matched allow_from filter"}

client = await self._get_client(context.workspace_id)
result = await client.commit(session_id, filtered_messages, load_config().ov_server.admin_user_id)
result = await client.commit(session_id, vikingbot_session.messages, load_config().ov_server.admin_user_id)
return result
except Exception as e:
logger.exception(f"Failed to add message to OpenViking: {e}")
Expand Down
7 changes: 4 additions & 3 deletions bot/vikingbot/openviking_mount/ov_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,11 @@ async def main_test():
# res = client.list_resources()
# res = await client.search("头有点疼", target_uri="viking://user/memories/")
# res = await client.get_viking_memory_context("123", current_message="头疼", history=[])
# res = await client.search_memory("你好", "user_1")
res = await client.search_memory("你好", "user_1")
# res = await client.list_resources("viking://resources/")
# res = await client.read_content("viking://user/memories/profile.md", level="read")
# res = await client.add_resource("https://github.com/volcengine/OpenViking", "ov代码")
res = await client.grep("viking://resources/", "viking", True)
# res = await client.grep("viking://resources/", "viking", True)
# res = await client.commit(
# session_id="99999",
# messages=[{"role": "user", "content": "你好"}],
Expand All @@ -466,7 +466,8 @@ async def account_test():
# res = await client.admin_remove_user("default", "")
# res = await client.admin_remove_user("default", "admin")
# res = await client.admin_list_accounts()
res = await client.admin_create_account("eval", "default")
# res = await client.admin_create_account("eval", "default")
res = await client.admin_register_user("default", "test_root", "root")
print(res)


Expand Down
Loading