# Aula 6: CI/CD e Automação de Pipelines

## Objetivos de Aprendizagem
- Criar pipelines de ML automatizados
- Implementar versionamento de dados e código
- Automatizar testes de modelos
- Integrar MLFlow com CI/CD
- Implementar validação automatizada

## Exercício Prático
Construir pipeline completo de ML com automação e testes.

## 1. Configuração do Ambiente

In [None]:
import pandas as pd
import numpy as np
import mlflow
import mlflow.sklearn
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.pipeline import Pipeline
import joblib
import json
import hashlib
from datetime import datetime
import os
import warnings
warnings.filterwarnings('ignore')

## 2. Pipeline de Dados

### Tarefa 1: Crie pipeline de ingestão e validação de dados

In [None]:
class DataPipeline:
    """Pipeline de processamento de dados"""
    
    def __init__(self, test_size=0.2, random_state=42):
        self.test_size = test_size
        self.random_state = random_state
        self.data_hash = None
        self.validation_results = {}
    
    def load_data(self):
        """Carregar dados"""
        print("[1/5] Carregando dados...")
        data = load_diabetes()
        X = pd.DataFrame(data.data, columns=data.feature_names)
        y = data.target
        return X, y
    
    def validate_data(self, X, y):
        """Validar qualidade dos dados"""
        print("[2/5] Validando dados...")
        validations = {}
        
        # Verificar valores nulos
        null_count = X.isnull().sum().sum()
        validations['no_nulls'] = null_count == 0
        
        # Verificar dimensões
        validations['correct_shape'] = len(X) == len(y)
        validations['min_samples'] = len(X) >= 100
        
        # Verificar tipos de dados
        validations['numeric_features'] = all(X.dtypes.apply(lambda x: np.issubdtype(x, np.number)))
        
        # Verificar outliers extremos
        z_scores = np.abs((X - X.mean()) / X.std())
        extreme_outliers = (z_scores > 5).sum().sum()
        validations['no_extreme_outliers'] = extreme_outliers == 0
        
        self.validation_results = validations
        all_passed = all(validations.values())
        
        print(f"  Validações: {sum(validations.values())}/{len(validations)} passed")
        return all_passed, validations
    
    def compute_data_hash(self, X, y):
        """Computar hash dos dados para versionamento"""
        print("[3/5] Computando hash dos dados...")
        data_str = str(X.values.tobytes()) + str(y.tobytes())
        self.data_hash = hashlib.md5(data_str.encode()).hexdigest()[:8]
        print(f"  Data hash: {self.data_hash}")
        return self.data_hash
    
    def split_data(self, X, y):
        """Dividir dados em treino e teste"""
        print("[4/5] Dividindo dados...")
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=self.test_size, random_state=self.random_state
        )
        print(f"  Train: {X_train.shape}, Test: {X_test.shape}")
        return X_train, X_test, y_train, y_test
    
    def execute(self):
        """Executar pipeline completo"""
        print("\n=== EXECUTANDO DATA PIPELINE ===")
        
        # Carregar
        X, y = self.load_data()
        
        # Validar
        valid, validations = self.validate_data(X, y)
        if not valid:
            raise ValueError(f"Validação de dados falhou: {validations}")
        
        # Hash
        data_hash = self.compute_data_hash(X, y)
        
        # Split
        X_train, X_test, y_train, y_test = self.split_data(X, y)
        
        print("[5/5] Pipeline de dados concluído!\n")
        
        return {
            'X_train': X_train,
            'X_test': X_test,
            'y_train': y_train,
            'y_test': y_test,
            'data_hash': data_hash,
            'validations': validations
        }

# Executar pipeline
data_pipeline = DataPipeline()
data = data_pipeline.execute()

## 3. Pipeline de Treinamento

### Tarefa 2: Crie pipeline de treinamento automatizado

