# Main

In [6]:
Function_Path = "Inventory.views_pack.terminal.saudi_tsr_output_checker" # Inventory.views_pack.terminal.process_exe_data
BASE_PATH = r"C:\Users\JonathanChackoPattas\OneDrive - Maritime Support Solutions\Desktop\MSS-Automation"

In [7]:
from __future__ import annotations
import ast
import os
from pathlib import Path
from functools import lru_cache
from typing import Optional, Dict, Tuple, List, Union, Iterable, Set, Any
# from google.adk.tools.tool_context import ToolContext

FuncNode = Union[ast.FunctionDef, ast.AsyncFunctionDef]

# -----------------------------
# project indexing
# -----------------------------

_EXCLUDE_DIRS = {
    ".git", "__pycache__", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "build", "dist", "site-packages", "venv", ".venv", "env", ".env",
    ".idea", ".vscode", "node_modules", ".tox", ".eggs",
    "venv-windows", "venv-linux",
}

def create_file_path(base_path, function_path):
    function_parts = function_path.split(".")
    function_name = function_parts[-1]  # last part is the function
    module_parts = function_parts[:-1]  # everything before is the module path
    for part in module_parts:
        base_path = os.path.join(base_path, part)
    return base_path + ".py", function_name

def _iter_py_files(root: Path) -> Iterable[Path]:
    for dirpath, dirnames, filenames in os.walk(root):
        # prune excluded dirs in-place for speed
        dirnames[:] = [d for d in dirnames if d not in _EXCLUDE_DIRS]
        for f in filenames:
            if f.endswith(".py"):
                yield Path(dirpath) / f

def _to_module_qualname(base_path: Path, file_path: Path) -> str:
    rel = file_path.relative_to(base_path)
    if rel.name == "__init__.py":
        rel = rel.parent
    else:
        rel = rel.with_suffix("")
    return ".".join(rel.parts)

def _to_function_path(base_path: Path, file_path: Path, func_name: str) -> str:
    mod = _to_module_qualname(base_path, file_path)
    return f"{mod}.{func_name}" if mod else func_name

def _gather_defs(module: ast.Module) -> Tuple[Dict[str, FuncNode], Dict[str, Dict[str, FuncNode]]]:
    """Return (top_level_funcs, class_methods[class_name][func_name])."""
    top_level_funcs: Dict[str, FuncNode] = {}
    class_methods: Dict[str, Dict[str, FuncNode]] = {}

    for node in module.body:
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
            top_level_funcs[node.name] = node
        elif isinstance(node, ast.ClassDef):
            methods: Dict[str, FuncNode] = {}
            for b in node.body:
                if isinstance(b, (ast.FunctionDef, ast.AsyncFunctionDef)):
                    methods[b.name] = b
            class_methods[node.name] = methods
    return top_level_funcs, class_methods

@lru_cache(maxsize=4)
def _index_project_functions(base_path_str: str):
    """
    Returns:
        by_name: Dict[str, List[tuple[Path, str /*module*/, FuncNode]]]
        by_mod_func: Dict[str /*module.func*/, tuple[Path, FuncNode]]
    """
    base_path = Path(base_path_str).resolve()
    by_name: Dict[str, List[Tuple[Path, str, FuncNode]]] = {}
    by_mod_func: Dict[str, Tuple[Path, FuncNode]] = {}

    for py in _iter_py_files(base_path):
        try:
            src = py.read_text(encoding="utf-8")
            mod = ast.parse(src)
        except Exception:
            continue  # skip unreadable / syntactically invalid files

        top_funcs, _ = _gather_defs(mod)
        module_name = _to_module_qualname(base_path, py)
        for name, node in top_funcs.items():
            by_name.setdefault(name, []).append((py, module_name, node))
            by_mod_func[f"{module_name}.{name}"] = (py, node)

    return by_name, by_mod_func

# -----------------------------
# source slicing & calls
# -----------------------------

def _slice_with_decorators(src_lines: List[str], fn: FuncNode) -> Tuple[str, int, int]:
    """Return (code, start_line, end_line), 1-based line numbers inclusive."""
    start = fn.lineno
    if getattr(fn, "decorator_list", None):
        start = min(getattr(dec, "lineno", start) for dec in fn.decorator_list) or start
    end = getattr(fn, "end_lineno", None)
    if end is None:
        full_src = "".join(src_lines)
        seg = ast.get_source_segment(full_src, fn)
        if seg is None:
            raise RuntimeError("Unable to determine function end; please use Python 3.8+.")
        end = start + seg.count("\n")
        return seg, start, end
    return "\n".join(src_lines[start - 1 : end]), start, end

