# EDC-MCP-LLM Integration - Interactive Testing Notebook

**Autore:** Lorenzo - Principal Data Architect @ NTT Data Italia

Questo notebook permette di testare interattivamente le funzionalita del sistema EDC-MCP-LLM:
- Ricerca asset nel catalogo EDC
- Recupero dettagli asset con enhancement AI
- Costruzione alberi di lineage
- Analisi impatto modifiche
- Generazione checklist operative
- Arricchimento documentazione

---

## 1. Setup Ambiente

Installiamo le dipendenze necessarie e configuriamo l'ambiente.

In [None]:
# Installazione dipendenze
!pip install -q aiohttp anthropic pydantic pydantic-settings python-dotenv nest_asyncio

In [None]:
# Import necessari
import asyncio
import aiohttp
import base64
import json
import ssl
import csv
from io import StringIO
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from enum import Enum
import warnings

# Per eseguire codice async in Colab/Jupyter
import nest_asyncio
nest_asyncio.apply()

# Disabilita warning SSL
warnings.filterwarnings('ignore')

print("[OK] Import completati")

## 2. Configurazione

Inserisci le tue credenziali EDC e le configurazioni LLM.

**IMPORTANTE:** Non salvare il notebook con le credenziali reali!

In [None]:
# ============================================
# CONFIGURAZIONE - MODIFICA QUESTI VALORI
# ============================================

@dataclass
class EDCConfig:
    """Configurazione EDC."""
    base_url: str = "https://edc.collaudo.servizi.allitude.it:9086/access"
    username: str = "Administrator"
    password: str = "YOUR_PASSWORD_HERE"  # <-- INSERISCI LA PASSWORD
    api_version: str = "2"
    request_timeout: int = 30
    
    @property
    def authorization_header(self) -> str:
        credentials = f"{self.username}:{self.password}"
        encoded = base64.b64encode(credentials.encode()).decode()
        return f"Basic {encoded}"
    
    @property
    def headers(self) -> dict:
        return {
            "Authorization": self.authorization_header,
            "Accept": "application/json",
            "Content-Type": "application/json",
            "User-Agent": "EDC-MCP-LLM Notebook"
        }


class LLMProvider(str, Enum):
    TINYLLAMA = "tinyllama"
    CLAUDE = "claude"
    GEMMA3 = "gemma3"
    MOCK = "mock"  # Per testing senza LLM


@dataclass
class LLMConfig:
    """Configurazione LLM."""
    provider: LLMProvider = LLMProvider.MOCK
    claude_api_key: str = ""  # <-- INSERISCI SE USI CLAUDE
    ollama_base_url: str = "http://localhost:11434"  # Per TinyLlama/Gemma3 locale
    model_name: str = "claude-sonnet-4-20250514"
    max_tokens: int = 4000
    temperature: float = 0.1


# Istanzia configurazioni
edc_config = EDCConfig()
llm_config = LLMConfig()

print("[OK] Configurazione caricata")
print(f"    EDC URL: {edc_config.base_url}")
print(f"    LLM Provider: {llm_config.provider.value}")

In [None]:
# Opzionale: Carica credenziali da Google Colab Secrets
# Decommentare se usi Colab con secrets configurati

# from google.colab import userdata
# edc_config.password = userdata.get('EDC_PASSWORD')
# llm_config.claude_api_key = userdata.get('CLAUDE_API_KEY')
# print("[OK] Credenziali caricate da Colab Secrets")

## 3. Client EDC

Classe client per interagire con le API EDC.

