Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/wiki/assets/jcl_forge_demo.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion gitgalaxy/tools/cobol_to_cobol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ This suite is built on a modular Hub-and-Spoke architecture. Every Python script
* **Cloud Schema Forge (`cobol_schema_forge.py`):** Translates `PIC` clauses to strict PostgreSQL DDLs.
<br>![Cloud Schema Forge](../../../docs/wiki/assets/cloud_schema_forge.gif)
* **Zero-Trust JCL Forge (`cobol_jcl_forge.py`):** Extracts `SELECT` mappings to auto-generate strict, least-privilege JCL emulators.

<br>![Zero-Trust JCL Forge](../../../docs/wiki/assets/jcl_forge_demo.gif)

#### 4. The AI Remediation Boundary
* **Anomaly Task Forge (`cobol_agent_task_forge.py`):** Isolates structural anomalies into bounded JSON job tickets for LLM remediation.

Expand Down
64 changes: 60 additions & 4 deletions gitgalaxy/tools/cobol_to_cobol/cobol_jcl_auditor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#!/usr/bin/env python3
# ==============================================================================
# GitGalaxy Spoke: Zero-Trust Meta Auditor (v4 - API Module)
# GitGalaxy Spoke: Zero-Trust Meta Auditor (v5 - CLI Enabled)
# Purpose: Compares forged JCLs against original IBM legacy JCLs to calculate
# exact code bloat reduction and over-permissioned I/O shedding.
# ==============================================================================
import argparse
import json
import re
import sys
from pathlib import Path

SYSTEM_DDS = {"STEPLIB", "SYSOUT", "SYSPRINT", "SYSUDUMP", "SYSIN", "CEEDUMP", "DFHCSD", "IGZDDOP"}
Expand Down Expand Up @@ -39,7 +42,7 @@ def parse_jcl_intent(filepath: Path) -> dict:
return metrics

def audit_zero_trust_jcls(forged_dir: Path, original_dir: Path) -> dict:
"""API endpoint for the Refractor Controller to fetch bloat metrics."""
"""Core logic to fetch bloat metrics."""
legacy_jcls = list(original_dir.rglob("*.[jJ][cC][lL]")) + list(original_dir.rglob("*.txt"))
legacy_map = {}

Expand All @@ -48,7 +51,7 @@ def audit_zero_trust_jcls(forged_dir: Path, original_dir: Path) -> dict:
if forged_dir in lj.parents: continue
metrics = parse_jcl_intent(lj)
for pgm in metrics["exec_pgms"]:
# If multiple legacy JCLs call the same program, keep the biggest one (worst offender)
# If multiple legacy JCLs call the same program, keep the biggest one
if pgm not in legacy_map or metrics.get("lines_of_code", 0) > legacy_map[pgm].get("lines_of_code", 0):
legacy_map[pgm] = {"file": lj, "metrics": metrics}

Expand Down Expand Up @@ -90,4 +93,57 @@ def audit_zero_trust_jcls(forged_dir: Path, original_dir: Path) -> dict:
else:
report["bloat_reduction_pct"] = 0.0

return report
return report

def main():
parser = argparse.ArgumentParser(description="GitGalaxy Zero-Trust Meta Auditor (v5)")
parser.add_argument("forged", help="Directory containing the forged GitGalaxy JCLs")
parser.add_argument("legacy", help="Directory containing the original legacy IBM JCLs")
parser.add_argument("--json", action="store_true", help="Output raw JSON instead of a formatted terminal report")

args = parser.parse_args()
forged_path = Path(args.forged).resolve()
legacy_path = Path(args.legacy).resolve()

if not forged_path.exists() or not legacy_path.exists():
print("\n[!] ERROR: One or both directories do not exist.")
sys.exit(1)

# Run the audit
report = audit_zero_trust_jcls(forged_path, legacy_path)

# Output routing
if args.json:
print(json.dumps(report, indent=2))
sys.exit(0)

# CLI Terminal Vibe
print("\n==============================================================")
print(" 🛡️ GitGalaxy Spoke: Zero-Trust Meta Auditor (v5)")
print("==============================================================")
print(f" [*] Forged Dir : {forged_path.name}")
print(f" [*] Legacy Root : {legacy_path.name}")
print("--------------------------------------------------------------")

if report["audited"] == 0:
print(" [!] No matching execution intents found between the directories.")
print(" Ensure your forged JCLs share PROGRAM-IDs with the legacy corpus.")
else:
print(" PROGRAM BREAKDOWN:")
for pgm, data in report["program_breakdown"].items():
loc = str(data['loc_saved']).rjust(4)
io = str(data['io_blocked']).rjust(2)
print(f" [+] {pgm.ljust(10)} | LOC Saved: {loc} | I/O Blocked: {io} | Ref: {data['legacy_file']}")

