# CC2 : OncoPlan - Corrigé Type

**Cours :** IA101 - Intelligence Artificielle (EPF 2026)
**Auteur :** Équipe Pédagogique

Ce notebook présente une solution complète pour le devoir OncoPlan. Il couvre :
1.  **Symbolique** : Ontologie RDF et Planification OR-Tools.
2.  **Probabiliste** : Modélisation Bayésienne avec Pyro.
3.  **Bonus** : Traçabilité via Hashage.

## Installation des Dépendances

In [1]:
!pip install rdflib ortools pyro-ppl torch pandas matplotlib seaborn

Accès refusé.


In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from rdflib import Graph, Literal, RDF, URIRef, Namespace
from rdflib.namespace import FOAF, XSD
from ortools.sat.python import cp_model
import torch
import pyro
import pyro.distributions as dist
from pyro.infer import SVI, Trace_ELBO, Predictive
from pyro.optim import Adam
import hashlib
import json

# Configuration
sns.set_style("whitegrid")
pyro.set_rng_seed(101)

---

## Partie 1 : Le Pharmacien Symbolique

### 1.1. Ontologie des Médicaments (RDFLib)

In [3]:
# Création du Namespace
ONCO = Namespace("http://example.org/onco/")
g = Graph()

# Définition des classes et propriétés
g.add((ONCO.Medicament, RDF.type, ONCO.Class))
g.add((ONCO.toxicite_renale, RDF.type, RDF.Property))
g.add((ONCO.incompatible_avec, RDF.type, RDF.Property))

# Peuplement de la base de connaissances
medicaments = {
    "Cisplatine": {"toxicite_renale": True, "incompatible": ["Gentamicine"]},
    "5-FU": {"toxicite_renale": False, "incompatible": []},
    "Docetaxel": {"toxicite_renale": False, "incompatible": ["Ketoconazole"]},
    "Gentamicine": {"toxicite_renale": True, "incompatible": ["Cisplatine"]}
}

for med_name, props in medicaments.items():
    med_uri = ONCO[med_name]
    g.add((med_uri, RDF.type, ONCO.Medicament))
    g.add((med_uri, ONCO.toxicite_renale, Literal(props["toxicite_renale"])))
    for inc in props["incompatible"]:
        g.add((med_uri, ONCO.incompatible_avec, ONCO[inc]))

print(f"Ontologie chargée avec {len(g)} triplets.")

Ontologie chargée avec 14 triplets.


In [4]:
def verifier_prescription(protocole_noms, patient_insuffisance_renale):
    alertes = []
    
    # Vérification Néphrotoxicité
    if patient_insuffisance_renale:
        for med in protocole_noms:
            med_uri = ONCO[med]
            if (med_uri, ONCO.toxicite_renale, Literal(True)) in g:
                alertes.append(f"ALERTE: {med} est néphrotoxique pour ce patient !")
    
    # Vérification Incompatibilités
    for i, med1 in enumerate(protocole_noms):
        for med2 in protocole_noms[i+1:]:
            uri1 = ONCO[med1]
            uri2 = ONCO[med2]
            if (uri1, ONCO.incompatible_avec, uri2) in g:
                alertes.append(f"ALERTE: Incompatibilité détectée entre {med1} et {med2}")
                
    return alertes

# Test
print("--- Test 1 : Patient Sain, Protocole OK ---")
print(verifier_prescription(["Cisplatine", "5-FU"], False))

print("\n--- Test 2 : Patient Insuffisant Rénal ---")
print(verifier_prescription(["Cisplatine"], True))

print("\n--- Test 3 : Incompatibilité ---")
print(verifier_prescription(["Cisplatine", "Gentamicine"], False))

--- Test 1 : Patient Sain, Protocole OK ---
[]

--- Test 2 : Patient Insuffisant Rénal ---
['ALERTE: Cisplatine est néphrotoxique pour ce patient !']

--- Test 3 : Incompatibilité ---
['ALERTE: Incompatibilité détectée entre Cisplatine et Gentamicine']


### 1.2. Planification des Cures (OR-Tools)

