In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

# Core

## MarkdownProcessor

In [None]:
#| export
from fastcore.basics import *
from fastcore.foundation import *
from fastcore.test import *
from fastcore.script import call_parse, Param

from pathlib import Path
import re, time
import threading

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler


class MarkdownProcessor:
    """Handles the processing of markdown files to extract code blocks"""
    def __init__(self, output_dir='py'):
        self.output_dir = Path(output_dir)
    
    def should_process(self, path):
        "Check if the markdown file should be processed"
        return path.suffix == '.md' and path.name.lower() != 'readme.md'
    
    def extract_code(self, md_content):
        "Extract code blocks from markdown content"
        pattern = r'`{3}[\w\s]*\n(.*?)`{3}|`([^`]+)`'
        code_blocks = []
        for match in re.finditer(pattern, md_content, re.DOTALL):
            code = match.group(1) or match.group(2)
            if code: code_blocks.append(code)
        return '\n'.join(code_blocks)
    
    def process_file(self, md_path):
        "Process a markdown file and save extracted code"
        if not self.should_process(Path(md_path)): return None
        
        content = Path(md_path).read_text()
        code = self.extract_code(content)
        if code:  # only create an output file if any code was extracted
            self.output_dir.mkdir(exist_ok=True)
            py_path = self.output_dir/f"{Path(md_path).stem}.py"
            py_path.write_text(code)
            return py_path
        return None

    def process_directory(self, input_dir):
        "Process all markdown files in the given directory"
        input_path = Path(input_dir)
        if not input_path.exists():
            raise ValueError(f"Directory {input_dir} does not exist")
        
        processed_files_containing_code = []
        for md_file in input_path.glob('**/*.md'):  # includes subdirectories
            if result := self.process_file(md_file):
                processed_files_containing_code.append(result)
    
        return processed_files_containing_code

In [None]:
mp = MarkdownProcessor()

# Test single backtick
test_str = "Some text `print('hello')`"
test_eq(mp.extract_code(test_str), "print('hello')")

In [None]:
# Test triple backtick
test_str = """Some text
```
def hello():
    print('world')
```
more text"""
print(test_str)

Some text
```
def hello():
    print('world')
```
more text


In [None]:
expected = "def hello():\n    print('world')\n"
print(expected)

def hello():
    print('world')



In [None]:
test_eq(mp.extract_code(test_str), expected)

In [None]:
# Test multiple code blocks
test_str = """```python
def hello():
    print('world')
```
Some text
```
def goodbye():
    print('bye')
```"""
expected = "def hello():\n    print('world')\n\ndef goodbye():\n    print('bye')\n"

test_eq(mp.extract_code(test_str), expected)

In [None]:
test_eq(mp.should_process(Path('test.md')), True)
test_eq(mp.should_process(Path('test.py')), False)
test_eq(mp.should_process(Path('test/test.md')), True)

test_eq(mp.should_process(Path('README.md')), False)
test_eq(mp.should_process(Path('readme.md')), False)
test_eq(mp.should_process(Path('ReadMe.md')), False)

Let's check that we are writing the file to disk as expected.

In [None]:
from tempfile import NamedTemporaryFile
from tempfile import TemporaryDirectory

with TemporaryDirectory() as tmp:
    # Setup temporary directories
    tmpdir = Path(tmp)
    out_dir = tmpdir/'py'
    
    # Create test markdown file
    md_path = tmpdir/'test.md'
    md_path.write_text(test_str)
    
    # Process the file
    mp = MarkdownProcessor(output_dir=out_dir)
    mp.process_file(md_path)
    
    # Check results
    out_file = out_dir/'test.py'
    test_eq(out_file.exists(), True)
    test_eq(out_file.read_text(), expected)

Let's see if we can process an entire directory as expected.