In [None]:
class TrainingPipeline:
    """Pipeline de treinamento de modelo"""
    
    def __init__(self, experiment_name="cicd_pipeline"):
        self.experiment_name = experiment_name
        mlflow.set_experiment(experiment_name)
    
    def create_model(self, params):
        """Criar modelo com pipeline"""
        print("[1/4] Criando modelo...")
        pipeline = Pipeline([
            ('scaler', StandardScaler()),
            ('regressor', RandomForestRegressor(**params))
        ])
        return pipeline
    
    def train_model(self, pipeline, X_train, y_train):
        """Treinar modelo"""
        print("[2/4] Treinando modelo...")
        pipeline.fit(X_train, y_train)
        print("  Treinamento concluído")
        return pipeline
    
    def evaluate_model(self, pipeline, X_train, y_train, X_test, y_test):
        """Avaliar modelo"""
        print("[3/4] Avaliando modelo...")
        
        # Previsões
        y_train_pred = pipeline.predict(X_train)
        y_test_pred = pipeline.predict(X_test)
        
        # Métricas
        metrics = {
            'train_rmse': np.sqrt(mean_squared_error(y_train, y_train_pred)),
            'test_rmse': np.sqrt(mean_squared_error(y_test, y_test_pred)),
            'train_r2': r2_score(y_train, y_train_pred),
            'test_r2': r2_score(y_test, y_test_pred),
            'train_mae': mean_absolute_error(y_train, y_train_pred),
            'test_mae': mean_absolute_error(y_test, y_test_pred)
        }
        
        # Cross-validation
        cv_scores = cross_val_score(
            pipeline, X_train, y_train, 
            cv=5, scoring='neg_mean_squared_error'
        )
        metrics['cv_rmse_mean'] = np.sqrt(-cv_scores.mean())
        metrics['cv_rmse_std'] = np.sqrt(cv_scores.std())
        
        print(f"  Test RMSE: {metrics['test_rmse']:.2f}")
        print(f"  Test R²: {metrics['test_r2']:.4f}")
        
        return metrics
    
    def execute(self, X_train, X_test, y_train, y_test, params, data_hash):
        """Executar pipeline de treinamento"""
        print("\n=== EXECUTANDO TRAINING PIPELINE ===")
        
        with mlflow.start_run(run_name=f"automated_training_{data_hash}") as run:
            # Criar e treinar
            pipeline = self.create_model(params)
            pipeline = self.train_model(pipeline, X_train, y_train)
            
            # Avaliar
            metrics = self.evaluate_model(pipeline, X_train, y_train, X_test, y_test)
            
            # Registrar no MLFlow
            print("[4/4] Registrando no MLFlow...")
            mlflow.log_params(params)
            mlflow.log_params({'data_hash': data_hash})
            mlflow.log_metrics(metrics)
            mlflow.sklearn.log_model(pipeline, "model")
            
            print("Pipeline de treinamento concluído!\n")
            
            return {
                'pipeline': pipeline,
                'metrics': metrics,
                'run_id': run.info.run_id
            }

# Executar pipeline de treinamento
training_pipeline = TrainingPipeline()
params = {
    'n_estimators': 100,
    'max_depth': 10,
    'random_state': 42
}
training_result = training_pipeline.execute(
    data['X_train'], data['X_test'], 
    data['y_train'], data['y_test'],
    params, data['data_hash']
)

## 4. Pipeline de Testes

### Tarefa 3: Implemente testes automatizados