def _get_attr_chain(node: ast.AST) -> Tuple[Optional[str], List[str]]:
    """
    For something like pkg.sub.mod.helper, return ("pkg", ["sub", "mod", "helper"]).
    If not an attribute chain rooted at Name, return (None, []).
    """
    chain: List[str] = []
    cur = node
    root_name = None
    while isinstance(cur, ast.Attribute):
        chain.append(cur.attr)
        cur = cur.value
    if isinstance(cur, ast.Name):
        root_name = cur.id
        chain.reverse()
        return root_name, chain
    return None, []

def _collect_calls_and_locals(fn: FuncNode) -> tuple[set[str], list[tuple[str, list[str]]], set[str]]:
    """
    Returns:
      bare: names of bare calls like {'helper', 'slugify'}
      attrs: qualified calls like [('utils', ['slugify']), ('pkg', ['sub', 'do'])]
      bound_locals: names bound in the function (params, assignments, etc.)
    """
    bare: Set[str] = set()
    attrs: List[Tuple[str, List[str]]] = []
    bound_locals: Set[str] = set()

    # params
    args = fn.args
    for a in getattr(args, "posonlyargs", []): bound_locals.add(a.arg)
    for a in args.args: bound_locals.add(a.arg)
    if args.vararg: bound_locals.add(args.vararg.arg)
    for a in args.kwonlyargs: bound_locals.add(a.arg)
    if args.kwarg: bound_locals.add(args.kwarg.arg)

    def add_targets(t):
        if isinstance(t, ast.Name):
            bound_locals.add(t.id)
        elif isinstance(t, (ast.Tuple, ast.List)):
            for elt in t.elts:
                add_targets(elt)

    for n in ast.walk(fn):
        if isinstance(n, ast.Call):
            if isinstance(n.func, ast.Name):
                bare.add(n.func.id)
            else:
                root, chain = _get_attr_chain(n.func)
                if root and chain:
                    attrs.append((root, chain))
        elif isinstance(n, ast.Assign):
            for t in n.targets: add_targets(t)
        elif isinstance(n, ast.AnnAssign) and n.target:
            add_targets(n.target)
        elif isinstance(n, ast.AugAssign):
            add_targets(n.target)
        elif isinstance(n, ast.For):
            add_targets(n.target)
        elif isinstance(n, ast.With):
            for item in n.items:
                if item.optional_vars: add_targets(item.optional_vars)
        elif isinstance(n, ast.comprehension):
            add_targets(n.target)
        elif isinstance(n, ast.ExceptHandler) and n.name:
            bound_locals.add(n.name)

    return bare, attrs, bound_locals

# -----------------------------
# import resolution
# -----------------------------

def _resolve_relative_module(this_module: str, level: int, module: Optional[str]) -> Optional[str]:
    """
    Resolve relative 'from ... import ...' to absolute dotted module.
    this_module: e.g., 'Inventory.views_pack.terminal'
    level: 1 => from . import x  (parent)
    level: 2 => from ..pkg import y
    """
    pkg_parts = this_module.split(".")[:-1]  # package of the file
    if level > len(pkg_parts) + 1:
        return None
    base = pkg_parts[: len(pkg_parts) - (level - 1)]
    if module:
        base += module.split(".")
    return ".".join(p for p in base if p)

def _parse_import_maps(mod: ast.Module, this_module: str):
    """
    Returns:
      import_aliases: dict of local name -> absolute module dotted path
         e.g., {'utils': 'Inventory.utils', 'mod': 'Inventory.x.y'}
      from_names: dict of local imported symbol -> absolute module or module.symbol
         e.g., {'slugify': 'Inventory.utils.slugify', 'utils': 'Inventory.utils'}
    """
    import_aliases: Dict[str, str] = {}
    from_names: Dict[str, str] = {}

    for n in mod.body:
        if isinstance(n, ast.Import):
            for a in n.names:
                full = a.name  # 'pkg' or 'pkg.sub.mod'
                local = a.asname if a.asname else full.split(".")[0]
                import_aliases[local] = full
        elif isinstance(n, ast.ImportFrom):
            if n.level and n.level > 0:
                base_mod = _resolve_relative_module(this_module, n.level, n.module)
            else:
                base_mod = n.module
            if not base_mod:
                continue
            for a in n.names:
                local = a.asname if a.asname else a.name
                # Could be a submodule or a symbol; we store as fully qualified
                from_names[local] = f"{base_mod}.{a.name}"
    return import_aliases, from_names