In [None]:
class EDCClient:
    """
    Client per API EDC.
    Versione semplificata per notebook.
    """
    
    def __init__(self, config: EDCConfig):
        self.config = config
        self.session: Optional[aiohttp.ClientSession] = None
        self._cache: Dict[str, Any] = {}
        self._stats = {
            'total_requests': 0,
            'cache_hits': 0,
            'api_errors': 0
        }
    
    async def _ensure_session(self):
        if self.session is None or self.session.closed:
            timeout = aiohttp.ClientTimeout(total=self.config.request_timeout)
            connector = aiohttp.TCPConnector(ssl=False, limit=10)
            self.session = aiohttp.ClientSession(
                headers=self.config.headers,
                timeout=timeout,
                connector=connector
            )
    
    async def bulk_search_assets(
        self,
        resource_name: str,
        name_filter: Optional[str] = None,
        asset_type: str = "com.infa.ldm.relational.Table"
    ) -> List[Dict]:
        """Cerca asset usando API bulk."""
        await self._ensure_session()
        
        bulk_url = f"{self.config.base_url}/1/catalog/data/bulk"
        
        params = {
            'resourceName': resource_name,
            'classTypes': asset_type,
            'facts': 'id,core.name,core.classType',
            'includeRefObjects': 'true'
        }
        
        self._stats['total_requests'] += 1
        
        try:
            async with self.session.get(bulk_url, params=params) as response:
                if response.status != 200:
                    error_text = await response.text()
                    raise Exception(f"API error {response.status}: {error_text[:200]}")
                
                csv_text = await response.text()
                csv_reader = csv.DictReader(StringIO(csv_text))
                items = list(csv_reader)
                
                # Filtra per nome se specificato
                if name_filter:
                    items = [
                        item for item in items
                        if name_filter.upper() in (item.get('core.name') or item.get('id', '')).upper()
                    ]
                
                # Arricchisci risultati
                results = []
                for item in items:
                    asset_id = item.get('id', '')
                    results.append({
                        'id': asset_id,
                        'name': item.get('core.name') or asset_id.split('/')[-1],
                        'classType': item.get('core.classType', 'Unknown'),
                        **item
                    })
                
                return results
                
        except Exception as e:
            self._stats['api_errors'] += 1
            raise
    
    async def get_asset_details(self, asset_id: str) -> Dict[str, Any]:
        """Recupera dettagli di un asset specifico."""
        await self._ensure_session()
        
        # Check cache
        if asset_id in self._cache:
            self._stats['cache_hits'] += 1
            return self._cache[asset_id]
        
        self._stats['total_requests'] += 1
        
        # URL per API objects
        objects_url = f"{self.config.base_url}/{self.config.api_version}/catalog/data/objects"
        
        params = [
            ('associations', 'core.DataSetDataFlow'),
            ('associations', 'core.DirectionalDataFlow'),
            ('includeDstLinks', 'true'),
            ('includeRefObjects', 'true'),
            ('includeSrcLinks', 'true'),
            ('offset', '0'),
            ('pageSize', '500'),
            ('q', f'id:{asset_id}')
        ]
        
        try:
            async with self.session.get(objects_url, params=params) as response:
                response.raise_for_status()
                data = await response.json()
                
                items = data.get('items', [])
                
                if not items:
                    name_from_id = asset_id.split('/')[-1] if '/' in asset_id else asset_id
                    result = {
                        'asset_id': asset_id,
                        'name': name_from_id,
                        'classType': 'Unknown',
                        'description': '',
                        'src_links': [],
                        'dst_links': [],
                        'facts': []
                    }
                else:
                    item = items[0]
                    
                    # Estrai nome
                    name = (
                        item.get('name') or
                        item.get('core.name') or
                        asset_id.split('/')[-1]
                    )
                    
                    # Estrai descrizione dai facts
                    description = ""
                    facts = item.get('facts', [])
                    for fact in facts:
                        attr_id = fact.get('attributeId', '')
                        if 'description' in attr_id.lower():
                            description = fact.get('value', '')
                            break
                    
                    # Processa links
                    src_links = []
                    for link in item.get('srcLinks', []):
                        if link.get('id'):
                            src_links.append({
                                'id': link['id'],
                                'name': link.get('name', link['id'].split('/')[-1]),
                                'classType': link.get('classType', 'Unknown'),
                                'association': link.get('association', '')
                            })
                    
                    dst_links = []
                    for link in item.get('dstLinks', []):
                        if link.get('id'):
                            dst_links.append({
                                'id': link['id'],
                                'name': link.get('name', link['id'].split('/')[-1]),
                                'classType': link.get('classType', 'Unknown'),
                                'association': link.get('association', '')
                            })
                    
                    result = {
                        'asset_id': asset_id,
                        'name': name,
                        'classType': item.get('classType', 'Unknown'),
                        'description': description,
                        'src_links': src_links,
                        'dst_links': dst_links,
                        'facts': facts,
                        'metadata': item
                    }
                
                self._cache[asset_id] = result
                return result
                
        except Exception as e:
            self._stats['api_errors'] += 1
            raise
    
    def get_statistics(self) -> Dict[str, int]:
        """Ritorna statistiche."""
        return self._stats.copy()
    
    def clear_cache(self):
        """Pulisce la cache."""
        self._cache.clear()
    
    async def close(self):
        """Chiude la sessione."""
        if self.session and not self.session.closed:
            await self.session.close()


# Istanzia client
edc_client = EDCClient(edc_config)
print("[OK] EDC Client creato")

## 4. Client LLM

Client per l'enhancement AI (supporta Claude, TinyLlama/Ollama, e modalita Mock per testing).

In [None]:
class BaseLLMClient:
    """Base class per LLM clients."""
    
    async def enhance_description(self, asset_name: str, technical_desc: str, **kwargs) -> str:
        raise NotImplementedError
    
    async def analyze_change_impact(self, source_asset: str, change_type: str, **kwargs) -> Dict:
        raise NotImplementedError
    
    async def generate_change_checklist(self, impact_analysis: Dict) -> Dict:
        raise NotImplementedError
    
    async def enhance_documentation(self, asset_info: Dict, **kwargs) -> Dict:
        raise NotImplementedError


