Have defined functions to access the data from a Monolithic Data Source.

The functions defined are for :
Cloning the GitHub Repository
Tranversing through all the files to easy out the sorting method.
Give a JSON file.

In [None]:
%%writefile monolith_inspector.py
#!/usr/bin/env python3
"""
Monolith Inspector for Java (jPetStore-ready)

What it does
------------
- Clones a repo (optional) OR scans a local path
- Emits:
  out/
    file_structure.md
    file_structure.json
    knowledge_graph.graphml
    knowledge_graph.json
    coupling_metrics.csv   # package-level afferent/efferent coupling

Graph nodes
-----------
- package: com.example.foo
- class:   com.example.foo.Bar
- method:  com.example.foo.Bar#baz(argTypes)

Graph edges (MultiDiGraph, directed)
------------------------------------
- package -> class                  (contains)
- class   -> class                  (extends / implements)
- class   -> class/package          (import depends-on)
- class   -> method                 (defines)
- class   -> class/method?          (calls – best-effort via javalang)

Notes
-----
- Java resolution of method calls is heuristic (no type solver). Still great for macro structure.
- Safe on large repos; skips generated/build dirs.
"""

import argparse
import json
import os
import re
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
from typing import List
import javalang
import networkx as nx

# ----------------------------
# Helpers
# ----------------------------

SKIP_DIRS = {
    ".git", ".idea", ".vscode", "target", "build", "out", ".gradle", ".mvn",
    "node_modules"
}

JAVA_EXT = {".java"}

def run(cmd, cwd=None) -> str:
    p = subprocess.run(cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    if p.returncode != 0:
        raise RuntimeError(f"Command failed: {' '.join(cmd)}\n{p.stdout}")
    return p.stdout

def maybe_clone(repo, branch, dest) -> Path:
    dest = Path(dest).expanduser().resolve()
    if dest.exists() and any(dest.iterdir()):
        return dest
    dest.parent.mkdir(parents=True, exist_ok=True)
    run(["git", "clone", "--depth", "1", "-b", branch, repo, str(dest)])
    return dest

def walk_files(root: Path):
    for dirpath, dirnames, filenames in os.walk(root):
        # prune skip dirs in-place
        dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS and not d.startswith(".")]
        for f in filenames:
            p = Path(dirpath) / f
            if p.suffix.lower() in JAVA_EXT:
                yield p

def build_tree_markdown(root: Path) -> str:
    """Return a Markdown tree (like tree) but light-weight."""
    lines = [f"# File Structure for {root.name}\n"]

    def relpath(p): return str(p.relative_to(root))

    def tree(prefix: str, path: Path):
        entries = sorted([*path.iterdir()], key=lambda x: (x.is_file(), x.name.lower()))
        for i, e in enumerate(entries):
            if e.name in SKIP_DIRS or e.name.startswith("."):
                continue
            connector = "└─" if i == len(entries)-1 else "├─"
            lines.append(f"{prefix}{connector} {e.name}")
            if e.is_dir():
                child_prefix = f"{prefix}{'   ' if i == len(entries)-1 else '│  '}"
                tree(child_prefix, e)

    tree("", root)
    return "\n".join(lines)

def tree_to_json(root: Path):
    def node(path: Path):
        if path.is_dir():
            return {
                "name": path.name,
                "type": "dir",
                "children": [
                    node(c) for c in sorted(path.iterdir(), key=lambda x: (x.is_file(), x.name.lower()))
                    if c.name not in SKIP_DIRS and not c.name.startswith(".")
                ]
            }
        else:
            return {"name": path.name, "type": "file"}
    return node(root)

# ----------------------------
# Java parsing (javalang)
# ----------------------------

def parse_java(path: Path):
    try:
        src = path.read_text(encoding="utf-8", errors="ignore")
        tree = javalang.parse.parse(src)
        return src, tree
    except Exception:
        return None, None

def locate_package(tree) -> str:
    try:
        return tree.package.name if tree.package else ""
    except Exception:
        return ""