In [None]:
with TemporaryDirectory() as tmpdir:
    tmpdir = Path(tmpdir)

    (tmpdir/'test1.md').write_text(test_str)
    (tmpdir/'README.md').write_text('# Readme\n' + test_str)

    mp = MarkdownProcessor(output_dir=tmpdir/'py')
    processed = mp.process_directory(tmpdir)
    
    # Verify results
    test_eq(len(processed), 1)  # Should process 1 file (excluding README.md)
    test_eq((tmpdir/'py'/'test1.py').exists(), True)
    
    # Check content of processed files
    test_eq((tmpdir/'py'/'test1.py').read_text(), expected)

## MarkdownWatcher

In [None]:
#| export
class MarkdownWatcher(FileSystemEventHandler):
    """Watches for markdown file changes and triggers processing"""
    def __init__(self, processor):
        self.processor = processor
    
    def on_modified(self, event):
        self.processor.process_file(event.src_path)
        
    def on_created(self, event):
        self.processor.process_file(event.src_path)

Now let's test the markdown watcher, with the file created before and while the watcher is running.

In [None]:
# Setup
input_dir = Path('input_dir')
input_dir.mkdir(exist_ok=True)
output_dir = Path('output_py')

test_before = Path(input_dir / 'test_before.md')
test_before.write_text(test_str)

processor = MarkdownProcessor(output_dir=output_dir)
handler = MarkdownWatcher(processor)

observer = Observer()
observer.schedule(handler, str(input_dir), recursive=False)
observer.start()

test_while = Path(input_dir / 'test_while.md')
test_while.write_text(test_str)

# Give it a moment to process
# By giving it a relatively long time -- 5 seconds! -- we are preventing any intermittent, false failures
time.sleep(5)

# Test and clean up
for stem in ['test_before', 'test_while']:
    output_path = output_dir / f'{stem}.py'
    test_eq(output_path.exists(), True)
    output_path.unlink()
output_dir.rmdir()

observer.stop()
observer.join()

test_before.unlink()
test_while.unlink()
input_dir.rmdir()

## CLI commands

In [None]:
#| export

@call_parse
def literati(path:Param("Directory to monitor", str)='.', 
             output_dir:Param("Output directory for Python files", str)='py'):
    "Monitor markdown files and extract code blocks to Python files"
    processor = MarkdownProcessor(output_dir=output_dir)
    
    # Process existing files first
    processor.process_directory(path)
    
    handler = MarkdownWatcher(processor)
    
    observer = Observer()
    observer.schedule(handler, path, recursive=False)
    observer.start()
    
    print(f"G'day! Monitoring {path} for markdown files...")
    print(f"Python files will be saved to {output_dir}/")
    print("Optimizing the servo run... (Press Ctrl+C to stop)")
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
        observer.join()
    
    return observer

@call_parse
def md_to_py(
    input_dir: Param("Input directory containing markdown files", str),
    output_dir: Param("Output directory for Python files", str)='py'
):
    "Convert markdown files in input_dir to Python files in output_dir"
    processor = MarkdownProcessor(output_dir=output_dir)
    processed = processor.process_directory(input_dir)
    print(f"Processed {len(processed)} files:")
    for file in processed:
        print(f"  {file}")

We can easily test the `md_to_py` however I haven't found a good way how to test `iterati`. Falling back to manual testing.

In [None]:
import signal
import subprocess

with TemporaryDirectory() as tmpdir:
    # Setup
    tmpdir = Path(tmpdir)
    (tmpdir/'test.md').write_text(test_str)

    # Test md_to_py command
    try:
        md_to_py(str(tmpdir), str(tmpdir/'py'))
        test_eq((tmpdir/'py'/'test.py').exists(), True)
        test_eq((tmpdir/'py'/'test.py').read_text(), expected)
    except Exception as e:
        assert False, f"md_to_py raised an exception: {e}"

Processed 1 files:
  /var/folders/wt/jbbs6xs16r32mfmphb089_2h0000gn/T/tmpvk0tnpe1/py/test.py


In [None]:
# manual solution to testing iterati, uncomment the code for running and terminate using `interrupt kernel`
# make sure to comment it back out before commiting to the library!

# !rm -rf py
# Path('test.md').write_text(test_str)
# literati()
# test_eq(Path('py/test.py').read_text(), expected)
# !ls py
# !rm -rf py

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()