class MockLLMClient(BaseLLMClient):
    """Mock LLM per testing senza API esterne."""
    
    async def enhance_description(self, asset_name: str, technical_desc: str, **kwargs) -> str:
        return f"[MOCK] Descrizione arricchita per {asset_name}: Asset di tipo enterprise data utilizzato per operazioni business critiche."
    
    async def analyze_change_impact(self, source_asset: str, change_type: str, **kwargs) -> Dict:
        return {
            'risk_level': 'MEDIUM',
            'business_impact': f'[MOCK] La modifica {change_type} su {source_asset} potrebbe impattare i processi downstream.',
            'technical_impact': '[MOCK] Verificare integrita referenziale e dipendenze.',
            'recommendations': [
                'Eseguire backup prima della modifica',
                'Testare in ambiente non-produttivo',
                'Notificare i team downstream'
            ],
            'testing_strategy': [
                'Unit test sui dati',
                'Integration test'
            ]
        }
    
    async def generate_change_checklist(self, impact_analysis: Dict) -> Dict:
        return {
            'governance_tasks': ['[MOCK] Sottometti change request', '[MOCK] Ottieni approvazione'],
            'pre_change_tasks': ['[MOCK] Backup dati', '[MOCK] Prepara rollback'],
            'execution_tasks': ['[MOCK] Implementa modifica', '[MOCK] Verifica risultati'],
            'validation_tasks': ['[MOCK] Test qualita', '[MOCK] Valida lineage'],
            'rollback_procedures': ['[MOCK] Restore da backup'],
            'stakeholder_notifications': ['[MOCK] Notifica team'],
            'monitoring_tasks': ['[MOCK] Monitora 24h']
        }
    
    async def enhance_documentation(self, asset_info: Dict, **kwargs) -> Dict:
        return {
            'enhanced_description': f"[MOCK] {asset_info.get('name', 'Asset')} e un componente critico del data layer.",
            'business_purpose': '[MOCK] Supporta operazioni business enterprise.',
            'suggested_tags': ['data-governance', 'enterprise', 'managed'],
            'suggested_quality_rules': ['Completezza dati', 'Validazione formati'],
            'compliance_notes': '[MOCK] Verificare requisiti GDPR.'
        }


class ClaudeLLMClient(BaseLLMClient):
    """Client Claude via Anthropic API."""
    
    def __init__(self, config: LLMConfig):
        self.config = config
        try:
            import anthropic
            import httpx
            
            http_client = httpx.Client(
                timeout=httpx.Timeout(60.0),
                verify=False
            )
            self.client = anthropic.Anthropic(
                api_key=config.claude_api_key,
                http_client=http_client
            )
        except ImportError:
            raise ImportError("Installa anthropic: pip install anthropic")
    
    async def _call_llm(self, prompt: str, system_msg: str = None) -> str:
        try:
            message = self.client.messages.create(
                model=self.config.model_name,
                max_tokens=self.config.max_tokens,
                temperature=self.config.temperature,
                system=system_msg or "Sei un esperto di data governance.",
                messages=[{"role": "user", "content": prompt}]
            )
            return message.content[0].text
        except Exception as e:
            return f"Errore Claude: {str(e)}"
    
    async def enhance_description(self, asset_name: str, technical_desc: str, **kwargs) -> str:
        prompt = f"""Arricchisci questa descrizione tecnica con contesto business.

Asset: {asset_name}
Descrizione: {technical_desc or 'Non disponibile'}

Fornisci una descrizione business-friendly in italiano (max 4 frasi)."""
        return await self._call_llm(prompt)
    
    async def analyze_change_impact(self, source_asset: str, change_type: str, **kwargs) -> Dict:
        downstream = kwargs.get('affected_lineage', {}).get('downstream', [])
        description = kwargs.get('change_details', {}).get('description', '')
        
        prompt = f"""Analizza l'impatto di questa modifica:

Asset: {source_asset}
Tipo: {change_type}
Descrizione: {description}
Asset downstream: {len(downstream)}

Fornisci: rischio (LOW/MEDIUM/HIGH/CRITICAL), impatto business, impatto tecnico, raccomandazioni."""
        
        response = await self._call_llm(prompt)
        
        risk = "MEDIUM"
        if "CRITICAL" in response.upper():
            risk = "CRITICAL"
        elif "HIGH" in response.upper():
            risk = "HIGH"
        elif "LOW" in response.upper():
            risk = "LOW"
        
        return {
            'risk_level': risk,
            'business_impact': response[:400],
            'technical_impact': f"Potenziale impatto su {len(downstream)} asset downstream",
            'recommendations': ['Backup prima della modifica', 'Test in ambiente non-prod'],
            'testing_strategy': ['Test unitari', 'Validazione lineage']
        }
    
    async def generate_change_checklist(self, impact_analysis: Dict) -> Dict:
        return {
            'governance_tasks': ['Sottometti change request', 'Ottieni approvazione'],
            'pre_change_tasks': ['Backup dati', 'Prepara rollback'],
            'execution_tasks': ['Implementa modifica', 'Verifica risultati'],
            'validation_tasks': ['Test qualita', 'Valida lineage'],
            'rollback_procedures': ['Restore da backup'],
            'stakeholder_notifications': ['Notifica team'],
            'monitoring_tasks': ['Monitora 24h']
        }
    
    async def enhance_documentation(self, asset_info: Dict, **kwargs) -> Dict:
        prompt = f"""Arricchisci la documentazione per:

Nome: {asset_info.get('name', '')}
Tipo: {asset_info.get('classType', '')}
Descrizione: {asset_info.get('description', 'Non disponibile')}

Fornisci: descrizione arricchita, scopo business, tag suggeriti, regole qualita."""
        
        response = await self._call_llm(prompt)
        
        return {
            'enhanced_description': response[:500],
            'business_purpose': 'Asset enterprise per operazioni data',
            'suggested_tags': ['data-governance', 'enterprise', 'managed'],
            'suggested_quality_rules': ['Completezza', 'Consistenza'],
            'compliance_notes': 'Verificare GDPR'
        }