# -----------------------------
# main API
# -----------------------------

def extract_function_source_ast(
    file_path: str | Path,
    func_or_qualname: str,
    include_helpers: bool = False,
    *,
    base_path: str | Path,
    detailed_functions: bool = False,
    recursive_helper: bool = False,
    aggressive_fallback: bool = False,  # set True to allow cross-project name fallback
    # tool_context: ToolContext
) -> Dict[str, Any]:
    """
    Extract a function or method source by name.

    Args:
      file_path: Path to the file containing the target function/method.
      func_or_qualname: "foo" or "ClassName.method".
      include_helpers: If True, also return helper function *paths* discovered
                       from calls inside the target, searching across the project.
      base_path: Project root directory. Only files under this root are considered.
      detailed_functions: If True, include detailed information about function arguments and return types.
      recursive_helper: If True, include helper functions found in the same file.
      aggressive_fallback: If True, when we can't prove a binding, include all
                           same-named top-level functions found across the project.
      tool_context: Tool context (optional for session actions).

    Returns:
      {
        "code": str,
        "start_line": int,
        "end_line": int,
        "function": str,
        "file": str,
        "helpers": List[str]  # dotted function paths across the project
      }
    """
    base = Path(base_path).resolve()
    path = Path(file_path).resolve()
    src = path.read_text(encoding="utf-8")
    src_lines = src.splitlines()

    mod = ast.parse(src)
    top_funcs, class_methods = _gather_defs(mod)

    class_name: Optional[str] = None
    func_name = func_or_qualname
    if "." in func_or_qualname:
        class_name, func_name = func_or_qualname.split(".", 1)

    target_node: Optional[FuncNode] = None
    if class_name:
        methods = class_methods.get(class_name, {})
        target_node = methods.get(func_name)
    else:
        target_node = top_funcs.get(func_name)
        if target_node is None:
            for cls, methods in class_methods.items():
                if func_name in methods:
                    target_node = methods[func_name]
                    class_name = cls
                    break

    if target_node is None:
        available = sorted(list(top_funcs.keys()) + [f"{c}.{m}" for c, ms in class_methods.items() for m in ms])
        raise ValueError(f"Function '{func_or_qualname}' not found. Available: {available}")

    main_code, start, end = _slice_with_decorators(src_lines, target_node)
    pieces = [f"# Extracted from {path.name}:{start}-{end}\n{main_code}"]

    helper_function_paths: List[str] = []
    helper_function_paths_final = []
    if include_helpers:
        by_name, by_mod_func = _index_project_functions(str(base))

        this_module = _to_module_qualname(base, path)
        import_aliases, from_names = _parse_import_maps(mod, this_module)

        # collect calls + bound locals in the function
        bare_names, qual_calls, bound_locals = _collect_calls_and_locals(target_node)

        resolved_funcs: set[str] = set()

        # ---- Bare calls: helper() ----
        for name in bare_names:
            # If the name is locally bound (param/assignment/etc.), we can't safely resolve it.
            if name in bound_locals:
                continue

            # Same-file top-level function wins
            if name in top_funcs:
                resolved_funcs.add(f"{this_module}.{name}")
                continue

            # from pkg.mod import name [as alias]
            if name in from_names:
                full = from_names[name]  # e.g., 'pkg.mod.helper'
                if full in by_mod_func:
                    resolved_funcs.add(full)
                    continue

            # No project-wide name scan unless explicitly allowed
            if aggressive_fallback:
                for _fp, module_name, _node in by_name.get(name, []):
                    # skip the exact same target function identity
                    if module_name == this_module and name == func_name:
                        continue
                    resolved_funcs.add(f"{module_name}.{name}")

        # ---- Qualified calls: utils.helper(), pkg.sub.mod.helper() ----
        for root, chain in qual_calls:
            if not chain:
                continue

            # If root is locally bound, treat as object, not module
            if root in bound_locals:
                continue

            func = chain[-1]
            prefix = chain[:-1]

            # Root can come from either 'import ... as root' OR 'from ... import root as root'
            base_mod = import_aliases.get(root)
            if not base_mod:
                # If root was imported via 'from X import root', that map points to X.root
                maybe = from_names.get(root)
                if maybe:
                    # If 'root' is actually a submodule imported via 'from X import root'
                    base_mod = maybe

            if not base_mod:
                continue  # unknown root → skip

            full_mod = ".".join([base_mod] + prefix) if prefix else base_mod
            candidate = f"{full_mod}.{func}"

            if candidate in by_mod_func:
                resolved_funcs.add(candidate)
            elif aggressive_fallback:
                for _fp, module_name, _node in by_name.get(func, []):
                    resolved_funcs.add(f"{module_name}.{func}")
        helper_function_paths = sorted(resolved_funcs)
        if detailed_functions:
            for func_path in helper_function_paths:
                p, f = create_file_path(base_path, func_path)
                # print(f, p)
                helper_function_paths_final.append(extract_function_source_ast(
                    file_path=p,
                    func_or_qualname=f,
                    include_helpers=detailed_functions,
                    base_path=base_path,
                    detailed_functions=recursive_helper,
                ))
        else:
            helper_function_paths_final = helper_function_paths

    return {
        "code": "\n".join(pieces),
        "start_line": start,
        "end_line": end,
        "function": func_or_qualname,
        "file": str(path),
        "helpers": helper_function_paths_final,
    }