print("--------------------------------------------------------------")
print(" 📊 FINAL AUDIT METRICS:")
print(f" > Programs Audited : {report['audited']}")
print(f" > Original Legacy LOC : {report['original_loc']}")
print(f" > GitGalaxy Forged LOC : {report['forged_loc']}")
print(f" > Bloat Reduction : {report['bloat_reduction_pct']}%")
print(f" > Over-Permissioned I/O : {report['excess_dds_blocked']} Boundaries Shed")

print("==============================================================\n")

if __name__ == "__main__":
main()
130 changes: 98 additions & 32 deletions gitgalaxy/tools/cobol_to_cobol/cobol_jcl_forge.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,61 @@
#!/usr/bin/env python3
# ==============================================================================
# GitGalaxy Spoke: Zero-Trust JCL Forge (v3 - IR Context Aware)
# GitGalaxy Spoke: Zero-Trust JCL Forge (v5 - Hygienic Defaults)
# ==============================================================================
import argparse
import sys
import re
from pathlib import Path
from datetime import datetime

def analyze_cobol_intent(filepath: Path) -> dict:
intent = {"program_id": "UNKNOWN", "files_requested": [], "is_cics": False, "is_db2": False}
intent = {
"program_id": "UNKNOWN",
"files_requested": [],
"is_cics": False,
"is_db2": False,
"cics_calls": 0,
"sql_calls": 0
}

try:
raw_content = filepath.read_text(encoding='utf-8', errors='ignore')

# 1. THE FLATTENER: Strip punch-card formatting and merge into a single AST string
# 1. THE FLATTENER: Strip punch-card formatting
clean_lines = []
for line in raw_content.splitlines():
# Skip pure comment lines (Asterisk or Slash in Column 7)
if len(line) > 6 and line[6] in ('*', '/'):
continue
# Keep only Column 8 and beyond (The actual code)
clean_lines.append(line[7:] if len(line) > 7 else line)

# Join with spaces so multi-line statements become a single continuous sentence
monolith_code = " ".join(clean_lines)

# 2. EXTRACT PROGRAM-ID (Bounded by the period)
prog_id_match = re.search(r'PROGRAM-ID\.\s+([A-Z0-9\-]+)\.', monolith_code, re.IGNORECASE)
# 2. EXTRACT PROGRAM-ID (With fallback to file name)
prog_id_match = re.search(r'PROGRAM-ID\.\s+[\'"]?([A-Z0-9\-]+)[\'"]?', monolith_code, re.IGNORECASE)
if prog_id_match:
intent["program_id"] = prog_id_match.group(1).strip()
else:
intent["program_id"] = filepath.stem.upper()

# 3. MULTI-LINE REGEX: Extract all File Assignments (Bounded by the period)
# This will now successfully catch SELECT and ASSIGN TO even if they are 20 lines apart!
# 3. BATCH I/O: Extract all File Assignments
file_assign_pattern = re.compile(r'SELECT\s+([A-Z0-9\-]+)\s+ASSIGN\s+(?:TO\s+)?([A-Z0-9\-]+)[^.]*\.', re.IGNORECASE)

for match in file_assign_pattern.finditer(monolith_code):
internal_name = match.group(1).strip()
raw_dd = match.group(2).strip()

# Strip IBM system prefixes like UT-S- or UR-S-
clean_dd = re.sub(r'^(?:UT|UR)-S-', '', raw_dd)

intent["files_requested"].append({
"internal": internal_name,
"dd_name": clean_dd
})
intent["files_requested"].append({"internal": internal_name, "dd_name": clean_dd})

# 4. TRANSACTIONAL I/O: Detect EXEC CICS blocks
cics_matches = re.findall(r'EXEC\s+CICS.*?END-EXEC\.', monolith_code, re.IGNORECASE)
if cics_matches:
intent["is_cics"] = True
intent["cics_calls"] = len(cics_matches)

# 5. DATABASE I/O: Detect EXEC SQL blocks
sql_matches = re.findall(r'EXEC\s+SQL.*?END-EXEC\.', monolith_code, re.IGNORECASE)
if sql_matches:
intent["is_db2"] = True
intent["sql_calls"] = len(sql_matches)