class OllamaLLMClient(BaseLLMClient):
    """Client per Ollama (TinyLlama/Gemma3)."""
    
    def __init__(self, config: LLMConfig):
        self.config = config
        self.base_url = config.ollama_base_url
        self.model = "tinyllama" if config.provider == LLMProvider.TINYLLAMA else "gemma3:4b"
        self.session: Optional[aiohttp.ClientSession] = None
    
    async def _ensure_session(self):
        if self.session is None or self.session.closed:
            self.session = aiohttp.ClientSession()
    
    async def _call_llm(self, prompt: str) -> str:
        await self._ensure_session()
        
        url = f"{self.base_url}/api/generate"
        payload = {
            "model": self.model,
            "prompt": prompt,
            "stream": False,
            "options": {
                "temperature": self.config.temperature,
                "num_predict": self.config.max_tokens
            }
        }
        
        try:
            async with self.session.post(url, json=payload) as response:
                response.raise_for_status()
                result = await response.json()
                return result.get('response', '')
        except Exception as e:
            return f"Errore Ollama: {str(e)}"
    
    async def enhance_description(self, asset_name: str, technical_desc: str, **kwargs) -> str:
        prompt = f"Descrivi in italiano l'asset {asset_name}: {technical_desc or 'nessuna descrizione'}"
        return await self._call_llm(prompt)
    
    async def analyze_change_impact(self, source_asset: str, change_type: str, **kwargs) -> Dict:
        return {
            'risk_level': 'MEDIUM',
            'business_impact': f'Modifica {change_type} su {source_asset}',
            'technical_impact': 'Verificare dipendenze',
            'recommendations': ['Backup', 'Test'],
            'testing_strategy': ['Unit test']
        }
    
    async def generate_change_checklist(self, impact_analysis: Dict) -> Dict:
        return MockLLMClient().generate_change_checklist.__wrapped__(None, impact_analysis)
    
    async def enhance_documentation(self, asset_info: Dict, **kwargs) -> Dict:
        return {
            'enhanced_description': f"Asset {asset_info.get('name', '')}",
            'business_purpose': 'Data asset',
            'suggested_tags': ['data'],
            'suggested_quality_rules': ['Verifica dati'],
            'compliance_notes': 'Da verificare'
        }


def create_llm_client(config: LLMConfig) -> BaseLLMClient:
    """Factory per creare LLM client."""
    if config.provider == LLMProvider.MOCK:
        return MockLLMClient()
    elif config.provider == LLMProvider.CLAUDE:
        return ClaudeLLMClient(config)
    elif config.provider in [LLMProvider.TINYLLAMA, LLMProvider.GEMMA3]:
        return OllamaLLMClient(config)
    else:
        return MockLLMClient()


# Istanzia client LLM
llm_client = create_llm_client(llm_config)
print(f"[OK] LLM Client creato: {type(llm_client).__name__}")

---

## 5. Test Funzioni EDC

Ora possiamo testare le singole funzioni!

### 5.1 Search Assets

Cerca asset nel catalogo EDC usando l'API bulk.