# from typing import List
def short_to_fqcn(short: str, pkg: str, imports: List[str]) -> str:

    """
    Best effort: if short is fully qualified, keep it.
    If not, try imports; else qualify with current package.
    """
    if "." in short and short[0].islower() is False:  # looks like fqcn (e.g., com.foo.Bar)
        return short
    base = short.split(".")[0]
    for imp in imports:
        if imp.endswith("." + base) or imp.endswith("." + base + ".*"):
            # if wildcard import, we can't pin class – return package-level
            if imp.endswith(".*"):
                return imp[:-2] + "." + base
            return imp
    return f"{pkg}.{base}" if pkg else base

def method_sig(pkg: str, cls: str, m) -> str:
    # com.pets.Store#find(String,int)
    params = ",".join([getattr(p.type, "name", "Object") for p in (m.parameters or [])])
    return f"{pkg}.{cls}#{m.name}({params})" if pkg else f"{cls}#{m.name}({params})"

# ----------------------------
# Graph construction
# ----------------------------

def analyze_repo(root: Path):
    """
    Returns:
      - graph (MultiDiGraph)
      - file_index: list of java files processed
      - package_coupling: dict of pkg -> {"afferent": set, "efferent": set}
    """
    G = nx.MultiDiGraph()
    file_index = []
    package_coupling = defaultdict(lambda: {"afferent": set(), "efferent": set()})

    for jf in walk_files(root):
        src, tree = parse_java(jf)
        if tree is None:
            continue
        file_index.append(str(jf.relative_to(root)))

        pkg = locate_package(tree)
        imports = [imp.path for imp in (tree.imports or []) if hasattr(imp, "path")]
        # add package node
        if pkg:
            G.add_node(f"package:{pkg}", kind="package", name=pkg)

        # collect class/interface declarations
        types = [t for t in tree.types if hasattr(t, "name")]
        for t in types:
            cls_name = t.name
            fqcn = f"{pkg}.{cls_name}" if pkg else cls_name
            G.add_node(f"class:{fqcn}", kind="class", name=fqcn, file=str(jf.relative_to(root)))

            # package contains class
            if pkg:
                G.add_edge(f"package:{pkg}", f"class:{fqcn}", kind="contains")

            # extends/implements edges
            if getattr(t, "extends", None):
                base = getattr(t.extends, "name", None) or str(t.extends)
                super_fq = short_to_fqcn(base, pkg, imports)
                G.add_node(f"class:{super_fq}", kind="class", name=super_fq)
                G.add_edge(f"class:{fqcn}", f"class:{super_fq}", kind="extends")

                # coupling: class pkg depends on super pkg
                super_pkg = ".".join(super_fq.split(".")[:-1])
                if pkg and super_pkg and super_pkg != pkg:
                    package_coupling[pkg]["efferent"].add(super_pkg)
                    package_coupling[super_pkg]["afferent"].add(pkg)

            impls = getattr(t, "implements", []) or []
            for impl in impls:
                iname = getattr(impl, "name", None) or str(impl)
                int_fq = short_to_fqcn(iname, pkg, imports)
                G.add_node(f"class:{int_fq}", kind="class", name=int_fq)
                G.add_edge(f"class:{fqcn}", f"class:{int_fq}", kind="implements")

                int_pkg = ".".join(int_fq.split(".")[:-1])
                if pkg and int_pkg and int_pkg != pkg:
                    package_coupling[pkg]["efferent"].add(int_pkg)
                    package_coupling[int_pkg]["afferent"].add(pkg)

            # import edges (package-level dependency)
            for imp in imports:
                # if import is a class, link class->that class; also package coupling
                imported = imp
                G.add_node(f"class:{imported}", kind="class", name=imported)
                G.add_edge(f"class:{fqcn}", f"class:{imported}", kind="imports")

                imp_pkg = ".".join(imported.split(".")[:-1]) if "." in imported else ""
                if pkg and imp_pkg and imp_pkg != pkg and not imported.endswith(".*"):
                    package_coupling[pkg]["efferent"].add(imp_pkg)
                    package_coupling[imp_pkg]["afferent"].add(pkg)

            # methods + definitions
            methods = [m for m in getattr(t, "methods", [])]
            for m in methods:
                msig = method_sig(pkg, cls_name, m)
                G.add_node(f"method:{msig}", kind="method", name=msig, file=str(jf.relative_to(root)))
                G.add_edge(f"class:{fqcn}", f"method:{msig}", kind="defines")

                # method calls (best-effort)
                # javalang gives MethodInvocation with .qualifier and .member
                # We’ll edge from class -> (guessed) target class or method token
                body = m.body or []
                try:
                    # flatten nodes
                    for path, node in m.filter(javalang.tree.MethodInvocation):
                        qual = node.qualifier  # may be a class/variable name
                        member = node.member
                        target = None

                        if qual:
                            target = short_to_fqcn(qual, pkg, imports)
                            G.add_node(f"class:{target}", kind="class", name=target)
                            G.add_edge(f"class:{fqcn}", f"class:{target}", kind="calls")

                            # package coupling for calls
                            tgt_pkg = ".".join(target.split(".")[:-1])
                            if pkg and tgt_pkg and tgt_pkg != pkg:
                                package_coupling[pkg]["efferent"].add(tgt_pkg)
                                package_coupling[tgt_pkg]["afferent"].add(pkg)
                        else:
                            # unknown receiver – record as method token under this class namespace
                            token = f"{pkg}.{member}" if pkg else member
                            G.add_node(f"method:{token}", kind="method", name=token)
                            G.add_edge(f"class:{fqcn}", f"method:{token}", kind="calls")

                except Exception:
                    pass

    return G, file_index, package_coupling