In [5]:
def planifier_chimio(nb_cycles=4, duree_cycle=21, jours_admin=[1, 8], capacite_max=3, occupations_existantes={}):
    model = cp_model.CpModel()
    horizon = nb_cycles * duree_cycle + 10 # Marge
    
    # Variables : start_cycle[i] est le jour de début du cycle i (0-based)
    start_cycle = [model.NewIntVar(1, horizon, f'start_cycle_{i}') for i in range(nb_cycles)]
    
    # Contraintes
    for i in range(nb_cycles):
        # Contrainte de continuité entre cycles (sauf si report, mais ici on vise l'idéal)
        if i > 0:
            model.Add(start_cycle[i] == start_cycle[i-1] + duree_cycle)
            
        # Contrainte de capacité pour chaque jour d'administration
        for j_offset in jours_admin:
            # Le jour réel est start_cycle[i] + j_offset - 1
            # Note: C'est complexe en CP pur car l'index dépend d'une variable.
            # Simplification : On fixe le premier jour et on vérifie, ou on itère sur tous les jours possibles.
            pass
            
    # Approche simplifiée pour l'exercice : On cherche juste le jour de départ J1 du premier cycle
    # qui permet de caser tous les RDV sans conflit.
    
    # Variable de décision : Jour de début du traitement (entre 1 et 30)
    start_day = model.NewIntVar(1, 30, 'start_day')
    
    # Jours fériés (Dimanches) : supposons J1 = Lundi. Dimanche = 7, 14, 21...
    # On veut éviter que le jour d'administration tombe un dimanche.
    # En CP-SAT, on utilise AddModuloEquality pour calculer le reste de la division par 7.
    # Si J1 est un Lundi, alors les dimanches sont les multiples de 7 (7, 14, 21...).
    # Donc on veut : jour_abs % 7 != 0.
    
    # Vérifions pour chaque jour potentiel de départ
    solver = cp_model.CpSolver()
    
    for i in range(nb_cycles):
        for j_admin in jours_admin:
            # Jour absolu de l'administration
            jour_abs = start_day + i * duree_cycle + (j_admin - 1)
            
            # Contrainte : Pas de dimanche
            # On crée une variable intermédiaire pour le reste de la division par 7
            reste = model.NewIntVar(0, 6, f'reste_{i}_{j_admin}')
            model.AddModuloEquality(reste, jour_abs, 7)
            model.Add(reste != 0) # 0 correspond au Dimanche
            
            # Capacité : Vérifier occupations_existantes
            # Si occupations_existantes[jour] >= 3, alors jour_abs != jour
            for jour_occupe, charge in occupations_existantes.items():
                if charge >= capacite_max:
                    model.Add(jour_abs != jour_occupe)

    status = solver.Solve(model)
    
    if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
        debut = solver.Value(start_day)
        planning = []
        for i in range(nb_cycles):
            for j in jours_admin:
                planning.append(debut + i*21 + j - 1)
        return debut, planning
    else:
        return None, []

# Simulation d'occupations (Jours 10, 11, 12 sont complets)
occupations = {10: 3, 11: 3, 12: 3}
debut, planning = planifier_chimio(occupations_existantes=occupations)
print(f"Début optimal du traitement : Jour {debut}")
print(f"Jours d'administration : {planning}")

Début optimal du traitement : Jour 1
Jours d'administration : [1, 8, 22, 29, 43, 50, 64, 71]


---

## Partie 2 : Le Médecin Probabiliste (Pyro)

In [6]:
class OncoModel:
    def __init__(self):
        pass
        
    def model(self, doses, observations=None):
        # 1. Prior sur le profil du patient (0: Résistant, 1: Normal, 2: Sensible)
        # On pense que la plupart sont Normaux (60%), puis Sensibles (30%), puis Résistants (10%)
        profil_probs = torch.tensor([0.1, 0.6, 0.3])
        profil = pyro.sample("profil", dist.Categorical(profil_probs))
        
        # Facteurs de sensibilité selon le profil
        # Résistant: peu de toxicité, Normal: moyen, Sensible: fort
        sensibilite_map = torch.tensor([0.5, 1.0, 2.0])
        sensibilite = sensibilite_map[profil]
        
        # 2. Dynamique Temporelle
        toxicite_cumulee = 0.0
        taux_base_gb = 8000.0 # Taux normal de globules blancs
        
        for t, dose in enumerate(doses):
            # La toxicité augmente avec la dose * sensibilité
            # Et diminue naturellement (récupération) de 20% par pas de temps
            toxicite_cumulee = toxicite_cumulee * 0.8 + (dose * sensibilite * 0.1)
            
            # Le taux de GB baisse quand la toxicité monte
            # Modèle simple : GB = Base - (Toxicité * 100)
            mu_gb = taux_base_gb - (toxicite_cumulee * 1000.0)
            mu_gb = torch.max(mu_gb, torch.tensor(1000.0)) # Minimum vital
            
            # Observation (si disponible)
            obs = None
            if observations is not None and t < len(observations):
                obs = observations[t]
                
            # Likelihood
            pyro.sample(f"obs_gb_{t}", dist.Normal(mu_gb, 500.0), obs=obs)
            
    def guide(self, doses, observations=None):
        # Guide variationnel pour SVI
        # On veut estimer la distribution a posteriori de 'profil'
        profil_probs_post = pyro.param("profil_probs_post", torch.tensor([0.33, 0.33, 0.33]), constraint=dist.constraints.simplex)
        pyro.sample("profil", dist.Categorical(profil_probs_post))

# Instanciation
onco_model = OncoModel()

# Données simulées : Patient reçoit 100mg à T0, 0 à T1... 
# Observation : Chute brutale à T1 (J8)
doses_plan = torch.tensor([100.0, 0.0, 0.0]) # J1, J8, J15
observations_reelles = torch.tensor([7800.0, 2500.0]) # J1 Normal, J8 Neutropénie sévère

