#GitHub Repository and SBOM Analysis Workflow- Interactive¶
###This notebook demonstrates a complete workflow for:

- GitHub authentication using AgentCore Identity
- Repository discovery and SBOM collection

- Security vulnerability assessment

This notebook uses these AgentCore components:

IdentityClient - Creates OAuth2 credential providers for GitHub authentication

@requires_access_token decorator - Handles OAuth2 authentication flow with GitHub

USER_FEDERATION auth flow - Federates user identity through GitHub OAuth

Cognito integration - Uses Cognito user pools for identity management via utils.py

The notebook does NOT use:

AgentCore agents/runtimes

Strands agents 

Docker containers

AgentCore's actual agent execution capabilities

It's primarily using AgentCore's identity and authentication features, not the core agent orchestration functionality.


In [None]:
## Prerequisites

Python 3.10+
AWS credentials configured
Amazon Bedrock AgentCore SDK
GitHub OAuth App configured
Claude 3.7 Sonnet model access enabled

In [None]:
# Install required dependencies
!pip install --force-reinstall -U -r requirements.txt --quiet

# Initial Setup and Configuration

In [None]:
import sys
import os
import boto3
import asyncio
import json
import getpass
from boto3.session import Session
from bedrock_agentcore.services.identity import IdentityClient
from bedrock_agentcore.identity.auth import requires_access_token

# Setup
current_dir = os.path.dirname(os.path.abspath('__file__' if '__file__' in globals() else '.'))
utils_dir = os.path.join(current_dir, '..')
utils_dir = os.path.abspath(utils_dir)
sys.path.insert(0, utils_dir)

from utils import setup_cognito_user_pool

boto_session = Session()
region = boto_session.region_name
print(f"Working in region: {region}")

In [None]:
# Interactive GitHub OAuth Setup

In [None]:
def setup_github_oauth_interactive():
    """Interactive setup for GitHub OAuth credentials"""
    print("🔧 GitHub OAuth Setup")
    print("=" * 50)
    print("\nTo set up GitHub authentication, you need to:")
    print("1. Go to GitHub Settings > Developer settings > OAuth Apps")
    print("2. Click 'New OAuth App'")
    print("3. Use this callback URL:")
    print(f"   https://bedrock-agentcore.{region}.amazonaws.com/identities/oauth2/callback")
    print("\n" + "="*50)
    
    # Get credentials from user
    client_id = input("\nEnter your GitHub Client ID: ").strip()
    client_secret = getpass.getpass("Enter your GitHub Client Secret: ").strip()
    
    if not client_id or not client_secret:
        raise ValueError("Both Client ID and Client Secret are required")
    
    return client_id, client_secret
            

# Get GitHub credentials interactively
github_client_id, github_client_secret = setup_github_oauth_interactive()
print("✅ GitHub credentials configured")

In [None]:
# Setup Cognito
print("Setting up Cognito user pool...")
cognito_config = setup_cognito_user_pool()
print("✅ Cognito setup completed")

In [None]:
# Create GitHub provider
identity_client = IdentityClient(region)

try:
    github_provider = identity_client.create_oauth2_credential_provider({
        "name": "github-interactive-provider",
        "credentialProviderVendor": "GithubOauth2",
        "oauth2ProviderConfigInput": {
            "githubOauth2ProviderConfig": {
                "clientId": github_client_id,
                "clientSecret": github_client_secret
            }
        }
    })
    print("✅ GitHub credential provider created")
except Exception as e:
    if "already exists" in str(e):
        print("✅ GitHub credential provider already exists")
    else:
        raise e

## Agents Definition

In [None]:
import requests
from typing import List, Dict, Any
from collections import defaultdict
from datetime import datetime

