In [3]:
# -*- coding: utf-8 -*-
"""
SYSTÈME COMPLET DE DÉTECTION ET ANONYMISATION RGPD - VERSION MAROC
"""

# =============================================
# 1. INSTALLATION DES DÉPENDANCES
# =============================================
!pip install faker pandas spacy presidio-analyzer presidio-anonymizer plotly
!python -m spacy download fr_core_news_sm

# =============================================
# 2. IMPORT DES LIBRAIRIES
# =============================================
import pandas as pd
from faker import Faker
import random
from collections import Counter
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_anonymizer import AnonymizerEngine
import plotly.express as px

# =============================================
# 3. CONFIGURATION INITIALE
# =============================================
fake = Faker('fr_FR')

# Niveaux de sensibilité
sensitivity_levels = {
    "CREDIT_CARD": "critique",
    "IBAN_CODE": "élevé",
    "ID_MAROC": "critique",
    "PERSON": "moyen",
    "PHONE_NUMBER": "moyen",
    "EMAIL_ADDRESS": "moyen",
    "LOCATION": "faible",
    "DATE_TIME": "faible",
    "DEFAULT": "faible"
}

# =============================================
# 4. GÉNÉRATION DE DONNÉES DE TEST MAROCAINES
# =============================================
def generate_maroc_cin():
    """Génère un CIN marocain : 2 lettres suivies de 5 chiffres (ex: AB12345)"""
    letters = ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=2))
    digits = ''.join(random.choices('0123456789', k=5))
    return letters + digits

def generate_maroc_phone():
    """Génère un numéro de téléphone marocain valide"""
    return f"+212{random.randint(600000000, 699999999)}"

def generate_maroc_record():
    """Génère un enregistrement fictif marocain"""
    record_type = random.choice(["transaction", "identité", "email", "contrat"])

    if record_type == "transaction":
        text = (
            f"Transaction #{fake.random_number(digits=6)}:\n"
            f"- Client : {fake.name()}\n"
            f"- Montant : {random.randint(100, 10000)} MAD\n"
            f"- Destinataire : IBAN {fake.iban()}\n"
            f"- Date : {fake.date_this_year()}"
        )
        entities = ["PERSON", "IBAN_CODE", "DATE_TIME"]

    elif record_type == "identité":
        address = fake.address().replace('\n', ', ')
        text = (
            f"Document d'identité :\n"
            f"- Nom : {fake.name()}\n"
            f"- CIN : {generate_maroc_cin()}\n"
            f"- Date de naissance : {fake.date_of_birth().strftime('%d/%m/%Y')}\n"
            f"- Adresse : {address}"
        )
        entities = ["PERSON", "ID_MAROC", "DATE_TIME", "LOCATION"]

    elif record_type == "email":
        text = (
            f"Email de {fake.company()} :\n"
            f"Objet : Facture #{fake.random_number(digits=6)}\n"
            f"Bonjour {fake.name()}, votre paiement par carte "
            f"{fake.credit_card_provider()} terminée par {fake.credit_card_number()[-4:]} "
            f"a été enregistré le {fake.date_this_month()}."
        )
        entities = ["PERSON", "CREDIT_CARD", "DATE_TIME"]

    elif record_type == "contrat":
        address = fake.address().replace('\n', ', ')
        phone = generate_maroc_phone()
        email = fake.email()
        text = (
            f"Contrat de location :\n"
            f"- Locataire : {fake.name()}\n"
            f"- CIN : {generate_maroc_cin()}\n"
            f"- Téléphone : {phone}\n"
            f"- Adresse : {address}\n"
            f"- Email : {email}"
        )
        entities = ["PERSON", "ID_MAROC", "PHONE_NUMBER", "LOCATION", "EMAIL_ADDRESS"]


    return {"text": text, "expected_entities": entities}