In [None]:
async def search_assets(
    resource_name: str,
    name_filter: str = "",
    asset_type: str = "com.infa.ldm.relational.Table",
    max_results: int = 10
):
    """
    Cerca asset nel catalogo EDC.
    
    Args:
        resource_name: Nome della risorsa EDC (es: DataPlatform)
        name_filter: Filtro sul nome (case-insensitive)
        asset_type: Tipo di asset da cercare
        max_results: Numero massimo di risultati
    """
    print(f"\n{'='*60}")
    print(f"SEARCH ASSETS")
    print(f"{'='*60}")
    print(f"Resource: {resource_name}")
    print(f"Filter: {name_filter or '(none)'}")
    print(f"Type: {asset_type}")
    print(f"{'='*60}\n")
    
    try:
        results = await edc_client.bulk_search_assets(
            resource_name=resource_name,
            name_filter=name_filter if name_filter else None,
            asset_type=asset_type
        )
        
        results = results[:max_results]
        
        print(f"Trovati {len(results)} asset:\n")
        
        for i, asset in enumerate(results, 1):
            print(f"{i}. {asset.get('name', 'N/A')}")
            print(f"   Type: {asset.get('classType', 'N/A')}")
            print(f"   ID: {asset.get('id', 'N/A')}")
            print()
        
        return results
        
    except Exception as e:
        print(f"[ERRORE] {e}")
        return []

In [None]:
# ESEGUI: Cerca tabelle con GARANZIE nel nome
results = await search_assets(
    resource_name="DataPlatform",
    name_filter="GARANZIE",
    max_results=10
)

In [None]:
# ESEGUI: Cerca tutte le View
results = await search_assets(
    resource_name="DataPlatform",
    name_filter="",
    asset_type="com.infa.ldm.relational.View",
    max_results=5
)

### 5.2 Get Asset Details

Recupera dettagli completi di un asset con enhancement AI.

In [None]:
async def get_asset_details(asset_id: str, enhance_with_ai: bool = True):
    """
    Recupera dettagli di un asset.
    
    Args:
        asset_id: ID completo dell'asset EDC
        enhance_with_ai: Se True, arricchisce la descrizione con AI
    """
    print(f"\n{'='*60}")
    print(f"GET ASSET DETAILS")
    print(f"{'='*60}")
    print(f"Asset ID: {asset_id}")
    print(f"AI Enhancement: {enhance_with_ai}")
    print(f"{'='*60}\n")
    
    try:
        details = await edc_client.get_asset_details(asset_id)
        
        print(f"Nome: {details['name']}")
        print(f"Tipo: {details['classType']}")
        print(f"Descrizione originale: {details['description'] or '(nessuna)'}")
        print(f"\nUpstream links: {len(details['src_links'])}")
        print(f"Downstream links: {len(details['dst_links'])}")
        print(f"Facts: {len(details['facts'])} items")
        
        # Mostra upstream links
        if details['src_links']:
            print(f"\nUpstream (sorgenti):")
            for link in details['src_links'][:5]:
                print(f"  <- {link['name']} ({link['classType']})")
        
        # Mostra downstream links
        if details['dst_links']:
            print(f"\nDownstream (destinazioni):")
            for link in details['dst_links'][:5]:
                print(f"  -> {link['name']} ({link['classType']})")
        
        # Enhancement AI
        if enhance_with_ai:
            print(f"\n--- AI Enhancement ---")
            enhanced = await llm_client.enhance_description(
                asset_name=details['name'],
                technical_desc=details['description']
            )
            print(f"Descrizione arricchita:\n{enhanced}")
        
        return details
        
    except Exception as e:
        print(f"[ERRORE] {e}")
        return None

In [None]:
# ESEGUI: Dettagli di un asset specifico
asset_id = "DataPlatform://ORAC51/DWHEVO/IFR_WK_GARANZIE_SOFFERENZE_DT_AP"

details = await get_asset_details(asset_id, enhance_with_ai=True)

### 5.3 Get Immediate Lineage

Recupera il lineage immediato (1 livello) upstream e/o downstream.

In [None]:
async def get_immediate_lineage(asset_id: str, direction: str = "both"):
    """
    Recupera lineage immediato di un asset.
    
    Args:
        asset_id: ID dell'asset
        direction: "upstream", "downstream", o "both"
    """
    print(f"\n{'='*60}")
    print(f"GET IMMEDIATE LINEAGE")
    print(f"{'='*60}")
    print(f"Asset ID: {asset_id}")
    print(f"Direction: {direction}")
    print(f"{'='*60}\n")
    
    try:
        details = await edc_client.get_asset_details(asset_id)
        
        results = []
        
        if direction in ["upstream", "both"]:
            print(f"UPSTREAM (sorgenti): {len(details['src_links'])} asset")
            print("-" * 40)
            for link in details['src_links']:
                print(f"  <- {link['name']}")
                print(f"     Type: {link['classType']}")
                print(f"     Association: {link['association']}")
                print(f"     ID: {link['id']}")
                print()
                results.append({**link, 'direction': 'upstream'})
        
        if direction in ["downstream", "both"]:
            print(f"\nDOWNSTREAM (destinazioni): {len(details['dst_links'])} asset")
            print("-" * 40)
            for link in details['dst_links']:
                print(f"  -> {link['name']}")
                print(f"     Type: {link['classType']}")
                print(f"     Association: {link['association']}")
                print(f"     ID: {link['id']}")
                print()
                results.append({**link, 'direction': 'downstream'})
        
        return results
        
    except Exception as e:
        print(f"[ERRORE] {e}")
        return []