In [None]:
class ModelTestPipeline:
    """Pipeline de testes de modelo"""
    
    def __init__(self):
        self.test_results = {}
    
    def test_model_performance(self, metrics, thresholds):
        """Testar performance do modelo"""
        tests = {}
        tests['test_r2_above_threshold'] = metrics['test_r2'] >= thresholds['min_r2']
        tests['test_rmse_below_threshold'] = metrics['test_rmse'] <= thresholds['max_rmse']
        tests['no_overfitting'] = abs(metrics['train_r2'] - metrics['test_r2']) <= thresholds['max_overfit_gap']
        return tests
    
    def test_model_stability(self, metrics):
        """Testar estabilidade do modelo"""
        tests = {}
        tests['cv_low_variance'] = metrics['cv_rmse_std'] < metrics['cv_rmse_mean'] * 0.2
        return tests
    
    def test_model_predictions(self, model, X_test, y_test):
        """Testar previsões do modelo"""
        tests = {}
        predictions = model.predict(X_test)
        
        # Verificar se previsões são válidas
        tests['predictions_not_null'] = not np.any(np.isnan(predictions))
        tests['predictions_not_inf'] = not np.any(np.isinf(predictions))
        
        # Verificar range das previsões
        y_range = y_test.max() - y_test.min()
        pred_range = predictions.max() - predictions.min()
        tests['predictions_reasonable_range'] = abs(pred_range - y_range) / y_range < 0.5
        
        return tests
    
    def test_model_interface(self, model, X_test):
        """Testar interface do modelo"""
        tests = {}
        
        # Testar predict
        try:
            _ = model.predict(X_test.iloc[:1])
            tests['predict_single_sample'] = True
        except:
            tests['predict_single_sample'] = False
        
        # Testar batch predict
        try:
            _ = model.predict(X_test)
            tests['predict_batch'] = True
        except:
            tests['predict_batch'] = False
        
        return tests
    
    def execute(self, model, metrics, X_test, y_test):
        """Executar todos os testes"""
        print("\n=== EXECUTANDO MODEL TEST PIPELINE ===")
        
        thresholds = {
            'min_r2': 0.3,
            'max_rmse': 100,
            'max_overfit_gap': 0.15
        }
        
        all_tests = {}
        
        # Performance tests
        print("[1/4] Testando performance...")
        perf_tests = self.test_model_performance(metrics, thresholds)
        all_tests.update({f'perf_{k}': v for k, v in perf_tests.items()})
        
        # Stability tests
        print("[2/4] Testando estabilidade...")
        stab_tests = self.test_model_stability(metrics)
        all_tests.update({f'stab_{k}': v for k, v in stab_tests.items()})
        
        # Prediction tests
        print("[3/4] Testando previsões...")
        pred_tests = self.test_model_predictions(model, X_test, y_test)
        all_tests.update({f'pred_{k}': v for k, v in pred_tests.items()})
        
        # Interface tests
        print("[4/4] Testando interface...")
        intf_tests = self.test_model_interface(model, X_test)
        all_tests.update({f'intf_{k}': v for k, v in intf_tests.items()})
        
        self.test_results = all_tests
        
        # Resumo
        total_tests = len(all_tests)
        passed_tests = sum(all_tests.values())
        
        print(f"\nResultado: {passed_tests}/{total_tests} testes passaram")
        
        # Mostrar testes que falharam
        failed = {k: v for k, v in all_tests.items() if not v}
        if failed:
            print("\n⚠️  Testes que falharam:")
            for test_name in failed.keys():
                print(f"  - {test_name}")
        else:
            print("\n✅ Todos os testes passaram!")
        
        return {
            'all_passed': all(all_tests.values()),
            'passed_count': passed_tests,
            'total_count': total_tests,
            'results': all_tests
        }

# Executar testes
test_pipeline = ModelTestPipeline()
test_results = test_pipeline.execute(
    training_result['pipeline'],
    training_result['metrics'],
    data['X_test'],
    data['y_test']
)

## 5. Pipeline de Deployment

### Tarefa 4: Automatize o deployment

In [None]:
class DeploymentPipeline:
    """Pipeline de deployment automatizado"""
    
    def __init__(self, model_dir="/tmp/models"):
        self.model_dir = model_dir
        os.makedirs(model_dir, exist_ok=True)
    
    def validate_for_deployment(self, test_results):
        """Validar se modelo está pronto para deployment"""
        print("[1/4] Validando para deployment...")
        
        if not test_results['all_passed']:
            print("  ❌ Modelo reprovou em testes")
            return False
        
        print("  ✅ Modelo aprovado em todos os testes")
        return True
    
    def package_model(self, model, metadata):
        """Empacotar modelo para deployment"""
        print("[2/4] Empacotando modelo...")
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_name = f"model_{metadata['data_hash']}_{timestamp}"
        
        # Salvar modelo
        model_path = os.path.join(self.model_dir, f"{model_name}.pkl")
        joblib.dump(model, model_path)
        
        # Salvar metadata
        metadata_path = os.path.join(self.model_dir, f"{model_name}_metadata.json")
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2)
        
        print(f"  Modelo salvo: {model_name}")
        
        return {
            'model_path': model_path,
            'metadata_path': metadata_path,
            'model_name': model_name
        }
    
    def create_deployment_manifest(self, package_info, test_results, metrics):
        """Criar manifesto de deployment"""
        print("[3/4] Criando manifesto de deployment...")
        
        manifest = {
            'deployment_timestamp': datetime.now().isoformat(),
            'model_name': package_info['model_name'],
            'model_path': package_info['model_path'],
            'test_results': {
                'all_passed': test_results['all_passed'],
                'passed_count': test_results['passed_count'],
                'total_count': test_results['total_count']
            },
            'performance_metrics': {
                'test_r2': metrics['test_r2'],
                'test_rmse': metrics['test_rmse'],
                'test_mae': metrics['test_mae']
            },
            'status': 'ready_for_production'
        }
        
        manifest_path = os.path.join(self.model_dir, f"{package_info['model_name']}_manifest.json")
        with open(manifest_path, 'w') as f:
            json.dump(manifest, f, indent=2)
        
        return manifest
    
    def execute(self, model, test_results, metrics, data_hash):
        """Executar pipeline de deployment"""
        print("\n=== EXECUTANDO DEPLOYMENT PIPELINE ===")
        
        # Validar
        if not self.validate_for_deployment(test_results):
            print("\n❌ Deployment abortado: modelo reprovou em testes")
            return None
        
        # Empacotar
        metadata = {
            'data_hash': data_hash,
            'metrics': {k: float(v) for k, v in metrics.items()},
            'creation_date': datetime.now().isoformat()
        }
        package_info = self.package_model(model, metadata)
        
        # Criar manifesto
        manifest = self.create_deployment_manifest(package_info, test_results, metrics)
        
        print("[4/4] Deployment concluído!")
        print(f"\n✅ Modelo pronto para produção: {package_info['model_name']}\n")
        
        return {
            'package_info': package_info,
            'manifest': manifest
        }

