From f4f50b84a105967cb97d2290d35ba5e6e00b9ab2 Mon Sep 17 00:00:00 2001 From: "coderabbitai[bot]" <136622811+coderabbitai[bot]@users.noreply.github.com> Date: Mon, 10 Nov 2025 04:16:15 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=9D=20Add=20docstrings=20to=20`copilot?= =?UTF-8?q?/create-examkit-project`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Docstrings generation was requested by @thecoder8890. * https://github.com/thecoder8890/exam-kit/pull/2#issuecomment-3508520110 The following files were modified: * `examkit/asr/whisper_runner.py` * `examkit/cli.py` * `examkit/config.py` * `examkit/ingestion/exam_parser.py` * `examkit/ingestion/ingest.py` * `examkit/ingestion/ocr.py` * `examkit/ingestion/slides_parser.py` * `examkit/ingestion/transcript_normalizer.py` * `examkit/logging_utils.py` * `examkit/nlp/embeddings.py` * `examkit/nlp/retrieval.py` * `examkit/nlp/spacy_nlp.py` * `examkit/nlp/splitter.py` * `examkit/nlp/topic_mapping.py` * `examkit/qa/checks.py` * `examkit/render/pandoc_renderer.py` * `examkit/render/templater.py` * `examkit/render/typst_renderer.py` * `examkit/reports/coverage.py` * `examkit/reports/export.py` * `examkit/synthesis/citations.py` * `examkit/synthesis/composer.py` * `examkit/synthesis/diagrams.py` * `examkit/synthesis/ollama_client.py` * `examkit/synthesis/prompts.py` * `examkit/utils/io_utils.py` * `examkit/utils/math_utils.py` * `examkit/utils/text_utils.py` * `examkit/utils/timecode.py` * `tests/test_render.py` --- examkit/asr/whisper_runner.py | 68 ++++++++----- examkit/cli.py | 32 +++--- examkit/config.py | 22 ++--- examkit/ingestion/exam_parser.py | 52 ++++++---- examkit/ingestion/ingest.py | 54 ++++++----- examkit/ingestion/ocr.py | 41 ++++---- examkit/ingestion/slides_parser.py | 38 ++++---- examkit/ingestion/transcript_normalizer.py | 71 +++++++++----- examkit/logging_utils.py | 30 +++--- examkit/nlp/embeddings.py | 100 +++++++++---------- examkit/nlp/retrieval.py | 70 +++++++------- examkit/nlp/spacy_nlp.py | 88 +++++++++-------- examkit/nlp/splitter.py | 68 +++++++------ examkit/nlp/topic_mapping.py | 77 ++++++++------- examkit/qa/checks.py | 107 ++++++++++++--------- examkit/render/pandoc_renderer.py | 26 ++--- examkit/render/templater.py | 70 ++++++++------ examkit/render/typst_renderer.py | 47 +++++---- examkit/reports/coverage.py | 51 ++++++---- examkit/reports/export.py | 46 +++++---- examkit/synthesis/citations.py | 102 ++++++++++++-------- examkit/synthesis/composer.py | 50 ++++++---- examkit/synthesis/diagrams.py | 71 +++++++------- examkit/synthesis/ollama_client.py | 77 ++++++++------- examkit/synthesis/prompts.py | 70 ++++++++++++-- examkit/utils/io_utils.py | 73 +++++++------- examkit/utils/math_utils.py | 73 +++++++------- examkit/utils/text_utils.py | 101 ++++++++++--------- examkit/utils/timecode.py | 61 +++++++----- tests/test_render.py | 8 +- 30 files changed, 1044 insertions(+), 800 deletions(-) diff --git a/examkit/asr/whisper_runner.py b/examkit/asr/whisper_runner.py index 9c219bb..2909a5e 100644 --- a/examkit/asr/whisper_runner.py +++ b/examkit/asr/whisper_runner.py @@ -21,17 +21,27 @@ def transcribe_audio( logger: logging.Logger = None ) -> List[Dict[str, Any]]: """ - Transcribe audio file using faster-whisper. - - Args: - audio_path: Path to audio file (WAV recommended). - model_size: Whisper model size (tiny, base, small, medium, large). - language: Language code (en, es, fr, etc.). - vad: Enable Voice Activity Detection. - logger: Logger instance. - + Transcribe an audio file into timestamped segments using faster-whisper. + + Transcribes the given audio file with the specified Whisper model and returns a list of segment dictionaries containing start/end timestamps and cleaned text. + + Parameters: + audio_path (Path): Path to the audio file. + model_size (str): Whisper model size to load (e.g., "tiny", "base", "small", "medium", "large"). + language (str): Language code hint for transcription (e.g., "en", "es", "fr"). + vad (bool): Whether to enable voice activity detection to filter non-speech. + logger (logging.Logger | None): Optional logger for informational messages. + Returns: - List of transcription segments. + List[Dict[str, Any]]: A list of segments where each segment dictionary contains: + - "source": "asr" + - "type": "whisper" + - "start": start time in seconds + - "end": end time in seconds + - "text": transcribed text (stripped of surrounding whitespace) + + Raises: + ImportError: If faster-whisper is not available. """ if not WHISPER_AVAILABLE: raise ImportError("faster-whisper not available. Install with: pip install faster-whisper") @@ -75,16 +85,21 @@ def transcribe_with_timestamps( logger: logging.Logger = None ) -> Dict[str, Any]: """ - Transcribe audio with detailed timestamp information. - - Args: - audio_path: Path to audio file. - model_size: Whisper model size. - language: Language code. - logger: Logger instance. - + Transcribe an audio file and return timestamped segments and summary metadata. + + Parameters: + audio_path (Path): Path to the input audio file. + model_size (str): Whisper model size identifier (e.g., "small"). + language (str): ISO language code to use for transcription. + Returns: - Dictionary with transcription and metadata. + result (dict): Dictionary containing: + - audio_file (str): String path of the input audio file. + - model (str): Model size used. + - language (str): Language code used. + - segments (List[dict]): List of segment dictionaries each with keys `source`, `type`, `start`, `end`, and `text`. + - total_duration (float): End time of the last segment in seconds, or 0.0 if no segments. + - total_segments (int): Number of segments. """ segments = transcribe_audio(audio_path, model_size, language, True, logger) @@ -102,11 +117,14 @@ def transcribe_with_timestamps( def export_to_vtt(segments: List[Dict[str, Any]], output_path: Path) -> None: """ - Export transcription segments to VTT format. - - Args: - segments: List of transcription segments. - output_path: Path for output VTT file. + Write transcription segments to a WebVTT file at the given path. + + Each segment must be a mapping containing keys "start" (seconds, number), "end" (seconds, number) + and "text" (string). The function creates or overwrites the file at output_path and writes + a valid WEBVTT document where each segment is numbered and formatted as a time range with text. + Parameters: + segments (List[Dict[str, Any]]): Ordered transcription segments with "start", "end", and "text". + output_path (Path): Filesystem path to write the .vtt file; existing file will be overwritten. """ from examkit.utils.timecode import seconds_to_timecode @@ -120,4 +138,4 @@ def export_to_vtt(segments: List[Dict[str, Any]], output_path: Path) -> None: f.write(f"{i}\n") f.write(f"{start} --> {end}\n") - f.write(f"{text}\n\n") + f.write(f"{text}\n\n") \ No newline at end of file diff --git a/examkit/cli.py b/examkit/cli.py index b51ff00..3e7fc8a 100644 --- a/examkit/cli.py +++ b/examkit/cli.py @@ -44,10 +44,15 @@ def ingest( log_level: str = typer.Option("INFO", "--log-level", "-l", help="Logging level") ) -> None: """ - Ingest and preprocess input files (video, transcript, slides, exam). - - Validates inputs, extracts audio with ffmpeg, normalizes transcripts, - parses slides and exam papers, and saves processed data to cache. + Run the ingestion pipeline to preprocess input files and populate the cache. + + Loads the manifest, invokes the ingestion pipeline to process videos, transcripts, + slides, and exam files, and writes processed artifacts to the specified cache + directory while printing status to the console. On failure the function logs the + error and exits the process with code 1. + + Raises: + typer.Exit: Exits with code 1 when ingestion fails. """ logger = setup_logging(level=log_level, log_file=Path("logs/ingest.log")) logger.info("Starting ingestion pipeline") @@ -99,10 +104,9 @@ def build( log_level: str = typer.Option("INFO", "--log-level", "-l", help="Logging level") ) -> None: """ - Build exam-ready PDF from processed inputs. - - Runs the full pipeline: embeddings → topic mapping → RAG synthesis - with Ollama → diagrams → templating → Typst/Pandoc rendering. + Build an exam-ready PDF for a session using the provided configuration and write outputs to the specified path. + + Prints the generated PDF, citations, coverage, and notes paths to the console. Exits with code 1 on error. """ logger = setup_logging(level=log_level, log_file=Path("logs/build.log")) logger.info(f"Starting build pipeline for session: {session_id}") @@ -193,10 +197,12 @@ def cache( ) ) -> None: """ - Manage cache directory. - - Actions: - clear - Remove all cached files safely + Manage the local cache directory for the CLI. + + When `action` is "clear", delete the cache directory if it exists and recreate it; if the directory does not exist, print a warning. For any other `action`, print an error listing available actions and exit with a non-zero status. + + Parameters: + action (str): Action to perform. Supported value: "clear". """ if action == "clear": cache_dir = Path("cache") @@ -219,4 +225,4 @@ def main() -> None: if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/examkit/config.py b/examkit/config.py index 59cc4af..1b6fdaa 100644 --- a/examkit/config.py +++ b/examkit/config.py @@ -81,13 +81,13 @@ class ExamKitConfig(BaseModel): @classmethod def from_yaml(cls, path: Path) -> "ExamKitConfig": """ - Load configuration from a YAML file. - - Args: - path: Path to the YAML configuration file. - + Create an ExamKitConfig from a YAML file. + + Parameters: + path (Path): Filesystem path to a YAML configuration file. + Returns: - ExamKitConfig instance. + ExamKitConfig: Configuration instance populated from the file's contents. """ with open(path, "r") as f: data = yaml.safe_load(f) @@ -95,10 +95,10 @@ def from_yaml(cls, path: Path) -> "ExamKitConfig": def to_yaml(self, path: Path) -> None: """ - Save configuration to a YAML file. - - Args: - path: Path to save the YAML configuration file. + Write the current configuration to the given filesystem path as YAML. + + Parameters: + path (Path): Filesystem path where the YAML file will be written. """ with open(path, "w") as f: - yaml.dump(self.model_dump(), f, default_flow_style=False, sort_keys=False) + yaml.dump(self.model_dump(), f, default_flow_style=False, sort_keys=False) \ No newline at end of file diff --git a/examkit/ingestion/exam_parser.py b/examkit/ingestion/exam_parser.py index 211a0a9..900a2e8 100644 --- a/examkit/ingestion/exam_parser.py +++ b/examkit/ingestion/exam_parser.py @@ -12,13 +12,15 @@ def extract_marks(text: str) -> int: """ - Extract marks from text using common patterns. - - Args: - text: Text containing marks information. - + Extract the numeric marks present in a text line using common bracketed patterns. + + Recognized patterns include forms like "[5 marks]", "(5 marks)", "[5]", and "(5)" (case-insensitive). The first matching numeric value is returned. + + Parameters: + text (str): Input text that may contain marks. + Returns: - Number of marks (0 if not found). + int: Number of marks found, or 0 if no marks are detected. """ # Common patterns: [5 marks], (5 marks), [5], (5) patterns = [ @@ -38,13 +40,24 @@ def extract_marks(text: str) -> int: def parse_exam_structure(text: str) -> List[Dict[str, Any]]: """ - Parse exam structure from text. - - Args: - text: Exam paper text content. - + Extract a structured list of questions and their parts from raw exam text. + + Parameters: + text (str): Full textual content of an exam paper (may contain multiple lines). + Returns: - List of question dictionaries. + List[Dict[str, Any]]: A list of question dictionaries. Each question dictionary includes the keys: + - `source`: origin identifier (e.g., "exam") + - `section`: section letter if detected (e.g., "A") or None + - `question_id`: string identifier (e.g., "Q1") + - `question_number`: integer question number + - `text`: concatenated text of the question + - `parts`: list of part dictionaries + - `marks`: numeric marks extracted for the question + Each part dictionary includes: + - `part_id`: identifier for the part (e.g., "a", "i") + - `text`: concatenated text of the part + - `marks`: numeric marks extracted for the part """ questions = [] lines = text.split('\n') @@ -114,14 +127,13 @@ def parse_exam_structure(text: str) -> List[Dict[str, Any]]: def parse_exam(path: Path, logger: logging.Logger) -> List[Dict[str, Any]]: """ - Parse exam paper PDF. - - Args: - path: Path to exam PDF file. - logger: Logger instance. - + Parse an exam PDF and return its extracted question structure. + + Parameters: + path (Path): Filesystem path to the exam PDF. + Returns: - List of question dictionaries. + List[Dict[str, Any]]: A list of question dictionaries. Each dictionary includes keys such as `source`, `section`, `question_id`, `question_number`, `text`, `parts` (a list of part dictionaries with `part_id`, `text`, and `marks`), and `marks`. """ logger.info(f"Parsing exam paper: {path}") @@ -138,4 +150,4 @@ def parse_exam(path: Path, logger: logging.Logger) -> List[Dict[str, Any]]: questions = parse_exam_structure(full_text) logger.info(f"Parsed {len(questions)} questions from exam paper") - return questions + return questions \ No newline at end of file diff --git a/examkit/ingestion/ingest.py b/examkit/ingestion/ingest.py index eadf794..f2b12f2 100644 --- a/examkit/ingestion/ingest.py +++ b/examkit/ingestion/ingest.py @@ -14,13 +14,16 @@ def validate_manifest(manifest: Dict[str, Any]) -> bool: """ - Validate manifest structure and file existence. - - Args: - manifest: Manifest dictionary. - + Validate that a manifest contains required fields and that its 'inputs' value is a dictionary. + + Parameters: + manifest (Dict[str, Any]): Manifest data expected to include at least the keys `"session_id"` and `"inputs"`. + Returns: - True if valid, raises ValueError otherwise. + bool: `True` if the manifest contains the required keys and `'inputs'` is a dictionary. + + Raises: + ValueError: If a required key is missing or if `manifest["inputs"]` is not a dictionary. """ required_keys = ["session_id", "inputs"] for key in required_keys: @@ -36,15 +39,17 @@ def validate_manifest(manifest: Dict[str, Any]) -> bool: def extract_audio_from_video(video_path: Path, output_path: Path, logger: logging.Logger) -> Path: """ - Extract audio from video file using ffmpeg. - - Args: - video_path: Path to input video file. - output_path: Path for output WAV file. - logger: Logger instance. - + Extract audio from a video file and save it as a 16 kHz mono PCM WAV. + + Parameters: + video_path (Path): Path to the input video file. + output_path (Path): Destination path for the extracted WAV file; the function will create the parent directory if needed. + Returns: - Path to extracted audio file. + Path: Path to the extracted audio file. + + Raises: + ffmpeg.Error: If FFmpeg fails during extraction. """ logger.info(f"Extracting audio from {video_path}") @@ -76,15 +81,18 @@ def ingest_pipeline( logger: logging.Logger ) -> Dict[str, Any]: """ - Run the complete ingestion pipeline. - - Args: - manifest: Manifest describing input files. - cache_dir: Directory for cached/processed files. - logger: Logger instance. - + Run the ingestion pipeline for a session and produce processed outputs in the cache directory. + + Parameters: + manifest (Dict[str, Any]): Manifest containing at least "session_id" and an "inputs" mapping of optional keys: "video", "transcript", "slides", "exam". + cache_dir (Path): Directory where processed files and the normalized manifest will be written. + logger (logging.Logger): Logger used for informational and warning messages. + Returns: - Dictionary with paths to processed files. + result (Dict[str, Any]): Dictionary with: + - "session_id" (str): The manifest's session identifier. + - "processed_files" (Dict[str, str]): Mapping of output types ("audio", "transcript", "slides", "exam") to their file paths in the cache for inputs that were present and processed. + - "normalized_manifest" (str): Path to the written normalized manifest JSON in the cache. """ from examkit.ingestion.transcript_normalizer import normalize_transcript from examkit.ingestion.slides_parser import parse_slides @@ -159,4 +167,4 @@ def ingest_pipeline( result["normalized_manifest"] = str(normalized_manifest_path) logger.info("Ingestion pipeline complete") - return result + return result \ No newline at end of file diff --git a/examkit/ingestion/ocr.py b/examkit/ingestion/ocr.py index 82af845..81d114f 100644 --- a/examkit/ingestion/ocr.py +++ b/examkit/ingestion/ocr.py @@ -15,14 +15,10 @@ def extract_text_with_ocr(image_path: Path, logger: logging.Logger) -> str: """ - Extract text from image using Tesseract OCR. - - Args: - image_path: Path to image file. - logger: Logger instance. - + Extracts text from the image at the given path using Tesseract OCR. + Returns: - Extracted text. + Extracted text from the image, or an empty string if Tesseract is unavailable or OCR fails. """ if not TESSERACT_AVAILABLE: logger.warning("Tesseract not available, OCR skipped") @@ -45,14 +41,13 @@ def extract_text_with_ocr(image_path: Path, logger: logging.Logger) -> str: def get_ocr_confidence(image_path: Path, logger: logging.Logger) -> float: """ - Get OCR confidence score for an image. - - Args: - image_path: Path to image file. - logger: Logger instance. - + Compute the average OCR confidence for the given image. + + Parameters: + image_path (Path): Path to the image file to analyze. + Returns: - Confidence score (0-100). + float: Average confidence score between 0 and 100. Returns 0.0 if OCR is unavailable, no valid confidences are found, or an error occurs. """ if not TESSERACT_AVAILABLE: return 0.0 @@ -74,14 +69,16 @@ def get_ocr_confidence(image_path: Path, logger: logging.Logger) -> float: def preprocess_image_for_ocr(image_path: Path, output_path: Path) -> Path: """ - Preprocess image to improve OCR accuracy. - - Args: - image_path: Path to input image. - output_path: Path for preprocessed image. - + Prepare an image for OCR by converting it to grayscale, boosting contrast, and applying sharpening. + + If TESSERACT_AVAILABLE is False, the function returns the original input path without modifying or creating a file. + + Parameters: + image_path (Path): Path to the input image file. + output_path (Path): Destination path for the preprocessed image. + Returns: - Path to preprocessed image. + Path: Path to the preprocessed image, or the original `image_path` if OCR is unavailable. """ if not TESSERACT_AVAILABLE: return image_path @@ -100,4 +97,4 @@ def preprocess_image_for_ocr(image_path: Path, output_path: Path) -> Path: # Save preprocessed image image.save(output_path) - return output_path + return output_path \ No newline at end of file diff --git a/examkit/ingestion/slides_parser.py b/examkit/ingestion/slides_parser.py index 8746ca3..044cc2a 100644 --- a/examkit/ingestion/slides_parser.py +++ b/examkit/ingestion/slides_parser.py @@ -15,15 +15,22 @@ def parse_pptx(path: Path, cache_dir: Path, logger: logging.Logger) -> List[Dict[str, Any]]: """ - Parse PowerPoint (PPTX) file. - - Args: - path: Path to PPTX file. - cache_dir: Directory to save extracted images. - logger: Logger instance. - + Extract structured slide information from a PPTX file. + + Parameters: + path (Path): Path to the source PPTX file. + cache_dir (Path): Directory used to store slide-related cache (e.g., generated image files). + logger (logging.Logger): Logger used for progress and warning messages. + Returns: - List of slide dictionaries. + List[Dict[str, Any]]: A list of slide dictionaries with the following keys: + - source (str): Fixed value "slides". + - type (str): Fixed value "pptx". + - slide_number (int): 1-based slide index. + - title (str): Slide title text if present, otherwise empty string. + - content (List[str]): Text blocks from the slide excluding the title. + - notes (str): Slide notes text if present, otherwise empty string. + - images (List[str]): Filenames (placeholders) for images detected on the slide. """ logger.info(f"Parsing PPTX: {path}") @@ -74,15 +81,12 @@ def parse_pptx(path: Path, cache_dir: Path, logger: logging.Logger) -> List[Dict def parse_pdf_slides(path: Path, cache_dir: Path, logger: logging.Logger) -> List[Dict[str, Any]]: """ - Parse PDF slides using PyMuPDF. - - Args: - path: Path to PDF file. - cache_dir: Directory to save extracted images. - logger: Logger instance. - + Parse a PDF as a sequence of slide-like dictionaries. + + When a page has few embedded characters, attempts OCR on a rendered high-resolution image; uses the first non-empty line of page text as the slide title and remaining lines as content. Extracts image references for each page into the `images` list. + Returns: - List of slide dictionaries. + List of dictionaries, each with keys: `source`, `type`, `slide_number`, `title`, `content`, and `images`. """ from examkit.ingestion.ocr import extract_text_with_ocr @@ -157,4 +161,4 @@ def parse_slides(path: Path, cache_dir: Path, logger: logging.Logger) -> List[Di elif suffix == '.pdf': return parse_pdf_slides(path, cache_dir, logger) else: - raise ValueError(f"Unsupported slides format: {suffix}") + raise ValueError(f"Unsupported slides format: {suffix}") \ No newline at end of file diff --git a/examkit/ingestion/transcript_normalizer.py b/examkit/ingestion/transcript_normalizer.py index 86270fb..e4e3a7c 100644 --- a/examkit/ingestion/transcript_normalizer.py +++ b/examkit/ingestion/transcript_normalizer.py @@ -12,13 +12,20 @@ def parse_vtt(content: str) -> List[Dict[str, Any]]: """ - Parse VTT (WebVTT) transcript format. - - Args: - content: VTT file content. - + Parse WebVTT content into a list of transcript segment dictionaries. + + Each segment represents a contiguous caption with its start and end times (in seconds) and the combined text. Empty caption blocks are omitted. + + Parameters: + content (str): Raw WebVTT file content. + Returns: - List of segment dictionaries. + List[Dict[str, Any]]: A list of segments where each segment has keys: + - "source": "transcript" + - "type": "vtt" + - "start" (float): Start time in seconds. + - "end" (float): End time in seconds. + - "text" (str): Concatenated caption text. """ segments = [] lines = content.split('\n') @@ -60,13 +67,18 @@ def parse_vtt(content: str) -> List[Dict[str, Any]]: def parse_srt(content: str) -> List[Dict[str, Any]]: """ - Parse SRT (SubRip) transcript format. - - Args: - content: SRT file content. - + Parse SubRip (SRT) formatted transcript into a list of segment dictionaries. + + Parameters: + content (str): Raw SRT file contents. + Returns: - List of segment dictionaries. + List[Dict[str, Any]]: A list of segments where each segment contains: + - "source": "transcript" + - "type": "srt" + - "start": start time in seconds (float) + - "end": end time in seconds (float) + - "text": concatenated subtitle text (str) """ segments = [] blocks = content.strip().split('\n\n') @@ -105,13 +117,21 @@ def parse_srt(content: str) -> List[Dict[str, Any]]: def parse_txt(content: str) -> List[Dict[str, Any]]: """ - Parse plain text transcript (no timestamps). - - Args: - content: Plain text content. - + Parse a plain-text transcript into paragraph segments. + + Paragraphs are split on double newlines; leading/trailing whitespace is trimmed and empty paragraphs are ignored. + + Parameters: + content (str): Raw transcript text. + Returns: - List of segment dictionaries (with dummy timestamps). + List[Dict[str, Any]]: A list of segment dictionaries. Each segment has: + - "source": "transcript" + - "type": "txt" + - "start": None + - "end": None + - "text": paragraph text + - "index": zero-based paragraph order """ # Split into paragraphs paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()] @@ -132,14 +152,13 @@ def parse_txt(content: str) -> List[Dict[str, Any]]: def normalize_transcript(path: Path, logger: logging.Logger) -> List[Dict[str, Any]]: """ - Normalize transcript from various formats to standardized JSONL. - - Args: - path: Path to transcript file. - logger: Logger instance. - + Normalize a transcript file (VTT, SRT, or TXT) into a list of standardized segment dictionaries. + + Parameters: + path (Path): Filesystem path to the transcript file to parse. + Returns: - List of normalized transcript segments. + List[Dict[str, Any]]: A list of segment dictionaries. Segments that include `start` timestamps are sorted by start time and appear first; segments without timestamps follow. """ logger.info(f"Normalizing transcript: {path}") @@ -165,4 +184,4 @@ def normalize_transcript(path: Path, logger: logging.Logger) -> List[Dict[str, A segments_with_time.sort(key=lambda x: x['start']) logger.info(f"Normalized {len(segments)} segments from {suffix} format") - return segments_with_time + segments_without_time + return segments_with_time + segments_without_time \ No newline at end of file diff --git a/examkit/logging_utils.py b/examkit/logging_utils.py index c01cfe4..b23e89f 100644 --- a/examkit/logging_utils.py +++ b/examkit/logging_utils.py @@ -19,15 +19,15 @@ def setup_logging( rich_output: bool = True ) -> logging.Logger: """ - Configure logging for the application. - - Args: - level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). - log_file: Optional path to log file. - rich_output: Whether to use Rich formatting for console output. - + Configure and return the "examkit" logger with console and optional file handlers. + + Parameters: + level (str): Logging level name (e.g., "DEBUG", "INFO"). Invalid or unknown names default to "INFO". + log_file (Optional[Path]): If provided, a file handler is added and the file's parent directory will be created if necessary. + rich_output (bool): If True, console output is formatted with Rich; otherwise a standard stream formatter is used. + Returns: - Configured logger instance. + logging.Logger: The configured logger named "examkit". """ # Convert string level to logging constant numeric_level = getattr(logging, level.upper(), logging.INFO) @@ -75,12 +75,12 @@ def setup_logging( def get_logger(name: str) -> logging.Logger: """ - Get a logger instance for a module. - - Args: - name: Module name. - + Retrieve a namespaced logger for the given module. + + Parameters: + name (str): Module name to namespace under "examkit". + Returns: - Logger instance. + logging.Logger: Logger named "examkit.". """ - return logging.getLogger(f"examkit.{name}") + return logging.getLogger(f"examkit.{name}") \ No newline at end of file diff --git a/examkit/nlp/embeddings.py b/examkit/nlp/embeddings.py index 103c834..9c401d7 100644 --- a/examkit/nlp/embeddings.py +++ b/examkit/nlp/embeddings.py @@ -24,14 +24,16 @@ def load_embedding_model(model_name: str = "all-MiniLM-L6-v2", logger: logging.Logger = None): """ - Load sentence-transformers model. - - Args: - model_name: Model name. - logger: Logger instance. - + Load a SentenceTransformer embedding model by name. + + Parameters: + model_name (str): Identifier of the SentenceTransformer model to load (e.g., "all-MiniLM-L6-v2"). + Returns: - Loaded model. + The instantiated `SentenceTransformer` model. + + Raises: + ImportError: If the `sentence-transformers` package is not available. """ if not SENTENCE_TRANSFORMERS_AVAILABLE: raise ImportError("sentence-transformers not available") @@ -50,16 +52,12 @@ def generate_embeddings( logger: logging.Logger = None ) -> np.ndarray: """ - Generate embeddings for a list of texts. - - Args: - texts: List of text strings. - model: SentenceTransformer model. - batch_size: Batch size for encoding. - logger: Logger instance. - + Generate embeddings for each input text using the provided sentence-transformer model. + + Each row in the returned array corresponds to the embedding for the text at the same position in `texts`, preserving order. + Returns: - Numpy array of embeddings. + np.ndarray: Array of embeddings where row i is the embedding for texts[i]. """ if logger: logger.info(f"Generating embeddings for {len(texts)} texts") @@ -80,15 +78,17 @@ def create_faiss_index( logger: logging.Logger = None ) -> Any: """ - Create FAISS index from embeddings. - - Args: - embeddings: Numpy array of embeddings. - dim: Embedding dimension. - logger: Logger instance. - + Create a FAISS flat L2 index and add the provided embedding vectors. + + Parameters: + embeddings (np.ndarray): Array of vectors to index. + dim (int): Dimensionality of each embedding vector. + Returns: - FAISS index. + faiss_index: FAISS IndexFlatL2 instance containing the provided vectors. + + Raises: + ImportError: If `faiss` is not available. """ if not FAISS_AVAILABLE: raise ImportError("faiss not available") @@ -105,13 +105,13 @@ def create_faiss_index( def save_index(index: Any, index_path: Path, metadata: Dict[str, Any], metadata_path: Path) -> None: """ - Save FAISS index and metadata. - - Args: - index: FAISS index. - index_path: Path to save index. - metadata: Metadata dictionary. - metadata_path: Path to save metadata. + Persist a FAISS index and its associated metadata to disk. + + Parameters: + index: FAISS index instance to save. + index_path (Path): Filesystem path where the FAISS index file will be written. + metadata (Dict[str, Any]): Dictionary of metadata associated with the index (for example, mapping vector identifiers to records). + metadata_path (Path): Filesystem path where the metadata will be serialized and saved. """ # Save FAISS index faiss.write_index(index, str(index_path)) @@ -123,14 +123,14 @@ def save_index(index: Any, index_path: Path, metadata: Dict[str, Any], metadata_ def load_index(index_path: Path, metadata_path: Path) -> tuple: """ - Load FAISS index and metadata. - - Args: - index_path: Path to FAISS index. - metadata_path: Path to metadata file. - + Load a FAISS index and its associated metadata from disk. + + Parameters: + index_path (Path): Path to the FAISS index file. + metadata_path (Path): Path to the pickled metadata file. + Returns: - Tuple of (index, metadata). + tuple: (index, metadata) where `index` is a FAISS Index instance and `metadata` is the Python object restored from the metadata file. """ index = faiss.read_index(str(index_path)) @@ -148,17 +148,19 @@ def search_similar( top_k: int = 5 ) -> List[Dict[str, Any]]: """ - Search for similar texts using FAISS. - - Args: - query: Query text. - model: SentenceTransformer model. - index: FAISS index. - metadata: List of metadata dicts for each indexed text. - top_k: Number of results to return. - + Finds metadata entries most similar to a query using a FAISS index. + + Parameters: + query (str): Text query to search for. + model: SentenceTransformer instance used to encode the query into an embedding. + index: FAISS index containing the indexed embeddings. + metadata (List[Dict[str, Any]]): List of metadata dictionaries aligned by position with the indexed embeddings. + top_k (int): Number of top results to return. + Returns: - List of similar items with scores. + List[Dict[str, Any]]: List of metadata dictionaries for the top matches, each augmented with: + - "distance" (float): L2 distance between the query embedding and the matched vector. + - "rank" (int): 1-based rank (1 is the closest). """ # Generate query embedding query_embedding = model.encode([query], convert_to_numpy=True) @@ -174,4 +176,4 @@ def search_similar( result["rank"] = i + 1 results.append(result) - return results + return results \ No newline at end of file diff --git a/examkit/nlp/retrieval.py b/examkit/nlp/retrieval.py index 2055a3d..ab1a69d 100644 --- a/examkit/nlp/retrieval.py +++ b/examkit/nlp/retrieval.py @@ -17,18 +17,18 @@ def retrieve_context_for_topic( logger: logging.Logger = None ) -> List[Dict[str, Any]]: """ - Retrieve relevant context chunks for a topic. - - Args: - topic: Topic dictionary. - model: Embedding model. - index: FAISS index. - chunks_metadata: Metadata for all chunks. - top_k: Number of chunks to retrieve. - logger: Logger instance. - + Retrieve context chunks relevant to a topic. + + Parameters: + topic (dict): Topic object with at least a `name` key. May include `description` (str) and `keywords` (List[str]) to enrich the query. + model: Embedding model used for similarity search. + index: Vector index used for retrieval. + chunks_metadata (List[dict]): List of candidate chunk metadata to search over. + top_k (int): Maximum number of chunks to return. + logger (logging.Logger, optional): Logger for debug messages. + Returns: - List of relevant chunks with metadata. + List[dict]: Chunks ranked by relevance to the topic. Each item is a metadata dictionary typically containing fields such as `text`, `source`, and `distance`. """ # Create query from topic query = f"{topic['name']} {topic.get('description', '')} {' '.join(topic.get('keywords', []))}" @@ -44,14 +44,14 @@ def retrieve_context_for_topic( def deduplicate_chunks(chunks: List[Dict[str, Any]], similarity_threshold: float = 0.95) -> List[Dict[str, Any]]: """ - Remove duplicate or highly similar chunks. - - Args: - chunks: List of chunks. - similarity_threshold: Threshold for considering chunks as duplicates. - + Remove duplicate chunks by exact text match, preserving the first occurrence order. + + Parameters: + chunks (List[Dict[str, Any]]): Sequence of chunk dictionaries that may contain a "text" field. + similarity_threshold (float): Currently unused; kept for API compatibility. + Returns: - Deduplicated chunks. + List[Dict[str, Any]]: Deduplicated list where later chunks with the same "text" as an earlier chunk are removed. """ if not chunks: return [] @@ -80,14 +80,14 @@ def rank_by_source_diversity( prefer_exam: bool = True ) -> List[Dict[str, Any]]: """ - Re-rank chunks to promote source diversity. - - Args: - chunks: List of chunks with source information. - prefer_exam: Whether to prioritize exam-related chunks. - + Reorder a list of chunks to increase diversity of their originating sources. + + Parameters: + chunks (List[Dict[str, Any]]): Chunks containing at least a "source" field. + prefer_exam (bool): If True, prioritize sources in the order ["exam", "slides", "transcript", "asr"]; if False, use ["slides", "transcript", "exam", "asr"]. + Returns: - Re-ranked chunks. + List[Dict[str, Any]]: The input chunks re-ranked by interleaving items from prioritized sources; chunks from sources not in the priority list are appended at the end. """ if not chunks: return [] @@ -128,17 +128,19 @@ def filter_by_confidence( max_distance: float = 1.0 ) -> List[Dict[str, Any]]: """ - Filter chunks by distance/confidence score. - - Args: - chunks: List of chunks with distance scores. - min_distance: Minimum distance threshold. - max_distance: Maximum distance threshold. - + Filter chunks to those whose distance score lies within the inclusive range. + + Chunks missing a "distance" field are treated as having distance 999 and will be excluded unless the range includes that value. + + Parameters: + chunks: Iterable of chunk dictionaries; each chunk's "distance" key is used for filtering. + min_distance: Minimum acceptable distance (inclusive). + max_distance: Maximum acceptable distance (inclusive). + Returns: - Filtered chunks. + Filtered list of chunks whose "distance" is between min_distance and max_distance, inclusive. """ return [ chunk for chunk in chunks if min_distance <= chunk.get("distance", 999) <= max_distance - ] + ] \ No newline at end of file diff --git a/examkit/nlp/spacy_nlp.py b/examkit/nlp/spacy_nlp.py index 71dfb06..5b1c7be 100644 --- a/examkit/nlp/spacy_nlp.py +++ b/examkit/nlp/spacy_nlp.py @@ -14,15 +14,21 @@ def extract_named_entities(text: str, nlp, logger: logging.Logger = None) -> List[Dict[str, Any]]: """ - Extract named entities from text using spaCy. - - Args: - text: Input text. - nlp: SpaCy model. - logger: Logger instance. - + Extract named entities from text and return them as dictionaries. + + If spaCy is unavailable (module-level SPACY_AVAILABLE is False), returns an empty list. + + Parameters: + text (str): Text to analyze. + nlp: A spaCy language model used to create a Doc for extraction. + logger (logging.Logger, optional): If provided, receives a debug message with the count of extracted entities. + Returns: - List of named entities with labels. + List[dict]: A list of entity dictionaries, each containing: + - "text": the entity string as found in the input, + - "label": the entity label (spaCy label string), + - "start": start character offset of the entity, + - "end": end character offset of the entity. """ if not SPACY_AVAILABLE: return [] @@ -46,15 +52,14 @@ def extract_named_entities(text: str, nlp, logger: logging.Logger = None) -> Lis def clean_and_tokenize(text: str, nlp, remove_stopwords: bool = False) -> List[str]: """ - Clean and tokenize text using spaCy. - - Args: - text: Input text. - nlp: SpaCy model. - remove_stopwords: Whether to remove stopwords. - + Clean and tokenize text into lowercase tokens, removing punctuation and whitespace. + + Parameters: + text (str): Input text to process. + remove_stopwords (bool): If True, omit spaCy stopwords from the output. + Returns: - List of tokens. + List[str]: Cleaned, tokenized, lowercase tokens. If spaCy is unavailable, returns text.split(). """ if not SPACY_AVAILABLE: return text.split() @@ -74,15 +79,16 @@ def clean_and_tokenize(text: str, nlp, remove_stopwords: bool = False) -> List[s def extract_key_phrases(text: str, nlp, top_n: int = 10) -> List[str]: """ - Extract key noun phrases from text. - - Args: - text: Input text. - nlp: SpaCy model. - top_n: Number of phrases to return. - + Extract noun phrase key phrases from the given text. + + If spaCy is unavailable, returns an empty list. The returned list contains unique noun phrases found in the text, limited to at most `top_n` items. + + Parameters: + nlp: SpaCy language model used to parse the text. + top_n (int): Maximum number of phrases to return. + Returns: - List of key phrases. + List of unique noun phrases, limited to `top_n` items. """ if not SPACY_AVAILABLE: return [] @@ -100,14 +106,12 @@ def extract_key_phrases(text: str, nlp, top_n: int = 10) -> List[str]: def lemmatize_text(text: str, nlp) -> str: """ - Lemmatize text using spaCy. - - Args: - text: Input text. - nlp: SpaCy model. - + Return the input text with each token replaced by its lemma. + + If spaCy is unavailable, the original text is returned. + Returns: - Lemmatized text. + Lemmatized text with tokens' lemmas joined by single spaces. """ if not SPACY_AVAILABLE: return text @@ -119,14 +123,20 @@ def lemmatize_text(text: str, nlp) -> str: def detect_language_patterns(text: str, nlp) -> Dict[str, Any]: """ - Detect language patterns and structure. - - Args: - text: Input text. - nlp: SpaCy model. - + Analyze text to extract basic language structure and pattern metrics. + + Parameters: + text (str): Text to analyze. + nlp: spaCy language model used to parse the text. + Returns: - Dictionary with language pattern information. + patterns (Dict[str, Any]): Mapping with the following keys: + - "sentence_count": Number of sentences in the text. + - "token_count": Total number of tokens. + - "has_questions": `true` if the text contains a question mark, `false` otherwise. + - "has_imperatives": `true` if any sentence appears to start with a base-form verb, `false` otherwise. + - "noun_phrases": Number of noun phrase chunks. + - "entities": Number of named entities detected. """ if not SPACY_AVAILABLE: return {} @@ -142,4 +152,4 @@ def detect_language_patterns(text: str, nlp) -> Dict[str, Any]: "entities": len(doc.ents) } - return patterns + return patterns \ No newline at end of file diff --git a/examkit/nlp/splitter.py b/examkit/nlp/splitter.py index 63ddd8f..d3caeae 100644 --- a/examkit/nlp/splitter.py +++ b/examkit/nlp/splitter.py @@ -14,14 +14,18 @@ def load_spacy_model(model_name: str = "en_core_web_sm", logger: logging.Logger = None): """ - Load spaCy model. - - Args: - model_name: SpaCy model name. - logger: Logger instance. - + Load and return a spaCy language model by name. + + Parameters: + model_name (str): Name of the spaCy model to load (e.g., "en_core_web_sm"). + logger (logging.Logger, optional): Logger for informational and error messages. + Returns: - Loaded spaCy model. + nlp: Loaded spaCy language object. + + Raises: + ImportError: If spaCy is not installed. + OSError: If the specified model is not found. """ if not SPACY_AVAILABLE: raise ImportError("spaCy not available. Install with: pip install spacy") @@ -39,14 +43,14 @@ def load_spacy_model(model_name: str = "en_core_web_sm", logger: logging.Logger def split_into_sentences_spacy(text: str, nlp) -> List[str]: """ - Split text into sentences using spaCy. - - Args: - text: Input text. - nlp: SpaCy model. - + Split the input text into sentence strings using a spaCy pipeline. + + Parameters: + text (str): Text to segment into sentences. + nlp: A spaCy language pipeline or model (e.g., the object returned by `spacy.load(...)`) used to perform sentence segmentation. + Returns: - List of sentences. + List[str]: Sentence strings extracted from the text, each stripped of surrounding whitespace. """ doc = nlp(text) return [sent.text.strip() for sent in doc.sents] @@ -58,15 +62,17 @@ def split_into_chunks( logger: logging.Logger = None ) -> List[Dict[str, Any]]: """ - Split segments into manageable chunks for embedding. - - Args: - segments: List of text segments. - max_chunk_size: Maximum chunk size in characters. - logger: Logger instance. - + Breaks segments into character-limited chunks by splitting long texts at word boundaries. + + Long segments (text length > max_chunk_size) are split into smaller chunks that copy the original segment, set the chunked text under the "text" key, and mark the chunk with "is_split" = True. Segments whose text length is less than or equal to max_chunk_size are returned unchanged. + + Parameters: + segments (List[Dict[str, Any]]): List of segment dictionaries; each dictionary is expected to contain a "text" key. + max_chunk_size (int): Maximum allowed chunk size in characters. + logger (logging.Logger, optional): Optional logger used to record chunking summary. + Returns: - List of chunked segments. + List[Dict[str, Any]]: List of segment dictionaries including original and generated chunk dictionaries (generated chunks have "is_split" = True). """ chunks = [] @@ -116,14 +122,16 @@ def merge_short_segments( min_length: int = 50 ) -> List[Dict[str, Any]]: """ - Merge very short segments for better context. - - Args: - segments: List of segments. - min_length: Minimum segment length. - + Merge consecutive short text segments into larger segments to preserve context. + + Segments with a "text" length less than `min_length` are concatenated (space-separated) into a single segment. The merged segment is created by copying the first buffered segment, replacing its "text" with the concatenated text and setting "is_merged" to True. + + Parameters: + segments (List[Dict[str, Any]]): List of segment dictionaries; each should contain a "text" key. + min_length (int): Minimum number of characters for a segment to be considered "long" and not merged. + Returns: - List of merged segments. + List[Dict[str, Any]]: A list of segments where consecutive short segments have been merged. Merged segments include an "is_merged" key set to True. """ if not segments: return [] @@ -156,4 +164,4 @@ def merge_short_segments( merged_segment["is_merged"] = True merged.append(merged_segment) - return merged + return merged \ No newline at end of file diff --git a/examkit/nlp/topic_mapping.py b/examkit/nlp/topic_mapping.py index 2f5d7be..4515650 100644 --- a/examkit/nlp/topic_mapping.py +++ b/examkit/nlp/topic_mapping.py @@ -11,13 +11,16 @@ def load_topics(topics_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ - Load and normalize topics. - - Args: - topics_data: List of topic dictionaries. - + Normalize a list of topic dictionaries into a consistent structure. + + Parameters: + topics_data (List[Dict[str, Any]]): List of topic objects. Each object may include + the keys `id`, `name`, `keywords`, `weight`, and `description`. If `id` is missing, + a fallback ID is derived from `name` (lowercased, spaces replaced with underscores). + Returns: - Normalized topics list. + List[Dict[str, Any]]: A list of normalized topic dictionaries, each containing the keys + `id`, `name`, `keywords`, `weight`, and `description` with sensible defaults when absent. """ normalized = [] for topic in topics_data: @@ -40,18 +43,20 @@ def map_chunks_to_topics( logger: logging.Logger = None ) -> Dict[str, List[int]]: """ - Map text chunks to topics using embeddings. - - Args: - chunks: List of text chunks. - topics: List of topics. - chunk_embeddings: Embeddings for chunks. - topic_embeddings: Embeddings for topics. - threshold: Similarity threshold. - logger: Logger instance. - + Assigns chunks to topics based on cosine similarity between their embeddings. + + Compares each chunk embedding to each topic embedding and adds a chunk's index to a topic's list when the cosine similarity is greater than or equal to the threshold. + + Parameters: + chunks: List of chunk dictionaries (used for indexing; chunk content is not inspected). + topics: List of topic dictionaries; each must include an "id" key. + chunk_embeddings: 2D array of shape (num_chunks, embedding_dim). + topic_embeddings: 2D array of shape (num_topics, embedding_dim). + threshold (float): Minimum cosine similarity required to assign a chunk to a topic. + logger (logging.Logger, optional): If provided, logs the number of chunks mapped per topic. + Returns: - Dictionary mapping topic IDs to chunk indices. + Dict[str, List[int]]: Mapping from topic ID to a list of chunk indices assigned to that topic. """ # Calculate similarity matrix similarities = cosine_similarity(chunk_embeddings, topic_embeddings) @@ -78,15 +83,21 @@ def calculate_coverage( total_chunks: int ) -> List[Dict[str, Any]]: """ - Calculate topic coverage metrics. - - Args: - topic_mapping: Mapping of topics to chunk indices. - topics: List of topics. - total_chunks: Total number of chunks. - + Compute per-topic coverage metrics from a mapping of topic IDs to chunk indices. + + Parameters: + topic_mapping (Dict[str, List[int]]): Mapping from topic ID to list of chunk indices assigned to that topic. + topics (List[Dict[str, Any]]): List of topic dictionaries; each must include `"id"` and `"name"`, and may include `"weight"`. + total_chunks (int): Total number of chunks considered; when zero or less, coverage percentages are reported as 0.0. + Returns: - List of coverage metrics per topic. + List[Dict[str, Any]]: A list of per-topic coverage dictionaries containing: + - topic_id (str): The topic's identifier. + - name (str): The topic's display name. + - chunk_count (int): Number of chunks mapped to the topic. + - coverage_percentage (float): Percentage of total_chunks mapped to the topic (0.0–100.0). + - weight (float): Topic weight (defaults to 1.0 if missing). + - weighted_coverage (float): coverage_percentage multiplied by weight. """ coverage_data = [] @@ -113,17 +124,17 @@ def identify_gaps( min_coverage: float = 10.0 ) -> List[str]: """ - Identify topics with insufficient coverage. - - Args: - coverage_data: List of coverage metrics. - min_coverage: Minimum acceptable coverage percentage. - + Identify topic names whose coverage percentage is below a minimum threshold. + + Parameters: + coverage_data (List[Dict[str, Any]]): Per-topic coverage dictionaries containing at least the keys "name" and "coverage_percentage". + min_coverage (float): Coverage percentage threshold; topics with coverage strictly less than this value are considered gaps. + Returns: - List of under-covered topic names. + List[str]: Names of topics whose coverage percentage is less than min_coverage. """ gaps = [] for item in coverage_data: if item["coverage_percentage"] < min_coverage: gaps.append(item["name"]) - return gaps + return gaps \ No newline at end of file diff --git a/examkit/qa/checks.py b/examkit/qa/checks.py index 97b5b25..51f5946 100644 --- a/examkit/qa/checks.py +++ b/examkit/qa/checks.py @@ -12,14 +12,16 @@ def check_formula_compilation(content: str, logger: logging.Logger = None) -> Dict[str, Any]: """ - Check if LaTeX formulas in content are valid. - - Args: - content: Content with LaTeX formulas. - logger: Logger instance. - + Validate LaTeX formulas found in the given content. + + Extracts LaTeX formulas and reports which formulas failed validation. + Returns: - Dictionary with check results. + result (dict): Summary of the check with keys: + - total_formulas (int): Number of formulas found. + - valid_formulas (int): Number of formulas that passed validation. + - invalid_formulas (List[str]): Formulas that failed validation. + - passed (bool): `true` if no invalid formulas were found, `false` otherwise. """ formulas = extract_latex_formulas(content) invalid_formulas = [] @@ -43,14 +45,18 @@ def check_formula_compilation(content: str, logger: logging.Logger = None) -> Di def check_internal_links(content: str, logger: logging.Logger = None) -> Dict[str, Any]: """ - Check internal links in markdown content. - - Args: - content: Markdown content. - logger: Logger instance. - + Verify internal Markdown links point to existing heading anchors. + + Converts document headings to anchor names by lowercasing and replacing spaces with hyphens, then checks all markdown links of the form [text](#anchor) to identify links whose targets are not present among those anchors. + + Parameters: + content (str): Markdown content to inspect. + Returns: - Dictionary with check results. + dict: Result object with keys: + - total_links (int): Number of internal links found. + - broken_links (List[Tuple[str, str]]): List of tuples (link_text, link_target) for links whose target anchor was not found. + - passed (bool): `true` if no broken links were detected, `false` otherwise. """ # Find all markdown links link_pattern = r'\[([^\]]+)\]\(#([^)]+)\)' @@ -86,15 +92,18 @@ def check_keyword_recall( logger: logging.Logger = None ) -> Dict[str, Any]: """ - Check if required keywords are present in content. - - Args: - content: Content to check. - required_keywords: List of keywords that should be present. - logger: Logger instance. - + Determine which of the required keywords appear in the provided content. + + Parameters: + content (str): Text to search for keywords; matching is case-insensitive. + required_keywords (List[str]): Keywords to look for; each keyword is matched as a substring (case-insensitive). An empty list yields 100% coverage. + Returns: - Dictionary with check results. + Dict[str, Any]: Result dictionary with keys: + - total_keywords (int): Number of keywords checked. + - found_keywords (int): Number of keywords found in the content. + - missing_keywords (List[str]): Keywords that were not found. + - coverage_percentage (float): Percentage of keywords found (0–100). """ content_lower = content.lower() missing_keywords = [] @@ -118,14 +127,21 @@ def check_keyword_recall( def check_citation_presence(content: str, logger: logging.Logger = None) -> Dict[str, Any]: """ - Check if content has proper citations. - - Args: - content: Content to check. - logger: Logger instance. - + Detects video, slide, and exam citations in the given content. + + Parameters: + content (str): Text to scan for citations; looks for tokens like `[vid ...]`, `[slide ...]`, or `[exam ...]`. + Returns: - Dictionary with check results. + dict: { + "total_citations": int, # total number of citation tokens found + "has_citations": bool, # True if any citations were found + "citation_types": { # counts per citation type + "video": int, + "slides": int, + "exam": int + } + } """ # Find citations [vid ...], [slide ...], [exam ...] citation_pattern = r'\[(vid|slide|exam)[^\]]*\]' @@ -149,14 +165,16 @@ def check_citation_presence(content: str, logger: logging.Logger = None) -> Dict def check_equation_consistency(content: str, logger: logging.Logger = None) -> Dict[str, Any]: """ - Check if equations use consistent notation. - - Args: - content: Content with equations. - logger: Logger instance. - + Analyze LaTeX formulas in the provided content to detect potentially inconsistent equation notation. + + Parameters: + content (str): Text containing LaTeX formulas to inspect (inline or display math). + Returns: - Dictionary with check results. + result (dict): Summary of the consistency check with keys: + - total_symbols (int): Number of unique equation symbols found. + - warnings (list): List of human-readable warnings about symbols with potential inconsistent usage. + - passed (bool): `true` if no warnings were produced, `false` otherwise. """ from examkit.utils.math_utils import extract_equation_symbols @@ -195,15 +213,14 @@ def run_all_checks( logger: logging.Logger = None ) -> Dict[str, Any]: """ - Run all QA checks on content. - - Args: - content: Content to check. - required_keywords: Optional list of required keywords. - logger: Logger instance. - + Run a suite of QA checks on the provided content and aggregate their results. + + Parameters: + content (str): Markdown or text content to validate. + required_keywords (List[str], optional): If provided, include a keyword-recall check for these terms. + Returns: - Dictionary with all check results. + Dict[str, Any]: Aggregated results containing per-check dictionaries for `"formulas"`, `"links"`, `"citations"`, and `"equations"`. If `required_keywords` was supplied, includes a `"keywords"` entry. Contains `"overall_passed"` (bool) which is true only if every check that reports a `passed` field is true. """ if logger: logger.info("Running QA checks...") @@ -230,4 +247,4 @@ def run_all_checks( if logger: logger.info(f"QA checks complete. Overall: {'PASSED' if all_passed else 'WARNINGS'}") - return results + return results \ No newline at end of file diff --git a/examkit/render/pandoc_renderer.py b/examkit/render/pandoc_renderer.py index 83cd31a..db3cdc1 100644 --- a/examkit/render/pandoc_renderer.py +++ b/examkit/render/pandoc_renderer.py @@ -15,16 +15,16 @@ def render_markdown_to_pdf_pandoc( logger: logging.Logger = None ) -> bool: """ - Render markdown to PDF using Pandoc. - - Args: - markdown_path: Path to markdown file. - output_pdf: Path for output PDF. - options: Additional Pandoc options. - logger: Logger instance. - + Render a Markdown file to PDF using Pandoc. + + Parameters: + markdown_path (Path): Path to the input Markdown file. + output_pdf (Path): Path where the generated PDF will be written. + options (Optional[List[str]]): Additional Pandoc command-line options to append. + logger (logging.Logger, optional): Logger to receive informational and error messages. + Returns: - True if successful, False otherwise. + bool: `True` if Pandoc produced the PDF successfully, `False` otherwise. """ if logger: logger.info(f"Rendering with Pandoc: {markdown_path} -> {output_pdf}") @@ -64,10 +64,10 @@ def render_markdown_to_pdf_pandoc( def check_pandoc_installed() -> bool: """ - Check if Pandoc is installed. - + Determine whether Pandoc is available on the system PATH. + Returns: - True if installed, False otherwise. + True if running `pandoc --version` succeeds with exit code 0, False otherwise. """ try: result = subprocess.run( @@ -77,4 +77,4 @@ def check_pandoc_installed() -> bool: ) return result.returncode == 0 except: - return False + return False \ No newline at end of file diff --git a/examkit/render/templater.py b/examkit/render/templater.py index 7dc8e7e..da91879 100644 --- a/examkit/render/templater.py +++ b/examkit/render/templater.py @@ -14,13 +14,13 @@ def setup_jinja_environment(templates_dir: Path = None) -> Environment: """ - Set up Jinja2 environment. - - Args: - templates_dir: Directory containing templates. - + Create a Jinja2 Environment configured to load templates from a filesystem directory. + + Parameters: + templates_dir (Path | None): Path to the templates directory. If omitted, defaults to "config/templates". + Returns: - Jinja2 Environment. + Environment: A Jinja2 Environment with FileSystemLoader and `trim_blocks` and `lstrip_blocks` enabled. """ if templates_dir is None: templates_dir = Path("config/templates") @@ -40,15 +40,23 @@ def render_markdown_document( config: ExamKitConfig ) -> str: """ - Render complete markdown document from sections. - - Args: - sections: List of content sections. - session_id: Session identifier. - config: Configuration. - + Builds a complete Markdown document from structured section data for an exam preparation session. + + Parameters: + sections (List[Dict[str, Any]]): Ordered list of section dictionaries. Each section may include keys: + - "topic" (str): section title. + - "definition" (str): definition text. + - "key_formulas" (str): key formulas text. + - "derivation" (str): derivation text. + - "examples" (str): worked examples text. + - "mistakes" (str): common mistakes text. + - "revision" (str): quick revision notes. + - "citations" (str): optional sources to display with a definition. + session_id (str): Identifier to include in the document title. + config (ExamKitConfig): Configuration object (used for environment/context; not directly inspected by this function). + Returns: - Rendered markdown content. + str: The rendered Markdown document as a single string. """ # Build markdown manually (simple template) lines = [ @@ -111,15 +119,17 @@ def render_typst_document( config: ExamKitConfig ) -> str: """ - Render Typst document from markdown content. - - Args: - markdown_content: Markdown content. - session_id: Session identifier. - config: Configuration. - + Render a Typst document from rendered Markdown content. + + Builds a Typst preface (theme import and conf block with title and date) and converts common Markdown constructs (headers and simple emphasis) into Typst syntax, producing a complete Typst document. + + Parameters: + markdown_content (str): Markdown text to convert. + session_id (str): Session identifier inserted into the document title. + config (ExamKitConfig): Configuration used for rendering (controls template/formatting options). + Returns: - Rendered Typst content. + str: The complete Typst document content. """ # Convert markdown to Typst format (basic conversion) typst_lines = [ @@ -157,14 +167,14 @@ def render_typst_document( def load_template(template_name: str, templates_dir: Path = None) -> Template: """ - Load a Jinja2 template. - - Args: - template_name: Template file name. - templates_dir: Templates directory. - + Load a Jinja2 template from the templates directory. + + Parameters: + template_name (str): Name of the template file to load. + templates_dir (Path | None): Optional path to the templates directory; when omitted the configured templates directory is used. + Returns: - Loaded template. + template (Template): The loaded Jinja2 Template object. """ env = setup_jinja_environment(templates_dir) return env.get_template(template_name) @@ -187,4 +197,4 @@ def render_section_template( Rendered content. """ template = load_template(template_name, templates_dir) - return template.render(**context) + return template.render(**context) \ No newline at end of file diff --git a/examkit/render/typst_renderer.py b/examkit/render/typst_renderer.py index f14c515..c805f1c 100644 --- a/examkit/render/typst_renderer.py +++ b/examkit/render/typst_renderer.py @@ -62,15 +62,17 @@ def compile_with_typst( logger: logging.Logger ) -> bool: """ - Compile using Typst. - - Args: - input_path: Input file path. - output_path: Output PDF path. - logger: Logger instance. - + Compile the given document to PDF using Typst. + + If the input is a Markdown file (suffix .md), a temporary Typst wrapper file is written and used for compilation. The function returns False if Typst is not available, if compilation fails, or if a timeout or other error occurs. + + Parameters: + input_path (Path): Path to the source document. If it has a `.md` suffix, it will be wrapped into a `.typ` file before compilation. + output_path (Path): Destination path for the generated PDF. + logger (logging.Logger): Logger used to record informational and error messages. + Returns: - True if successful, False otherwise. + bool: `True` if PDF generation completed successfully, `False` otherwise. """ if not check_typst_installed(): logger.error("Typst not installed. Install with: brew install typst") @@ -110,13 +112,13 @@ def compile_with_typst( def create_typst_wrapper_for_markdown(markdown_path: Path) -> str: """ - Create a Typst wrapper that includes markdown content. - - Args: - markdown_path: Path to markdown file. - + Generate a minimal Typst document that embeds the contents of the given Markdown file. + + Parameters: + markdown_path (Path): Path to the Markdown file to convert. + Returns: - Typst content. + typst_content (str): A Typst-formatted document string containing converted headings and simple inline formatting. """ with open(markdown_path, 'r', encoding='utf-8') as f: md_content = f.read() @@ -158,16 +160,13 @@ def compile_with_pandoc( logger: logging.Logger ) -> bool: """ - Compile using Pandoc (fallback). - - Args: - input_path: Input markdown file. - output_path: Output PDF path. - config: Configuration. - logger: Logger instance. - + Generate a PDF from the given Markdown input using Pandoc as a fallback engine. + + Parameters: + config (ExamKitConfig): Uses `config.pdf.font_size` to set the document font size (in points). + Returns: - True if successful, False otherwise. + `true` if the PDF was generated successfully, `false` otherwise. """ logger.info(f"Compiling with Pandoc: {input_path} -> {output_path}") @@ -207,4 +206,4 @@ def compile_with_pandoc( return False except Exception as e: logger.error(f"Pandoc compilation error: {e}") - return False + return False \ No newline at end of file diff --git a/examkit/reports/coverage.py b/examkit/reports/coverage.py index 136f909..aa60440 100644 --- a/examkit/reports/coverage.py +++ b/examkit/reports/coverage.py @@ -15,15 +15,21 @@ def generate_coverage_report( logger: logging.Logger = None ) -> pd.DataFrame: """ - Generate topic coverage report. - - Args: - coverage_data: List of coverage dictionaries. - output_path: Path to save CSV report. - logger: Logger instance. - + Create a topic coverage report from coverage_data and write it to output_path as a CSV. + + The function constructs a DataFrame from coverage_data, sorts it in descending order by the + "coverage_percentage" column if present, ensures the parent directory of output_path exists, + and writes the DataFrame to CSV without an index. If a logger is provided, an info message + is emitted with the saved path. + + Parameters: + coverage_data: Iterable of dictionaries describing topics; if a dictionary contains a + "coverage_percentage" key it will be used for sorting. Each item typically includes + a topic identifier (e.g., "name") and its coverage percentage. + output_path: Filesystem path where the CSV report will be written. + Returns: - DataFrame with coverage data. + pd.DataFrame: The DataFrame created (and possibly sorted) from coverage_data. """ df = pd.DataFrame(coverage_data) @@ -43,13 +49,13 @@ def generate_coverage_report( def calculate_overall_coverage(coverage_data: List[Dict[str, Any]]) -> Dict[str, float]: """ - Calculate overall coverage statistics. - - Args: - coverage_data: List of coverage dictionaries. - + Compute summary statistics (mean, median, minimum, and maximum) for topic coverage percentages. + + Parameters: + coverage_data (List[Dict[str, Any]]): Sequence of records where each record contains a `coverage_percentage` numeric value. + Returns: - Dictionary with overall statistics. + Dict[str, float]: Dictionary with keys `"mean"`, `"median"`, `"min"`, and `"max"` mapping to their respective coverage values. If `coverage_data` is empty, all values are `0.0`. """ if not coverage_data: return {"mean": 0.0, "median": 0.0, "min": 0.0, "max": 0.0} @@ -90,13 +96,16 @@ def identify_coverage_gaps( def generate_coverage_summary(coverage_data: List[Dict[str, Any]]) -> str: """ - Generate a text summary of coverage. - - Args: - coverage_data: List of coverage dictionaries. - + Produce a human-readable summary of topic coverage statistics and low-coverage topics. + + Parameters: + coverage_data (List[Dict[str, Any]]): List of topic coverage records. Each record should include a + `coverage_percentage` numeric value and a `name` string used when listing gaps. + Returns: - Text summary. + summary (str): Multi-line text containing total topics, mean, median, min/max coverage, and a + list of topics with coverage below 10% (if any). If `coverage_data` is empty, returns + "No coverage data available.". """ if not coverage_data: return "No coverage data available." @@ -121,4 +130,4 @@ def generate_coverage_summary(coverage_data: List[Dict[str, Any]]) -> str: else: lines.append("✓ All topics have adequate coverage") - return "\n".join(lines) + return "\n".join(lines) \ No newline at end of file diff --git a/examkit/reports/export.py b/examkit/reports/export.py index 2aa7b99..3b31aa3 100644 --- a/examkit/reports/export.py +++ b/examkit/reports/export.py @@ -11,14 +11,18 @@ def generate_report(session_id: str, logger: logging.Logger) -> Dict[str, Any]: """ - Generate comprehensive report for a session. - - Args: - session_id: Session identifier. - logger: Logger instance. - + Assemble a session report by collecting coverage CSV, QA notes, and citation JSON from the out/ directory. + + Parameters: + session_id (str): Identifier used to locate out/{session_id}_coverage.csv, out/{session_id}_notes.md, and out/{session_id}_citations.json. + Returns: - Dictionary with report data. + dict: Report with keys: + - session_id: the provided session identifier. + - coverage: list of coverage records (each a dict) loaded from CSV, or empty list. + - qa: summary dict with keys `formulas_checked`, `links_verified`, `citations_found`, and `warnings` when QA notes are present, otherwise empty dict. + - citations: list loaded from citations JSON, or empty list. + - coverage_path: string path to the coverage CSV when present, otherwise None. """ out_dir = Path("out") @@ -65,11 +69,17 @@ def generate_report(session_id: str, logger: logging.Logger) -> Dict[str, Any]: def export_report_text(report: Dict[str, Any], output_path: Path) -> None: """ - Export report as text file. - - Args: - report: Report dictionary. - output_path: Output path for text file. + Write a human-readable text summary of a report to the given file path. + + Parameters: + report (Dict[str, Any]): Report dictionary produced by `generate_report`. Expected keys used: + - session_id (str): Identifier included in the header. + - coverage (List[Dict]): Optional; each item should contain `name` (str), + `coverage_percentage` (float), and `chunk_count` (int). + - qa (Dict): Optional; may contain `formulas_checked`, `links_verified`, + `citations_found`, and `warnings` (all ints). + - citations (List): Optional; list of citation entries. + output_path (Path): Filesystem path where the composed text will be written. """ lines = [ f"ExamKit Report - {report['session_id']}", @@ -106,11 +116,11 @@ def export_report_text(report: Dict[str, Any], output_path: Path) -> None: def export_report_json(report: Dict[str, Any], output_path: Path) -> None: """ - Export report as JSON file. - - Args: - report: Report dictionary. - output_path: Output path for JSON file. + Write the report dictionary to a JSON file at the specified output path. + + Parameters: + report (Dict[str, Any]): The report content to serialize. + output_path (Path): Filesystem path where the JSON file will be written. """ from examkit.utils.io_utils import write_json - write_json(report, output_path) + write_json(report, output_path) \ No newline at end of file diff --git a/examkit/synthesis/citations.py b/examkit/synthesis/citations.py index 70cdce2..43dbe32 100644 --- a/examkit/synthesis/citations.py +++ b/examkit/synthesis/citations.py @@ -12,7 +12,11 @@ class CitationManager: """Manages citations for generated content.""" def __init__(self): - """Initialize citation manager.""" + """ + Initialize the CitationManager's internal state. + + Sets up an empty list for stored citations and initializes the citation counter to 0. + """ self.citations = [] self.citation_counter = 0 @@ -24,16 +28,16 @@ def add_citation( metadata: Dict[str, Any] = None ) -> str: """ - Add a citation and return citation ID. - - Args: - source_type: Type of source (video, slide, exam, etc.). - source_id: Identifier for the source. - content: Content being cited. - metadata: Additional metadata. - + Add a citation to the manager and generate a unique citation ID. + + Parameters: + source_type (str): Source kind (e.g., "video", "slide", "exam"). + source_id (str): Identifier of the source. + content (str): The cited content or excerpt. + metadata (Dict[str, Any], optional): Additional citation metadata; defaults to empty dict. + Returns: - Citation ID. + str: Generated citation ID (e.g., "cite_1"). """ self.citation_counter += 1 citation_id = f"cite_{self.citation_counter}" @@ -51,13 +55,22 @@ def add_citation( def format_citation(self, chunk: Dict[str, Any]) -> str: """ - Format a citation string from a chunk. - - Args: - chunk: Chunk dictionary with source information. - + Return a formatted citation label for a content chunk. + + Parameters: + chunk (Dict[str, Any]): Dictionary describing the source. Recognized keys: + - "source": source type (e.g., "transcript", "asr", "slides", "exam", or other). + - For "transcript"/"asr": optional "start" (seconds) to include a timecode. + - For "slides": optional "slide_number". + - For "exam": optional "question_id". + Returns: - Formatted citation string. + str: A citation string in one of the formats: + - "[vid {timecode}]" if a transcript/asr chunk includes a start time. + - "[vid]" for transcript/asr without a start time. + - "[slide {slide_number}]" for slide chunks. + - "[exam {question_id}]" for exam chunks. + - "[{source}]" for any other source type. """ source_type = chunk.get("source", "unknown") @@ -84,13 +97,15 @@ def format_citation(self, chunk: Dict[str, Any]) -> str: def format_multiple_citations(self, chunks: List[Dict[str, Any]]) -> str: """ - Format multiple citations from chunks. - - Args: - chunks: List of chunks. - + Create a single citation string from multiple chunk descriptors. + + Formats each chunk using format_citation, removes duplicate formatted citations while preserving their original order, and joins them with a single space. + + Parameters: + chunks (List[Dict[str, Any]]): List of chunk dictionaries describing sources (e.g., transcript, slides, exam). + Returns: - Formatted citation string combining all sources. + str: Space-separated string of unique formatted citations in original order. """ citations = [] for chunk in chunks: @@ -102,22 +117,23 @@ def format_multiple_citations(self, chunks: List[Dict[str, Any]]) -> str: def export_citations(self) -> List[Dict[str, Any]]: """ - Export all citations. - + Retrieve all stored citation records. + Returns: - List of citation dictionaries. + List[Dict[str, Any]]: The internal list of citation dictionaries. Each dictionary contains the keys + `id`, `type`, `source_id`, `content`, and `metadata`. This returns the actual internal list (not a copy). """ return self.citations def get_citation_by_id(self, citation_id: str) -> Dict[str, Any]: """ - Get citation by ID. - - Args: - citation_id: Citation identifier. - + Retrieve a stored citation by its identifier. + + Parameters: + citation_id (str): The citation identifier to look up. + Returns: - Citation dictionary or None. + dict: The citation dictionary if found, or `None` if no matching citation exists. """ for citation in self.citations: if citation["id"] == citation_id: @@ -126,26 +142,28 @@ def get_citation_by_id(self, citation_id: str) -> Dict[str, Any]: def get_citations_by_type(self, source_type: str) -> List[Dict[str, Any]]: """ - Get all citations of a specific type. - - Args: - source_type: Type of source. - + Return all stored citations whose "type" field matches the given source type. + + Parameters: + source_type (str): The citation type to match. + Returns: - List of citations. + List[Dict[str, Any]]: List of citation dictionaries whose `"type"` equals `source_type`. """ return [c for c in self.citations if c["type"] == source_type] def get_citation_count(self) -> int: """ - Get total number of citations. - + Get the number of stored citations. + Returns: - Citation count. + int: Number of citations currently tracked. """ return len(self.citations) def clear(self) -> None: - """Clear all citations.""" + """ + Remove all stored citations and reset the internal citation counter to zero. + """ self.citations = [] - self.citation_counter = 0 + self.citation_counter = 0 \ No newline at end of file diff --git a/examkit/synthesis/composer.py b/examkit/synthesis/composer.py index 37d47be..55ad08f 100644 --- a/examkit/synthesis/composer.py +++ b/examkit/synthesis/composer.py @@ -27,15 +27,20 @@ def load_processed_data(session_id: str, cache_dir: Path, logger: logging.Logger) -> Dict[str, List]: """ - Load processed data from cache. - - Args: - session_id: Session identifier. - cache_dir: Cache directory. - logger: Logger instance. - + Load cached session data for transcripts, slides, and exam items. + + Reads JSONL files named _transcript.jsonl, _slides.jsonl, + and _exam.jsonl from the provided cache directory if they exist, + logging the number of items loaded for each present file. + + Parameters: + session_id (str): Session identifier used to locate cache files. + cache_dir (Path): Directory containing cached JSONL files. + logger (logging.Logger): Logger used to report load counts and warnings. + Returns: - Dictionary with loaded data. + dict: A dictionary with keys "transcript", "slides", and "exam", each mapped + to a list of loaded items (empty list if the corresponding cache file is absent). """ data = { "transcript": [], @@ -71,16 +76,25 @@ def build_pipeline( logger: logging.Logger ) -> Dict[str, Any]: """ - Main build pipeline for generating study materials. - - Args: - config: ExamKit configuration. - session_id: Session identifier. - output_pdf_path: Path for output PDF. - logger: Logger instance. - + Orchestrates the end-to-end generation of study materials (notes, citations, coverage, and optional PDF) for a given session. + + Parameters: + config (ExamKitConfig): Configuration for embedding, retrieval, LLM, and output behavior. + session_id (str): Identifier for the session whose processed data will be used. + output_pdf_path (Path): Target path for the generated PDF output. + logger (logging.Logger): Logger used for progress and error reporting. + Returns: - Dictionary with output paths and metadata. + result (Dict[str, Any]): Summary of produced artifacts and metadata with keys: + - "pdf_path": string path to the produced PDF file. + - "notes_path": string path to the generated Markdown notes. + - "citations_path": string path to the exported citations JSON. + - "coverage_path": string path to the exported coverage CSV. + - "topics_processed": number of topic sections that were produced. + - "total_citations": total count of citations recorded by the CitationManager. + + Raises: + ValueError: If no processed input chunks are found for the session (instructs to run ingestion first). """ logger.info(f"Starting build pipeline for session: {session_id}") @@ -270,4 +284,4 @@ def build_pipeline( "coverage_path": str(coverage_path), "topics_processed": len(sections), "total_citations": citation_mgr.get_citation_count() - } + } \ No newline at end of file diff --git a/examkit/synthesis/diagrams.py b/examkit/synthesis/diagrams.py index 66ac7b6..68e6f0a 100644 --- a/examkit/synthesis/diagrams.py +++ b/examkit/synthesis/diagrams.py @@ -22,17 +22,17 @@ def create_flowchart( format: str = "png" ) -> Optional[Path]: """ - Create a flowchart diagram. - - Args: - nodes: List of node dictionaries with 'id' and 'label'. - edges: List of tuples (from_id, to_id). - title: Diagram title. - output_path: Path to save diagram. - format: Output format (png, svg, pdf). - + Create a directed flowchart from the given nodes and edges. + + Parameters: + nodes: List of dictionaries each with keys 'id' (node identifier) and 'label' (display text). + edges: List of (from_id, to_id) tuples specifying directed connections between node ids. + title: Diagram title used as the graph comment. + output_path: Filesystem path where the diagram will be written (extension is added based on `format`). If omitted, the diagram is not written to disk. + format: Output file format extension to use when rendering (e.g., 'png', 'svg', 'pdf'). + Returns: - Path to generated diagram or None. + Path to the generated diagram file with the chosen extension, or `None` if Graphviz is unavailable or no `output_path` was provided. """ if not GRAPHVIZ_AVAILABLE: return None @@ -65,17 +65,17 @@ def create_concept_map( format: str = "png" ) -> Optional[Path]: """ - Create a concept map diagram. - - Args: - concepts: List of concept names. - relationships: List of tuples (concept1, relation, concept2). - title: Diagram title. - output_path: Path to save diagram. - format: Output format. - + Create an undirected concept map diagram from a list of concepts and labeled relationships. + + Parameters: + concepts (List[str]): Concept names to include as nodes. + relationships (List[tuple]): Tuples of the form (concept1, relation, concept2) describing labeled edges. + title (str): Diagram title. + output_path (Optional[Path]): Filesystem path where the diagram should be written; if omitted, no file is written. + format (str): Output file format/extension to render (e.g., "png", "pdf"). + Returns: - Path to generated diagram or None. + Path or None: Path to the generated file including the chosen extension if the diagram was rendered; `None` if Graphviz is unavailable or no output_path was provided. """ if not GRAPHVIZ_AVAILABLE: return None @@ -154,15 +154,16 @@ def generate_mermaid_diagram( logger: logging.Logger = None ) -> bool: """ - Generate diagram from Mermaid code using mermaid-cli. - - Args: - mermaid_code: Mermaid diagram code. - output_path: Output path for diagram. - logger: Logger instance. - + Generate a diagram from Mermaid code using the mermaid-cli tool. + + Creates a temporary `.mmd` file adjacent to `output_path`, invokes `mmdc` to render the diagram to `output_path`, and removes the temporary file on success. If `mmdc` is not available or rendering fails, no output file is produced and the function returns `False`. + + Parameters: + mermaid_code (str): Mermaid diagram source code. + output_path (Path): Destination path for the rendered diagram file. + Returns: - True if successful, False otherwise. + bool: `True` if the diagram was generated successfully, `False` otherwise. """ # Check if mermaid-cli is available try: @@ -194,13 +195,13 @@ def generate_mermaid_diagram( def detect_diagram_opportunity(text: str) -> Optional[str]: """ - Detect if text describes a process that could be diagrammed. - - Args: - text: Input text. - + Suggests a diagram type based on keywords found in the input text. + + Parameters: + text (str): Text to analyze for diagram-related cues. + Returns: - Diagram type suggestion or None. + One of 'flowchart', 'concept_map', or 'hierarchy' if corresponding keywords are present in the text, otherwise None. """ text_lower = text.lower() @@ -219,4 +220,4 @@ def detect_diagram_opportunity(text: str) -> Optional[str]: if any(keyword in text_lower for keyword in hierarchy_keywords): return 'hierarchy' - return None + return None \ No newline at end of file diff --git a/examkit/synthesis/ollama_client.py b/examkit/synthesis/ollama_client.py index 6428c5f..dfb2170 100644 --- a/examkit/synthesis/ollama_client.py +++ b/examkit/synthesis/ollama_client.py @@ -12,10 +12,12 @@ def check_ollama_available() -> bool: """ - Check if Ollama is available. - + Determine whether a local Ollama HTTP service is reachable. + + Performs a short HTTP GET to the local /api/tags endpoint and treats an HTTP 200 response as available. + Returns: - True if available, False otherwise. + `True` if the local Ollama service responds with HTTP 200, `False` otherwise. """ try: response = requests.get("http://localhost:11434/api/tags", timeout=2) @@ -26,10 +28,10 @@ def check_ollama_available() -> bool: def list_models() -> list: """ - List available Ollama models. - + Retrieve names of models available from the local Ollama API. + Returns: - List of model names. + list: Model name strings. Returns an empty list if the Ollama API is unreachable or does not return a 200 response. """ try: response = requests.get("http://localhost:11434/api/tags", timeout=5) @@ -51,19 +53,20 @@ def generate_completion( logger: logging.Logger = None ) -> str: """ - Generate completion using Ollama. - - Args: - prompt: User prompt. - model: Model name. - system_prompt: System prompt. - temperature: Sampling temperature. - max_tokens: Maximum tokens to generate. - offline: Enforce offline mode. - logger: Logger instance. - + Generate a text completion from a local Ollama model. + + Parameters: + prompt (str): The user prompt to send to the model. + system_prompt (Optional[str]): Optional system-level prompt to guide generation. + temperature (float): Sampling temperature controlling randomness; higher values increase randomness. + max_tokens (int): Maximum number of tokens to generate. + offline (bool): If True, require a local Ollama server to be available before attempting generation. + Returns: - Generated text. + str: The generated text produced by the model. + + Raises: + RuntimeError: If offline is True and the local Ollama server is unavailable, or if the HTTP request to Ollama fails. """ if offline and not check_ollama_available(): raise RuntimeError("Ollama not available. Start with: ollama serve") @@ -112,17 +115,20 @@ def generate_chat_completion( logger: logging.Logger = None ) -> str: """ - Generate chat completion using Ollama. - - Args: - messages: List of message dicts with 'role' and 'content'. - model: Model name. - temperature: Sampling temperature. - max_tokens: Maximum tokens to generate. - logger: Logger instance. - + Generate a chat response from the local Ollama model for a sequence of messages. + + Parameters: + messages (list): List of message dictionaries each containing 'role' (e.g., 'user'|'assistant'|'system') and 'content' (str). + model (str): Ollama model identifier to use. + temperature (float): Sampling temperature for response generation. + max_tokens (int): Maximum number of tokens to generate. + logger (logging.Logger, optional): Logger for error/debug messages (not required). + Returns: - Generated response. + str: The generated message content (empty string if none present). + + Raises: + RuntimeError: If Ollama is not available or the HTTP request to Ollama fails. """ if not check_ollama_available(): raise RuntimeError("Ollama not available") @@ -154,14 +160,13 @@ def generate_chat_completion( def pull_model(model: str, logger: logging.Logger = None) -> bool: """ - Pull a model using Ollama CLI. - - Args: - model: Model name to pull. - logger: Logger instance. - + Pull the specified Ollama model via the local Ollama CLI. + + Parameters: + model (str): Name of the model to pull. + Returns: - True if successful, False otherwise. + True if the CLI reported success (exit code 0), False otherwise. """ if logger: logger.info(f"Pulling Ollama model: {model}") @@ -177,4 +182,4 @@ def pull_model(model: str, logger: logging.Logger = None) -> bool: except Exception as e: if logger: logger.error(f"Failed to pull model: {e}") - return False + return False \ No newline at end of file diff --git a/examkit/synthesis/prompts.py b/examkit/synthesis/prompts.py index 954de5b..8179d5d 100644 --- a/examkit/synthesis/prompts.py +++ b/examkit/synthesis/prompts.py @@ -120,36 +120,92 @@ def render_definition_prompt(topic_name: str, context_chunks: list) -> str: - """Render definition prompt.""" + """ + Render a definition prompt for the given topic using the module's Jinja2 template. + + Parameters: + topic_name (str): The topic name to insert into the prompt. + context_chunks (list): Sequence of context chunks used to populate the prompt; each chunk is expected to include source metadata and the chunk content. + + Returns: + str: The rendered prompt text. + """ template = Template(DEFINITION_TEMPLATE) return template.render(topic_name=topic_name, context_chunks=context_chunks) def render_derivation_prompt(topic_name: str, context_chunks: list) -> str: - """Render derivation prompt.""" + """ + Render a filled derivation prompt for a given topic using provided context chunks. + + Parameters: + topic_name (str): Name of the topic to derive. + context_chunks (list): Iterable of context chunk objects or dicts; each chunk should include source and content used to cite and support the derivation. + + Returns: + str: The rendered prompt text with a Derivation section and inline source citations. + """ template = Template(DERIVATION_TEMPLATE) return template.render(topic_name=topic_name, context_chunks=context_chunks) def render_mistakes_prompt(topic_name: str, context_chunks: list) -> str: - """Render common mistakes prompt.""" + """ + Builds a prompt that asks for common mistakes related to a topic using the provided context chunks. + + Parameters: + context_chunks (list): Sequence of context items (e.g., mappings with source and content) to be embedded into the prompt as cited sources. + + Returns: + str: The rendered prompt text. + """ template = Template(MISTAKES_TEMPLATE) return template.render(topic_name=topic_name, context_chunks=context_chunks) def render_compare_prompt(topic_a: str, topic_b: str, context_a: list, context_b: list) -> str: - """Render compare and contrast prompt.""" + """ + Builds a compare-and-contrast prompt for two topics using their context chunks. + + Parameters: + topic_a (str): Name of the first topic. + topic_b (str): Name of the second topic. + context_a (list): Context chunks (source/text entries) related to `topic_a`. + context_b (list): Context chunks (source/text entries) related to `topic_b`. + + Returns: + str: The rendered prompt text that instructs an LLM to compare similarities, differences, + appropriate usages, and provide source-cited conclusions for the two topics. + """ template = Template(COMPARE_TEMPLATE) return template.render(topic_a=topic_a, topic_b=topic_b, context_a=context_a, context_b=context_b) def render_revision_prompt(topic_name: str, context_chunks: list) -> str: - """Render fast revision prompt.""" + """ + Builds a quick revision prompt for a topic using the provided context chunks. + + Parameters: + topic_name (str): The topic title to include in the prompt. + context_chunks (list): Sequence of context chunk objects (e.g., mappings with `source` and content) whose sources and content will be listed and cited in the prompt. + + Returns: + rendered_prompt (str): The rendered revision prompt text containing a concise revision summary, key facts, a worked example, and source citations. + """ template = Template(FAST_REVISION_TEMPLATE) return template.render(topic_name=topic_name, context_chunks=context_chunks) def render_example_prompt(topic_name: str, context_chunks: list) -> str: - """Render worked example prompt.""" + """ + Builds a worked-example prompt for a topic using the provided context chunks. + + Parameters: + topic_name (str): The topic title to include in the prompt. + context_chunks (list): Iterable of context items (each supplying content and source) to be incorporated and cited in the prompt. + + Returns: + rendered_prompt (str): The prompt text produced by rendering the worked-example template with the given topic and context chunks. + """ template = Template(EXAMPLE_TEMPLATE) - return template.render(topic_name=topic_name, context_chunks=context_chunks) + return template.render(topic_name=topic_name, context_chunks=context_chunks) \ No newline at end of file diff --git a/examkit/utils/io_utils.py b/examkit/utils/io_utils.py index babd22d..31573e8 100644 --- a/examkit/utils/io_utils.py +++ b/examkit/utils/io_utils.py @@ -10,13 +10,13 @@ def ensure_dir(path: Path) -> Path: """ - Ensure a directory exists, creating it if necessary. - - Args: - path: Directory path to ensure. - + Ensure the directory at the given path exists, creating parent directories as needed. + + Parameters: + path (Path): Directory path to ensure. + Returns: - The directory path. + Path: The same path that was provided. """ path.mkdir(parents=True, exist_ok=True) return path @@ -24,13 +24,10 @@ def ensure_dir(path: Path) -> Path: def read_json(path: Path) -> Dict[str, Any]: """ - Read and parse a JSON file. - - Args: - path: Path to JSON file. - + Load and return the JSON object stored at the given file path. + Returns: - Parsed JSON data as dictionary. + A dictionary representing the parsed JSON content of the file. """ with open(path, "r") as f: return json.load(f) @@ -38,12 +35,12 @@ def read_json(path: Path) -> Dict[str, Any]: def write_json(data: Dict[str, Any], path: Path, indent: int = 2) -> None: """ - Write data to a JSON file. - - Args: - data: Data to write. - path: Output path. - indent: JSON indentation level. + Write a mapping to the given file as JSON, creating the parent directory if it does not exist. + + Parameters: + data (Dict[str, Any]): Mapping to serialize to JSON. + path (Path): Destination file path; the parent directory will be created if missing. + indent (int): Number of spaces to use for JSON indentation. """ ensure_dir(path.parent) with open(path, "w") as f: @@ -52,13 +49,13 @@ def write_json(data: Dict[str, Any], path: Path, indent: int = 2) -> None: def read_jsonl(path: Path) -> List[Dict[str, Any]]: """ - Read a JSONL (JSON Lines) file. - - Args: - path: Path to JSONL file. - + Load objects from a JSON Lines (JSONL) file, parsing each non-empty line as JSON. + + Parameters: + path (Path): Path to the JSONL file to read. + Returns: - List of dictionaries, one per line. + List[Dict[str, Any]]: A list of objects parsed from each non-empty line in the file. """ data = [] with open(path, "r") as f: @@ -71,11 +68,13 @@ def read_jsonl(path: Path) -> List[Dict[str, Any]]: def write_jsonl(data: List[Dict[str, Any]], path: Path) -> None: """ - Write data to a JSONL file. - - Args: - data: List of dictionaries to write. - path: Output path. + Write a list of dictionaries to a newline-delimited JSON (JSONL) file. + + Each dictionary is serialized as a single JSON object on its own line using UTF-8-compatible output (serialization uses `ensure_ascii=False`). The parent directory of `path` will be created if it does not exist. + + Parameters: + data (List[Dict[str, Any]]): List of JSON-serializable dictionaries to write, one per line. + path (Path): Destination file path for the JSONL output. """ ensure_dir(path.parent) with open(path, "w") as f: @@ -97,14 +96,14 @@ def copy_file(src: Path, dst: Path) -> None: def read_text(path: Path, encoding: str = "utf-8") -> str: """ - Read text file contents. - - Args: - path: Path to text file. - encoding: Text encoding. - + Read the entire contents of a text file. + + Parameters: + path (Path): Path to the text file. + encoding (str): File encoding to use; defaults to "utf-8". + Returns: - File contents as string. + str: The file contents. """ with open(path, "r", encoding=encoding) as f: return f.read() @@ -121,4 +120,4 @@ def write_text(content: str, path: Path, encoding: str = "utf-8") -> None: """ ensure_dir(path.parent) with open(path, "w", encoding=encoding) as f: - f.write(content) + f.write(content) \ No newline at end of file diff --git a/examkit/utils/math_utils.py b/examkit/utils/math_utils.py index 483cd70..4e5670d 100644 --- a/examkit/utils/math_utils.py +++ b/examkit/utils/math_utils.py @@ -8,13 +8,13 @@ def extract_latex_formulas(text: str) -> List[str]: """ - Extract LaTeX formulas from text. - - Args: - text: Input text containing LaTeX formulas. - + Extract LaTeX formulas from the given text, returning them without surrounding dollar delimiters. + + Parameters: + text (str): Text that may contain LaTeX inline ($...$) or display ($$...$$) formulas. + Returns: - List of extracted formulas. + List[str]: A list of formula strings found (inline and display), with the surrounding `$`/`$$` removed. """ # Match inline math: $...$ inline = re.findall(r'\$([^\$]+)\$', text) @@ -25,13 +25,15 @@ def extract_latex_formulas(text: str) -> List[str]: def validate_latex_formula(formula: str) -> bool: """ - Basic validation of LaTeX formula syntax. - - Args: - formula: LaTeX formula string. - + Validate basic structural correctness of a LaTeX formula string. + + Performs lightweight checks for balanced braces, brackets, and parentheses, and rejects common invalid patterns such as unclosed command arguments and literal double dollar signs. + + Parameters: + formula (str): LaTeX formula string to validate. + Returns: - True if formula appears valid, False otherwise. + `True` if no issues are detected, `False` otherwise. """ # Check for balanced braces if formula.count('{') != formula.count('}'): @@ -56,28 +58,28 @@ def validate_latex_formula(formula: str) -> bool: def format_number(num: float, precision: int = 2) -> str: """ - Format a number with specified precision. - - Args: - num: Number to format. - precision: Decimal precision. - + Format a number to a fixed number of decimal places. + + Parameters: + num (float): Value to format. + precision (int): Number of digits after the decimal point. + Returns: - Formatted number string. + str: The formatted number as a string with exactly `precision` decimal places. """ return f"{num:.{precision}f}" def calculate_coverage_percentage(covered: int, total: int) -> float: """ - Calculate coverage percentage. - - Args: - covered: Number of covered items. - total: Total number of items. - + Compute the percentage of covered items out of a total. + + Parameters: + covered (int): Number of covered items. + total (int): Total number of items. + Returns: - Coverage percentage (0-100). + float: Coverage percentage between 0 and 100. Returns 0.0 when `total` is 0. """ if total == 0: return 0.0 @@ -104,12 +106,12 @@ def normalize_score(score: float, min_val: float, max_val: float) -> float: def extract_equation_symbols(formula: str) -> List[str]: """ Extract variable symbols from a LaTeX formula. - - Args: - formula: LaTeX formula. - + + Parameters: + formula (str): The LaTeX input string to scan for symbols. + Returns: - List of variable symbols found. + List[str]: Unique symbols found — single-letter alphabetic identifiers (a–z, A–Z) and common Greek letter names (e.g. "alpha", "beta") without the leading backslash. """ # Simple extraction of single-letter variables symbols = re.findall(r'\b([a-zA-Z])\b', formula) @@ -121,15 +123,12 @@ def extract_equation_symbols(formula: str) -> List[str]: def is_numeric(value: str) -> bool: """ Check if a string represents a numeric value. - - Args: - value: String to check. - + Returns: - True if numeric, False otherwise. + True if the string can be parsed as a float, False otherwise. """ try: float(value) return True except ValueError: - return False + return False \ No newline at end of file diff --git a/examkit/utils/text_utils.py b/examkit/utils/text_utils.py index 649402e..def7e35 100644 --- a/examkit/utils/text_utils.py +++ b/examkit/utils/text_utils.py @@ -8,13 +8,13 @@ def clean_text(text: str) -> str: """ - Clean and normalize text. - - Args: - text: Input text. - + Normalize input text by collapsing consecutive whitespace into single spaces, removing control characters in the ranges \x00-\x1f and \x7f-\x9f, and trimming leading and trailing whitespace. + + Parameters: + text (str): Input text to clean. + Returns: - Cleaned text. + str: Cleaned text with normalized whitespace and control characters removed. """ # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) @@ -25,13 +25,13 @@ def clean_text(text: str) -> str: def split_into_sentences(text: str) -> List[str]: """ - Split text into sentences using basic heuristics. - - Args: - text: Input text. - + Split text into sentences using punctuation (., !, ?) followed by whitespace. + + Parameters: + text (str): Input text to segment. + Returns: - List of sentences. + List[str]: Sentence strings with surrounding whitespace removed; empty segments are omitted. """ # Simple sentence splitting (can be improved with spaCy) sentences = re.split(r'(?<=[.!?])\s+', text) @@ -40,13 +40,13 @@ def split_into_sentences(text: str) -> List[str]: def split_into_paragraphs(text: str) -> List[str]: """ - Split text into paragraphs. - - Args: - text: Input text. - + Split text into paragraphs by using double-newline boundaries and trimming each paragraph. + + Parameters: + text (str): Text to split into paragraphs. + Returns: - List of paragraphs. + List[str]: Non-empty paragraphs with surrounding whitespace removed. """ paragraphs = text.split('\n\n') return [p.strip() for p in paragraphs if p.strip()] @@ -54,15 +54,15 @@ def split_into_paragraphs(text: str) -> List[str]: def truncate_text(text: str, max_length: int, suffix: str = "...") -> str: """ - Truncate text to a maximum length. - - Args: - text: Input text. - max_length: Maximum length. - suffix: Suffix to add if truncated. - + Truncate text to at most max_length characters, appending a suffix when truncation occurs. + + Parameters: + text (str): Input text to truncate. + max_length (int): Maximum allowed length of the returned string. + suffix (str): Suffix to append when truncation occurs (default "..."). + Returns: - Truncated text. + str: The original text if its length is <= max_length, otherwise a truncated string of length max_length that ends with the given suffix. """ if len(text) <= max_length: return text @@ -71,14 +71,14 @@ def truncate_text(text: str, max_length: int, suffix: str = "...") -> str: def extract_keywords(text: str, min_length: int = 3) -> List[str]: """ - Extract potential keywords from text (simple implementation). - - Args: - text: Input text. - min_length: Minimum keyword length. - + Extract unique candidate keywords from the given text. + + Parameters: + text (str): Input text to extract keywords from. + min_length (int): Minimum number of characters a token must have to be considered a keyword. + Returns: - List of keywords. + List[str]: A list of unique keyword strings (order not guaranteed). """ # Remove punctuation and split words = re.findall(r'\b\w+\b', text.lower()) @@ -90,26 +90,23 @@ def extract_keywords(text: str, min_length: int = 3) -> List[str]: def normalize_whitespace(text: str) -> str: """ - Normalize whitespace in text. - - Args: - text: Input text. - + Collapse all consecutive whitespace characters into single ASCII spaces and remove leading/trailing whitespace. + + Parameters: + text (str): Input string that may contain spaces, tabs, newlines, or other whitespace characters. + Returns: - Text with normalized whitespace. + str: String where runs of whitespace are replaced by a single space and leading/trailing whitespace is removed. """ return ' '.join(text.split()) def remove_urls(text: str) -> str: """ - Remove URLs from text. - - Args: - text: Input text. - + Remove HTTP(S) and www-prefixed URLs from the given text. + Returns: - Text with URLs removed. + Text with HTTP(S) and www-prefixed URLs removed. """ url_pattern = r'https?://\S+|www\.\S+' return re.sub(url_pattern, '', text) @@ -117,12 +114,12 @@ def remove_urls(text: str) -> str: def count_words(text: str) -> int: """ - Count words in text. - - Args: - text: Input text. - + Count the number of whitespace-separated tokens in the given text. + + Parameters: + text (str): Input string whose words are counted; splitting is performed on any whitespace. + Returns: - Word count. + word_count (int): Number of whitespace-separated tokens in text. """ - return len(text.split()) + return len(text.split()) \ No newline at end of file diff --git a/examkit/utils/timecode.py b/examkit/utils/timecode.py index bb69592..3196b8b 100644 --- a/examkit/utils/timecode.py +++ b/examkit/utils/timecode.py @@ -23,13 +23,15 @@ def seconds_to_timecode(seconds: float) -> str: def timecode_to_seconds(timecode: str) -> float: """ - Convert HH:MM:SS timecode to seconds. - - Args: - timecode: Timecode string (HH:MM:SS or MM:SS). - + Convert a timecode string into total seconds. + + Accepts 'HH:MM:SS', 'MM:SS', or a single numeric string; the seconds component may include a fractional part. + + Parameters: + timecode (str): Timecode in one of the accepted formats. + Returns: - Time in seconds. + float: Total seconds represented by the timecode. """ parts = timecode.split(':') if len(parts) == 3: @@ -44,13 +46,18 @@ def timecode_to_seconds(timecode: str) -> float: def format_duration(seconds: float) -> str: """ - Format duration in a human-readable way. - - Args: - seconds: Duration in seconds. - + Format a duration (in seconds) into a concise human-readable string. + + The function discards any fractional part of the input seconds and emits: + - "Xh Ym Zs" when hours > 0 + - "Ym Zs" when hours == 0 and minutes > 0 + - "Zs" when only seconds remain + + Parameters: + seconds (float): Duration in seconds; fractional seconds are discarded. + Returns: - Formatted duration string. + str: Formatted duration string (e.g., "1h 2m 3s", "5m 30s", "45s"). """ hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) @@ -66,13 +73,15 @@ def format_duration(seconds: float) -> str: def parse_vtt_timestamp(timestamp: str) -> float: """ - Parse VTT timestamp format to seconds. - - Args: - timestamp: VTT timestamp (e.g., "00:01:23.456"). - + Parse a WebVTT timestamp into total seconds. + + Fractional seconds (milliseconds) are ignored; the timestamp may be in "HH:MM:SS.ms" or "MM:SS.ms" form. + + Parameters: + timestamp (str): VTT timestamp string, e.g. "00:01:23.456" or "01:23.456". + Returns: - Time in seconds. + float: Total seconds represented by the timestamp. """ # Remove milliseconds if present if '.' in timestamp: @@ -99,13 +108,13 @@ def create_video_citation(timecode: str, description: str = "") -> str: def extract_time_range(start: float, end: float) -> Tuple[str, str]: """ - Extract time range as formatted timecodes. - - Args: - start: Start time in seconds. - end: End time in seconds. - + Return start and end times formatted as HH:MM:SS timecodes. + + Parameters: + start (float): Start time in seconds. + end (float): End time in seconds. + Returns: - Tuple of (start_timecode, end_timecode). + tuple[str, str]: A tuple (start_timecode, end_timecode) where each element is the corresponding time formatted as "HH:MM:SS". """ - return seconds_to_timecode(start), seconds_to_timecode(end) + return seconds_to_timecode(start), seconds_to_timecode(end) \ No newline at end of file diff --git a/tests/test_render.py b/tests/test_render.py index 8569d61..1063a99 100644 --- a/tests/test_render.py +++ b/tests/test_render.py @@ -58,7 +58,11 @@ def test_typst_wrapper_creation(): def test_config_loading(): - """Test configuration loading.""" + """ + Verifies that ExamKitConfig.from_yaml correctly loads ASR, LLM, and offline settings from a YAML configuration. + + Creates a temporary YAML configuration containing `asr.model`, `llm.model`, and `offline`, loads it via `ExamKitConfig.from_yaml`, and asserts the resulting object's fields match the expected values. + """ import tempfile import yaml @@ -106,4 +110,4 @@ def test_coverage_report(): summary = generate_coverage_summary(coverage_data) assert "Total Topics: 3" in summary - assert "low coverage" in summary + assert "low coverage" in summary \ No newline at end of file