# Génération du dataset
data = [generate_maroc_record() for _ in range(300)]
df = pd.DataFrame(data)
# Génération de 300 entrées
data = []
for _ in range(300):
    try:
        data.append(generate_maroc_record())
    except Exception as e:
        print(f"Erreur lors de la génération : {e}")
        continue

df = pd.DataFrame(data)

# Export CSV avec encodage UTF-8
df.to_csv("financial_pii_dataset.csv", index=False, encoding='utf-8')

# Vérification
print(" Dataset généré avec succès !")
print(f"Nombre d'enregistrements : {len(df)}")


# =============================================
# 5. CONFIGURATION DE PRESIDIO AVEC CIN MAROCAIN
# =============================================
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

# Recognizer pour CIN Marocain
cin_recognizer = PatternRecognizer(
    supported_entity="ID_MAROC",
    patterns=[Pattern(name="CIN Maroc", regex=r"\b[A-Z]{2}\d{5}\b", score=0.85)],
    supported_language="fr"
)
analyzer.registry.add_recognizer(cin_recognizer)

# Recognizer pour téléphone marocain
phone_recognizer = PatternRecognizer(
    supported_entity="PHONE_NUMBER",
    patterns=[Pattern(name="Téléphone Maroc", regex=r"\+2126\d{8}\b", score=0.85)],
    supported_language="fr"
)
analyzer.registry.add_recognizer(phone_recognizer)

# =============================================
# 6. ANALYSE ET ANONYMISATION
# =============================================
def analyze_and_anonymize(text):
    results = analyzer.analyze(text=text, language='en')
    anonymized = anonymizer.anonymize(text=text, analyzer_results=results)
    entities = [r.entity_type for r in results]
    sensitivity = [sensitivity_levels.get(e, "faible") for e in entities]
    return anonymized.text, entities, sensitivity

df[['anonymized_text', 'detected_entities', 'sensitivity_level']] = df['text'].apply(
    lambda x: pd.Series(analyze_and_anonymize(x))
)

# =============================================
# 7. VISUALISATION DES RÉSULTATS
# =============================================
all_entities = [e for row in df['detected_entities'] for e in row]
entity_counts = Counter(all_entities)

all_sensitivities = [s for row in df['sensitivity_level'] for s in row]
sensitivity_counts = Counter(all_sensitivities)

fig1 = px.bar(
    x=list(entity_counts.keys()),
    y=list(entity_counts.values()),
    title="Types de données sensibles détectées",
    labels={'x': 'Type de donnée', 'y': 'Occurrences'},
    color=list(entity_counts.keys())
)

fig2 = px.pie(
    names=list(sensitivity_counts.keys()),
    values=list(sensitivity_counts.values()),
    title="Répartition des niveaux de sensibilité",
    color=list(sensitivity_counts.keys()),
    color_discrete_map={
        "critique": "red",
        "élevé": "orange",
        "moyen": "yellow",
        "faible": "green"
    }
)

fig1.show()
fig2.show()

# =============================================
# 8. EXPORT CSV
# =============================================
df.to_csv("resultats_rgpd_maroc.csv", index=False, encoding="utf-8-sig")

print(f"""
ANALYSE MAROCAINE TERMINÉE
-------------------------------
Documents analysés : {len(df)}
Données sensibles détectées : {len(all_entities)}
Types uniques détectés : {len(entity_counts)}

Fichier exporté : resultats_rgpd_maroc.csv
""")


Collecting fr-core-news-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/fr_core_news_sm-3.8.0/fr_core_news_sm-3.8.0-py3-none-any.whl (16.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.3/16.3 MB[0m [31m76.3 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('fr_core_news_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
✅ Dataset généré avec succès !
Nombre d'enregistrements : 300





✅ ANALYSE MAROCAINE TERMINÉE
-------------------------------
Documents analysés : 300
Données sensibles détectées : 1636
Types uniques détectés : 13

Fichier exporté : resultats_rgpd_maroc.csv