# Executar deployment
deployment_pipeline = DeploymentPipeline()
deployment_result = deployment_pipeline.execute(
    training_result['pipeline'],
    test_results,
    training_result['metrics'],
    data['data_hash']
)

## 6. Pipeline Completo Orquestrado

### Tarefa 5: Integre todos os pipelines

In [None]:
class MLPipelineOrchestrator:
    """Orquestrador de pipeline completo de ML"""
    
    def __init__(self, config):
        self.config = config
        self.results = {}
    
    def execute(self):
        """Executar pipeline completo"""
        print("\n" + "="*60)
        print("    INICIANDO PIPELINE COMPLETO DE ML")
        print("="*60)
        
        try:
            # 1. Data Pipeline
            data_pipeline = DataPipeline(
                test_size=self.config['test_size'],
                random_state=self.config['random_state']
            )
            data = data_pipeline.execute()
            self.results['data'] = data
            
            # 2. Training Pipeline
            training_pipeline = TrainingPipeline(
                experiment_name=self.config['experiment_name']
            )
            training_result = training_pipeline.execute(
                data['X_train'], data['X_test'],
                data['y_train'], data['y_test'],
                self.config['model_params'],
                data['data_hash']
            )
            self.results['training'] = training_result
            
            # 3. Test Pipeline
            test_pipeline = ModelTestPipeline()
            test_results = test_pipeline.execute(
                training_result['pipeline'],
                training_result['metrics'],
                data['X_test'],
                data['y_test']
            )
            self.results['tests'] = test_results
            
            # 4. Deployment Pipeline
            deployment_pipeline = DeploymentPipeline(
                model_dir=self.config['model_dir']
            )
            deployment_result = deployment_pipeline.execute(
                training_result['pipeline'],
                test_results,
                training_result['metrics'],
                data['data_hash']
            )
            self.results['deployment'] = deployment_result
            
            print("\n" + "="*60)
            print("    PIPELINE COMPLETO EXECUTADO COM SUCESSO")
            print("="*60)
            
            return self.results
            
        except Exception as e:
            print(f"\n❌ Erro no pipeline: {str(e)}")
            raise

# Configuração do pipeline
config = {
    'experiment_name': 'automated_cicd_pipeline',
    'test_size': 0.2,
    'random_state': 42,
    'model_params': {
        'n_estimators': 100,
        'max_depth': 10,
        'random_state': 42
    },
    'model_dir': '/tmp/deployed_models'
}

# Executar pipeline orquestrado
orchestrator = MLPipelineOrchestrator(config)
results = orchestrator.execute()

## 7. Simulação de CI/CD com GitHub Actions

### Tarefa 6: Crie arquivo de configuração CI/CD

In [None]:
# Criar exemplo de GitHub Actions workflow
github_actions_workflow = """
name: ML Pipeline CI/CD

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]
  schedule:
    - cron: '0 0 * * 0'  # Semanal

jobs:
  train-and-deploy:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v2
    
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.8'
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
    
    - name: Run data pipeline
      run: |
        python scripts/data_pipeline.py
    
    - name: Train model
      run: |
        python scripts/train_model.py
    
    - name: Run tests
      run: |
        python -m pytest tests/
    
    - name: Deploy model
      if: success()
      run: |
        python scripts/deploy_model.py
      env:
        MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
"""

