# Forum Admin Notebook (SQLite)

文本方式管理主题与帖子（无需图形界面）。建议先在开发环境操作，确认后再用于生产。

功能：
- 查看线程、查看待审核帖子
- 审核通过/拒绝
- 新建主题、回复、编辑、删除（软删）
- 关键词搜索、导出 CSV


In [1]:
import os, sqlite3, csv, datetime as dt
from pathlib import Path

# 解析数据库路径：优先 DB_PATH 环境变量，否则与后端一致
import os, sqlite3
from pathlib import Path

def detect_db_path() -> str:
# 1) 优先环境变量
    env = os.getenv("DB_PATH")
    if env:
        return env
DB_PATH = detect_db_path()

In [7]:
DB_PATH='shopback_data.db'


In [8]:
def get_conn():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn

def print_rows(rows, limit=20):
    cnt = 0
    for r in rows:
        print(dict(r))
        cnt += 1
        if cnt >= limit:
            break
    print(f'-- {cnt} row(s) shown')

# 简单健康检查：表是否存在
with get_conn() as c:
    cur = c.cursor()
    cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('forum_threads','forum_posts')")
    print('tables:', [r[0] for r in cur.fetchall()])


tables: ['forum_threads', 'forum_posts']


In [None]:
# 查看最近线程
def list_threads(limit=20, offset=0):
    with get_conn() as c:
        cur = c.cursor()
        cur.execute(
            "SELECT id, title, author_name, status, created_at, last_post_at FROM forum_threads"
            " ORDER BY COALESCE(last_post_at, created_at) DESC, id DESC LIMIT ? OFFSET ?",
            (limit, offset)
        )
        rows = cur.fetchall()
        print_rows(rows)

list_threads(10)


In [None]:
# 查看某线程帖子（仅显示字段摘要）
def list_posts(thread_id, include_pending=False, limit=50, offset=0):
    with get_conn() as c:
        cur = c.cursor()
        if include_pending:
            cur.execute(
                "SELECT id, author_name, status, created_at FROM forum_posts WHERE thread_id=? ORDER BY id ASC LIMIT ? OFFSET ?",
                (thread_id, limit, offset)
            )
        else:
            cur.execute(
                "SELECT id, author_name, status, created_at FROM forum_posts WHERE thread_id=? AND status='published' ORDER BY id ASC LIMIT ? OFFSET ?",
                (thread_id, limit, offset)
            )
        rows = cur.fetchall()
        print_rows(rows, limit=limit)

# 示例：list_posts(1, include_pending=True)


In [None]:
# 待审核队列
def list_pending(limit=100, offset=0):
    with get_conn() as c:
        cur = c.cursor()
        cur.execute(
            "SELECT id, thread_id, author_name, status, created_at FROM forum_posts WHERE status='pending' ORDER BY created_at ASC LIMIT ? OFFSET ?",
            (limit, offset)
        )
        print_rows(cur.fetchall(), limit=limit)

list_pending(20)


In [None]:
# 审核操作：通过/拒绝
def approve_post(post_id, reason=None, moderator_name=None):
    with get_conn() as c:
        cur = c.cursor()
        cur.execute("UPDATE forum_posts SET status='published', updated_at=CURRENT_TIMESTAMP WHERE id=?", (post_id,))
        if cur.rowcount == 0:
            print('post not found')
            return
        cur.execute(
            "INSERT INTO forum_moderation_actions (post_id, moderator_name, action, reason) VALUES (?, ?, 'approve', ?)",
            (post_id, moderator_name, reason)
        )
        c.commit()
        print('approved', post_id)

def reject_post(post_id, reason=None, moderator_name=None):
    with get_conn() as c:
        cur = c.cursor()
        cur.execute("UPDATE forum_posts SET status='rejected', updated_at=CURRENT_TIMESTAMP WHERE id=?", (post_id,))
        if cur.rowcount == 0:
            print('post not found')
            return
        cur.execute(
            "INSERT INTO forum_moderation_actions (post_id, moderator_name, action, reason) VALUES (?, ?, 'reject', ?)",
            (post_id, moderator_name, reason)
        )
        c.commit()
        print('rejected', post_id)

# 示例：approve_post(123, 'ok', 'admin') / reject_post(124, 'spam', 'mod1')


In [None]:
# 新建主题 + 首帖
def create_thread(title, content_html, author_name=None, tags=None):
    tags_json = None
    if tags is not None:
        import json as _json
        tags_json = _json.dumps(tags)
    with get_conn() as c:
        cur = c.cursor()
        cur.execute(
            "INSERT INTO forum_threads (title, author_name, tags_json, status, last_post_at) VALUES (?, ?, ?, 'normal', CURRENT_TIMESTAMP)",
            (title.strip(), author_name, tags_json)
        )
        thread_id = cur.lastrowid
        cur.execute(
            "INSERT INTO forum_posts (thread_id, author_name, raw_html, safe_html, status, rules_score) VALUES (?, ?, ?, ?, 'published', 0)",
            (thread_id, author_name, content_html, content_html)
        )
        c.commit()
        print('thread created:', thread_id)
        return thread_id

# 示例：create_thread('测试主题', '<p>内容</p>', 'admin', ['general'])


In [9]:
# 回复、编辑、删除（软删）
def reply(thread_id, content_html, author_name=None, status='published'):
    with get_conn() as c:
        cur = c.cursor()
        cur.execute("SELECT 1 FROM forum_threads WHERE id=?", (thread_id,))
        if not cur.fetchone():
            print('thread not found')
            return
        cur.execute(
            "INSERT INTO forum_posts (thread_id, author_name, raw_html, safe_html, status, rules_score) VALUES (?, ?, ?, ?, ?, 0)",
            (thread_id, author_name, content_html, content_html, status)
        )
        c.commit()
        print('replied post id:', cur.lastrowid)