In [None]:
# ESEGUI: Lineage immediato
asset_id = "DataPlatform://ORAC51/DWHEVO/IFR_WK_GARANZIE_SOFFERENZE_DT_AP"

lineage = await get_immediate_lineage(asset_id, direction="both")

### 5.4 Build Lineage Tree

Costruisce l'albero completo del lineage (multi-livello).

In [None]:
class TreeNode:
    """Nodo dell'albero di lineage."""
    
    def __init__(self, id: str, code: str, name: str = "", class_type: str = ""):
        self.id = id
        self.code = code
        self.name = name
        self.class_type = class_type
        self.children: List['TreeNode'] = []
    
    def get_total_nodes(self) -> int:
        return 1 + sum(c.get_total_nodes() for c in self.children)
    
    def get_max_depth(self) -> int:
        if not self.children:
            return 1
        return 1 + max(c.get_max_depth() for c in self.children)


async def build_lineage_tree(
    asset_id: str,
    direction: str = "upstream",
    max_depth: int = 3,
    visited: set = None,
    current_depth: int = 0,
    code: str = "001"
) -> Optional[TreeNode]:
    """
    Costruisce albero di lineage ricorsivamente.
    
    Args:
        asset_id: ID dell'asset radice
        direction: "upstream" o "downstream"
        max_depth: Profondita massima
    """
    if visited is None:
        visited = set()
    
    # Prevenzione cicli
    if asset_id in visited:
        return None
    
    if current_depth >= max_depth:
        return None
    
    visited.add(asset_id)
    
    try:
        details = await edc_client.get_asset_details(asset_id)
        
        node = TreeNode(
            id=asset_id,
            code=code,
            name=details['name'],
            class_type=details['classType']
        )
        
        # Seleziona links in base alla direzione
        links = details['src_links'] if direction == "upstream" else details['dst_links']
        
        # Ricorsione sui figli
        for i, link in enumerate(links, 1):
            child_code = f"{code}{i:03d}"
            child_node = await build_lineage_tree(
                asset_id=link['id'],
                direction=direction,
                max_depth=max_depth,
                visited=visited,
                current_depth=current_depth + 1,
                code=child_code
            )
            if child_node:
                node.children.append(child_node)
        
        return node
        
    except Exception as e:
        print(f"[Warning] Errore per {asset_id}: {e}")
        return None


def print_tree(node: TreeNode, indent: int = 0):
    """Stampa albero in modo leggibile."""
    prefix = "  " * indent
    print(f"{prefix}[{node.code}] {node.name} ({node.class_type})")
    for child in node.children:
        print_tree(child, indent + 1)

In [None]:
async def get_lineage_tree(asset_id: str, direction: str = "upstream", max_depth: int = 3):
    """
    Costruisce e visualizza albero di lineage.
    """
    print(f"\n{'='*60}")
    print(f"BUILD LINEAGE TREE")
    print(f"{'='*60}")
    print(f"Asset ID: {asset_id}")
    print(f"Direction: {direction}")
    print(f"Max Depth: {max_depth}")
    print(f"{'='*60}\n")
    
    import time
    start_time = time.time()
    
    root = await build_lineage_tree(
        asset_id=asset_id,
        direction=direction,
        max_depth=max_depth
    )
    
    elapsed = time.time() - start_time
    
    if root:
        print(f"Albero costruito in {elapsed:.2f}s\n")
        print(f"Statistiche:")
        print(f"  - Nodi totali: {root.get_total_nodes()}")
        print(f"  - Profondita max: {root.get_max_depth()}")
        print(f"\nStruttura:\n")
        print_tree(root)
    else:
        print("Nessun lineage trovato.")
    
    return root

In [None]:
# ESEGUI: Costruisci albero lineage upstream
asset_id = "DataPlatform://ORAC51/DWHEVO/IFR_WK_GARANZIE_SOFFERENZE_DT_AP"

tree = await get_lineage_tree(asset_id, direction="upstream", max_depth=2)

### 5.5 Analyze Change Impact

Analizza l'impatto di una modifica usando AI.