# Salvar workflow
workflow_dir = "/tmp/github_workflows"
os.makedirs(workflow_dir, exist_ok=True)
with open(f"{workflow_dir}/ml_pipeline.yml", 'w') as f:
    f.write(github_actions_workflow)

print("GitHub Actions workflow criado:")
print(github_actions_workflow)

## 8. Relatório de Execução

### Tarefa 7: Gere relatório completo

In [None]:
def generate_pipeline_report(results):
    """Gerar relatório completo do pipeline"""
    
    report = f"""
╔══════════════════════════════════════════════════════════╗
║           RELATÓRIO DE EXECUÇÃO DO PIPELINE             ║
╚══════════════════════════════════════════════════════════╝

Data/Hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1. DATA PIPELINE
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Data Hash:       {results['data']['data_hash']}
Train Samples:   {len(results['data']['X_train'])}
Test Samples:    {len(results['data']['X_test'])}
Features:        {results['data']['X_train'].shape[1]}

Validações:
"""
    
    for validation, passed in results['data']['validations'].items():
        status = "✅" if passed else "❌"
        report += f"  {status} {validation}\n"
    
    metrics = results['training']['metrics']
    report += f"""
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2. TRAINING PIPELINE
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Run ID:          {results['training']['run_id']}

Métricas:
  Train RMSE:    {metrics['train_rmse']:.2f}
  Test RMSE:     {metrics['test_rmse']:.2f}
  Train R²:      {metrics['train_r2']:.4f}
  Test R²:       {metrics['test_r2']:.4f}
  CV RMSE:       {metrics['cv_rmse_mean']:.2f} ± {metrics['cv_rmse_std']:.2f}

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
3. TEST PIPELINE
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Testes Executados: {results['tests']['total_count']}
Testes Aprovados:  {results['tests']['passed_count']}
Status:            {'✅ APROVADO' if results['tests']['all_passed'] else '❌ REPROVADO'}
"""
    
    if results['deployment']:
        report += f"""
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
4. DEPLOYMENT PIPELINE
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Status:          ✅ DEPLOYED
Model Name:      {results['deployment']['package_info']['model_name']}
Model Path:      {results['deployment']['package_info']['model_path']}
Deployment Time: {results['deployment']['manifest']['deployment_timestamp']}
"""
    else:
        report += """
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
4. DEPLOYMENT PIPELINE
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Status:          ❌ NOT DEPLOYED (testes falharam)
"""
    
    report += """
╔══════════════════════════════════════════════════════════╗
║                   FIM DO RELATÓRIO                       ║
╚══════════════════════════════════════════════════════════╝
"""
    
    return report

# Gerar relatório
report = generate_pipeline_report(results)
print(report)

# Salvar relatório
with open('/tmp/pipeline_report.txt', 'w', encoding='utf-8') as f:
    f.write(report)

## 9. Exercícios Adicionais

### Desafios para Praticar:

1. **DVC Integration**: Use DVC para versionamento de dados:
   ```bash
   dvc init
   dvc add data/
   git add data.dvc .gitignore
   ```

2. **Pre-commit Hooks**: Configure hooks para validar código antes de commit
3. **Docker**: Containerize todo o pipeline
4. **Kubernetes**: Deploy o pipeline em cluster K8s
5. **Airflow**: Orquestre o pipeline com Apache Airflow
6. **Model Versioning**: Implemente versionamento semântico para modelos
7. **Rollback Automático**: Implemente rollback se novo modelo falhar

### Estrutura de Projeto Recomendada:

```
ml-project/
├── .github/
│   └── workflows/
│       └── ml_pipeline.yml
├── data/
│   ├── raw/
│   └── processed/
├── models/
├── notebooks/
├── scripts/
│   ├── data_pipeline.py
│   ├── train_model.py
│   ├── test_model.py
│   └── deploy_model.py
├── tests/
│   ├── test_data.py
│   ├── test_model.py
│   └── test_api.py
├── requirements.txt
├── setup.py
└── README.md
```

### Questões para Reflexão:

1. Como garantir reprodutibilidade em pipelines de ML?
2. Quais testes são essenciais antes de deployment?
3. Como lidar com pipelines de longa duração?
4. Quando usar orquestração vs. simples scripts?
5. Como monitorar pipelines em produção?