# Inférence SVI
pyro.clear_param_store()
svi = SVI(onco_model.model, onco_model.guide, Adam({"lr": 0.01}), loss=Trace_ELBO())

# Pour l'inférence SVI, il est crucial d'aligner les données d'entrée (doses) avec les observations.
# Si on passe des doses futures pour lesquelles on n'a pas d'observation, Pyro va considérer ces observations manquantes
# comme des variables latentes à inférer. Or, notre guide ne couvre pas ces variables, ce qui provoquerait une erreur.
# On tronque donc le plan de doses pour ne garder que l'historique passé correspondant aux observations réelles.
doses_inf = doses_plan[:len(observations_reelles)]

print("Début de l'inférence...")
for step in range(1000):
    loss = svi.step(doses_inf, observations_reelles)
    if step % 200 == 0:
        print(f"Step {step} : Loss = {loss}")

# Résultats
post_probs = pyro.param("profil_probs_post").detach()
print(f"\nProbabilités a posteriori du profil :")
print(f"Résistant : {post_probs[0]:.2f}")
print(f"Normal    : {post_probs[1]:.2f}")
print(f"Sensible  : {post_probs[2]:.2f}")

if post_probs[2] > 0.8:
    print("=> DIAGNOSTIC : Patient hypersensible. Réduction de dose impérative.")

Début de l'inférence...
Step 0 : Loss = 111.35245561599731
Step 200 : Loss = 111.15741121768951
Step 400 : Loss = 66.71360492706299
Step 600 : Loss = 66.87260314822197
Step 800 : Loss = 67.00849670171738

Probabilités a posteriori du profil :
Résistant : 0.92
Normal    : 0.04
Sensible  : 0.04


### 2.2. Prédiction et Prise de Décision

In [7]:
# Simulation de l'avenir avec le profil inféré
predictive = Predictive(onco_model.model, guide=onco_model.guide, num_samples=100, return_sites=["obs_gb_3"])

# Scénario 1 : Maintien de la dose (100mg)
doses_futures_standard = torch.tensor([100.0, 0.0, 0.0, 100.0]) 
samples_std = predictive(doses_futures_standard)
# Debug : Afficher les clés disponibles si erreur
# print(samples_std.keys())
gb_std = samples_std['obs_gb_3'] # GB à la prochaine injection
risque_std = (gb_std < 2000).float().mean()

# Scénario 2 : Réduction de dose (50mg)
doses_futures_reduites = torch.tensor([100.0, 0.0, 0.0, 50.0])
samples_red = predictive(doses_futures_reduites)
gb_red = samples_red['obs_gb_3']
risque_red = (gb_red < 2000).float().mean()

print(f"Risque de neutropénie sévère (<2000) avec dose standard : {risque_std*100:.1f}%")
print(f"Risque de neutropénie sévère (<2000) avec dose réduite : {risque_red*100:.1f}%")

Risque de neutropénie sévère (<2000) avec dose standard : 98.0%
Risque de neutropénie sévère (<2000) avec dose réduite : 15.0%


---

## Partie 3 : Bonus Smart Contract

In [8]:
class OncoContract:
    def __init__(self, medecin_id, patient_id):
        self.medecin_id = medecin_id
        self.patient_id = patient_id
        self.plan_hash = None
        self.signatures = {"medecin": False, "patient": False}
        
    def enregistrer_plan(self, plan_data):
        # Sérialisation stable
        plan_str = json.dumps(plan_data, sort_keys=True)
        self.plan_hash = hashlib.sha256(plan_str.encode()).hexdigest()
        print(f"Plan enregistré. Hash : {self.plan_hash}")
        
    def signer(self, role, cle_privee_simulee):
        # Simulation basique de signature
        if role in self.signatures:
            self.signatures[role] = True
            print(f"Signé par {role}.")
            
    def verifier_execution(self, plan_propose):
        plan_str = json.dumps(plan_propose, sort_keys=True)
        hash_verif = hashlib.sha256(plan_str.encode()).hexdigest()
        
        if hash_verif != self.plan_hash:
            return "ERREUR : Le plan a été modifié !"
        if not all(self.signatures.values()):
            return "ERREUR : Plan non signé par toutes les parties."
        return "SUCCÈS : Administration autorisée."

# Test
contract = OncoContract("DrHouse", "P004")
plan_final = {"J1": "Cisplatine 50mg", "J8": "Repos"}
contract.enregistrer_plan(plan_final)
contract.signer("medecin", "key123")
contract.signer("patient", "key456")

print(contract.verifier_execution(plan_final))
print(contract.verifier_execution({"J1": "Cisplatine 100mg"})) # Tentative de fraude

Plan enregistré. Hash : da26c9774d16722ec30d452830ae409ad0e8304f3fe9ea31c8d738d14d17332e
Signé par medecin.
Signé par patient.
SUCCÈS : Administration autorisée.
ERREUR : Le plan a été modifié !