def show_all_posts():
    with get_conn() as c:
        cur = c.cursor()
        cur.execute("SELECT * FROM forum_posts")
        print_rows(cur.fetchall())

show_all_posts()
def edit_post(post_id, new_html, new_status=None):
    with get_conn() as c:
        cur = c.cursor()
        if new_status:
            cur.execute(
                "UPDATE forum_posts SET raw_html=?, safe_html=?, status=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
                (new_html, new_html, new_status, post_id)
            )
        else:
            cur.execute(
                "UPDATE forum_posts SET raw_html=?, safe_html=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
                (new_html, post_id)
            )
        c.commit()
        print('edited', post_id)

def soft_delete_post(post_id):
    with get_conn() as c:
        cur = c.cursor()
        cur.execute("UPDATE forum_posts SET status='deleted', updated_at=CURRENT_TIMESTAMP WHERE id=?", (post_id,))
        c.commit()
        print('deleted', post_id)

# 示例：reply(1, '<p>hi</p>', 'admin') / edit_post(10, '<p>new</p>') / soft_delete_post(11)


{'id': 1, 'thread_id': 1, 'author_id': None, 'author_name': None, 'raw_html': '<p>hello</p>', 'safe_html': '<p>hello</p>', 'status': 'published', 'rules_score': 0, 'rules_hits_json': '[]', 'created_at': '2025-09-15 06:29:23', 'updated_at': '2025-09-15 06:29:23'}
{'id': 2, 'thread_id': 2, 'author_id': None, 'author_name': None, 'raw_html': 'aa', 'safe_html': 'aa', 'status': 'published', 'rules_score': 2, 'rules_hits_json': '[{"rule": "too_short", "score": 2, "detail": null}]', 'created_at': '2025-09-15 06:31:41', 'updated_at': '2025-09-15 06:31:41'}
{'id': 3, 'thread_id': 2, 'author_id': None, 'author_name': None, 'raw_html': 'seki\n', 'safe_html': 'seki\n', 'status': 'published', 'rules_score': 2, 'rules_hits_json': '[{"rule": "too_short", "score": 2, "detail": null}]', 'created_at': '2025-09-15 06:31:54', 'updated_at': '2025-09-15 06:31:54'}
{'id': 4, 'thread_id': 2, 'author_id': None, 'author_name': 'WTMLL', 'raw_html': '私募\n', 'safe_html': '私募\n', 'status': 'published', 'rules_score

In [10]:
show_all_posts()

{'id': 1, 'thread_id': 1, 'author_id': None, 'author_name': None, 'raw_html': '<p>hello</p>', 'safe_html': '<p>hello</p>', 'status': 'published', 'rules_score': 0, 'rules_hits_json': '[]', 'created_at': '2025-09-15 06:29:23', 'updated_at': '2025-09-15 06:29:23'}
{'id': 2, 'thread_id': 2, 'author_id': None, 'author_name': None, 'raw_html': 'aa', 'safe_html': 'aa', 'status': 'published', 'rules_score': 2, 'rules_hits_json': '[{"rule": "too_short", "score": 2, "detail": null}]', 'created_at': '2025-09-15 06:31:41', 'updated_at': '2025-09-15 06:31:41'}
{'id': 3, 'thread_id': 2, 'author_id': None, 'author_name': None, 'raw_html': 'seki\n', 'safe_html': 'seki\n', 'status': 'published', 'rules_score': 2, 'rules_hits_json': '[{"rule": "too_short", "score": 2, "detail": null}]', 'created_at': '2025-09-15 06:31:54', 'updated_at': '2025-09-15 06:31:54'}
{'id': 4, 'thread_id': 2, 'author_id': None, 'author_name': 'WTMLL', 'raw_html': '私募\n', 'safe_html': '私募\n', 'status': 'published', 'rules_score

In [None]:
# 关键词搜索（标题与内容简单 LIKE）
def search(keyword, limit=50):
    kw = f'%{keyword.replace("%","%%").replace("_","__")}%'
    with get_conn() as c:
        cur = c.cursor()
        cur.execute(
            "SELECT t.id AS thread_id, t.title, p.id AS post_id, p.author_name, p.status, p.created_at"
            " FROM forum_threads t JOIN forum_posts p ON p.thread_id=t.id"
            " WHERE t.title LIKE ? OR p.raw_html LIKE ?"
            " ORDER BY p.created_at DESC LIMIT ?",
            (kw, kw, limit)
        )
        print_rows(cur.fetchall(), limit=limit)

# 示例：search('测试')


In [None]:
# 导出帖子为 CSV（简单示例）
def export_posts_csv(path='forum_posts_export.csv', status=None):
    with get_conn() as c:
        cur = c.cursor()
        if status:
            cur.execute(
                "SELECT id, thread_id, author_name, status, created_at FROM forum_posts WHERE status=? ORDER BY created_at DESC",
                (status,)
            )
        else:
            cur.execute(
                "SELECT id, thread_id, author_name, status, created_at FROM forum_posts ORDER BY created_at DESC"
            )
        rows = cur.fetchall()
    with open(path, 'w', newline='', encoding='utf-8') as f:
        w = csv.writer(f)
        w.writerow(['id','thread_id','author_name','status','created_at'])
        for r in rows:
            d = dict(r)
            w.writerow([d['id'], d['thread_id'], d.get('author_name'), d.get('status'), d.get('created_at')])
    print('exported ->', path, 'rows:', len(rows))

# 示例：export_posts_csv('posts.csv', status='pending')
