## Garak KFP Demo

In [1]:
from dotenv import load_dotenv

"""
Set the values for the following variables in your `.env` file

#### AWS Connection
AWS_ACCESS_KEY_ID=<>
AWS_SECRET_ACCESS_KEY=<>
AWS_S3_ENDPOINT=<>
AWS_DEFAULT_REGION=<>

#### Kubeflow Configuration
KUBEFLOW_PIPELINES_ENDPOINT=<>
KUBEFLOW_NAMESPACE=<>
KUBEFLOW_BASE_IMAGE=<>
KUBEFLOW_S3_CREDENTIALS_SECRET_NAME=<>
KUBEFLOW_RESULTS_S3_PREFIX=<>
"""

load_dotenv()

True

In [2]:
from garak_pipeline import PipelineRunner, EvalConfig, BenchmarkConfig, ModelConfig
from rich.pretty import pprint

In [3]:
runner = PipelineRunner()



In [None]:
model_config = ModelConfig(
    model_endpoint="https://model-endpoint/v1",
    model_name="model-name",
    api_key="api-key", # optional, if model requires an API key
)

### run a pre-defined benchmark

In [5]:
runner.list_benchmarks() # predefined benchmarks

{'quick': {'name': 'Quick Scan',
  'description': 'Quick security scan for testing (~10 minutes)',
  'type': 'probes',
  'timeout': 1800,
  'is_predefined': True},
 'standard': {'name': 'Standard Scan',
  'description': 'Standard security scan covering common attack vectors (~30 minutes)',
  'type': 'probes',
  'timeout': 1800,
  'is_predefined': True},
 'owasp_llm_top10': {'name': 'OWASP LLM Top 10',
  'description': 'OWASP Top 10 for Large Language Model Applications (~12 hours)',
  'type': 'taxonomy',
  'timeout': 43200,
  'is_predefined': True},
 'avid_security': {'name': 'AVID Security',
  'description': 'AI Vulnerability Database - Security vulnerabilities (~12 hours)',
  'type': 'taxonomy',
  'timeout': 43200,
  'is_predefined': True},
 'avid_ethics': {'name': 'AVID Ethics',
  'description': 'AI Vulnerability Database - Ethical concerns (~1 hour)',
  'type': 'taxonomy',
  'timeout': 3600,
  'is_predefined': True},
 'avid_performance': {'name': 'AVID Performance',
  'description'

In [None]:
eval_config = EvalConfig(
    model=model_config,
    benchmark="avid_performance", # use 'quick' if you want something in 5 mins
    # parallel_attempts=16, # default is 8 (uncomment to increase execution speed depending on your throughput)
)

In [7]:
job = runner.run_scan(eval_config)

print(f"\nScan Job Submitted:")
print(f"  Job ID: {job.job_id}")
print(f"  Status: {job.status}")
print(f"  Benchmark: {job.benchmark_id}")
print(f"  Kubeflow Run ID: {job.kubeflow_run_id}")


Scan Job Submitted:
  Job ID: a6965bf9-c291-40d6-a666-097e586c16b2
  Status: submitted
  Benchmark: avid_performance
  Kubeflow Run ID: 1f6f540e-b4e4-48d6-a951-55d11c740ecd


In [8]:
completed_job = runner.wait_for_completion(
    job_id=job.job_id,
    poll_interval=30,
    verbose=True
)

print(f"\nFinal Status: {completed_job.status}")

# Note: For detailed real-time logs, check the Kubeflow UI
# The pod logs will show Garak's progress output

Waiting for job a6965bf9-c291-40d6-a666-097e586c16b2 to complete...
Monitor at: https://ds-pipeline-dspa-model-namespace.apps.rosa.y1m4j9o2e1n6b9l.r6mx.p3.openshiftapps.com/#/runs/details/1f6f540e-b4e4-48d6-a951-55d11c740ecd
  Status: in_progress (elapsed: 0m 2s)
  Status: in_progress (elapsed: 0m 32s)
  Status: in_progress (elapsed: 1m 3s)
  Status: in_progress (elapsed: 1m 33s)
  Status: in_progress (elapsed: 2m 4s)
  Status: in_progress (elapsed: 2m 34s)
  Status: in_progress (elapsed: 3m 5s)
  Status: in_progress (elapsed: 3m 35s)
  Status: in_progress (elapsed: 4m 6s)
  Status: in_progress (elapsed: 4m 36s)
  Status: in_progress (elapsed: 5m 7s)
  Status: in_progress (elapsed: 5m 37s)
  Status: in_progress (elapsed: 6m 7s)
  Status: in_progress (elapsed: 6m 38s)
  Status: in_progress (elapsed: 7m 8s)
  Status: in_progress (elapsed: 7m 39s)
  Status: in_progress (elapsed: 8m 9s)
  Status: in_progress (elapsed: 8m 39s)
  Status: in_progress (elapsed: 9m 10s)
Job completed successful

In [9]:
result = runner.job_result(job_id=job.job_id)

In [10]:
def get_aggregated_results(scan_result:dict) -> dict:
    aggregated_scores = {k: v["aggregated_results"] for k, v in scan_result["scores"].items()}

    combined_metrics = {
        "total_attempts": 0,
        "vulnerable_responses": 0,
        "attack_success_rate": 0,
    }
    
    for aggregated_results in aggregated_scores.values():
        combined_metrics["total_attempts"] += aggregated_results["total_attempts"]
        combined_metrics["vulnerable_responses"] += aggregated_results["vulnerable_responses"]
    
    combined_metrics["attack_success_rate"] = round((combined_metrics["vulnerable_responses"] / combined_metrics["total_attempts"] * 100), 2) if combined_metrics["total_attempts"] > 0 else 0
    
    return combined_metrics

In [11]:
if result:
    print(f"\nScan Results Summary:")
    print(f"=" * 20)
    
    # Overall statistics
    for name, value in get_aggregated_results(result).items():
        print(f"{name}: {value}")
    
else:
    print("Job not completed yet. Check status again in a few moments.")


Scan Results Summary:
total_attempts: 559
vulnerable_responses: 213
attack_success_rate: 38.1


Each scan generates a HTML report stored in S3. You can download it:

In [12]:
local_path = runner.download_html_report(
    job_id=job.job_id,
    output_path=f"reports/scan_report_{job.job_id}.html"
)
print(f"HTML report downloaded to: {local_path}") # open this file in your browser to view the report

HTML report downloaded to: /Users/spandraj/dev/trustyai-llm-demo/garak-quickstart-demo/reports/scan_report_a6965bf9-c291-40d6-a666-097e586c16b2.html


In [13]:
# display html report
from IPython.display import HTML
HTML(filename=local_path)


In [14]:
if result:
    print(f"\nScan Results Summary:")
    print(f"=" * 80)
    
    # Overall statistics
    generations = result.get('generations', [])
    scores = result.get('scores', {})
    
    # Results by probe
    print(f"\nüîç Results by Probe:")
    for probe_name, probe_scores in scores.items():
        print(f"\n  {probe_name}:")
        aggregated = probe_scores.get('aggregated_results', {})
        
        pprint(aggregated)
    
    print(f"\n" + "=" * 80)
else:
    print("Job not completed yet. Check status again in a few moments.")


Scan Results Summary:

üîç Results by Probe:

  continuation.ContinueSlursReclaimedSlurs:



  goodside.ThreatenJSON:



  goodside.WhoIsRiley:



  lmrc.Deadnaming:



  lmrc.SexualContent:



  lmrc.Sexualisation:



  lmrc.SlurUsage:



  misleading.FalseAssertion:



  snowball.GraphConnectivity:



  suffix.GCGCached:



  tap.TAPCached:





In [33]:
# show 5 random generations
import random

generations = result.get('generations', [])
random_generations = random.sample(generations, 5)
pprint(random_generations)


### register a benchmark and run the scan

In [22]:
benchmark_id = "my-custom-benchmark"
benchmark_config = BenchmarkConfig(
    name="My Custom Benchmark",
    probes=["continuation.ContinueSlursReclaimedSlurs"],
    timeout=600,
)
runner.register_benchmark(benchmark_id, benchmark_config)


In [23]:
eval_config = EvalConfig(
    model=model_config,
    benchmark=benchmark_id,
)

In [24]:
job = runner.run_scan(eval_config)

print(f"\nScan Job Submitted:")
print(f"  Job ID: {job.job_id}")
print(f"  Status: {job.status}")
print(f"  Benchmark: {job.benchmark_id}")
print(f"  Kubeflow Run ID: {job.kubeflow_run_id}")


Scan Job Submitted:
  Job ID: ac21baea-32d7-407f-838c-d10bb80c9f8e
  Status: submitted
  Benchmark: my-custom-benchmark
  Kubeflow Run ID: 077f81e3-e04e-4ba4-b8db-cd4a7a58b11b


In [25]:
completed_job = runner.wait_for_completion(
    job_id=job.job_id,
    poll_interval=30,
    verbose=True
)

print(f"\nFinal Status: {completed_job.status}")

# Note: For detailed real-time logs, check the Kubeflow UI
# The pod logs will show Garak's progress output

Waiting for job ac21baea-32d7-407f-838c-d10bb80c9f8e to complete...
Monitor at: https://ds-pipeline-dspa-model-namespace.apps.rosa.y1m4j9o2e1n6b9l.r6mx.p3.openshiftapps.com/#/runs/details/077f81e3-e04e-4ba4-b8db-cd4a7a58b11b
  Status: in_progress (elapsed: 0m 0s)
  Status: in_progress (elapsed: 0m 30s)
  Status: in_progress (elapsed: 1m 0s)
  Status: in_progress (elapsed: 1m 31s)
  Status: in_progress (elapsed: 2m 1s)
  Status: in_progress (elapsed: 2m 31s)
  Status: in_progress (elapsed: 3m 1s)
  Status: in_progress (elapsed: 3m 31s)
  Status: in_progress (elapsed: 4m 1s)
Job completed successfully!

Final Status: completed


In [26]:
result = runner.job_result(job_id=job.job_id)


In [27]:
if result:
    print(f"\nScan Results Summary:")
    print(f"=" * 80)
    
    # Overall statistics
    generations = result.get('generations', [])
    scores = result.get('scores', {})
    
    # Results by probe
    print(f"\nüîç Results by Probe:")
    for probe_name, probe_scores in scores.items():
        print(f"\n  {probe_name}:")
        aggregated = probe_scores.get('aggregated_results', {})
        
        pprint(aggregated)
    
    print(f"\n" + "=" * 80)
else:
    print("Job not completed yet. Check status again in a few moments.")


Scan Results Summary:

üîç Results by Probe:

  continuation.ContinueSlursReclaimedSlurs:





In [28]:
# show 5 random generations
import random

generations = result.get('generations', [])
random_generations = random.sample(generations, 5)
pprint(random_generations)


### specify benchmark inline

In [29]:
eval_config = EvalConfig(
    model=model_config,
    benchmark=BenchmarkConfig(
        name="promptinject",
        probes=["promptinject.HijackHateHumans"],
        timeout=600,
    ),
)

In [30]:
job = runner.run_scan(eval_config)

print(f"\nScan Job Submitted:")
print(f"  Job ID: {job.job_id}")
print(f"  Status: {job.status}")
print(f"  Benchmark: {job.benchmark_id}")
print(f"  Kubeflow Run ID: {job.kubeflow_run_id}")


Scan Job Submitted:
  Job ID: 345335b6-0be0-4215-aa5b-8b310d97c031
  Status: submitted
  Benchmark: promptinject
  Kubeflow Run ID: 513bd402-503f-4602-b740-0a4d7b896bc7


In [31]:
completed_job = runner.wait_for_completion(
    job_id=job.job_id,
    poll_interval=30,
    verbose=True
)

print(f"\nFinal Status: {completed_job.status}")

# Note: For detailed real-time logs, check the Kubeflow UI
# The pod logs will show Garak's progress output

Waiting for job 345335b6-0be0-4215-aa5b-8b310d97c031 to complete...
Monitor at: https://ds-pipeline-dspa-model-namespace.apps.rosa.y1m4j9o2e1n6b9l.r6mx.p3.openshiftapps.com/#/runs/details/513bd402-503f-4602-b740-0a4d7b896bc7
  Status: in_progress (elapsed: 0m 0s)
  Status: in_progress (elapsed: 0m 30s)
  Status: in_progress (elapsed: 1m 0s)
  Status: in_progress (elapsed: 1m 30s)
  Status: in_progress (elapsed: 2m 1s)
  Status: in_progress (elapsed: 2m 31s)
  Status: in_progress (elapsed: 3m 1s)
  Status: in_progress (elapsed: 3m 31s)
  Status: in_progress (elapsed: 4m 1s)
  Status: in_progress (elapsed: 4m 31s)
  Status: in_progress (elapsed: 5m 1s)
  Status: in_progress (elapsed: 5m 32s)
  Status: in_progress (elapsed: 6m 2s)
  Status: in_progress (elapsed: 6m 32s)
  Status: in_progress (elapsed: 7m 2s)
  Status: in_progress (elapsed: 7m 32s)
  Status: in_progress (elapsed: 8m 2s)
  Status: in_progress (elapsed: 8m 32s)
  Status: in_progress (elapsed: 9m 3s)
  Status: in_progress (el

In [32]:
result = runner.job_result(job_id=job.job_id)


In [33]:
if result:
    print(f"\nScan Results Summary:")
    print(f"=" * 80)
    
    # Overall statistics
    generations = result.get('generations', [])
    scores = result.get('scores', {})
    
    # Results by probe
    print(f"\nüîç Results by Probe:")
    for probe_name, probe_scores in scores.items():
        print(f"\n  {probe_name}:")
        aggregated = probe_scores.get('aggregated_results', {})
        
        pprint(aggregated)
    
    print(f"\n" + "=" * 80)
else:
    print("Job not completed yet. Check status again in a few moments.")


Scan Results Summary:

üîç Results by Probe:

  promptinject.HijackHateHumans:





In [34]:
# show 5 random generations
import random

generations = result.get('generations', [])
random_generations = random.sample(generations, 5)
pprint(random_generations)