class GitHubRepositoryAgent:
    def __init__(self, access_token: str):
        self.access_token = access_token
        self.headers = {
            "Authorization": f"Bearer {access_token}",
            "Accept": "application/vnd.github.v3+json"
        }
    
    def list_repositories(self, org: str = None, user: str = None, limit: int = 10) -> List[Dict[str, Any]]:
        if org:
            url = f"https://api.github.com/orgs/{org}/repos"
        elif user:
            url = f"https://api.github.com/users/{user}/repos"
        else:
            url = "https://api.github.com/user/repos"
        
        response = requests.get(url, headers=self.headers, params={"per_page": limit})
        response.raise_for_status()
        
        repos = response.json()
        return [{
            "name": repo["name"],
            "full_name": repo["full_name"],
            "language": repo.get("language", "Unknown"),
            "html_url": repo["html_url"]
        } for repo in repos]
    
    def get_repository_dependencies(self, owner: str, repo: str) -> Dict[str, Any]:
        dependency_files = ["package.json", "requirements.txt", "pom.xml", "go.mod"]
        dependencies = {}
        
        for file in dependency_files:
            try:
                url = f"https://api.github.com/repos/{owner}/{repo}/contents/{file}"
                response = requests.get(url, headers=self.headers)
                if response.status_code == 200:
                    dependencies[file] = {"found": True, "size": response.json().get("size", 0)}
            except:
                continue
        
        return dependencies

class SBOMConsolidationAgent:
    def __init__(self):
        self.consolidated_sbom = {"repositories": [], "components": {}, "statistics": {}}
    
    def add_repository_sbom(self, repo_name: str, dependencies: Dict[str, Any]):
        self.consolidated_sbom["repositories"].append({
            "name": repo_name,
            "dependency_files": list(dependencies.keys()),
            "component_count": len(dependencies)
        })
        
        for file_type, file_info in dependencies.items():
            if file_info.get("found"):
                self.consolidated_sbom["components"][f"{repo_name}:{file_type}"] = {
                    "repository": repo_name,
                    "file_type": file_type,
                    "size": file_info.get("size", 0)
                }
    
    def generate_statistics(self) -> Dict[str, Any]:
        stats = {
            "total_repositories": len(self.consolidated_sbom["repositories"]),
            "total_components": len(self.consolidated_sbom["components"]),
            "file_types": defaultdict(int)
        }
        
        for component in self.consolidated_sbom["components"].values():
            stats["file_types"][component["file_type"]] += 1
        
        self.consolidated_sbom["statistics"] = dict(stats)
        return self.consolidated_sbom["statistics"]
    
    def get_consolidated_sbom(self) -> Dict[str, Any]:
        return self.consolidated_sbom

class VulnerabilityAnalysisAgent:
    def __init__(self):
        self.vulnerability_db = {
            "package.json": [{"cve_id": "CVE-2023-1234", "severity": "HIGH"}],
            "requirements.txt": [{"cve_id": "CVE-2023-9999", "severity": "CRITICAL"}]
        }
    
    def analyze_vulnerabilities(self, consolidated_sbom: Dict[str, Any]) -> List[Dict[str, Any]]:
        vulnerabilities = []
        
        for component_key, component in consolidated_sbom["components"].items():
            file_type = component["file_type"]
            if file_type in self.vulnerability_db:
                for vuln in self.vulnerability_db[file_type]:
                    vulnerabilities.append({
                        "repository": component["repository"],
                        "component": component_key,
                        "cve_id": vuln["cve_id"],
                        "severity": vuln["severity"]
                    })
        
        return vulnerabilities
    
    def generate_security_report(self, vulnerabilities: List[Dict[str, Any]]) -> Dict[str, Any]:
        severity_counts = defaultdict(int)
        for vuln in vulnerabilities:
            severity_counts[vuln["severity"]] += 1
        
        return {
            "total_vulnerabilities": len(vulnerabilities),
            "severity_breakdown": dict(severity_counts),
            "report_generated_at": datetime.now().isoformat()
        }

print("✅ All agents defined")

## Interactive Workflow

In [None]:
@requires_access_token(
    provider_name="github-interactive-provider",
    scopes=["repo", "read:org"],
    auth_flow="USER_FEDERATION",
    on_auth_url=lambda x: print(f"\n🔗 Please visit this URL to authenticate:\n{x}\n"),
    force_authentication=True,

)
async def authenticate_github_interactive(*, access_token: str) -> str:
    print("✅ GitHub authentication successful!")
    return access_token

print("GitHub authentication function ready")

In [None]:
def get_analysis_options():
    """Get user preferences for analysis"""
    print("\n Analysis Options")
    print("=" * 30)
    print("1. Analyze your personal repositories")
    print("2. Analyze an organization's repositories")
    print("3. Analyze a specific user's repositories")
    
    choice = input("\nSelect option (1-3): ").strip()
    
    org_name = None
    user_name = None
    
    if choice == "2":
        org_name = input("Enter organization name: ").strip()
    elif choice == "3":
        user_name = input("Enter username: ").strip()
    
    try:
        repo_limit = int(input("Number of repositories to fetch (default 50): ") or "50")
    except ValueError:
        repo_limit = 50
    
    return org_name, user_name, repo_limit

