diff --git a/docs/wiki/assets/jcl_forge_demo.gif b/docs/wiki/assets/jcl_forge_demo.gif new file mode 100644 index 0000000..d0c6992 Binary files /dev/null and b/docs/wiki/assets/jcl_forge_demo.gif differ diff --git a/gitgalaxy/tools/cobol_to_cobol/README.md b/gitgalaxy/tools/cobol_to_cobol/README.md index 7ff24f2..4fa80be 100644 --- a/gitgalaxy/tools/cobol_to_cobol/README.md +++ b/gitgalaxy/tools/cobol_to_cobol/README.md @@ -43,7 +43,8 @@ This suite is built on a modular Hub-and-Spoke architecture. Every Python script * **Cloud Schema Forge (`cobol_schema_forge.py`):** Translates `PIC` clauses to strict PostgreSQL DDLs.
![Cloud Schema Forge](../../../docs/wiki/assets/cloud_schema_forge.gif) * **Zero-Trust JCL Forge (`cobol_jcl_forge.py`):** Extracts `SELECT` mappings to auto-generate strict, least-privilege JCL emulators. - +
![Zero-Trust JCL Forge](../../../docs/wiki/assets/jcl_forge_demo.gif) + #### 4. The AI Remediation Boundary * **Anomaly Task Forge (`cobol_agent_task_forge.py`):** Isolates structural anomalies into bounded JSON job tickets for LLM remediation. diff --git a/gitgalaxy/tools/cobol_to_cobol/cobol_jcl_auditor.py b/gitgalaxy/tools/cobol_to_cobol/cobol_jcl_auditor.py index 8e4736e..1382042 100644 --- a/gitgalaxy/tools/cobol_to_cobol/cobol_jcl_auditor.py +++ b/gitgalaxy/tools/cobol_to_cobol/cobol_jcl_auditor.py @@ -1,10 +1,13 @@ #!/usr/bin/env python3 # ============================================================================== -# GitGalaxy Spoke: Zero-Trust Meta Auditor (v4 - API Module) +# GitGalaxy Spoke: Zero-Trust Meta Auditor (v5 - CLI Enabled) # Purpose: Compares forged JCLs against original IBM legacy JCLs to calculate # exact code bloat reduction and over-permissioned I/O shedding. # ============================================================================== +import argparse +import json import re +import sys from pathlib import Path SYSTEM_DDS = {"STEPLIB", "SYSOUT", "SYSPRINT", "SYSUDUMP", "SYSIN", "CEEDUMP", "DFHCSD", "IGZDDOP"} @@ -39,7 +42,7 @@ def parse_jcl_intent(filepath: Path) -> dict: return metrics def audit_zero_trust_jcls(forged_dir: Path, original_dir: Path) -> dict: - """API endpoint for the Refractor Controller to fetch bloat metrics.""" + """Core logic to fetch bloat metrics.""" legacy_jcls = list(original_dir.rglob("*.[jJ][cC][lL]")) + list(original_dir.rglob("*.txt")) legacy_map = {} @@ -48,7 +51,7 @@ def audit_zero_trust_jcls(forged_dir: Path, original_dir: Path) -> dict: if forged_dir in lj.parents: continue metrics = parse_jcl_intent(lj) for pgm in metrics["exec_pgms"]: - # If multiple legacy JCLs call the same program, keep the biggest one (worst offender) + # If multiple legacy JCLs call the same program, keep the biggest one if pgm not in legacy_map or metrics.get("lines_of_code", 0) > legacy_map[pgm].get("lines_of_code", 0): legacy_map[pgm] = {"file": lj, "metrics": metrics} @@ -90,4 +93,57 @@ def audit_zero_trust_jcls(forged_dir: Path, original_dir: Path) -> dict: else: report["bloat_reduction_pct"] = 0.0 - return report \ No newline at end of file + return report + +def main(): + parser = argparse.ArgumentParser(description="GitGalaxy Zero-Trust Meta Auditor (v5)") + parser.add_argument("forged", help="Directory containing the forged GitGalaxy JCLs") + parser.add_argument("legacy", help="Directory containing the original legacy IBM JCLs") + parser.add_argument("--json", action="store_true", help="Output raw JSON instead of a formatted terminal report") + + args = parser.parse_args() + forged_path = Path(args.forged).resolve() + legacy_path = Path(args.legacy).resolve() + + if not forged_path.exists() or not legacy_path.exists(): + print("\n[!] ERROR: One or both directories do not exist.") + sys.exit(1) + + # Run the audit + report = audit_zero_trust_jcls(forged_path, legacy_path) + + # Output routing + if args.json: + print(json.dumps(report, indent=2)) + sys.exit(0) + + # CLI Terminal Vibe + print("\n==============================================================") + print(" 🛡️ GitGalaxy Spoke: Zero-Trust Meta Auditor (v5)") + print("==============================================================") + print(f" [*] Forged Dir : {forged_path.name}") + print(f" [*] Legacy Root : {legacy_path.name}") + print("--------------------------------------------------------------") + + if report["audited"] == 0: + print(" [!] No matching execution intents found between the directories.") + print(" Ensure your forged JCLs share PROGRAM-IDs with the legacy corpus.") + else: + print(" PROGRAM BREAKDOWN:") + for pgm, data in report["program_breakdown"].items(): + loc = str(data['loc_saved']).rjust(4) + io = str(data['io_blocked']).rjust(2) + print(f" [+] {pgm.ljust(10)} | LOC Saved: {loc} | I/O Blocked: {io} | Ref: {data['legacy_file']}") + + print("--------------------------------------------------------------") + print(" 📊 FINAL AUDIT METRICS:") + print(f" > Programs Audited : {report['audited']}") + print(f" > Original Legacy LOC : {report['original_loc']}") + print(f" > GitGalaxy Forged LOC : {report['forged_loc']}") + print(f" > Bloat Reduction : {report['bloat_reduction_pct']}%") + print(f" > Over-Permissioned I/O : {report['excess_dds_blocked']} Boundaries Shed") + + print("==============================================================\n") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/gitgalaxy/tools/cobol_to_cobol/cobol_jcl_forge.py b/gitgalaxy/tools/cobol_to_cobol/cobol_jcl_forge.py index 75303d7..a6a8ca3 100644 --- a/gitgalaxy/tools/cobol_to_cobol/cobol_jcl_forge.py +++ b/gitgalaxy/tools/cobol_to_cobol/cobol_jcl_forge.py @@ -1,50 +1,61 @@ #!/usr/bin/env python3 # ============================================================================== -# GitGalaxy Spoke: Zero-Trust JCL Forge (v3 - IR Context Aware) +# GitGalaxy Spoke: Zero-Trust JCL Forge (v5 - Hygienic Defaults) # ============================================================================== import argparse import sys import re from pathlib import Path +from datetime import datetime def analyze_cobol_intent(filepath: Path) -> dict: - intent = {"program_id": "UNKNOWN", "files_requested": [], "is_cics": False, "is_db2": False} + intent = { + "program_id": "UNKNOWN", + "files_requested": [], + "is_cics": False, + "is_db2": False, + "cics_calls": 0, + "sql_calls": 0 + } try: raw_content = filepath.read_text(encoding='utf-8', errors='ignore') - # 1. THE FLATTENER: Strip punch-card formatting and merge into a single AST string + # 1. THE FLATTENER: Strip punch-card formatting clean_lines = [] for line in raw_content.splitlines(): - # Skip pure comment lines (Asterisk or Slash in Column 7) if len(line) > 6 and line[6] in ('*', '/'): continue - # Keep only Column 8 and beyond (The actual code) clean_lines.append(line[7:] if len(line) > 7 else line) - # Join with spaces so multi-line statements become a single continuous sentence monolith_code = " ".join(clean_lines) - # 2. EXTRACT PROGRAM-ID (Bounded by the period) - prog_id_match = re.search(r'PROGRAM-ID\.\s+([A-Z0-9\-]+)\.', monolith_code, re.IGNORECASE) + # 2. EXTRACT PROGRAM-ID (With fallback to file name) + prog_id_match = re.search(r'PROGRAM-ID\.\s+[\'"]?([A-Z0-9\-]+)[\'"]?', monolith_code, re.IGNORECASE) if prog_id_match: intent["program_id"] = prog_id_match.group(1).strip() + else: + intent["program_id"] = filepath.stem.upper() - # 3. MULTI-LINE REGEX: Extract all File Assignments (Bounded by the period) - # This will now successfully catch SELECT and ASSIGN TO even if they are 20 lines apart! + # 3. BATCH I/O: Extract all File Assignments file_assign_pattern = re.compile(r'SELECT\s+([A-Z0-9\-]+)\s+ASSIGN\s+(?:TO\s+)?([A-Z0-9\-]+)[^.]*\.', re.IGNORECASE) - for match in file_assign_pattern.finditer(monolith_code): internal_name = match.group(1).strip() raw_dd = match.group(2).strip() - - # Strip IBM system prefixes like UT-S- or UR-S- clean_dd = re.sub(r'^(?:UT|UR)-S-', '', raw_dd) - - intent["files_requested"].append({ - "internal": internal_name, - "dd_name": clean_dd - }) + intent["files_requested"].append({"internal": internal_name, "dd_name": clean_dd}) + + # 4. TRANSACTIONAL I/O: Detect EXEC CICS blocks + cics_matches = re.findall(r'EXEC\s+CICS.*?END-EXEC\.', monolith_code, re.IGNORECASE) + if cics_matches: + intent["is_cics"] = True + intent["cics_calls"] = len(cics_matches) + + # 5. DATABASE I/O: Detect EXEC SQL blocks + sql_matches = re.findall(r'EXEC\s+SQL.*?END-EXEC\.', monolith_code, re.IGNORECASE) + if sql_matches: + intent["is_db2"] = True + intent["sql_calls"] = len(sql_matches) except Exception as e: print(f" [!] Intent Forge Error on {filepath.name}: {e}") @@ -62,12 +73,16 @@ def generate_zero_trust_jcl(intent: dict, job_name: str, account_code: str, line jcl.append(f"// TIME=10,REGION=4M") jcl.append(f"//* ==========================================================") jcl.append(f"//* AUTOGENERATED BY GITGALAXY ZERO-TRUST FORGE") - jcl.append(f"//* ==========================================================") + arch_flags = [] + if intent["is_cics"]: arch_flags.append("CICS") + if intent["is_db2"]: arch_flags.append("DB2") + if arch_flags: + jcl.append(f"//* ARCHITECTURE REQUIRES: {' + '.join(arch_flags)}") + + jcl.append(f"//* ==========================================================") jcl.append(f"//STEP01 EXEC PGM={prog_name}") - # Point to the new Superuser directory! jcl.append(f"//STEPLIB DD DSN=HERC01.LOADLIB,DISP=SHR") - jcl.append(f"//SYSOUT DD SYSOUT=*") jcl.append(f"//SYSPRINT DD SYSOUT=*") jcl.append(f"//SYSUDUMP DD SYSOUT=*") @@ -92,28 +107,79 @@ def generate_zero_trust_jcl(intent: dict, job_name: str, account_code: str, line else: jcl.append(f"// DISP={disp}") if "SHR" in disp and raw_dd not in lineage.get('inputs', set()): - # Warning moved to own line to respect punch-card Col 71 jcl.append(f"//* WARNING: NO EXPLICIT OPEN INTENT FOR {dd}") jcl.append("//") return "\n".join(jcl) def main(): - parser = argparse.ArgumentParser() - parser.add_argument("target") - parser.add_argument("--job", default="GITGJOB") - parser.add_argument("--acct", default="12345") - parser.add_argument("--out", type=str) + parser = argparse.ArgumentParser(description="GitGalaxy Zero-Trust JCL Forge (v5)") + parser.add_argument("target", help="Target directory or specific COBOL file") + parser.add_argument("--job", default="GITGJOB", help="Job name for the generated JCL") + parser.add_argument("--acct", default="12345", help="Account code for the generated JCL") + parser.add_argument("--out", type=str, help="Output directory (default: hygienic timestamped folder)") args = parser.parse_args() target_path = Path(args.target).resolve() - cobol_files = list(target_path.rglob("*.cbl")) if target_path.is_dir() else [target_path] + + if target_path.is_dir(): + extensions = ("*.cbl", "*.CBL", "*.cob", "*.COB", "*.ccp", "*.CCP") + cobol_files = [] + for ext in extensions: + cobol_files.extend(target_path.rglob(ext)) + else: + cobol_files = [target_path] - out_dir = Path(args.out).resolve() if args.out else None - if out_dir: out_dir.mkdir(parents=True, exist_ok=True) + if not cobol_files: + print(f"\n[!] ERROR: No COBOL source files found in: {target_path}") + sys.exit(1) + # --- THE HYGIENIC DEFAULT LOGIC --- + if args.out: + out_dir = Path(args.out).resolve() + else: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + if target_path.is_dir(): + # e.g., cics-genapp -> cics-genapp_forged_20260425_161559 + out_dir = target_path.parent / f"{target_path.name}_forged_{timestamp}" + else: + out_dir = target_path.parent / f"forged_jcls_{timestamp}" + + # Create the directory + out_dir.mkdir(parents=True, exist_ok=True) + + print("\n==============================================================") + print(" 🚀 GitGalaxy Spoke: Zero-Trust JCL Forge (v5)") + print("==============================================================") + print(f" [*] Target Path : {target_path}") + print(f" [*] Output Dir : {out_dir}") + print(f" [*] Files Found : {len(cobol_files)}") + print("--------------------------------------------------------------") + + success_count = 0 for file_path in cobol_files: intent = analyze_cobol_intent(file_path) jcl_output = generate_zero_trust_jcl(intent, args.job[:8].upper(), args.acct) - out_path = out_dir / f"{intent['program_id']}.jcl" if out_dir else file_path.parent / f"{intent['program_id']}.jcl" + + # Output strictly locked to the designated hygienic folder + out_path = out_dir / f"{intent['program_id']}.jcl" out_path.write_text(jcl_output, encoding='utf-8') -if __name__ == "__main__": main() \ No newline at end of file + + io_parts = [] + if len(intent['files_requested']) > 0: + io_parts.append(f"{len(intent['files_requested'])} Batch I/O") + if intent['cics_calls'] > 0: + io_parts.append(f"{intent['cics_calls']} CICS") + if intent['sql_calls'] > 0: + io_parts.append(f"{intent['sql_calls']} SQL") + + io_str = f"({', '.join(io_parts)})" if io_parts else "(No I/O)" + + print(f" [+] Forged: {file_path.name.ljust(15)} -> {out_path.name.ljust(15)} {io_str}") + success_count += 1 + + print("--------------------------------------------------------------") + print(f" [✓] Done! Successfully forged {success_count} JCL(s).") + print("==============================================================\n") + +if __name__ == "__main__": + main() \ No newline at end of file