# Advanced Integrations with ghops

This notebook explores advanced integration patterns, combining ghops' powerful features to create sophisticated automation solutions. Learn how to build custom actions, integrate with CI/CD systems, and extend ghops with plugins.

## Table of Contents
1. [Combining Clustering with Workflows](#combining)
2. [Building Custom Actions](#custom-actions)
3. [CI/CD Integration](#cicd)
4. [Plugin Development](#plugins)
5. [API Integration Patterns](#api-patterns)
6. [Event-Driven Automation](#event-driven)
7. [Cross-Platform Orchestration](#cross-platform)
8. [Real-World Use Cases](#use-cases)
9. [Performance Optimization](#performance)
10. [Exercises](#exercises)

## Setup and Imports

In [None]:
import subprocess
import json
import yaml
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import tempfile
import os
import time
from datetime import datetime, timedelta
import asyncio
import requests
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
import networkx as nx

# Setup
plt.style.use('seaborn-v0_8-darkgrid')
workspace = tempfile.mkdtemp(prefix="ghops_advanced_")
print(f"Workspace: {workspace}")

# Helper functions
def run_command(cmd):
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    return result.stdout, result.stderr, result.returncode

def parse_jsonl(output):
    results = []
    for line in output.strip().split('\n'):
        if line:
            try:
                results.append(json.loads(line))
            except json.JSONDecodeError:
                continue
    return results

## 1. Combining Clustering with Workflows {#combining}

Learn how to use clustering results to drive intelligent workflow decisions.

In [None]:
# Intelligent maintenance workflow using clustering
cluster_driven_workflow = {
    "name": "cluster-driven-maintenance",
    "description": "Use clustering to optimize maintenance operations",
    "version": "1.0.0",
    
    "steps": [
        # Step 1: Analyze repository clusters
        {
            "name": "analyze_clusters",
            "action": "ghops.cluster.analyze",
            "params": {
                "path": "~/projects",
                "algorithm": "similarity",
                "threshold": 0.8
            }
        },
        
        # Step 2: Identify duplicate repositories
        {
            "name": "find_duplicates",
            "action": "ghops.cluster.duplicates",
            "input": "${analyze_clusters.output}",
            "params": {
                "min_similarity": 0.9
            }
        },
        
        # Step 3: Process each cluster differently
        {
            "name": "process_clusters",
            "action": "foreach_cluster",
            "input": "${analyze_clusters.clusters}",
            "steps": [
                {
                    "name": "determine_strategy",
                    "action": "custom.determine_cluster_strategy",
                    "params": {
                        "cluster": "${item}",
                        "rules": {
                            "high_similarity": "consolidate",
                            "medium_similarity": "standardize",
                            "low_similarity": "maintain_separately"
                        }
                    }
                },
                {
                    "name": "apply_strategy",
                    "action": "switch",
                    "on": "${determine_strategy.output}",
                    "cases": {
                        "consolidate": {
                            "action": "workflow.run",
                            "params": {
                                "workflow": "consolidation-workflow",
                                "inputs": {"repos": "${item.members}"}
                            }
                        },
                        "standardize": {
                            "action": "workflow.run",
                            "params": {
                                "workflow": "standardization-workflow",
                                "inputs": {"repos": "${item.members}"}
                            }
                        },
                        "maintain_separately": {
                            "action": "workflow.run",
                            "params": {
                                "workflow": "individual-maintenance",
                                "inputs": {"repos": "${item.members}"}
                            }
                        }
                    }
                }
            ]
        },
        
        # Step 4: Generate optimization report
        {
            "name": "generate_report",
            "action": "report.optimization",
            "params": {
                "clusters": "${analyze_clusters.output}",
                "duplicates": "${find_duplicates.output}",
                "actions_taken": "${process_clusters.output}"
            }
        }
    ]
}

print("Cluster-Driven Workflow Structure:")
print(json.dumps(cluster_driven_workflow, indent=2)[:1000] + "...")

In [None]:
# Simulate cluster-based decision making
@dataclass
class Cluster:
    id: int
    members: List[str]
    similarity: float
    characteristics: Dict[str, Any] = field(default_factory=dict)

class ClusterStrategy:
    """Determines optimal strategy for repository clusters"""
    
    @staticmethod
    def analyze(cluster: Cluster) -> Dict[str, Any]:
        strategy = {
            "cluster_id": cluster.id,
            "action": None,
            "priority": "medium",
            "estimated_savings": 0
        }
        
        if cluster.similarity > 0.9:
            strategy["action"] = "consolidate"
            strategy["priority"] = "high"
            strategy["estimated_savings"] = len(cluster.members) * 10  # Hours per month
            strategy["recommendations"] = [
                "Merge duplicate repositories",
                "Create shared library for common code",
                "Archive redundant repos"
            ]
        elif cluster.similarity > 0.7:
            strategy["action"] = "standardize"
            strategy["priority"] = "medium"
            strategy["estimated_savings"] = len(cluster.members) * 5
            strategy["recommendations"] = [
                "Align coding standards",
                "Standardize dependencies",
                "Create common CI/CD pipeline"
            ]
        else:
            strategy["action"] = "optimize_individually"
            strategy["priority"] = "low"
            strategy["estimated_savings"] = len(cluster.members) * 2
            strategy["recommendations"] = [
                "Maintain separate workflows",
                "Focus on individual optimization"
            ]
        
        return strategy

# Create sample clusters
sample_clusters = [
    Cluster(0, ["api-v1", "api-v2", "api-legacy"], 0.95),
    Cluster(1, ["frontend-react", "frontend-vue"], 0.75),
    Cluster(2, ["ml-pipeline", "data-processor"], 0.60),
    Cluster(3, ["docs-site", "blog", "portfolio"], 0.85)
]

# Analyze strategies
strategies = [ClusterStrategy.analyze(cluster) for cluster in sample_clusters]

# Display results
strategy_df = pd.DataFrame(strategies)
print("Cluster Strategy Analysis:")
print("=" * 70)
for idx, row in strategy_df.iterrows():
    cluster = sample_clusters[idx]
    print(f"\nCluster {row['cluster_id']}: {', '.join(cluster.members[:2])}...")
    print(f"  Similarity: {cluster.similarity:.2f}")
    print(f"  Strategy: {row['action'].upper()}")
    print(f"  Priority: {row['priority']}")
    print(f"  Est. Savings: {row['estimated_savings']} hours/month")
    if 'recommendations' in row:
        print("  Recommendations:")
        for rec in row['recommendations'][:2]:
            print(f"    ‚Ä¢ {rec}")

## 2. Building Custom Actions {#custom-actions}

Create reusable custom actions for ghops workflows.

In [None]:
# Custom Action Framework
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional

class CustomAction(ABC):
    """Base class for custom ghops actions"""
    
    def __init__(self, name: str, version: str = "1.0.0"):
        self.name = name
        self.version = version
        self.metadata = {
            "created": datetime.now().isoformat(),
            "author": "ghops"
        }
    
    @abstractmethod
    def validate_params(self, params: Dict[str, Any]) -> bool:
        """Validate action parameters"""
        pass
    
    @abstractmethod
    def execute(self, context: Dict[str, Any], params: Dict[str, Any]) -> Dict[str, Any]:
        """Execute the action"""
        pass
    
    def run(self, context: Dict[str, Any], params: Dict[str, Any]) -> Dict[str, Any]:
        """Run the action with validation"""
        if not self.validate_params(params):
            raise ValueError(f"Invalid parameters for action {self.name}")
        
        result = {
            "action": self.name,
            "version": self.version,
            "start_time": datetime.now().isoformat()
        }
        
        try:
            output = self.execute(context, params)
            result["status"] = "success"
            result["output"] = output
        except Exception as e:
            result["status"] = "failed"
            result["error"] = str(e)
        
        result["end_time"] = datetime.now().isoformat()
        return result

# Example Custom Actions
class CodeQualityAction(CustomAction):
    """Analyze code quality across repositories"""
    
    def validate_params(self, params: Dict[str, Any]) -> bool:
        return "path" in params
    
    def execute(self, context: Dict[str, Any], params: Dict[str, Any]) -> Dict[str, Any]:
        path = params["path"]
        
        # Simulate code quality analysis
        metrics = {
            "complexity": np.random.randint(1, 10),
            "maintainability": np.random.randint(60, 100),
            "test_coverage": np.random.randint(40, 95),
            "code_smells": np.random.randint(0, 20),
            "duplicates": np.random.randint(0, 10),
            "technical_debt": np.random.randint(0, 100)
        }
        
        grade = "A" if metrics["maintainability"] > 80 else "B" if metrics["maintainability"] > 60 else "C"
        
        return {
            "path": path,
            "metrics": metrics,
            "grade": grade,
            "recommendations": self._generate_recommendations(metrics)
        }
    
    def _generate_recommendations(self, metrics: Dict[str, int]) -> List[str]:
        recommendations = []
        if metrics["complexity"] > 7:
            recommendations.append("Refactor complex functions")
        if metrics["test_coverage"] < 70:
            recommendations.append("Increase test coverage")
        if metrics["code_smells"] > 10:
            recommendations.append("Address code smells")
        return recommendations

class DependencyAuditAction(CustomAction):
    """Audit dependencies for security and updates"""
    
    def validate_params(self, params: Dict[str, Any]) -> bool:
        return "path" in params
    
    def execute(self, context: Dict[str, Any], params: Dict[str, Any]) -> Dict[str, Any]:
        path = params["path"]
        
        # Simulate dependency audit
        vulnerabilities = [
            {"package": "requests", "severity": "high", "version": "2.25.0", "fix": "2.28.0"},
            {"package": "pyyaml", "severity": "medium", "version": "5.3", "fix": "5.4"}
        ] if np.random.random() > 0.5 else []
        
        outdated = [
            {"package": "numpy", "current": "1.19.0", "latest": "1.24.0"},
            {"package": "pandas", "current": "1.2.0", "latest": "2.0.0"}
        ]
        
        return {
            "path": path,
            "vulnerabilities": vulnerabilities,
            "outdated_packages": outdated,
            "total_dependencies": np.random.randint(10, 50),
            "risk_level": "high" if vulnerabilities else "low"
        }

# Register custom actions
custom_actions = {
    "code_quality": CodeQualityAction("code_quality"),
    "dependency_audit": DependencyAuditAction("dependency_audit")
}

# Test custom actions
print("Testing Custom Actions:")
print("=" * 60)

for action_name, action in custom_actions.items():
    result = action.run({}, {"path": "/test/repo"})
    print(f"\n{action_name}:")
    print(f"  Status: {result['status']}")
    if 'output' in result:
        output = result['output']
        if 'grade' in output:
            print(f"  Grade: {output['grade']}")
        if 'risk_level' in output:
            print(f"  Risk Level: {output['risk_level']}")

## 3. CI/CD Integration {#cicd}

Integrate ghops with popular CI/CD systems.

In [None]:
# GitHub Actions Integration
github_action_workflow = """
name: ghops Repository Management
on:
  schedule:
    - cron: '0 6 * * *'  # Daily at 6 AM
  push:
    branches: [main]
  workflow_dispatch:

jobs:
  repository-analysis:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.9'
      
      - name: Install ghops
        run: pip install ghops
      
      - name: Run Repository Clustering
        run: |
          ghops cluster analyze . --algorithm similarity > clusters.json
          ghops cluster duplicates . > duplicates.json
      
      - name: Generate Report
        run: |
          ghops report generate \
            --clusters clusters.json \
            --duplicates duplicates.json \
            --format markdown > report.md
      
      - name: Upload Report
        uses: actions/upload-artifact@v3
        with:
          name: repository-report
          path: report.md
      
      - name: Comment on PR
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v6
        with:
          script: |
            const fs = require('fs');
            const report = fs.readFileSync('report.md', 'utf8');
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: report
            });
"""

# GitLab CI Integration
gitlab_ci_config = """
stages:
  - analysis
  - optimization
  - deploy

variables:
  GHOPS_CONFIG: /etc/ghops/config.yaml

repository-analysis:
  stage: analysis
  image: python:3.9
  before_script:
    - pip install ghops
  script:
    - ghops list ${CI_PROJECT_DIR} > repos.json
    - ghops status --format json > status.json
    - ghops cluster analyze ${CI_PROJECT_DIR} > clusters.json
  artifacts:
    paths:
      - repos.json
      - status.json
      - clusters.json
    expire_in: 1 week

consolidation:
  stage: optimization
  dependencies:
    - repository-analysis
  script:
    - ghops workflow run consolidation-workflow --input clusters.json
  only:
    - schedules
"""

# Jenkins Pipeline
jenkins_pipeline = """
pipeline {
    agent any
    
    environment {
        GHOPS_HOME = "${WORKSPACE}/.ghops"
    }
    
    stages {
        stage('Setup') {
            steps {
                sh 'pip install ghops'
                sh 'ghops config init'
            }
        }
        
        stage('Analysis') {
            parallel {
                stage('Clustering') {
                    steps {
                        sh 'ghops cluster analyze ${WORKSPACE}'
                    }
                }
                stage('Status Check') {
                    steps {
                        sh 'ghops status ${WORKSPACE}'
                    }
                }
                stage('Dependencies') {
                    steps {
                        sh 'ghops audit dependencies ${WORKSPACE}'
                    }
                }
            }
        }
        
        stage('Workflow') {
            steps {
                sh 'ghops workflow run maintenance-workflow'
            }
        }
    }
    
    post {
        always {
            archiveArtifacts artifacts: '*.json,*.md'
        }
    }
}
"""

# Display CI/CD configurations
print("CI/CD Integration Examples:")
print("=" * 60)
print("\n1. GitHub Actions:")
print(github_action_workflow[:500] + "...")
print("\n2. GitLab CI:")
print(gitlab_ci_config[:400] + "...")
print("\n3. Jenkins Pipeline:")
print(jenkins_pipeline[:400] + "...")

## 4. Plugin Development {#plugins}

Create plugins to extend ghops functionality.

In [None]:
# ghops Plugin Framework
from abc import ABC, abstractmethod
import importlib.util
import inspect

class GhopsPlugin(ABC):
    """Base class for ghops plugins"""
    
    def __init__(self):
        self.name = self.__class__.__name__
        self.version = getattr(self, '__version__', '1.0.0')
        self.description = self.__doc__ or "No description"
        self.commands = {}
        self.hooks = {}
        self._register_commands()
        self._register_hooks()
    
    @abstractmethod
    def _register_commands(self):
        """Register plugin commands"""
        pass
    
    def _register_hooks(self):
        """Register lifecycle hooks"""
        pass
    
    def register_command(self, name: str, func, description: str = ""):
        """Register a new command"""
        self.commands[name] = {
            'function': func,
            'description': description,
            'params': inspect.signature(func).parameters
        }
    
    def register_hook(self, event: str, func):
        """Register an event hook"""
        if event not in self.hooks:
            self.hooks[event] = []
        self.hooks[event].append(func)

# Example Plugin: Security Scanner
class SecurityScannerPlugin(GhopsPlugin):
    """Advanced security scanning for repositories"""
    
    __version__ = '2.0.0'
    
    def _register_commands(self):
        self.register_command('scan', self.scan_repository, 
                            'Scan repository for security issues')
        self.register_command('audit', self.audit_dependencies,
                            'Audit dependencies for vulnerabilities')
        self.register_command('secrets', self.check_secrets,
                            'Check for exposed secrets')
    
    def _register_hooks(self):
        self.register_hook('pre_commit', self.pre_commit_scan)
        self.register_hook('post_clone', self.post_clone_scan)
    
    def scan_repository(self, path: str, deep: bool = False):
        """Comprehensive security scan"""
        results = {
            'path': path,
            'scan_date': datetime.now().isoformat(),
            'issues': [],
            'score': 100
        }
        
        # Simulate scanning
        checks = [
            ('Hardcoded credentials', 'high', -30),
            ('Outdated dependencies', 'medium', -15),
            ('Missing security headers', 'low', -5),
            ('Unencrypted sensitive data', 'critical', -50)
        ]
        
        for check, severity, impact in checks:
            if np.random.random() > 0.7:  # 30% chance of finding issue
                results['issues'].append({
                    'type': check,
                    'severity': severity,
                    'impact': impact,
                    'file': f"src/{np.random.choice(['main.py', 'config.yaml', '.env'])}"
                })
                results['score'] += impact
        
        return results
    
    def audit_dependencies(self, path: str):
        """Audit project dependencies"""
        return {
            'vulnerable_packages': np.random.randint(0, 5),
            'outdated_packages': np.random.randint(0, 10),
            'total_packages': np.random.randint(20, 50)
        }
    
    def check_secrets(self, path: str):
        """Check for exposed secrets"""
        patterns = ['API_KEY', 'SECRET_KEY', 'PASSWORD', 'TOKEN']
        found = [p for p in patterns if np.random.random() > 0.8]
        return {'exposed_secrets': found}
    
    def pre_commit_scan(self, files: List[str]):
        """Scan files before commit"""
        print(f"Scanning {len(files)} files before commit...")
        return all(self._quick_scan(f) for f in files)
    
    def post_clone_scan(self, repo_path: str):
        """Scan repository after cloning"""
        print(f"Security scanning cloned repository: {repo_path}")
        return self.scan_repository(repo_path)
    
    def _quick_scan(self, file: str):
        """Quick security scan for a single file"""
        return np.random.random() > 0.1  # 90% pass rate

# Example Plugin: Metrics Collector
class MetricsCollectorPlugin(GhopsPlugin):
    """Collect and analyze repository metrics"""
    
    def _register_commands(self):
        self.register_command('collect', self.collect_metrics,
                            'Collect repository metrics')
        self.register_command('analyze', self.analyze_trends,
                            'Analyze metric trends')
    
    def collect_metrics(self, path: str):
        """Collect various repository metrics"""
        return {
            'lines_of_code': np.random.randint(1000, 10000),
            'commit_count': np.random.randint(10, 500),
            'contributor_count': np.random.randint(1, 20),
            'open_issues': np.random.randint(0, 50),
            'code_complexity': np.random.uniform(1, 10),
            'test_coverage': np.random.uniform(30, 95)
        }
    
    def analyze_trends(self, metrics_history: List[Dict]):
        """Analyze trends in metrics over time"""
        if not metrics_history:
            return {"error": "No historical data"}
        
        # Simulate trend analysis
        trends = {
            'code_growth': 'increasing',
            'complexity_trend': 'stable',
            'coverage_trend': 'improving',
            'activity_level': 'high'
        }
        return trends

# Plugin Manager
class PluginManager:
    """Manage ghops plugins"""
    
    def __init__(self):
        self.plugins = {}
        self.enabled_plugins = set()
    
    def register(self, plugin: GhopsPlugin):
        """Register a new plugin"""
        self.plugins[plugin.name] = plugin
        print(f"Registered plugin: {plugin.name} v{plugin.version}")
    
    def enable(self, plugin_name: str):
        """Enable a plugin"""
        if plugin_name in self.plugins:
            self.enabled_plugins.add(plugin_name)
            print(f"Enabled plugin: {plugin_name}")
        else:
            print(f"Plugin not found: {plugin_name}")
    
    def execute_command(self, plugin_name: str, command: str, **kwargs):
        """Execute a plugin command"""
        if plugin_name not in self.enabled_plugins:
            return {"error": f"Plugin {plugin_name} is not enabled"}
        
        plugin = self.plugins[plugin_name]
        if command not in plugin.commands:
            return {"error": f"Command {command} not found in plugin {plugin_name}"}
        
        func = plugin.commands[command]['function']
        return func(**kwargs)
    
    def list_plugins(self):
        """List all registered plugins"""
        return [{
            'name': p.name,
            'version': p.version,
            'description': p.description,
            'enabled': p.name in self.enabled_plugins,
            'commands': list(p.commands.keys())
        } for p in self.plugins.values()]

# Initialize and test plugins
manager = PluginManager()

# Register plugins
security_plugin = SecurityScannerPlugin()
metrics_plugin = MetricsCollectorPlugin()

manager.register(security_plugin)
manager.register(metrics_plugin)

# Enable plugins
manager.enable('SecurityScannerPlugin')
manager.enable('MetricsCollectorPlugin')

# List plugins
print("\nRegistered Plugins:")
print("=" * 60)
for plugin in manager.list_plugins():
    status = "‚úì" if plugin['enabled'] else "‚úó"
    print(f"{status} {plugin['name']} v{plugin['version']}")
    print(f"  {plugin['description']}")
    print(f"  Commands: {', '.join(plugin['commands'])}")

# Test plugin execution
print("\nPlugin Execution Test:")
print("=" * 60)
result = manager.execute_command('SecurityScannerPlugin', 'scan', path='/test/repo')
print(f"Security Score: {result.get('score', 'N/A')}/100")
if result.get('issues'):
    print(f"Issues Found: {len(result['issues'])}")

## 5. API Integration Patterns {#api-patterns}

Learn patterns for integrating with external APIs.

In [None]:
# API Integration Framework
import asyncio
from typing import Optional, Dict, Any, List
from enum import Enum
import hashlib
import time

class RateLimiter:
    """Rate limiter for API calls"""
    
    def __init__(self, calls_per_second: float = 1.0):
        self.calls_per_second = calls_per_second
        self.min_interval = 1.0 / calls_per_second
        self.last_call = 0
    
    async def acquire(self):
        """Wait if necessary to respect rate limit"""
        current = time.time()
        time_since_last = current - self.last_call
        
        if time_since_last < self.min_interval:
            await asyncio.sleep(self.min_interval - time_since_last)
        
        self.last_call = time.time()

class CacheStrategy(Enum):
    NONE = "none"
    MEMORY = "memory"
    DISK = "disk"
    REDIS = "redis"

class APIClient:
    """Base API client with common patterns"""
    
    def __init__(self, base_url: str, api_key: Optional[str] = None,
                 rate_limit: float = 10.0, cache_strategy: CacheStrategy = CacheStrategy.MEMORY):
        self.base_url = base_url
        self.api_key = api_key
        self.rate_limiter = RateLimiter(rate_limit)
        self.cache_strategy = cache_strategy
        self.cache = {} if cache_strategy == CacheStrategy.MEMORY else None
        self.stats = {
            'requests': 0,
            'cache_hits': 0,
            'errors': 0
        }
    
    def _cache_key(self, endpoint: str, params: Dict) -> str:
        """Generate cache key for request"""
        key_str = f"{endpoint}:{json.dumps(params, sort_keys=True)}"
        return hashlib.md5(key_str.encode()).hexdigest()
    
    async def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """Make GET request with caching and rate limiting"""
        params = params or {}
        
        # Check cache
        if self.cache_strategy == CacheStrategy.MEMORY:
            cache_key = self._cache_key(endpoint, params)
            if cache_key in self.cache:
                self.stats['cache_hits'] += 1
                return self.cache[cache_key]
        
        # Rate limiting
        await self.rate_limiter.acquire()
        
        # Simulate API call
        self.stats['requests'] += 1
        
        # Mock response
        response = {
            'endpoint': endpoint,
            'params': params,
            'data': self._generate_mock_data(endpoint),
            'timestamp': datetime.now().isoformat()
        }
        
        # Cache response
        if self.cache_strategy == CacheStrategy.MEMORY:
            self.cache[cache_key] = response
        
        return response
    
    def _generate_mock_data(self, endpoint: str) -> Any:
        """Generate mock data based on endpoint"""
        if 'repos' in endpoint:
            return [{'id': i, 'name': f'repo_{i}'} for i in range(5)]
        elif 'user' in endpoint:
            return {'id': 1, 'username': 'testuser', 'repos': 42}
        else:
            return {'status': 'ok'}
    
    def get_stats(self) -> Dict[str, int]:
        """Get client statistics"""
        return self.stats.copy()

# Specialized API Clients
class GitHubAPIClient(APIClient):
    """GitHub API client with specific methods"""
    
    def __init__(self, token: str):
        super().__init__(
            base_url="https://api.github.com",
            api_key=token,
            rate_limit=5000/3600  # 5000 requests per hour
        )
    
    async def get_user_repos(self, username: str) -> List[Dict]:
        """Get user repositories"""
        response = await self.get(f"/users/{username}/repos")
        return response['data']
    
    async def get_repo_stats(self, owner: str, repo: str) -> Dict:
        """Get repository statistics"""
        endpoints = [
            f"/repos/{owner}/{repo}",
            f"/repos/{owner}/{repo}/stats/contributors",
            f"/repos/{owner}/{repo}/languages"
        ]
        
        results = await asyncio.gather(*[self.get(ep) for ep in endpoints])
        
        return {
            'info': results[0]['data'],
            'contributors': results[1]['data'],
            'languages': results[2]['data']
        }

class PyPIAPIClient(APIClient):
    """PyPI API client for package information"""
    
    def __init__(self):
        super().__init__(
            base_url="https://pypi.org/pypi",
            rate_limit=10.0  # Conservative rate limit
        )
    
    async def get_package_info(self, package_name: str) -> Dict:
        """Get package information from PyPI"""
        response = await self.get(f"/{package_name}/json")
        return response['data']
    
    async def check_updates(self, packages: List[tuple]) -> List[Dict]:
        """Check for package updates"""
        updates = []
        
        for package, current_version in packages:
            info = await self.get_package_info(package)
            latest = info.get('version', '0.0.0')  # Mock version
            
            if latest != current_version:
                updates.append({
                    'package': package,
                    'current': current_version,
                    'latest': latest,
                    'update_available': True
                })
        
        return updates

# Test API clients
async def test_api_clients():
    print("Testing API Clients:")
    print("=" * 60)
    
    # GitHub client
    github = GitHubAPIClient("mock_token")
    repos = await github.get_user_repos("octocat")
    print(f"\nGitHub: Found {len(repos)} repositories")
    
    # PyPI client
    pypi = PyPIAPIClient()
    packages = [("requests", "2.25.0"), ("numpy", "1.19.0")]
    updates = await pypi.check_updates(packages)
    print(f"\nPyPI: {len(updates)} packages have updates available")
    
    # Show statistics
    print("\nAPI Client Statistics:")
    print(f"GitHub: {github.get_stats()}")
    print(f"PyPI: {pypi.get_stats()}")

# Run async test
import nest_asyncio
nest_asyncio.apply()
loop = asyncio.get_event_loop()
loop.run_until_complete(test_api_clients())

## 6. Event-Driven Automation {#event-driven}

Build event-driven automation systems with ghops.

In [None]:
# Event-Driven Architecture
from typing import Callable, List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
import asyncio
from enum import Enum

class EventType(Enum):
    REPO_CREATED = "repo.created"
    REPO_UPDATED = "repo.updated"
    REPO_DELETED = "repo.deleted"
    COMMIT_PUSHED = "commit.pushed"
    PR_OPENED = "pr.opened"
    PR_MERGED = "pr.merged"
    ISSUE_CREATED = "issue.created"
    WORKFLOW_STARTED = "workflow.started"
    WORKFLOW_COMPLETED = "workflow.completed"
    CLUSTER_DETECTED = "cluster.detected"
    SECURITY_ALERT = "security.alert"

@dataclass
class Event:
    """Represents an event in the system"""
    type: EventType
    timestamp: datetime = field(default_factory=datetime.now)
    source: str = "ghops"
    data: Dict[str, Any] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)

class EventBus:
    """Central event bus for publishing and subscribing to events"""
    
    def __init__(self):
        self.subscribers: Dict[EventType, List[Callable]] = {}
        self.event_history: List[Event] = []
        self.filters: List[Callable] = []
    
    def subscribe(self, event_type: EventType, handler: Callable):
        """Subscribe to an event type"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
        print(f"Subscribed {handler.__name__} to {event_type.value}")
    
    def add_filter(self, filter_func: Callable[[Event], bool]):
        """Add a global event filter"""
        self.filters.append(filter_func)
    
    async def publish(self, event: Event):
        """Publish an event to all subscribers"""
        # Apply filters
        for filter_func in self.filters:
            if not filter_func(event):
                return  # Event filtered out
        
        # Store in history
        self.event_history.append(event)
        
        # Notify subscribers
        if event.type in self.subscribers:
            handlers = self.subscribers[event.type]
            
            # Run handlers concurrently
            tasks = [self._run_handler(handler, event) for handler in handlers]
            await asyncio.gather(*tasks)
    
    async def _run_handler(self, handler: Callable, event: Event):
        """Run an event handler safely"""
        try:
            if asyncio.iscoroutinefunction(handler):
                await handler(event)
            else:
                handler(event)
        except Exception as e:
            print(f"Error in handler {handler.__name__}: {e}")
    
    def get_history(self, event_type: Optional[EventType] = None,
                   limit: int = 100) -> List[Event]:
        """Get event history"""
        history = self.event_history
        if event_type:
            history = [e for e in history if e.type == event_type]
        return history[-limit:]

# Event Handlers
class AutomationHandlers:
    """Collection of automation event handlers"""
    
    @staticmethod
    async def on_repo_created(event: Event):
        """Handle repository creation"""
        repo = event.data.get('repository', {})
        print(f"  ‚Üí Setting up new repository: {repo.get('name')}")
        
        # Automated setup tasks
        tasks = [
            "Initialize CI/CD pipeline",
            "Add standard .gitignore",
            "Create README template",
            "Set up branch protection"
        ]
        
        for task in tasks:
            await asyncio.sleep(0.1)  # Simulate work
            print(f"    ‚úì {task}")
    
    @staticmethod
    def on_security_alert(event: Event):
        """Handle security alerts"""
        alert = event.data
        severity = alert.get('severity', 'unknown')
        
        print(f"  ‚ö†Ô∏è SECURITY ALERT: {severity.upper()}")
        print(f"    Repository: {alert.get('repository')}")
        print(f"    Issue: {alert.get('issue')}")
        
        if severity == 'critical':
            print("    üö® Initiating emergency response protocol")
    
    @staticmethod
    async def on_cluster_detected(event: Event):
        """Handle cluster detection"""
        cluster = event.data
        similarity = cluster.get('similarity', 0)
        
        if similarity > 0.9:
            print(f"  üîç High similarity cluster detected: {similarity:.1%}")
            print(f"    Members: {cluster.get('members', [])}")
            print("    ‚Üí Triggering consolidation workflow")
    
    @staticmethod
    def on_workflow_completed(event: Event):
        """Handle workflow completion"""
        workflow = event.data
        status = workflow.get('status')
        duration = workflow.get('duration', 0)
        
        icon = "‚úÖ" if status == 'success' else "‚ùå"
        print(f"  {icon} Workflow '{workflow.get('name')}' completed")
        print(f"    Duration: {duration:.1f}s")
        print(f"    Status: {status}")

# Create and configure event bus
event_bus = EventBus()
handlers = AutomationHandlers()

# Subscribe handlers to events
event_bus.subscribe(EventType.REPO_CREATED, handlers.on_repo_created)
event_bus.subscribe(EventType.SECURITY_ALERT, handlers.on_security_alert)
event_bus.subscribe(EventType.CLUSTER_DETECTED, handlers.on_cluster_detected)
event_bus.subscribe(EventType.WORKFLOW_COMPLETED, handlers.on_workflow_completed)

# Add event filter (only process events from last hour)
def recent_events_filter(event: Event) -> bool:
    age = datetime.now() - event.timestamp
    return age.total_seconds() < 3600  # 1 hour

event_bus.add_filter(recent_events_filter)

# Simulate events
async def simulate_events():
    print("\nSimulating Events:")
    print("=" * 60)
    
    events = [
        Event(
            type=EventType.REPO_CREATED,
            data={'repository': {'name': 'new-project', 'language': 'python'}}
        ),
        Event(
            type=EventType.SECURITY_ALERT,
            data={'severity': 'critical', 'repository': 'api-server', 
                  'issue': 'SQL injection vulnerability detected'}
        ),
        Event(
            type=EventType.CLUSTER_DETECTED,
            data={'similarity': 0.92, 'members': ['service-a', 'service-b', 'service-c']}
        ),
        Event(
            type=EventType.WORKFLOW_COMPLETED,
            data={'name': 'deployment-pipeline', 'status': 'success', 'duration': 142.3}
        )
    ]
    
    for event in events:
        print(f"\nüì° Event: {event.type.value}")
        await event_bus.publish(event)
        await asyncio.sleep(0.5)

# Run simulation
loop = asyncio.get_event_loop()
loop.run_until_complete(simulate_events())

# Show event history
print("\nEvent History:")
print("=" * 60)
for event in event_bus.get_history(limit=5):
    print(f"{event.timestamp.strftime('%H:%M:%S')} - {event.type.value}")

## 7. Cross-Platform Orchestration {#cross-platform}

Orchestrate operations across multiple platforms.

In [None]:
# Cross-Platform Orchestration System
@dataclass
class Platform:
    name: str
    type: str
    api_endpoint: str
    capabilities: List[str] = field(default_factory=list)
    credentials: Optional[Dict] = None

class PlatformOrchestrator:
    """Orchestrate operations across multiple platforms"""
    
    def __init__(self):
        self.platforms = {}
        self.workflows = {}
        self._register_platforms()
    
    def _register_platforms(self):
        """Register supported platforms"""
        platforms = [
            Platform("GitHub", "vcs", "https://api.github.com",
                    ["repos", "issues", "actions", "releases"]),
            Platform("GitLab", "vcs", "https://gitlab.com/api/v4",
                    ["repos", "ci", "registry"]),
            Platform("PyPI", "registry", "https://pypi.org",
                    ["publish", "search", "stats"]),
            Platform("DockerHub", "registry", "https://hub.docker.com",
                    ["images", "push", "scan"]),
            Platform("AWS", "cloud", "https://aws.amazon.com",
                    ["deploy", "lambda", "s3"]),
            Platform("Kubernetes", "orchestration", "https://k8s.io",
                    ["deploy", "scale", "monitor"])
        ]
        
        for platform in platforms:
            self.platforms[platform.name] = platform
    
    async def orchestrate_release(self, project: str, version: str):
        """Orchestrate a release across multiple platforms"""
        print(f"\nüé≠ Orchestrating release for {project} v{version}")
        print("=" * 60)
        
        steps = [
            ("GitHub", "Create release tag", self._github_release),
            ("GitLab", "Mirror to GitLab", self._gitlab_mirror),
            ("PyPI", "Publish package", self._pypi_publish),
            ("DockerHub", "Build & push image", self._docker_push),
            ("AWS", "Deploy to Lambda", self._aws_deploy),
            ("Kubernetes", "Update deployment", self._k8s_update)
        ]
        
        results = []
        for platform, action, func in steps:
            print(f"\nüìç {platform}: {action}")
            try:
                result = await func(project, version)
                results.append((platform, "success", result))
                print(f"  ‚úì Success: {result}")
            except Exception as e:
                results.append((platform, "failed", str(e)))
                print(f"  ‚úó Failed: {e}")
            
            await asyncio.sleep(0.5)  # Simulate processing time
        
        return results
    
    async def _github_release(self, project: str, version: str):
        """Create GitHub release"""
        return f"Created release v{version} with 3 assets"
    
    async def _gitlab_mirror(self, project: str, version: str):
        """Mirror to GitLab"""
        return f"Mirrored {project} to GitLab"
    
    async def _pypi_publish(self, project: str, version: str):
        """Publish to PyPI"""
        return f"Published {project}-{version}.tar.gz"
    
    async def _docker_push(self, project: str, version: str):
        """Push Docker image"""
        return f"Pushed {project}:{version} to registry"
    
    async def _aws_deploy(self, project: str, version: str):
        """Deploy to AWS"""
        if np.random.random() > 0.8:  # 20% chance of failure
            raise Exception("Lambda function update failed")
        return f"Deployed to Lambda function {project}-prod"
    
    async def _k8s_update(self, project: str, version: str):
        """Update Kubernetes deployment"""
        return f"Updated deployment {project} to v{version}"
    
    def get_platform_matrix(self) -> pd.DataFrame:
        """Get platform capability matrix"""
        all_capabilities = set()
        for platform in self.platforms.values():
            all_capabilities.update(platform.capabilities)
        
        matrix_data = []
        for platform in self.platforms.values():
            row = {'Platform': platform.name, 'Type': platform.type}
            for cap in all_capabilities:
                row[cap] = '‚úì' if cap in platform.capabilities else ''
            matrix_data.append(row)
        
        return pd.DataFrame(matrix_data)

# Initialize orchestrator
orchestrator = PlatformOrchestrator()

# Display platform matrix
print("Platform Capability Matrix:")
print("=" * 80)
matrix = orchestrator.get_platform_matrix()
print(matrix.to_string(index=False))

# Simulate cross-platform release
async def test_orchestration():
    results = await orchestrator.orchestrate_release("awesome-project", "2.0.0")
    
    print("\n\nüìä Orchestration Summary:")
    print("=" * 60)
    
    success_count = sum(1 for _, status, _ in results if status == "success")
    total = len(results)
    
    print(f"Success Rate: {success_count}/{total} ({success_count/total*100:.0f}%)")
    
    for platform, status, detail in results:
        icon = "‚úì" if status == "success" else "‚úó"
        print(f"{icon} {platform}: {detail}")

# Run orchestration test
loop = asyncio.get_event_loop()
loop.run_until_complete(test_orchestration())

## 8. Real-World Use Cases {#use-cases}

Explore real-world integration scenarios.

In [None]:
# Real-World Use Case: Multi-Repository Refactoring
class MultiRepoRefactoring:
    """Coordinate refactoring across multiple repositories"""
    
    def __init__(self, repos: List[str]):
        self.repos = repos
        self.changes = {}
        self.validation_results = {}
    
    async def plan_refactoring(self, old_pattern: str, new_pattern: str):
        """Plan refactoring changes"""
        print(f"Planning refactoring: '{old_pattern}' ‚Üí '{new_pattern}'")
        print("=" * 60)
        
        for repo in self.repos:
            # Simulate finding occurrences
            occurrences = np.random.randint(0, 20)
            files_affected = np.random.randint(0, 10)
            
            self.changes[repo] = {
                'occurrences': occurrences,
                'files': files_affected,
                'estimated_time': occurrences * 0.5  # minutes
            }
            
            print(f"  {repo}: {occurrences} occurrences in {files_affected} files")
        
        total_occurrences = sum(c['occurrences'] for c in self.changes.values())
        total_time = sum(c['estimated_time'] for c in self.changes.values())
        
        print(f"\nTotal: {total_occurrences} changes, estimated {total_time:.1f} minutes")
        return self.changes
    
    async def validate_changes(self):
        """Validate proposed changes"""
        print("\nValidating changes...")
        
        for repo in self.repos:
            # Simulate validation
            tests_pass = np.random.random() > 0.2  # 80% pass rate
            build_success = np.random.random() > 0.1  # 90% success rate
            
            self.validation_results[repo] = {
                'tests': 'pass' if tests_pass else 'fail',
                'build': 'success' if build_success else 'failed',
                'safe': tests_pass and build_success
            }
        
        safe_repos = [r for r, v in self.validation_results.items() if v['safe']]
        print(f"  Safe to refactor: {len(safe_repos)}/{len(self.repos)} repositories")
        
        return self.validation_results
    
    async def execute_refactoring(self):
        """Execute refactoring with rollback capability"""
        print("\nExecuting refactoring...")
        
        completed = []
        failed = []
        
        for repo in self.repos:
            if self.validation_results.get(repo, {}).get('safe', False):
                # Simulate refactoring
                success = np.random.random() > 0.05  # 95% success rate
                
                if success:
                    completed.append(repo)
                    print(f"  ‚úì {repo}: Refactoring complete")
                else:
                    failed.append(repo)
                    print(f"  ‚úó {repo}: Refactoring failed, rolling back")
            else:
                print(f"  ‚ö† {repo}: Skipped (validation failed)")
        
        return {'completed': completed, 'failed': failed}

# Real-World Use Case: Dependency Update Campaign
class DependencyUpdateCampaign:
    """Manage dependency updates across portfolio"""
    
    def __init__(self):
        self.vulnerabilities = {}
        self.updates = {}
        self.priorities = {}
    
    async def scan_vulnerabilities(self, repos: List[str]):
        """Scan for security vulnerabilities"""
        print("Scanning for vulnerabilities...")
        
        critical_packages = ['log4j', 'openssl', 'requests']
        
        for repo in repos:
            vulns = []
            for pkg in critical_packages:
                if np.random.random() > 0.7:  # 30% chance of vulnerability
                    vulns.append({
                        'package': pkg,
                        'severity': np.random.choice(['low', 'medium', 'high', 'critical']),
                        'cve': f"CVE-2024-{np.random.randint(1000, 9999)}"
                    })
            
            self.vulnerabilities[repo] = vulns
        
        total_vulns = sum(len(v) for v in self.vulnerabilities.values())
        critical = sum(1 for r in self.vulnerabilities.values() 
                      for v in r if v['severity'] == 'critical')
        
        print(f"  Found {total_vulns} vulnerabilities ({critical} critical)")
        return self.vulnerabilities
    
    def prioritize_updates(self):
        """Prioritize update order"""
        for repo, vulns in self.vulnerabilities.items():
            # Calculate priority score
            score = 0
            for vuln in vulns:
                if vuln['severity'] == 'critical':
                    score += 100
                elif vuln['severity'] == 'high':
                    score += 50
                elif vuln['severity'] == 'medium':
                    score += 20
                else:
                    score += 5
            
            self.priorities[repo] = score
        
        # Sort by priority
        sorted_repos = sorted(self.priorities.items(), key=lambda x: x[1], reverse=True)
        return sorted_repos

# Test real-world scenarios
print("Real-World Integration Scenarios")
print("=" * 80)

# Scenario 1: Multi-repo refactoring
print("\nüìù Scenario 1: Multi-Repository Refactoring")
print("-" * 60)

repos = [f"service-{chr(65+i)}" for i in range(5)]
refactoring = MultiRepoRefactoring(repos)

async def run_refactoring():
    await refactoring.plan_refactoring("OldAPIClient", "NewAPIClient")
    await refactoring.validate_changes()
    result = await refactoring.execute_refactoring()
    print(f"\nRefactoring complete: {len(result['completed'])} succeeded, {len(result['failed'])} failed")

loop = asyncio.get_event_loop()
loop.run_until_complete(run_refactoring())

# Scenario 2: Dependency updates
print("\nüîí Scenario 2: Security Dependency Update Campaign")
print("-" * 60)

campaign = DependencyUpdateCampaign()

async def run_campaign():
    await campaign.scan_vulnerabilities(repos)
    priorities = campaign.prioritize_updates()
    
    print("\nUpdate Priority Order:")
    for repo, score in priorities[:3]:
        print(f"  1. {repo} (priority score: {score})")

loop.run_until_complete(run_campaign())

## 9. Performance Optimization {#performance}

Optimize integration performance.

In [None]:
# Performance optimization strategies
import time
import concurrent.futures
from functools import lru_cache

class PerformanceOptimizer:
    """Optimize ghops integration performance"""
    
    @staticmethod
    def measure_time(func):
        """Decorator to measure function execution time"""
        def wrapper(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            duration = time.time() - start
            print(f"  {func.__name__}: {duration:.3f}s")
            return result, duration
        return wrapper
    
    @measure_time
    def sequential_processing(self, items: List[Any], processor):
        """Process items sequentially"""
        results = []
        for item in items:
            results.append(processor(item))
        return results
    
    @measure_time
    def parallel_processing(self, items: List[Any], processor, max_workers=4):
        """Process items in parallel"""
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            results = list(executor.map(processor, items))
        return results
    
    @measure_time
    def batch_processing(self, items: List[Any], processor, batch_size=10):
        """Process items in batches"""
        results = []
        for i in range(0, len(items), batch_size):
            batch = items[i:i+batch_size]
            results.extend(processor(batch))
        return results
    
    @lru_cache(maxsize=128)
    def cached_expensive_operation(self, key: str):
        """Cache expensive operations"""
        time.sleep(0.1)  # Simulate expensive operation
        return f"Result for {key}"

# Test performance optimizations
print("Performance Optimization Comparison")
print("=" * 60)

optimizer = PerformanceOptimizer()

# Simulate processing function
def process_item(item):
    time.sleep(0.01)  # Simulate work
    return item * 2

def process_batch(batch):
    time.sleep(0.01 * len(batch))  # Simulate batch processing
    return [item * 2 for item in batch]

# Test data
test_items = list(range(100))

print("\nProcessing 100 items:")
print("-" * 40)

# Sequential
seq_results, seq_time = optimizer.sequential_processing(test_items, process_item)

# Parallel
par_results, par_time = optimizer.parallel_processing(test_items, process_item, max_workers=10)

# Batch
batch_results, batch_time = optimizer.batch_processing(test_items, process_batch, batch_size=20)

# Calculate speedup
print("\nSpeedup Analysis:")
print(f"  Parallel vs Sequential: {seq_time/par_time:.2f}x faster")
print(f"  Batch vs Sequential: {seq_time/batch_time:.2f}x faster")

# Cache effectiveness
print("\nCache Effectiveness:")
cache_keys = ['key1', 'key2', 'key1', 'key3', 'key2', 'key1']  # Some repeated

start = time.time()
for key in cache_keys:
    optimizer.cached_expensive_operation(key)
cached_time = time.time() - start

print(f"  Processing {len(cache_keys)} items (with cache): {cached_time:.3f}s")
print(f"  Cache info: {optimizer.cached_expensive_operation.cache_info()}")

# Optimization recommendations
print("\nüí° Optimization Recommendations:")
print("=" * 60)
recommendations = [
    "Use parallel processing for I/O-bound operations",
    "Implement caching for expensive API calls",
    "Batch database operations to reduce overhead",
    "Use connection pooling for network requests",
    "Implement circuit breakers for external services",
    "Use async/await for concurrent operations",
    "Profile code to identify bottlenecks"
]

for i, rec in enumerate(recommendations, 1):
    print(f"{i}. {rec}")

## 10. Exercises {#exercises}

Practice advanced integration techniques.

### Exercise 1: Custom Integration
Create a custom integration between ghops and a third-party service.

In [None]:
# TODO: Create a custom integration
# Requirements:
# 1. Define a new platform or service
# 2. Implement authentication
# 3. Create API client with rate limiting
# 4. Build custom actions
# 5. Handle errors gracefully

class MyCustomIntegration:
    """Your custom integration here"""
    pass

# Test your integration

### Exercise 2: Event-Driven Workflow
Design an event-driven workflow that responds to repository events.

In [None]:
# TODO: Create an event-driven workflow
# Requirements:
# 1. Define custom events
# 2. Create event handlers
# 3. Implement event filtering
# 4. Add error recovery
# 5. Create event analytics

# Your implementation here

### Exercise 3: Performance Tuning
Optimize a slow integration workflow.

In [None]:
# TODO: Optimize this slow workflow
# Current workflow processes repositories one by one
# Goal: Achieve 5x speedup

def slow_workflow(repos: List[str]):
    """Slow workflow that needs optimization"""
    results = []
    for repo in repos:
        # Slow operations
        status = check_status(repo)  # 1s
        metrics = collect_metrics(repo)  # 2s
        analysis = analyze_code(repo)  # 3s
        results.append({'status': status, 'metrics': metrics, 'analysis': analysis})
    return results

# Your optimized version
def optimized_workflow(repos: List[str]):
    """Your optimized implementation"""
    pass

## Cleanup

In [None]:
# Clean up workspace
import shutil
if 'workspace' in locals() and os.path.exists(workspace):
    shutil.rmtree(workspace)
    print(f"Cleaned up workspace: {workspace}")

## Summary

In this notebook, you learned:
- Combining clustering with workflow automation
- Building custom actions and plugins
- Integrating with CI/CD systems
- Creating event-driven automation
- Cross-platform orchestration
- API integration patterns
- Performance optimization techniques
- Real-world integration scenarios

## Key Takeaways

1. **Intelligent Automation**: Use clustering to drive smart workflow decisions
2. **Extensibility**: Custom actions and plugins extend ghops capabilities
3. **CI/CD Integration**: Seamlessly integrate with existing DevOps pipelines
4. **Event-Driven**: React to changes in real-time with event-driven patterns
5. **Cross-Platform**: Orchestrate operations across multiple services
6. **Performance**: Optimization techniques can provide significant speedups
7. **Real-World**: Apply patterns to solve actual development challenges

## Next Steps

- **Notebook 5**: Data Analysis and Visualization - Deep dive into repository analytics
- Build your own custom integrations
- Contribute to the ghops ecosystem
- Share your workflows with the community