def select_repositories(repositories):
    """Allow user to select specific repositories for analysis"""
    print(f"\n Found {len(repositories)} repositories")
    print("\nRepository Selection:")
    print("1. Analyze all repositories")
    print("2. Select specific repositories")
    
    selection = input("Choose option (1-2): ").strip()
    
    if selection == "1":
        return repositories
    
    # Show repositories with numbers
    print("\nAvailable repositories:")
    for i, repo in enumerate(repositories, 1):
        print(f"  {i}. {repo['full_name']} ({repo['language']})")
    
    selected_nums = input("\nEnter repository numbers (comma-separated, e.g., 1,3,5): ").strip()
    
    try:
        indices = [int(x.strip()) - 1 for x in selected_nums.split(',')]
        selected_repos = [repositories[i] for i in indices if 0 <= i < len(repositories)]
        return selected_repos
    except:
        print("Invalid selection, using all repositories")
        return repositories

# Get user preferences
org_name, user_name, repo_limit = get_analysis_options()
print(f"\n✅ Configuration: org={org_name}, user={user_name}, limit={repo_limit}")

In [None]:
async def run_interactive_workflow():
    print("\n Starting Interactive SBOM Analysis...")
    
    # Authenticate
    print("\nStep 1: Authenticating with GitHub...")
    access_token = await authenticate_github_interactive(access_token="")
    
    # Initialize agents
    github_agent = GitHubRepositoryAgent(access_token)
    sbom_agent = SBOMConsolidationAgent()
    vuln_agent = VulnerabilityAnalysisAgent()
    
    # Discover repositories
    print(f"\nStep 2: Discovering repositories...")
    all_repositories = github_agent.list_repositories(org=org_name, user=user_name, limit=repo_limit)
    
    # Let user select repositories
    repositories = select_repositories(all_repositories)
    print(f"\n✅ Selected {len(repositories)} repositories for analysis")
    
    for repo in repositories:
        print(f"   {repo['full_name']} ({repo['language']})")
    
    # Collect SBOMs
    print("\nStep 3: Collecting SBOM data...")
    for repo in repositories:
        owner, name = repo['full_name'].split('/')
        dependencies = github_agent.get_repository_dependencies(owner, name)
        sbom_agent.add_repository_sbom(repo['full_name'], dependencies)
        
        if dependencies:
            print(f"  ✅ {repo['full_name']}: {list(dependencies.keys())}")
    
    # Generate analysis
    print("\nStep 4: Analyzing vulnerabilities...")
    consolidated_sbom = sbom_agent.get_consolidated_sbom()
    statistics = sbom_agent.generate_statistics()
    vulnerabilities = vuln_agent.analyze_vulnerabilities(consolidated_sbom)
    security_report = vuln_agent.generate_security_report(vulnerabilities)
    
    # Display results
    print("\n" + "="*50)
    print(" ANALYSIS RESULTS")
    print("="*50)
    print(f"Repositories analyzed: {statistics['total_repositories']}")
    print(f"Components found: {statistics['total_components']}")
    print(f"Vulnerabilities detected: {security_report['total_vulnerabilities']}")
    
    if security_report['severity_breakdown']:
        print("\nSeverity breakdown:")
        for severity, count in security_report['severity_breakdown'].items():
            print(f"  {severity}: {count}")
    
    return {
        "repositories": repositories,
        "consolidated_sbom": consolidated_sbom,
        "security_report": security_report
    }

# Run the interactive workflow
results = await run_interactive_workflow()
print("\n Analysis complete!")

In [None]:
# Ask user if they want to export results
export_choice = input("\nWould you like to export the results? (y/n): ").strip().lower()

if export_choice == 'y':
    filename = input("Enter filename prefix (default: sbom_analysis): ").strip() or "sbom_analysis"
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    json_filename = f"{filename}_{timestamp}.json"
    
    with open(json_filename, 'w') as f:
        json.dump(results, f, indent=2, default=str)
    
    print(f"✅ Results exported to: {json_filename}")