In [None]:
async def analyze_change_impact(
    asset_id: str,
    change_type: str,
    change_description: str
):
    """
    Analizza l'impatto di una modifica.
    
    Args:
        asset_id: Asset che subira la modifica
        change_type: Tipo di modifica (column_drop, data_type_change, deprecation, etc.)
        change_description: Descrizione dettagliata della modifica
    """
    print(f"\n{'='*60}")
    print(f"ANALYZE CHANGE IMPACT")
    print(f"{'='*60}")
    print(f"Asset ID: {asset_id}")
    print(f"Change Type: {change_type}")
    print(f"Description: {change_description}")
    print(f"{'='*60}\n")
    
    try:
        # Recupera lineage downstream
        details = await edc_client.get_asset_details(asset_id)
        downstream = details['dst_links']
        
        print(f"Asset downstream impattati: {len(downstream)}")
        
        # Analisi AI
        impact = await llm_client.analyze_change_impact(
            source_asset=asset_id,
            change_type=change_type,
            change_details={'description': change_description},
            affected_lineage={'downstream': downstream}
        )
        
        print(f"\n--- RISULTATO ANALISI ---\n")
        print(f"LIVELLO RISCHIO: {impact['risk_level']}")
        print(f"\nImpatto Business:")
        print(f"  {impact['business_impact']}")
        print(f"\nImpatto Tecnico:")
        print(f"  {impact['technical_impact']}")
        print(f"\nRaccomandazioni:")
        for i, rec in enumerate(impact.get('recommendations', []), 1):
            print(f"  {i}. {rec}")
        print(f"\nStrategia di Testing:")
        for i, test in enumerate(impact.get('testing_strategy', []), 1):
            print(f"  {i}. {test}")
        
        return impact
        
    except Exception as e:
        print(f"[ERRORE] {e}")
        return None

In [None]:
# ESEGUI: Analisi impatto
asset_id = "DataPlatform://ORAC51/DWHEVO/IFR_WK_GARANZIE_SOFFERENZE_DT_AP"

impact = await analyze_change_impact(
    asset_id=asset_id,
    change_type="column_drop",
    change_description="Eliminazione della colonna FLAG_ATTIVO per deprecazione"
)

### 5.6 Generate Change Checklist

Genera una checklist operativa per implementare una modifica.

In [None]:
async def generate_change_checklist(
    asset_id: str,
    change_type: str,
    change_description: str
):
    """
    Genera checklist operativa per una modifica.
    """
    print(f"\n{'='*60}")
    print(f"GENERATE CHANGE CHECKLIST")
    print(f"{'='*60}")
    print(f"Asset: {asset_id}")
    print(f"Modifica: {change_type} - {change_description}")
    print(f"{'='*60}\n")
    
    try:
        # Prima genera impact analysis
        details = await edc_client.get_asset_details(asset_id)
        downstream = details['dst_links']
        
        impact = await llm_client.analyze_change_impact(
            source_asset=asset_id,
            change_type=change_type,
            change_details={'description': change_description},
            affected_lineage={'downstream': downstream}
        )
        
        # Genera checklist
        checklist = await llm_client.generate_change_checklist(impact)
        
        print("CHECKLIST OPERATIVA")
        print("=" * 40)
        
        sections = [
            ('governance_tasks', 'Governance e Approvazioni'),
            ('pre_change_tasks', 'Preparazione Pre-Modifica'),
            ('execution_tasks', 'Esecuzione'),
            ('validation_tasks', 'Validazione e Test'),
            ('rollback_procedures', 'Procedure di Rollback'),
            ('stakeholder_notifications', 'Notifiche Stakeholder'),
            ('monitoring_tasks', 'Monitoring Post-Implementazione')
        ]
        
        for key, title in sections:
            tasks = checklist.get(key, [])
            if tasks:
                print(f"\n{title}:")
                for i, task in enumerate(tasks, 1):
                    print(f"  [ ] {i}. {task}")
        
        return checklist
        
    except Exception as e:
        print(f"[ERRORE] {e}")
        return None

In [None]:
# ESEGUI: Genera checklist
checklist = await generate_change_checklist(
    asset_id="DataPlatform://ORAC51/DWHEVO/IFR_WK_GARANZIE_SOFFERENZE_DT_AP",
    change_type="schema_change",
    change_description="Aggiunta nuova colonna DATA_SCADENZA"
)

### 5.7 Enhance Asset Documentation

Arricchisce la documentazione di un asset con AI.