In [8]:
from pydantic import BaseModel, Field, field_validator, model_validator  # Pydantic v2

class ParameterInputSchema(BaseModel):
    function_path: str = Field(str, alias="function_path")
    include_helpers: bool = Field(False, alias="include_helpers")
    base_path: str = Field(str, alias="base_path")
    detailed_functions: bool = Field(False, alias="detailed_functions")
    recursive_helper: bool = Field(False, alias="recursive_helper")
    aggressive_fallback: bool = Field(False, alias="aggressive_fallback")

    # allow using field names instead of aliases and vice-versa
    model_config = dict(populate_by_name=True)

    # @field_validator("base_path", mode="before")
    # @classmethod
    # def _coerce_to_path(cls, v):
    #     return Path(v).expanduser() if not isinstance(v, Path) else v

    # @model_validator(mode="after")
    # def _validate_paths(self):
    #     # Convert to Path
    #     self.base_path = Path(self.base_path).resolve()
    #     self.file_path = Path(self.file_path).resolve()

    #     if not self.file_path.exists():
    #         raise ValueError(f"file_path does not exist: {self.file_path}")
    #     if not self.file_path.is_file():
    #         raise ValueError("file_path must be a file")

    #     # Ensure file_path is within base_path
    #     try:
    #         self.file_path.relative_to(self.base_path)
    #     except ValueError as e:
    #         raise ValueError(
    #             f"file_path must be under base_path\n  file: {self.file_path}\n  base: {self.base_path}"
    #         ) from e
    #     return self

    # Back-compat property for code that still references the misspelling
    @property
    def reursive_helper(self) -> bool:
        return self.recursive_helper

    def to_kwargs(self) -> Dict[str, Any]:
        """Map schema to extract_function_source_ast kwargs (preserves original param names)."""
        path, func = create_file_path(str(self.base_path), str(self.function_path))
        return {
            "file_path": path,
            "func_or_qualname": func,
            "include_helpers": self.include_helpers,
            "base_path": str(self.base_path),
            "detailed_functions": self.detailed_functions,
            "recursive_helper": self.recursive_helper,   # keep original name expected by your function
            "aggressive_fallback": self.aggressive_fallback,
        }

def extract_function_source(
        params: ParameterInputSchema,  # set True to allow cross-project name fallback
        # tool_context: ToolContext
    ) -> Dict[str, Any]:
    """
    Wrapper around `extract_function_source_ast` that accepts a validated
    `ParameterInputSchema` and forwards its fields as keyword arguments.

    Parameters
    ----------
    params : ParameterInputSchema
        Contains:
          - function_path (str): Path to the function/method to extract.
          - include_helpers (bool): If True, also return helper function *paths* discovered
            from calls inside the target, searching across the project.
          - base_path (str): Project root directory. Only files under this root are considered.
          - detailed_functions (bool): If True, include detailed information about function
            arguments and return types.
          - recursive_helper (bool): If True, include helper functions found in the same file.
          - aggressive_fallback (bool): If True, when binding can't be proven, include all
            same-named top-level functions found across the project.
    tool_context : ToolContext
        Tool context (e.g., session/runtime context) passed through to the extractor.

    Returns
    -------
    Dict[str, Any]
        {
          "code": str,
          "start_line": int,
          "end_line": int,
          "function": str,
          "file": str,
          "helpers": List[str] | List[Dict[str, Any]]  # depends on detailed_helpers flags
        }
    """
    return extract_function_source_ast(
        # tool_context=tool_context,
        **params.to_kwargs(),
    )