# ----------------------------
# Outputs
# ----------------------------

def write_outputs(root: Path, outdir: Path, G: nx.MultiDiGraph, file_index, package_coupling):
    outdir.mkdir(parents=True, exist_ok=True)

    # file structure
    md = build_tree_markdown(root)
    (outdir / "file_structure.md").write_text(md, encoding="utf-8")
    struct_json = tree_to_json(root)
    (outdir / "file_structure.json").write_text(json.dumps(struct_json, indent=2), encoding="utf-8")

    # graph
    nx.write_graphml(G, outdir / "knowledge_graph.graphml")

    # json dump
    nodes = [{"id": n, **G.nodes[n]} for n in G.nodes]
    edges = [{"u": u, "v": v, "key": k, **G.edges[u, v, k]} for u, v, k in G.edges(keys=True)]
    (outdir / "knowledge_graph.json").write_text(json.dumps({"nodes": nodes, "edges": edges}, indent=2), encoding="utf-8")

    # coupling metrics
    import csv
    with (outdir / "coupling_metrics.csv").open("w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["package", "afferent_count", "efferent_count", "afferent_set", "efferent_set"])
        for pkg, ce in sorted(package_coupling.items()):
            aff = sorted(ce["afferent"])
            eff = sorted(ce["efferent"])
            w.writerow([pkg, len(aff), len(eff), ";".join(aff), ";".join(eff)])

    # tiny console summary
    print(f"[OK] Wrote outputs to: {outdir}")
    print(f" - file_structure.md / .json")
    print(f" - knowledge_graph.graphml / .json")
    print(f" - coupling_metrics.csv")
    print(f"Java files parsed: {len(file_index)}")

# ----------------------------
# CLI
# ----------------------------

def main():
    ap = argparse.ArgumentParser(description="Parse Java monolith (like jPetStore) and build a knowledge graph.")
    ap.add_argument("--repo", help="Git repo URL (optional). If omitted, use --path.", default=None)
    ap.add_argument("--branch", help="Branch to clone", default="master")
    ap.add_argument("--path", help="Local path to repo (if you already cloned).", default=None)
    ap.add_argument("--out", help="Output directory", default="out")
    args = ap.parse_args()

    if not args.repo and not args.path:
        ap.error("Provide either --repo or --path")

    if args.repo:
        root = maybe_clone(args.repo, args.branch, "./_repo_checkout")
    else:
        root = Path(args.path).expanduser().resolve()
        if not root.exists():
            raise SystemExit(f"Path not found: {root}")

    G, files, coupling = analyze_repo(root)
    write_outputs(root, Path(args.out), G, files, coupling)

if __name__ == "__main__":
    main()

Overwriting monolith_inspector.py


In [None]:
%pip install javalang networkx



Takes the Monolithic Data from an open source GitHub link, and runs the functions defined previously.

In [None]:
import subprocess
import os

# Create a directory for the output
output_dir = "analysis_output"
os.makedirs(output_dir, exist_ok=True)

# Run the script with a sample repo (jPetStore)
# Assuming the script is saved as 'monolith_inspector.py' in the current directory
script_name = "monolith_inspector.py"

# The script content is already in monolith_inspector.py due to the %%writefile magic command

command = ["python3", script_name, "--repo", "https://github.com/KimJongSung/jPetStore.git", "--out", output_dir]

try:
    process = subprocess.run(command, capture_output=True, text=True, check=True)
    print("Script executed successfully!")
    print("Stdout:\n", process.stdout)
    print("Stderr:\n", process.stderr)
except subprocess.CalledProcessError as e:
    print(f"Error executing script: {e}")
    print("Stdout:\n", e.stdout)
    print("Stderr:\n", e.stderr)

Script executed successfully!
Stdout:
 [OK] Wrote outputs to: analysis_output
 - file_structure.md / .json
 - knowledge_graph.graphml / .json
 - coupling_metrics.csv
Java files parsed: 72

Stderr:
 


Displaying a sample size of the knowledge graph created.

In [None]:
import json
import os

output_dir = "analysis_output"
graph_json_path = os.path.join(output_dir, "knowledge_graph.json")

if os.path.exists(graph_json_path):
    with open(graph_json_path, 'r') as f:
        graph_data = json.load(f)
    # Displaying the full JSON might be too large, so display a part of it or a summary
    # For simplicity, let's display the keys and the first few items of nodes and edges
    print("Knowledge Graph JSON structure:")
    print(f"Keys: {graph_data.keys()}")
    print("\nFirst 5 Nodes:")
    for i, node in enumerate(graph_data.get('nodes', [])[:5]):
        print(node)
    print("\nFirst 5 Edges:")
    for i, edge in enumerate(graph_data.get('edges', [])[:5]):
        print(edge)

else:
    print(f"Knowledge graph JSON file not found at: {graph_json_path}")

Knowledge Graph JSON structure:
Keys: dict_keys(['nodes', 'edges'])

First 5 Nodes:
{'id': 'package:org.springframework.samples.jpetstore.domain', 'kind': 'package', 'name': 'org.springframework.samples.jpetstore.domain'}
{'id': 'class:org.springframework.samples.jpetstore.domain.Product', 'kind': 'class', 'name': 'org.springframework.samples.jpetstore.domain.Product', 'file': 'src/main/java/org/springframework/samples/jpetstore/domain/Product.java'}
{'id': 'class:java.io.Serializable', 'kind': 'class', 'name': 'java.io.Serializable'}
{'id': 'method:org.springframework.samples.jpetstore.domain.Product#getProductId()', 'kind': 'method', 'name': 'org.springframework.samples.jpetstore.domain.Product#getProductId()', 'file': 'src/main/java/org/springframework/samples/jpetstore/domain/Product.java'}
{'id': 'method:org.springframework.samples.jpetstore.domain.Product#setProductId(String)', 'kind': 'method', 'name': 'org.springframework.samples.jpetstore.domain.Product#setProductId(String)', 

Now, to take the structural knowledge graph and convert it into a semantic context to enable accurate queries and estabilish showing how the code elements are connected.

Knowledge Graph holds the nodes - entities with texts and fake embeddings, and edges which contains the relationships.

We output the final enriched graph and index for nodes.

In [None]:
%%writefile graphrag_merger.py

#!/usr/bin/env python3
"""
GraphRAG Merger

Takes the structural knowledge graph (from monolith_inspector.py)
and enriches it with semantic/functional context for RAG-style queries.

Inputs
------
- knowledge_graph.json (nodes + edges from networkx export)
- file_structure.json  (hierarchical tree)
- coupling_metrics.csv (afferent/efferent coupling)

Outputs
-------
out/
  graph_context.json   # GraphRAG-compatible schema (entities + relationships)
  node_index.json      # index with text chunks + embeddings
"""

import json
import os
import csv
from pathlib import Path
from typing import Dict, Any, List
import hashlib

# ----------------------------
# Helpers
# ----------------------------

def read_json(path: Path):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def read_csv(path: Path):
    rows = []
    with open(path, "r", encoding="utf-8") as f:
        for row in csv.DictReader(f):
            rows.append(row)
    return rows

def simple_embed(text: str) -> List[float]:
    """
    Placeholder: simple hash-based embedding
    (replace with OpenAI, HuggingFace, etc. if needed).
    """
    h = hashlib.sha256(text.encode("utf-8")).digest()
    return [x/255 for x in h[:32]]  # 32-dim fake embedding

# ----------------------------
# Build GraphRAG schema
# ----------------------------

def build_graphrag_context(graph_data: Dict[str, Any],
                           file_struct: Dict[str, Any],
                           coupling: List[Dict[str, str]]) -> Dict[str, Any]:
    """
    Convert nodes + edges + file context into GraphRAG style:
      {
        "entities": [ {id, type, name, text, embedding} ],
        "relationships": [ {source, target, type} ]
      }
    """
    entities = []
    relationships = []

    # Convert nodes
    for n in graph_data["nodes"]:
        node_id = n["id"]
        node_type = n.get("kind", "unknown")
        name = n.get("name", node_id)

        text = f"{node_type} {name}"
        if "file" in n:
            text += f" defined in {n['file']}"

        entities.append({
            "id": node_id,
            "type": node_type,
            "name": name,
            "text": text,
            "embedding": simple_embed(text)
        })

    # Convert edges
    for e in graph_data["edges"]:
        relationships.append({
            "source": e["u"],
            "target": e["v"],
            "type": e.get("kind", "related")
        })

    # Enrich with coupling as extra edges
    for row in coupling:
        pkg = row["package"]
        for eff in row["efferent_set"].split(";"):
            if eff:
                relationships.append({
                    "source": f"package:{pkg}",
                    "target": f"package:{eff}",
                    "type": "couples-to"
                })

    return {"entities": entities, "relationships": relationships}

# ----------------------------
# Main
# ----------------------------

def main(outdir="analysis_output"):
    out = Path(outdir)
    graph_data = read_json(out / "knowledge_graph.json")
    file_struct = read_json(out / "file_structure.json")
    coupling = read_csv(out / "coupling_metrics.csv")

    merged = build_graphrag_context(graph_data, file_struct, coupling)

    (out / "graph_context.json").write_text(
        json.dumps(merged, indent=2), encoding="utf-8"
    )

    # Build node index for retrieval
    node_index = {
        e["id"]: {"text": e["text"], "embedding": e["embedding"]}
        for e in merged["entities"]
    }
    (out / "node_index.json").write_text(
        json.dumps(node_index, indent=2), encoding="utf-8"
    )
    print(f"[OK] GraphRAG context written to {out/'graph_context.json'}")
    print(f"[OK] Node index written to {out/'node_index.json'}")

if __name__ == "__main__":
    main()


Overwriting graphrag_merger.py


In [None]:
!python3 graphrag_merger.py --out out

[OK] GraphRAG context written to analysis_output/graph_context.json
[OK] Node index written to analysis_output/node_index.json


Setting up an API key for our code.

In [None]:
!pip install requests
import os

# Replace with your actual API key
os.environ["GEMINI_API_KEY"] = "AIzaSyB3II7ESGsgzPcHZ440X4MLoHkJdPXirG0"





Now We input the files from graphrag merger, consisting the analysis (e.g., graph_context.json, node_index.json), and merge them into one text string.

Gemini API helps us propose a microservices architecture in strict JSON format.

That consists of:

Microservice names

Responsibilities

Functions/classes it handles

Dependencies

API endpoints

In [None]:
import os
import json
import requests

# Use Colab's userdata to securely access the API key
from google.colab import userdata
API_KEY = userdata.get('GEMINI_API_KEY')

# Endpoint for Gemini 2.0 Flash
BASE_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent"

def generate_gemini(prompt_text):
    headers = {
        "Content-Type": "application/json",
        "X-goog-api-key": API_KEY
    }

    # Prepare payload in the format Gemini expects
    data = {
        "contents": [
            {
                "parts": [
                    {
                        "text": prompt_text
                    }
                ]
            }
        ]
    }

    response = requests.post(BASE_URL, headers=headers, json=data)
    if response.status_code == 200:
        resp_json = response.json()
        # Print the full response for debugging
        print("Full API response:", json.dumps(resp_json, indent=2))
        # Check for the expected nested structure
        if "candidates" in resp_json and len(resp_json["candidates"]) > 0 and \
           "content" in resp_json["candidates"][0] and \
           "parts" in resp_json["candidates"][0]["content"] and \
           len(resp_json["candidates"][0]["content"]["parts"]) > 0 and \
           "text" in resp_json["candidates"][0]["content"]["parts"][0]:
            return resp_json["candidates"][0]["content"]["parts"][0]["text"]
        else:
            raise RuntimeError(f"API response does not contain expected structure: {resp_json}")
    else:
        raise RuntimeError(f"API Error {response.status_code}: {response.text}")


def load_analysis_files(folder="analysis_output", max_chars=5000):
    """
    Read all analysis_output files (json, md, csv, etc.)
    and concatenate into a compact string for analysis.
    """
    contents = []
    for fname in os.listdir(folder):
        path = os.path.join(folder, fname)
        if os.path.isfile(path):
            try:
                with open(path, "r", encoding="utf-8", errors="ignore") as f:
                    text = f.read()
                    # truncate long files
                    if len(text) > max_chars:
                        text = text[:max_chars] + "\n...[TRUNCATED]..."
                    contents.append(f"## {fname}\n{text}\n")
            except Exception as e:
                print(f"⚠️ Skipping {fname}: {e}")
    return "\n".join(contents)

def generate_microservices_from_analysis(analysis_text, output_dir="./microservices"):
    """
    Use LLaMA AI agent to propose microservice decomposition
    based on analysis_output scripts.
    """
    prompt = f"""
    You are an expert software architect.
    Given the following static analysis reports of a monolithic codebase,
    propose a microservices architecture.

    Requirements:
    - Identify each microservice and its core responsibilities
    - List the main functions/classes that belong to it
    - Specify inter-service communication (REST, events, queues, etc.)
    - Maintain functional equivalence to the monolith
    - Avoid redundancy and enforce consistency across services
    - Return output in strict JSON format:
    {{
        "microservices": [
            {{
                "name": "...",
                "responsibilities": ["..."],
                "functions": ["..."],
                "dependencies": ["..."],
                "api_endpoints": ["..."]
            }}
        ]
    }}

    Analysis Files Content:
    {analysis_text}
    """

    # Query llama model
    response = generate_gemini(prompt)

    # Assuming the response is directly the text content
    raw_output = response.strip()


    try:
        # Remove the markdown code block
        if raw_output.startswith("```json"):
            raw_output = raw_output[7:]
        if raw_output.endswith("```"):
            raw_output = raw_output[:-3]
        raw_output = raw_output.strip()

        microservices_plan = json.loads(raw_output)
    except json.JSONDecodeError:
        print("⚠️ Could not parse LLaMA output as JSON. Raw output:")
        print(raw_output)
        return None

    # Save microservices architecture
    os.makedirs(output_dir, exist_ok=True)
    arch_file = f"{output_dir}/microservices_plan.json"
    with open(arch_file, "w") as f:
        json.dump(microservices_plan, f, indent=2)

    print(f"✅ Microservices plan generated at {arch_file}")
    return microservices_plan


# Example usage
if __name__ == "__main__":
    analysis_text = load_analysis_files("analysis_output", max_chars=3000)
    microservices_plan = generate_microservices_from_analysis(analysis_text)

Full API response: {
  "candidates": [
    {
      "content": {
        "parts": [
          {
            "text": "```json\n{\n  \"microservices\": [\n    {\n      \"name\": \"CatalogService\",\n      \"responsibilities\": [\n        \"Manage product catalog, including categories, products, and items.\",\n        \"Provide APIs to browse and search the catalog.\"\n      ],\n      \"functions\": [\n        \"org.springframework.samples.jpetstore.domain.Product\",\n        \"org.springframework.samples.jpetstore.domain.Category\",\n        \"org.springframework.samples.jpetstore.domain.Item\",\n        \"org.springframework.samples.jpetstore.dao.ProductDao\",\n        \"org.springframework.samples.jpetstore.dao.CategoryDao\",\n        \"org.springframework.samples.jpetstore.dao.ItemDao\",\n        \"org.springframework.samples.jpetstore.dao.ibatis.SqlMapProductDao\",\n        \"org.springframework.samples.jpetstore.dao.ibatis.SqlMapCategoryDao\",\n        \"org.springframework.samples.j

For turning my monolithic analysis into actual scaffolded microservice code.

So, In a concatenated text string of knowledge of my monolith,  I generate plan using a prompt, “Propose a microservices architecture in JSON.”

Then I Extract and validate JSON.
And save it as microservices/microservices_plan.json.
For each service in the plan, I ask Gemini to scaffold a FastAPI project, thereby saving respective files.

Then we Save the code (save_microservice_code), Parses Gemini’s response and Creates folders per service respectively.

Writes app.py, requirements.txt, etc. into proper places.

In [None]:
import os
import json
import requests

# Use Colab's userdata to securely access the API key
from google.colab import userdata
API_KEY = userdata.get('GEMINI_API_KEY')

# Gemini 2.0 Flash endpoint
BASE_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent"


# ---------------------------
# Gemini API wrapper
# ---------------------------
def generate_gemini(prompt_text):
    headers = {
        "Content-Type": "application/json",
        "X-goog-api-key": API_KEY
    }

    data = {
        "contents": [
            {"parts": [{"text": prompt_text}]}
        ]
    }

    response = requests.post(BASE_URL, headers=headers, json=data)
    if response.status_code == 200:
        resp_json = response.json()
        if "candidates" in resp_json and \
           len(resp_json["candidates"]) > 0 and \
           "content" in resp_json["candidates"][0] and \
           "parts" in resp_json["candidates"][0]["content"] and \
           len(resp_json["candidates"][0]["content"]["parts"]) > 0 and \
           "text" in resp_json["candidates"][0]["content"]["parts"][0]:
            return resp_json["candidates"][0]["content"]["parts"][0]["text"]
        else:
            raise RuntimeError(f"API response does not contain expected structure: {resp_json}")
    else:
        raise RuntimeError(f"API Error {response.status_code}: {response.text}")


# ---------------------------
# Step 1: Load analysis files
# ---------------------------
def load_analysis_files(folder="analysis_output", max_chars=5000):
    contents = []
    for fname in os.listdir(folder):
        path = os.path.join(folder, fname)
        if os.path.isfile(path):
            try:
                with open(path, "r", encoding="utf-8", errors="ignore") as f:
                    text = f.read()
                    if len(text) > max_chars:
                        text = text[:max_chars] + "\n...[TRUNCATED]..."
                    contents.append(f"## {fname}\n{text}\n")
            except Exception as e:
                print(f"⚠️ Skipping {fname}: {e}")
    return "\n".join(contents)


# ---------------------------
# Step 2: Generate plan
# ---------------------------
def generate_microservices_from_analysis(analysis_text, output_dir="./microservices"):
    prompt = f"""
    You are an expert software architect.
    Given the following static analysis reports of a monolithic codebase,
    propose a microservices architecture.

    Requirements:
    - Identify each microservice and its core responsibilities
    - List the main functions/classes that belong to it
    - Specify inter-service communication (REST, events, queues, etc.)
    - Maintain functional equivalence to the monolith
    - Avoid redundancy and enforce consistency across services
    - Return output in strict JSON format:
    {{
        "microservices": [
            {{
                "name": "...",
                "responsibilities": ["..."],
                "functions": ["..."],
                "dependencies": ["..."],
                "api_endpoints": ["..."]
            }}
        ]
    }}

    Analysis Files Content:
    {analysis_text}
    """

    raw_output = generate_gemini(prompt).strip()

    try:
        if raw_output.startswith("```json"):
            raw_output = raw_output[7:]
        if raw_output.endswith("```"):
            raw_output = raw_output[:-3]
        microservices_plan = json.loads(raw_output)
    except json.JSONDecodeError:
        print("⚠️ Could not parse Gemini output as JSON. Raw output:")
        print(raw_output)
        return None

    os.makedirs(output_dir, exist_ok=True)
    arch_file = f"{output_dir}/microservices_plan.json"
    with open(arch_file, "w") as f:
        json.dump(microservices_plan, f, indent=2)

    print(f"✅ Microservices plan generated at {arch_file}")
    return microservices_plan


# ---------------------------
# Step 3: Generate code per service
# ---------------------------
def generate_microservice_code(service, output_dir="./microservices_code"):
    prompt = f"""
    You are an expert backend engineer.
    Based on the following service definition, generate a scaffolded codebase
    using Python FastAPI with REST endpoints.

    Service definition:
    {json.dumps(service, indent=2)}

    Requirements:
    - Create a main app.py with routes for each api_endpoint
    - Include placeholder functions for responsibilities
    - Add requirements.txt listing necessary libraries
    - Organize into a modular folder structure
    - Ensure the code runs with `uvicorn app:app --reload`
    - Return files with this format (IMPORTANT):
      # file: path/to/file.py
      ```python
      # code here
      ```
    """

    response = generate_gemini(prompt)
    return response.strip()


def save_microservice_code(service_name, code_text, base_dir="./microservices_code"):
    service_dir = os.path.join(base_dir, service_name)
    os.makedirs(service_dir, exist_ok=True)

    current_file = None
    buffers = {}

    for line in code_text.splitlines():
        clean = line.strip()

        if clean.lower().startswith("# file:"):
            current_file = clean.split(":", 1)[-1].strip()
            buffers[current_file] = []
        elif clean.startswith("```"):
            continue
        elif current_file:
            buffers[current_file].append(line)

    if not buffers:
        buffers["app.py"] = [
            l for l in code_text.splitlines() if not l.strip().startswith("```")
        ]

    for fname, lines in buffers.items():
        path = os.path.join(service_dir, fname)
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(path, "w", encoding="utf-8") as f:
            f.write("\n".join(lines).strip() + "\n")

    print(f"✅ Saved code for {service_name} in {service_dir}")


def generate_code_from_plan(plan_file="microservices/microservices_plan.json"):
    with open(plan_file, "r") as f:
        plan = json.load(f)

    for service in plan.get("microservices", []):
        name = service["name"]
        print(f"🚀 Generating code for service: {name}")
        code_text = generate_microservice_code(service)
        if code_text:
            save_microservice_code(name, code_text)


# ---------------------------
# Run end-to-end
# ---------------------------
if __name__ == "__main__":
    analysis_text = load_analysis_files("analysis_output", max_chars=3000)
    microservices_plan = generate_microservices_from_analysis(analysis_text)

    if microservices_plan:
        print("🚀 Now generating microservice code...")
        generate_code_from_plan("microservices/microservices_plan.json")


✅ Microservices plan generated at ./microservices/microservices_plan.json
🚀 Now generating microservice code...
🚀 Generating code for service: CatalogService
✅ Saved code for CatalogService in ./microservices_code/CatalogService
🚀 Generating code for service: AccountService
✅ Saved code for AccountService in ./microservices_code/AccountService
🚀 Generating code for service: OrderService
✅ Saved code for OrderService in ./microservices_code/OrderService