In [None]:
async def enhance_asset_documentation(
    asset_id: str,
    business_domain: str = ""
):
    """
    Arricchisce la documentazione di un asset.
    
    Args:
        asset_id: ID dell'asset
        business_domain: Dominio business (es: GARANZIE, SOFFERENZE)
    """
    print(f"\n{'='*60}")
    print(f"ENHANCE ASSET DOCUMENTATION")
    print(f"{'='*60}")
    print(f"Asset ID: {asset_id}")
    print(f"Business Domain: {business_domain or '(auto-detect)'}")
    print(f"{'='*60}\n")
    
    try:
        # Recupera metadati
        details = await edc_client.get_asset_details(asset_id)
        
        print(f"Asset: {details['name']}")
        print(f"Tipo: {details['classType']}")
        print(f"Descrizione originale: {details['description'] or '(nessuna)'}")
        
        # Enhancement AI
        enhanced = await llm_client.enhance_documentation(
            asset_info=details,
            lineage_context={'upstream': details['src_links']},
            business_context={'domain': business_domain}
        )
        
        print(f"\n--- DOCUMENTAZIONE ARRICCHITA ---\n")
        
        print(f"Descrizione Arricchita:")
        print(f"  {enhanced['enhanced_description']}")
        
        print(f"\nScopo Business:")
        print(f"  {enhanced['business_purpose']}")
        
        print(f"\nTag Suggeriti:")
        print(f"  {', '.join(enhanced['suggested_tags'])}")
        
        print(f"\nRegole Qualita Suggerite:")
        for i, rule in enumerate(enhanced.get('suggested_quality_rules', []), 1):
            print(f"  {i}. {rule}")
        
        print(f"\nNote Compliance:")
        print(f"  {enhanced['compliance_notes']}")
        
        return enhanced
        
    except Exception as e:
        print(f"[ERRORE] {e}")
        return None

In [None]:
# ESEGUI: Arricchisci documentazione
enhanced_docs = await enhance_asset_documentation(
    asset_id="DataPlatform://ORAC51/DWHEVO/IFR_WK_GARANZIE_SOFFERENZE_DT_AP",
    business_domain="GARANZIE"
)

---

## 6. Utility e Statistiche

In [None]:
def show_statistics():
    """Mostra statistiche del sistema."""
    print(f"\n{'='*60}")
    print(f"STATISTICHE SISTEMA")
    print(f"{'='*60}")
    
    stats = edc_client.get_statistics()
    
    print(f"\nEDC Client:")
    print(f"  - Total API calls: {stats['total_requests']}")
    print(f"  - Cache hits: {stats['cache_hits']}")
    print(f"  - API errors: {stats['api_errors']}")
    
    cache_rate = (stats['cache_hits'] / stats['total_requests'] * 100) if stats['total_requests'] > 0 else 0
    print(f"  - Cache hit rate: {cache_rate:.1f}%")
    
    print(f"\nLLM Provider: {llm_config.provider.value}")
    print(f"LLM Client: {type(llm_client).__name__}")


show_statistics()

In [None]:
def switch_llm_provider(provider: str):
    """
    Cambia provider LLM.
    
    Args:
        provider: "mock", "claude", "tinyllama", "gemma3"
    """
    global llm_client, llm_config
    
    provider_map = {
        'mock': LLMProvider.MOCK,
        'claude': LLMProvider.CLAUDE,
        'tinyllama': LLMProvider.TINYLLAMA,
        'gemma3': LLMProvider.GEMMA3
    }
    
    if provider.lower() not in provider_map:
        print(f"Provider non valido. Usa: {list(provider_map.keys())}")
        return
    
    old_provider = llm_config.provider.value
    llm_config.provider = provider_map[provider.lower()]
    llm_client = create_llm_client(llm_config)
    
    print(f"Provider cambiato: {old_provider} -> {llm_config.provider.value}")


# Esempio: switch_llm_provider("claude")

In [None]:
# Pulizia cache
def clear_all_cache():
    edc_client.clear_cache()
    print("Cache pulita!")


# clear_all_cache()

---

## 7. Cleanup

Esegui questa cella alla fine per chiudere le connessioni.

In [None]:
# Chiudi connessioni
async def cleanup():
    await edc_client.close()
    print("Connessioni chiuse.")

await cleanup()

---

## Note

### Prerequisiti
- Accesso alla rete dove si trova l'istanza EDC (VPN se necessario)
- Credenziali EDC valide
- (Opzionale) API key Claude per enhancement AI avanzato
- (Opzionale) Ollama in esecuzione per TinyLlama/Gemma3 locale

### Provider LLM disponibili
- **mock**: Per testing senza API esterne (default)
- **claude**: Anthropic Claude API (richiede API key)
- **tinyllama**: TinyLlama via Ollama locale
- **gemma3**: Google Gemma3 4B via Ollama locale

### Asset ID Format
```
DataPlatform://CONNECTION/SCHEMA/TABLE_NAME
```

Esempio:
```
DataPlatform://ORAC51/DWHEVO/IFR_WK_GARANZIE_SOFFERENZE_DT_AP
```