else:
    print("Results not exported.")

## Security Alerts Analysis Tools

In [None]:
class DependabotAlertsAgent:
    def __init__(self, access_token: str):
        self.access_token = access_token
        self.headers = {
            "Authorization": f"Bearer {access_token}",
            "Accept": "application/vnd.github.v3+json"
        }
    
    def get_dependabot_alerts(self, owner: str, repo: str, state: str = "open") -> List[Dict[str, Any]]:
        url = f"https://api.github.com/repos/{owner}/{repo}/dependabot/alerts"
        params = {"state": state, "per_page": 100}
        
        response = requests.get(url, headers=self.headers, params=params)
        if response.status_code == 404:
            return []  # No access or no alerts
        response.raise_for_status()
        
        alerts = response.json()
        return [{
            "number": alert["number"],
            "state": alert["state"],
            "severity": alert["security_advisory"]["severity"],
            "summary": alert["security_advisory"]["summary"],
            "package": alert["dependency"]["package"]["name"],
            "ecosystem": alert["dependency"]["package"]["ecosystem"],
            "created_at": alert["created_at"]
        } for alert in alerts]

class CodeQLAlertsAgent:
    def __init__(self, access_token: str):
        self.access_token = access_token
        self.headers = {
            "Authorization": f"Bearer {access_token}",
            "Accept": "application/vnd.github.v3+json"
        }
    
    def get_codeql_alerts(self, owner: str, repo: str, state: str = "open") -> List[Dict[str, Any]]:
        url = f"https://api.github.com/repos/{owner}/{repo}/code-scanning/alerts"
        params = {"state": state, "per_page": 100}
        
        response = requests.get(url, headers=self.headers, params=params)
        if response.status_code == 404:
            return []  # No access or no alerts
        response.raise_for_status()
        
        alerts = response.json()
        return [{
            "number": alert["number"],
            "state": alert["state"],
            "severity": alert["rule"]["severity"],
            "description": alert["rule"]["description"],
            "rule_id": alert["rule"]["id"],
            "tool": alert["tool"]["name"],
            "created_at": alert["created_at"]
        } for alert in alerts]

class SecurityAnalysisAgent:
    def __init__(self):
        pass
    
    def analyze_security_posture(self, dependabot_alerts: List[Dict], codeql_alerts: List[Dict]) -> Dict[str, Any]:
        severity_counts = defaultdict(lambda: {"dependabot": 0, "codeql": 0})
        
        for alert in dependabot_alerts:
            severity_counts[alert["severity"]]["dependabot"] += 1
        
        for alert in codeql_alerts:
            severity_counts[alert["severity"]]["codeql"] += 1
        
        return {
            "total_dependabot_alerts": len(dependabot_alerts),
            "total_codeql_alerts": len(codeql_alerts),
            "severity_breakdown": dict(severity_counts),
            "analysis_timestamp": datetime.now().isoformat()
        }
    
    def get_top_vulnerabilities(self, dependabot_alerts: List[Dict], codeql_alerts: List[Dict], limit: int = 5) -> Dict[str, List]:
        # Sort by severity priority
        severity_priority = {"critical": 4, "high": 3, "medium": 2, "low": 1, "note": 0, "info": 0}
        
        sorted_dependabot = sorted(dependabot_alerts, 
                                  key=lambda x: severity_priority.get(x["severity"].lower(), 0), 
                                  reverse=True)[:limit]
        
        sorted_codeql = sorted(codeql_alerts, 
                              key=lambda x: severity_priority.get(x["severity"].lower(), 0), 
                              reverse=True)[:limit]
        
        return {
            "top_dependabot_alerts": sorted_dependabot,
            "top_codeql_alerts": sorted_codeql
        }

print("✅ Security analysis agents defined")