except Exception as e:
print(f" [!] Intent Forge Error on {filepath.name}: {e}")
Expand All @@ -62,12 +73,16 @@ def generate_zero_trust_jcl(intent: dict, job_name: str, account_code: str, line
jcl.append(f"// TIME=10,REGION=4M")
jcl.append(f"//* ==========================================================")
jcl.append(f"//* AUTOGENERATED BY GITGALAXY ZERO-TRUST FORGE")
jcl.append(f"//* ==========================================================")

arch_flags = []
if intent["is_cics"]: arch_flags.append("CICS")
if intent["is_db2"]: arch_flags.append("DB2")
if arch_flags:
jcl.append(f"//* ARCHITECTURE REQUIRES: {' + '.join(arch_flags)}")

jcl.append(f"//* ==========================================================")
jcl.append(f"//STEP01 EXEC PGM={prog_name}")
# Point to the new Superuser directory!
jcl.append(f"//STEPLIB DD DSN=HERC01.LOADLIB,DISP=SHR")

jcl.append(f"//SYSOUT DD SYSOUT=*")
jcl.append(f"//SYSPRINT DD SYSOUT=*")
jcl.append(f"//SYSUDUMP DD SYSOUT=*")
Expand All @@ -92,28 +107,79 @@ def generate_zero_trust_jcl(intent: dict, job_name: str, account_code: str, line
else:
jcl.append(f"// DISP={disp}")
if "SHR" in disp and raw_dd not in lineage.get('inputs', set()):
# Warning moved to own line to respect punch-card Col 71
jcl.append(f"//* WARNING: NO EXPLICIT OPEN INTENT FOR {dd}")
jcl.append("//")
return "\n".join(jcl)

def main():
parser = argparse.ArgumentParser()
parser.add_argument("target")
parser.add_argument("--job", default="GITGJOB")
parser.add_argument("--acct", default="12345")
parser.add_argument("--out", type=str)
parser = argparse.ArgumentParser(description="GitGalaxy Zero-Trust JCL Forge (v5)")
parser.add_argument("target", help="Target directory or specific COBOL file")
parser.add_argument("--job", default="GITGJOB", help="Job name for the generated JCL")
parser.add_argument("--acct", default="12345", help="Account code for the generated JCL")
parser.add_argument("--out", type=str, help="Output directory (default: hygienic timestamped folder)")
args = parser.parse_args()

target_path = Path(args.target).resolve()
cobol_files = list(target_path.rglob("*.cbl")) if target_path.is_dir() else [target_path]

if target_path.is_dir():
extensions = ("*.cbl", "*.CBL", "*.cob", "*.COB", "*.ccp", "*.CCP")
cobol_files = []
for ext in extensions:
cobol_files.extend(target_path.rglob(ext))
else:
cobol_files = [target_path]

out_dir = Path(args.out).resolve() if args.out else None
if out_dir: out_dir.mkdir(parents=True, exist_ok=True)
if not cobol_files:
print(f"\n[!] ERROR: No COBOL source files found in: {target_path}")
sys.exit(1)

# --- THE HYGIENIC DEFAULT LOGIC ---
if args.out:
out_dir = Path(args.out).resolve()
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if target_path.is_dir():
# e.g., cics-genapp -> cics-genapp_forged_20260425_161559
out_dir = target_path.parent / f"{target_path.name}_forged_{timestamp}"
else:
out_dir = target_path.parent / f"forged_jcls_{timestamp}"

# Create the directory
out_dir.mkdir(parents=True, exist_ok=True)

print("\n==============================================================")
print(" 🚀 GitGalaxy Spoke: Zero-Trust JCL Forge (v5)")
print("==============================================================")
print(f" [*] Target Path : {target_path}")
print(f" [*] Output Dir : {out_dir}")
print(f" [*] Files Found : {len(cobol_files)}")
print("--------------------------------------------------------------")

success_count = 0
for file_path in cobol_files:
intent = analyze_cobol_intent(file_path)
jcl_output = generate_zero_trust_jcl(intent, args.job[:8].upper(), args.acct)
out_path = out_dir / f"{intent['program_id']}.jcl" if out_dir else file_path.parent / f"{intent['program_id']}.jcl"

# Output strictly locked to the designated hygienic folder
out_path = out_dir / f"{intent['program_id']}.jcl"
out_path.write_text(jcl_output, encoding='utf-8')
if __name__ == "__main__": main()

io_parts = []
if len(intent['files_requested']) > 0:
io_parts.append(f"{len(intent['files_requested'])} Batch I/O")
if intent['cics_calls'] > 0:
io_parts.append(f"{intent['cics_calls']} CICS")
if intent['sql_calls'] > 0:
io_parts.append(f"{intent['sql_calls']} SQL")

io_str = f"({', '.join(io_parts)})" if io_parts else "(No I/O)"

print(f" [+] Forged: {file_path.name.ljust(15)} -> {out_path.name.ljust(15)} {io_str}")
success_count += 1

print("--------------------------------------------------------------")
print(f" [✓] Done! Successfully forged {success_count} JCL(s).")
print("==============================================================\n")

if __name__ == "__main__":
main()
Loading