# CRUD

> TEMP! Eventually have admin panel with auth to make this easier? This isn't to be exported as a .py file as this is only run in notebooks

Use `Shift + s` to stop the Jupyter forms running

> Currently it only supports **markdown from gists**, which Solveit supports exporting as

In [None]:
#| default_exp crud

In [None]:
#| hide
#| notest
#| eval: false
from dialoghelper import *

In [None]:
# fc_tool_info()

Tools available from `fastcore.tools`:

- &`rg`: Run the `rg` command with the args in `argstr` (no need to backslash escape)
- &`sed`: Run the `sed` command with the args in `argstr` (e.g for reading a section of a file)
- &`view`: View directory or file contents with optional line range and numbers
- &`create`: Creates a new file with the given content at the specified path
- &`insert`: Insert new_str at specified line number
- &`str_replace`: Replace first occurrence of old_str with new_str in file
- &`strs_replace`: Replace for each str pair in old_strs,new_strs
- &`replace_lines`: Replace lines in file using start and end line-numbers

In [None]:
from fastcore.tools import *

In [None]:
#| export
from casblog.core import Post, db, PROJECT_ROOT
import os
import httpx
import re
from datetime import datetime
from fastlite import *
import json

In [None]:
#| export
def fetch_gist_markdown(gist_url: str) -> tuple[str, str]:
    """Fetch markdown content and filename from a gist URL."""
    match = re.search(r'gist\.github\.com/[^/]+/([a-f0-9]+)', gist_url)
    if not match: raise ValueError(f"Invalid gist URL: {gist_url}")
    
    resp = httpx.get(f'https://api.github.com/gists/{match.group(1)}')
    resp.raise_for_status()
    
    md_file = next((f for f in resp.json()['files'].values() 
                    if f['filename'].endswith('.md')), None)
    if not md_file: raise ValueError("No markdown file found in gist")
    
    return md_file['content'], md_file['filename']

In [None]:
#| export
def get_db(prod: bool = False):
    """Get database connection."""
    return database(PROJECT_ROOT / "data" / ("prod.db" if prod else "dev.db"))

In [None]:
db = get_db(prod=False)
assert db.tables, "Database should have tables"

## Add Post

In [None]:
#| export
def save_post(db, post: Post) -> Post:
    """Save a Post to the given database, returning new Post with id."""
    result = db.t.post.insert(post)
    return Post(**{**post.__dict__, 'id': result['id']})

In [None]:
#| export
def parse_categories(s: str) -> str:
    """Convert comma-separated string to JSON array string."""
    if not s.strip(): return '[]'
    return json.dumps([c.strip() for c in s.split(',') if c.strip()])

## Sync from Notebook

Read content from `.ipynb` files in `/data` and update posts in the database. Useful for syncing local notebooks to dev/prod after edits.

In [None]:
#| export
def read_notebook_markdown(path, skip_first: bool = False) -> str:
    """Read a Jupyter notebook and extract content as markdown.
    
    - Markdown cells are included as-is
    - Code cells are wrapped in ```python blocks
    - Cells starting with #| directives are skipped
    
    Args:
        path: Path to .ipynb file
        skip_first: If True, skip the first cell (often metadata/links)
    """
    with open(path) as f:
        nb = json.load(f)
    
    cells = nb['cells']
    if skip_first and cells:
        cells = cells[1:]
    
    parts = []
    for cell in cells:
        source = ''.join(cell['source'])
        
        if cell['cell_type'] == 'markdown':
            parts.append(source)
        elif cell['cell_type'] == 'code':
            # Skip cells with nbdev directives
            if source.strip().startswith('#|'):
                continue
            parts.append(f"```python\n{source}\n```")
    
    return '\n\n'.join(parts)

In [None]:
# Test read_notebook_markdown
content = read_notebook_markdown(PROJECT_ROOT / "data" / "how_to_read.ipynb")
assert "Filter Framework" in content, "Should contain main content"
assert "```python" not in content or content.count("```python") >= 0, "Code blocks formatted"
print(f"Extracted {len(content)} chars, first 200:\n{content[:200]}...")

In [None]:
#| export
def update_post(db, slug: str, **kwargs) -> bool:
    """Update an existing post by slug.
    
    Args:
        db: Database connection
        slug: Post slug to find
        **kwargs: Fields to update (content, title, categories, etc.)
    
    Returns:
        True if post was found and updated, False otherwise
    """
    posts = list(db.t.post.rows_where('slug = ?', [slug], limit=1))
    if not posts:
        return False
    
    post = posts[0]
    kwargs['updated'] = datetime.now().isoformat()
    
    db.t.post.update(kwargs, post['id'])
    return True

In [None]:
# Test update_post
dev_db = get_db(prod=False)
assert update_post(dev_db, 'nonexistent-slug', content='test') == False, "Should return False for missing post"
print("update_post validation passed")

In [None]:
#| export
def sync_notebook_to_post(db, slug: str, notebook_path, skip_first: bool = False) -> bool:
    """Read notebook content and update the matching post.
    
    Args:
        db: Database connection
        slug: Post slug to update
        notebook_path: Path to .ipynb file
        skip_first: Skip first cell of notebook
    
    Returns:
        True if successful, False if post not found
    """
    content = read_notebook_markdown(notebook_path, skip_first=skip_first)
    return update_post(db, slug, content=content)

In [None]:
post_gist = "https://gist.github.com/stantonius/1124934653f070d345ec4a19c57695f5"

In [None]:
#| notest
content, filename = fetch_gist_markdown(post_gist)

try:
    post = Post(
        title=input(f"Title [{filename}]: ").strip() or filename.replace('.md', ''),
        content=content,
        slug=input("Slug (or Enter to auto-generate): ").strip() or None,
        categories=parse_categories(input("Categories (comma-separated): "))
    )
except KeyboardInterrupt:
    print("\nCancelled.")


saved = save_post(get_db(prod=True), post)

## Delete Post

In [None]:
#| notest
try:
    # Fetch all posts from prod
    prod_db = get_db(prod=True)
    posts = list(prod_db.t.post.rows)

    # Display posts
    print("Posts in prod database:\n")
    for p in posts:
        print(f"  {p['id']}: {p['title']}")

    # Get selection
    ids_str = input("\nEnter post IDs to delete (comma-separated), or Enter to cancel: ").strip()
    if not ids_str:
        print("Cancelled.")
    else:
        ids = [int(i.strip()) for i in ids_str.split(',')]
        
        # Show what will be deleted
        print(f"\n‚ö†Ô∏è  About to delete {len(ids)} post(s):")
        for pid in ids:
            p = next((p for p in posts if p['id'] == pid), None)
            if p: print(f"  - {p['title']}")
        
        # Confirm
        confirm = input("\nType 'DELETE' to confirm: ").strip()
        if confirm == 'DELETE':
            for pid in ids:
                prod_db.t.post.delete(pid)
                print(f"üóëÔ∏è  Deleted post id={pid}")
            print("\n‚úÖ Done!")
        else:
            print("Cancelled.")
except KeyboardInterrupt:
    print("\nCancelled.")

Posts in prod database:

  2: Token & Embeddings Refresh
  4: RAG Fundamentals
  5: How to Read



‚ö†Ô∏è  About to delete 1 post(s):
  - RAG Fundamentals


üóëÔ∏è  Deleted post id=4

‚úÖ Done!
