# DSPM Policy-as-Code Gating + Risk Scoring

This notebook focuses on **policy gating** (block/allow) based on deterministic risk scoring.

It demonstrates:
- loading a policy
- scoring synthetic findings
- enforcing thresholds
- emitting a gate decision artifact

## GDrive Setup

In [1]:
import os, pathlib, subprocess
from  google.colab import drive
if not os.path.exists("/content/drive"):
    drive.mount('/content/drive')
else:
  print("Drive already mounted")

search_item = '01_Quickstart_Evidence_Pipeline.ipynb'

l_result_labels = subprocess.run(["find","/content/drive/MyDrive","-type","f","-iname",f"*{search_item}*"],capture_output=True, text=True)
print(f"Labels_proj.csv path: {l_result_labels.stdout.splitlines()}")



Drive already mounted
Labels_proj.csv path: ['/content/drive/MyDrive/DSPM-DevSecOps-EphemeralCompute-EvidencePipeline/notebooks/01_Quickstart_Evidence_Pipeline.ipynb']


In [2]:
%cd /content/drive/MyDrive/DSPM-DevSecOps-EphemeralCompute-EvidencePipeline

!pip -q install -r requirements.txt
!pip -q install -e .

/content/drive/MyDrive/DSPM-DevSecOps-EphemeralCompute-EvidencePipeline
  Installing build dependencies ... [?25l[?25hdone
  Checking if build backend supports build_editable ... [?25l[?25hdone
  Getting requirements to build editable ... [?25l[?25hdone
  Preparing editable metadata (pyproject.toml) ... [?25l[?25hdone
  Building editable for dspm-devsecops (pyproject.toml) ... [?25l[?25hdone


## 1) Load policy + sample findings

In [3]:
import sys, subprocess, os

# Define the project root based on the notebook's location
# The notebook is in /content/drive/MyDrive/DSPM-DevSecOps-EphemeralCompute-EvidencePipeline/notebooks/
# So, the project root is its parent directory.
project_root = '/content/drive/MyDrive/DSPM-DevSecOps-EphemeralCompute-EvidencePipeline'

# Check if dspm_devsecops is already installed, and install if not
try:
    import dspm_devsecops
    print("dspm_devsecops already installed.")
except ImportError:
    print("dspm_devsecops not found, attempting installation...")
    # Install requirements.txt from the project root
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "-r", os.path.join(project_root, "requirements.txt")])
    # Install the package itself in editable mode from the project root
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "-e", project_root])
    print("dspm_devsecops installed.")

# Removed: from dspm_devsecops.risk.policy import Policy (as Policy class is not found)
from dspm_devsecops.risk.scoring import score_findings

# Removed: policy = Policy.default() (as Policy class is not found)
sample_findings = [
    {"asset":"lambda:orders-handler", "category":"public_exposure", "severity":"high", "evidence":"internet-facing route"},
    {"asset":"s3:pii-bucket", "category":"pii", "severity":"critical", "evidence":"synthetic PII classifier hit"},
    {"asset":"iam:role-ci", "category":"overprivilege", "severity":"medium", "evidence":"wildcard actions"},
]
scores = score_findings(sample_findings, []) # Pass an empty list for serverless_findings
scores

dspm_devsecops already installed.


RiskScore(total=3, breakdown={'terraform': 3, 'serverless': 0}, normalized_0_100=5)

In [4]:
#import os

#project_root = '/content/drive/MyDrive/DSPM-DevSecOps-EphemeralCompute-EvidencePipeline'
#policy_file_path = os.path.join(project_root, 'src', 'dspm_devsecops', 'risk', 'policy.py')

#if os.path.exists(policy_file_path):
#    with open(policy_file_path, 'r') as f:
#        print(f.read())
#else:
#    print(f"Policy file not found at: {policy_file_path}")

In [5]:
from pathlib import Path
import yaml

from dspm_devsecops.iac.terraform_scan import scan_terraform_dir
from dspm_devsecops.iac.serverless_scan import scan_serverless_yaml
from dspm_devsecops.risk.scoring import score_findings
from dspm_devsecops.policy_dsl.evaluator import evaluate_policies, gate

repo_root = Path("/content/drive/MyDrive/DSPM-DevSecOps-EphemeralCompute-EvidencePipeline")

# 1) Generate findings from the included examples
tf_findings = scan_terraform_dir(repo_root / "examples" / "iac" / "terraform")
sls_findings = scan_serverless_yaml(repo_root / "examples" / "iac" / "serverless" / "serverless.yml")

# 2) Deterministic risk score
risk = score_findings(tf_findings, sls_findings)
print(risk)

# 3) Load and evaluate policy DSL
policies = yaml.safe_load((repo_root / "policies" / "policies.yml").read_text())
ctx = {
    "risk_0_100": risk.normalized_0_100,
    # optional fields the DSL can use, if present in your policy rules:
    # "classification": "...", "exposure": "...", "cross_tenant": "...",
    # "provider": "...", "canonical_type": "..."
}
decisions = evaluate_policies(policies['policies'], ctx)
gate_result = gate(decisions)

gate_result

RiskScore(total=36, breakdown={'terraform': 13, 'serverless': 23}, normalized_0_100=60)


{'status': 'PASS', 'matched': [], 'total_rules': 4, 'matched_rules': 0}

## 2) Gate decision

In [6]:
gate_result

{'status': 'PASS', 'matched': [], 'total_rules': 4, 'matched_rules': 0}

## 3) Emit a gate decision artifact (for CI/CD attachment)

In [7]:
import json, pathlib, datetime
out = pathlib.Path("_out_policy")
out.mkdir(exist_ok=True)

artifact = {
    "generated_at": datetime.datetime.now(datetime.UTC).isoformat(),
    "policy_definitions": policies,
    "risk_score": {
        "total": risk.total,
        "breakdown": risk.breakdown,
        "normalized_0_100": risk.normalized_0_100
    },
    "decision": gate_result,
}
(out/"gate_decision.json").write_text(json.dumps(artifact, indent=2))
print("Wrote:", (out/"gate_decision.json").as_posix())

Wrote: _out_policy/gate_decision.json


## 4) Why this matters

Policy-as-code gating provides **auditable, deterministic** release control tied to data-risk posture.