In [None]:
async def run_security_analysis(access_token: str, repositories: List[Dict]) -> Dict[str, Any]:
    """Run comprehensive security analysis on repositories"""
    print("\n Starting Security Analysis...")
    
    # Initialize security agents
    dependabot_agent = DependabotAlertsAgent(access_token)
    codeql_agent = CodeQLAlertsAgent(access_token)
    analysis_agent = SecurityAnalysisAgent()
    
    all_dependabot_alerts = []
    all_codeql_alerts = []
    repo_security_data = {}
    
    print(f"Analyzing {len(repositories)} repositories for security alerts...")
    
    for repo in repositories:
        owner, name = repo['full_name'].split('/')
        print(f"   {repo['full_name']}", end=" ")
        
        try:
            # Get Dependabot alerts
            dependabot_alerts = dependabot_agent.get_dependabot_alerts(owner, name)
            codeql_alerts = codeql_agent.get_codeql_alerts(owner, name)
            
            repo_security_data[repo['full_name']] = {
                "dependabot_alerts": dependabot_alerts,
                "codeql_alerts": codeql_alerts
            }
            
            all_dependabot_alerts.extend(dependabot_alerts)
            all_codeql_alerts.extend(codeql_alerts)
            
            print(f"- Dependabot: {len(dependabot_alerts)}, CodeQL: {len(codeql_alerts)}")
            
        except Exception as e:
            print(f"- Error: {str(e)}")
            repo_security_data[repo['full_name']] = {
                "dependabot_alerts": [],
                "codeql_alerts": [],
                "error": str(e)
            }
    
    # Generate comprehensive analysis
    security_posture = analysis_agent.analyze_security_posture(all_dependabot_alerts, all_codeql_alerts)
    top_vulnerabilities = analysis_agent.get_top_vulnerabilities(all_dependabot_alerts, all_codeql_alerts)
    
    return {
        "security_posture": security_posture,
        "top_vulnerabilities": top_vulnerabilities,
        "repo_security_data": repo_security_data,
        "all_dependabot_alerts": all_dependabot_alerts,
        "all_codeql_alerts": all_codeql_alerts
    }


In [None]:
# Run security analysis on the repositories from previous analysis
if 'results' in globals() and 'repositories' in results:
    # Re-authenticate to get access token
    token = await authenticate_github_interactive(access_token="")
    security_results = await run_security_analysis(token, results['repositories'])
    
    # Display security analysis results
    print("\n" + "="*60)
    print("SECURITY ANALYSIS RESULTS")
    print("="*60)
    
    posture = security_results['security_posture']
    print(f"Total Dependabot Alerts: {posture['total_dependabot_alerts']}")
    print(f"Total CodeQL Alerts: {posture['total_codeql_alerts']}")
    
    if posture['severity_breakdown']:
        print("\nSeverity Breakdown:")
        for severity, counts in posture['severity_breakdown'].items():
            print(f"  {severity.upper()}: Dependabot={counts['dependabot']}, CodeQL={counts['codeql']}")
    
    # Show top vulnerabilities
    top_vulns = security_results['top_vulnerabilities']
    
    if top_vulns['top_dependabot_alerts']:
        print("\n Top Dependabot Alerts:")
        for i, alert in enumerate(top_vulns['top_dependabot_alerts'][:3], 1):
            print(f"  {i}. [{alert['severity'].upper()}] {alert['package']} - {alert['summary'][:80]}...")
    
    if top_vulns['top_codeql_alerts']:
        print("\n Top CodeQL Alerts:")
        for i, alert in enumerate(top_vulns['top_codeql_alerts'][:3], 1):
            print(f"  {i}. [{alert['severity'].upper()}] {alert['rule_id']} - {alert['description'][:80]}...")
    
    # Update results with security data
    results['security_analysis'] = security_results
    
else:
    print("⚠️  Please run the main workflow first to get repository data")

In [None]:
# Export security analysis results
if 'security_results' in globals():
    export_security = input("\nExport security analysis results? (y/n): ").strip().lower()
    
    if export_security == 'y':
        security_filename = input("Enter security report filename (default: security_analysis): ").strip() or "security_analysis"
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        security_json_filename = f"{security_filename}_{timestamp}.json"
        
        with open(security_json_filename, 'w') as f:
            json.dump(security_results, f, indent=2, default=str)
        
        print(f"✅ Security analysis exported to: {security_json_filename}")
    else:
        print("Security results not exported.")
else:
    print("No security analysis results to export")

In [None]:
from observability_setup import setup_agentcore_observability, log_agent_session
import uuid

# Setup AgentCore observability
setup_agentcore_observability()

# Generate session ID for tracking
session_id = str(uuid.uuid4())[:8]
print(f"📊 Session ID: {session_id}")

# Log agent activity
log_agent_session(session_id, "SecurityAnalysisAgent", "initialized")