In [9]:
path, func = create_file_path(BASE_PATH, Function_Path)
print(path, "\n"+func+"()")
# out = extract_function_source_ast(
#     file_path=path,
#     func_or_qualname=func,
#     include_helpers=True,
#     base_path=BASE_PATH,
#     detailed_functions=True,
#     reursive_helper=True,
# )
out = extract_function_source(
    params=ParameterInputSchema(
        function_path=Function_Path,
        include_helpers=True,
        base_path=BASE_PATH,
        detailed_functions=True,
        recursive_helper=True,
    )
)
import json
print(json.dumps(out, indent=2))
# for key, value in out.items():
#     print(f"\n{key}:")
#     print(value)

C:\Users\JonathanChackoPattas\OneDrive - Maritime Support Solutions\Desktop\MSS-Automation\Inventory\views_pack\terminal.py 
saudi_tsr_output_checker()
{
  "code": "# Extracted from terminal.py:455-583\n@csrf_exempt\ndef saudi_tsr_output_checker(request):\n    if request.method == 'POST':\n        try:\n            report = request.POST.get('document_type')\n            file_path = request.FILES.get('file')\n            if not file_path:\n                return JsonResponse({\"status\": \"error\", \"message\": \"No file uploaded.\", \"data\": pd.DataFrame().to_dict()}, status=400)\n            TSR = True\n            if False:\n                pass\n            ## DAMMAM DEPOTS\n            elif report == \"GLOBE-DAMMAM_Report\":\n                df = GLOBE_DAMMAM(file_path)\n                \"\"\"\n                returns ['CONTAINER_NUMBER','RCVC_DATE', 'SNTS_DATE']\n                \"\"\"\n            elif report == \"ALI-RAZA_Report\":\n                df = ALI_RAZA_DAMMAM(file_pat

# Alternative

In [10]:
import sys
sys.exit(1)

SystemExit: 1

In [None]:
import os
from pathlib import Path

def to_function_path(base_path: str, file_path: str, func_name: str) -> str:
    """
    Reverse of create_file_path: return 'module.submodule.function' from a file path and function name.
    - Handles package __init__.py (maps to the package name, not '...__init__').
    - Ensures file_path is under base_path (avoids external libraries).
    """
    base = Path(base_path).resolve()
    file = Path(file_path).resolve()

    # Ensure it's inside your project root
    try:
        rel = file.relative_to(base)
    except ValueError:
        raise ValueError(f"{file} is outside BASE_PATH {base}")

    if rel.suffix != ".py":
        raise ValueError("file_path must be a .py file")

    rel_no_ext = rel.with_suffix("")
    parts = list(rel_no_ext.parts)

    # If pointing at a package __init__.py, drop the final '__init__'
    if parts and parts[-1] == "__init__":
        parts = parts[:-1]

    module = ".".join(parts).strip(".")
    if not module:
        raise ValueError("Could not derive module name from the given path.")

    return f"{module}.{func_name}"

Function_Path = to_function_path(BASE_PATH, path, func)
print(Function_Path)  # Inventory.views_pack.terminal.process_exe_data

In [None]:
# ast_function_extractor.py
from __future__ import annotations
import ast
from pathlib import Path
from typing import Optional, Dict, Tuple, List, Union

FuncNode = Union[ast.FunctionDef, ast.AsyncFunctionDef]

def _gather_defs(module: ast.Module) -> Tuple[Dict[str, FuncNode], Dict[str, Dict[str, FuncNode]]]:
    """Return (top_level_funcs, class_methods[class_name][func_name])."""
    top_level_funcs: Dict[str, FuncNode] = {}
    class_methods: Dict[str, Dict[str, FuncNode]] = {}

    for node in module.body:
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
            top_level_funcs[node.name] = node
        elif isinstance(node, ast.ClassDef):
            methods: Dict[str, FuncNode] = {}
            for b in node.body:
                if isinstance(b, (ast.FunctionDef, ast.AsyncFunctionDef)):
                    methods[b.name] = b
            class_methods[node.name] = methods
    return top_level_funcs, class_methods


def _slice_with_decorators(src_lines: List[str], fn: FuncNode) -> Tuple[str, int, int]:
    """Return (code, start_line, end_line), 1-based line numbers inclusive."""
    start = fn.lineno
    if getattr(fn, "decorator_list", None):
        start = min(getattr(dec, "lineno", start) for dec in fn.decorator_list) or start
    end = getattr(fn, "end_lineno", None)
    if end is None:
        # Fallback for very old Python: try ast.get_source_segment
        full_src = "".join(src_lines)
        seg = ast.get_source_segment(full_src, fn)
        if seg is None:
            raise RuntimeError("Unable to determine function end; please use Python 3.8+.")
        # Best-effort end line calc
        end = start + seg.count("\n")
        return seg, start, end
    return "\n".join(src_lines[start - 1 : end]), start, end


def _called_top_level_functions(fn: FuncNode) -> List[str]:
    """Naive: collect ast.Name() calls used by this function."""
    called: set[str] = set()
    for n in ast.walk(fn):
        if isinstance(n, ast.Call) and isinstance(n.func, ast.Name):
            called.add(n.func.id)
    return sorted(called)


def extract_function_source_ast(
    file_path: str | Path,
    func_or_qualname: str,
    include_helpers: bool = False,
) -> dict:
    """
    Extract a function or method source by name.
    - func_or_qualname: "foo" or "ClassName.method"
    - include_helpers=True: also append any same-file top-level helper functions
      that are directly called by the target (naive name-based detection).
    Returns: {"code": str, "start_line": int, "end_line": int, "function": str, "file": str}
    """
    path = Path(file_path)
    src = path.read_text(encoding="utf-8")
    src_lines = src.splitlines()

    mod = ast.parse(src)
    top_funcs, class_methods = _gather_defs(mod)

    class_name: Optional[str] = None
    func_name = func_or_qualname
    if "." in func_or_qualname:
        class_name, func_name = func_or_qualname.split(".", 1)

    target_node: Optional[FuncNode] = None
    if class_name:
        methods = class_methods.get(class_name, {})
        target_node = methods.get(func_name)
    else:
        target_node = top_funcs.get(func_name)
        # also allow class methods lookup by qualname if provided differently
        if target_node is None:
            for cls, methods in class_methods.items():
                if func_name in methods:
                    # ambiguous unless qualname given; pick first match
                    target_node = methods[func_name]
                    class_name = cls
                    break

    if target_node is None:
        available = sorted(list(top_funcs.keys()) + [f"{c}.{m}" for c, ms in class_methods.items() for m in ms])
        raise ValueError(f"Function '{func_or_qualname}' not found. Available: {available}")

    main_code, start, end = _slice_with_decorators(src_lines, target_node)
    pieces = [f"# Extracted from {path.name}:{start}-{end}\n{main_code}"]
    helper_function_paths = []
    if include_helpers and not class_name:
        called = _called_top_level_functions(target_node)
        helpers = [name for name in called if name in top_funcs and name != func_name]
        for h in helpers:
            # h_code, hs, he = _slice_with_decorators(src_lines, top_funcs[h])
            # pieces.append(f"\n# Helper '{h}' from {path.name}:{hs}-{he}\n{h_code}")
            helper_function_paths.append(to_function_path(BASE_PATH, file_path, h))

    return {
        "code": "\n".join(pieces),
        "start_line": start,
        "end_line": end,
        "function": func_or_qualname,
        "file": str(path),
        "helpers": helper_function_paths,
    }

print(extract_function_source_ast(path, func, include_helpers=True)) # ['code']

In [None]:
# ast_function_extractor.py
from __future__ import annotations
import ast
from pathlib import Path
from typing import Optional, Dict, Tuple, List, Union

FuncNode = Union[ast.FunctionDef, ast.AsyncFunctionDef]

def _gather_defs(module: ast.Module) -> Tuple[Dict[str, FuncNode], Dict[str, Dict[str, FuncNode]]]:
    """Return (top_level_funcs, class_methods[class_name][func_name])."""
    top_level_funcs: Dict[str, FuncNode] = {}
    class_methods: Dict[str, Dict[str, FuncNode]] = {}

    for node in module.body:
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
            top_level_funcs[node.name] = node
        elif isinstance(node, ast.ClassDef):
            methods: Dict[str, FuncNode] = {}
            for b in node.body:
                if isinstance(b, (ast.FunctionDef, ast.AsyncFunctionDef)):
                    methods[b.name] = b
            class_methods[node.name] = methods
    return top_level_funcs, class_methods


def _slice_with_decorators(src_lines: List[str], fn: FuncNode) -> Tuple[str, int, int]:
    """Return (code, start_line, end_line), 1-based line numbers inclusive."""
    start = fn.lineno
    if getattr(fn, "decorator_list", None):
        start = min(getattr(dec, "lineno", start) for dec in fn.decorator_list) or start
    end = getattr(fn, "end_lineno", None)
    if end is None:
        # Fallback for very old Python: try ast.get_source_segment
        full_src = "".join(src_lines)
        seg = ast.get_source_segment(full_src, fn)
        if seg is None:
            raise RuntimeError("Unable to determine function end; please use Python 3.8+.")
        # Best-effort end line calc
        end = start + seg.count("\n")
        return seg, start, end
    return "\n".join(src_lines[start - 1 : end]), start, end


def _called_top_level_functions(fn: FuncNode) -> List[str]:
    """Naive: collect ast.Name() calls used by this function."""
    called: set[str] = set()
    for n in ast.walk(fn):
        if isinstance(n, ast.Call) and isinstance(n.func, ast.Name):
            called.add(n.func.id)
    return sorted(called)


def extract_function_source_ast(
    file_path: str | Path,
    func_or_qualname: str,
    include_helpers: bool = False,
) -> dict:
    """
    Extract a function or method source by name.
    - func_or_qualname: "foo" or "ClassName.method"
    - include_helpers=True: also append any same-file top-level helper functions
      that are directly called by the target (naive name-based detection).
    Returns: {"code": str, "start_line": int, "end_line": int, "function": str, "file": str}
    """
    path = Path(file_path)
    src = path.read_text(encoding="utf-8")
    src_lines = src.splitlines()

    mod = ast.parse(src)
    top_funcs, class_methods = _gather_defs(mod)

    class_name: Optional[str] = None
    func_name = func_or_qualname
    if "." in func_or_qualname:
        class_name, func_name = func_or_qualname.split(".", 1)

    target_node: Optional[FuncNode] = None
    if class_name:
        methods = class_methods.get(class_name, {})
        target_node = methods.get(func_name)
    else:
        target_node = top_funcs.get(func_name)
        # also allow class methods lookup by qualname if provided differently
        if target_node is None:
            for cls, methods in class_methods.items():
                if func_name in methods:
                    # ambiguous unless qualname given; pick first match
                    target_node = methods[func_name]
                    class_name = cls
                    break

    if target_node is None:
        available = sorted(list(top_funcs.keys()) + [f"{c}.{m}" for c, ms in class_methods.items() for m in ms])
        raise ValueError(f"Function '{func_or_qualname}' not found. Available: {available}")

    main_code, start, end = _slice_with_decorators(src_lines, target_node)
    pieces = [f"# Extracted from {path.name}:{start}-{end}\n{main_code}"]

    if include_helpers and not class_name:
        called = _called_top_level_functions(target_node)
        helpers = [name for name in called if name in top_funcs and name != func_name]
        for h in helpers:
            h_code, hs, he = _slice_with_decorators(src_lines, top_funcs[h])
            pieces.append(f"\n# Helper '{h}' from {path.name}:{hs}-{he}\n{h_code}")

    return {
        "code": "\n".join(pieces),
        "start_line": start,
        "end_line": end,
        "function": func_or_qualname,
        "file": str(path),
    }


if __name__ == "__main__":
    out = extract_function_source_ast(path, func) # , include_helpers=True
out

In [None]:
# libcst_function_extractor.py
from __future__ import annotations
from pathlib import Path
import libcst as cst
from libcst import FunctionDef, ClassDef
from libcst.metadata import MetadataWrapper, ParentNodeProvider, PositionProvider

def extract_function_source_libcst(file_path: str | Path, func_or_qualname: str) -> dict:
    """
    Extract the exact function/method source with original formatting preserved.
    Supports:
      - "foo" (top-level)
      - "ClassName.method" (class method)
    Returns: {"code": str, "start_line": int, "end_line": int, "function": str, "file": str}
    """
    path = Path(file_path)
    src = path.read_text(encoding="utf-8")
    module = cst.parse_module(src)
    wrapper = MetadataWrapper(module)
    pos = wrapper.resolve(PositionProvider)
    parent = wrapper.resolve(ParentNodeProvider)

    class_name = None
    func_name = func_or_qualname
    if "." in func_or_qualname:
        class_name, func_name = func_or_qualname.split(".", 1)

    matches: list[FunctionDef] = []

    class Finder(cst.CSTVisitor):
        METADATA_DEPENDENCIES = (ParentNodeProvider, PositionProvider)

        def visit_FunctionDef(self, node: FunctionDef) -> None:
            if node.name.value != func_name:
                return
            p = parent[node]
            if class_name:
                if isinstance(p, ClassDef) and p.name.value == class_name:
                    matches.append(node)
            else:
                # top-level if parent is Module
                from libcst import Module
                if isinstance(p, Module):
                    matches.append(node)

    wrapper.visit(Finder())

    if not matches:
        # Build a list of available names for a helpful error
        available: list[str] = []
        class Collector(cst.CSTVisitor):
            METADATA_DEPENDENCIES = (ParentNodeProvider,)
            def visit_FunctionDef(self, node: FunctionDef) -> None:
                p = parent[node]
                if isinstance(p, cst.Module):
                    available.append(node.name.value)
                elif isinstance(p, ClassDef):
                    available.append(f"{p.name.value}.{node.name.value}")

        wrapper.visit(Collector())
        raise ValueError(f"Function '{func_or_qualname}' not found. Available: {sorted(available)}")

    node = matches[0]  # pick first match if multiple
    code = module.code_for_node(node)
    r = pos[node]  # CodeRange(start=(line, col), end=(line, col))

    return {
        "code": code,
        "start_line": r.start.line,
        "end_line": r.end.line,
        "function": func_or_qualname,
        "file": str(path),
    }


if __name__ == "__main__":
    print(extract_function_source_libcst(path, func)["code"])

In [None]:
# llama_index_extractor.py
try:
    from llama_index.core import SimpleDirectoryReader
    from llama_index.core.node_parser import CodeSplitter
except ImportError:
    from llama_index import SimpleDirectoryReader
    from llama_index.node_parser import CodeSplitter

# Load a single Python file as a "document"
docs = SimpleDirectoryReader(input_files=[Path(path)]).load_data()

# Split by lines with overlap (keeps function/class blocks coherent)
splitter = CodeSplitter(
    language="python",
    chunk_lines=60,
    chunk_lines_overlap=10,
    max_chars=2000,
)

nodes = splitter.get_nodes_from_documents(docs)

print(f"Total chunks: {len(nodes)}")
for i, n in enumerate(nodes, 1):
    meta = getattr(n, "metadata", {}) or {}
    start = meta.get("start_line") or meta.get("start") or "?"
    end = meta.get("end_line") or meta.get("end") or "?"
    print(f"\n--- Chunk {i} [{start}-{end}] ---")
    print(n.text[:400])  # preview first 400 chars

In [None]:
# langchain_recursive_extractor.py
try:
    from langchain_text_splitters import RecursiveCharacterTextSplitter, Language
except ImportError:
    from langchain.text_splitter import RecursiveCharacterTextSplitter, Language

python_text = Path(path).read_text(encoding="utf-8")

py = RecursiveCharacterTextSplitter.from_language(
    language=Language.PYTHON,
    chunk_size=800,
    chunk_overlap=100,
)

chunks = py.split_text(python_text)

print(f"Total chunks: {len(chunks)}")
for i, ch in enumerate(chunks[:5], 1):  # preview first 5
    print(f"\n--- Chunk {i} ---")
    print(ch[:400])

In [None]:
# langchain_python_extractor.py
try:
    from langchain_text_splitters import PythonCodeTextSplitter
except ImportError:
    from langchain.text_splitter import PythonCodeTextSplitter

python_code = Path(path).read_text(encoding="utf-8")

splitter = PythonCodeTextSplitter(
    chunk_size=100,
    chunk_overlap=20,
)

chunks = splitter.split_text(python_code)

print(f"Total chunks: {len(chunks)}")
for i, ch in enumerate(chunks[:10], 1):  # preview first 10 small chunks
    print(f"\n--- Chunk {i} ---")
    print(ch)
