In [None]:
import sys
!{sys.executable} -m pip install git+https://github.com/kermitt2/grobid_client_python.git

In [None]:
!pip install grobid-client-python
!pip install requests
!pip install ocrmypdf
!pip install PyPDF2
!pip install grobid-client
!pip install beautifulsoup4
!pip install PyMuPDF
!pip install tiktoken protobuf
!pip install sentencepiece
!pip install git+https://github.com/huggingface/transformers.git
!pip install transformers torch
!pip install accelerate
!pip install rapidfuzz

In [None]:
"""
PDF Mining Contract Extraction Pipeline
---------------------------------------
This script processes scanned mining contracts in PDF format to extract
structured information: contract title, country, company, and resources.

It combines OCR (via GROBID), CamemBERT for NER, and fuzzy matching for company resolution.
"""

import os
import json
import requests
import fitz  # PyMuPDF for reading PDFs
import re
from bs4 import BeautifulSoup
from transformers import CamembertTokenizer, CamembertForTokenClassification, pipeline
from collections import Counter
from rapidfuzz import fuzz, process as rapidfuzz_process

# --- File paths and model setup ---
PDF_FOLDER = "D:/MLAIM/S4/PDFs_Extraction/contrats_scannées1"
COUNTRY_LIST_FILE = "D:/MLAIM/S4/PDFs_Extraction/countries_fr.txt"
RESOURCE_LIST_FILE = "D:/MLAIM/S4/PDFs_Extraction/resources.txt"
COMPANY_LIST_FILE = "D:/MLAIM/S4/PDFs_Extraction/societes.txt"
GROBID_URL = "http://localhost:8070/api/processFulltextDocument"
OUTPUT_JSON = "combined_extraction_output.json"

# --- Heuristic keyword lists ---
company_keywords = [
    "Ltd", "Inc", "Corp", "LLC", "Company", "Corporation", "Enterprises", "Group",
    "Holdings", "Services", "Solutions", "Industries", "Systems", "Technologies",
    "Partners", "Consulting", "Management", "Trading", "Operations",
    "SARL", "SA", "SAS", "SNC", "Entreprise", "Compagnie", "Société", "Groupe",
    "Conseil", "Commerce", "Gestion", "Opérations", "Sprl", "Office"
]

title_keywords = [
    "contrat", "convention", "accord", "protocole", "entente",
    "contract", "agreement", "memorandum"
]

exclusion_keywords = ["république", "ministère", "province", "direction"]

# --- Load CamemBERT model for NER ---
print("[Init] Loading CamemBERT NER model...")
model_name = "Jean-Baptiste/camembert-ner"
tokenizer = CamembertTokenizer.from_pretrained(model_name)
model = CamembertForTokenClassification.from_pretrained(model_name)
ner_pipeline = pipeline("ner", model=model, tokenizer=tokenizer, grouped_entities=True)

# --- Utility functions ---

def fix_encoding(text: str) -> str:
    """Fix character encoding from GROBID output."""
    try:
        return text.encode("latin1").decode("utf-8")
    except:
        return text

def load_list(file_path: str) -> list:
    """Load a list from a .txt file (one entry per line)."""
    with open(file_path, "r", encoding="utf-8") as f:
        return [line.strip().lower() for line in f if line.strip()]

def clean_text(text):
    """Remove special characters and extra whitespace."""
    return re.sub(r'\s+', ' ', re.sub(r"[^\w\s\-’']", '', text)).strip()

def extract_title_from_text(text: str) -> str:
    """Extract contract title heuristically from first page text."""
    lines = text.splitlines()
    for line in lines[:100]:  # Only check first 100 lines
        if not line.isupper() or len(line.split()) < 2:
            continue
        cleaned = clean_text(line).lower()
        if any(cleaned.startswith(k) for k in title_keywords) and not any(e in cleaned for e in exclusion_keywords):
            return line.strip()
    return lines[0].strip() if lines else ""

def extract_first_page_text(pdf_path: str) -> str:
    """Extract plain text from the first page of a PDF using PyMuPDF."""
    try:
        doc = fitz.open(pdf_path)
        if len(doc) == 0:
            return ""
        return doc.load_page(0).get_text().strip()
    except Exception as e:
        print(f"Error reading {pdf_path}: {e}")
        return ""

def extract_keywords(text: str, keywords: list) -> list:
    """Return a list of keywords found in the text."""
    found = set()
    text = text.lower()
    for word in keywords:
        if re.search(r'\b' + re.escape(word) + r'\b', text):
            found.add(word)
    return list(found)

def split_text(text, max_words=500):
    """Split large text into chunks for NER processing."""
    words = text.split()
    return [' '.join(words[i:i + max_words]) for i in range(0, len(words), max_words)]

def is_company(name):
    """Check if a string resembles a company name using keyword presence."""
    return any(re.search(r'\b' + re.escape(kw) + r'\b', name, re.IGNORECASE) for kw in company_keywords)

def match_to_known_company(entity, known_companies, threshold=85):
    """Fuzzy match an organization name to a known company list."""
    match, score, _ = rapidfuzz_process.extractOne(entity, known_companies, scorer=fuzz.token_sort_ratio)
    return match if score >= threshold else None

# --- Core extraction function ---

def extract_contract_info(pdf_path: str, country_list: list, resource_list: list, company_list: list):
    """Process a single PDF file to extract structured contract information."""
    try:
        # Step 1: OCR via GROBID
        with open(pdf_path, "rb") as pdf_file:
            response = requests.post(GROBID_URL, files={"input": pdf_file})
        if response.status_code != 200:
            print(f"[Error] GROBID failed for {pdf_path}")
            return None

        raw_xml = response.text
        raw_text = fix_encoding(BeautifulSoup(raw_xml, "lxml").get_text())
        first_page_text = extract_first_page_text(pdf_path)

        # Step 2: Heuristic extraction
        title = extract_title_from_text(first_page_text)
        countries_heur = extract_keywords(raw_text, country_list)
        resources_heur = extract_keywords(raw_text, resource_list)

        # Step 3: Named Entity Recognition (NER)
        country_counter = Counter()
        company_matches = []
        other_counter = Counter()

        for chunk in split_text(raw_text):
            ner_out = ner_pipeline(chunk)
            for ent in ner_out:
                word = ent['word'].strip()
                label = ent['entity_group']
                word_lower = word.lower()
                if label in ["LOC", "MISC"] and word_lower in country_list:
                    country_counter[word_lower] += 1
                elif label == "ORG":
                    matched = match_to_known_company(word_lower, company_list)
                    if matched:
                        company_matches.append(matched)
                    elif is_company(word):
                        other_counter[word] += 1

        most_common_company = Counter(company_matches).most_common(2)
        top_company = most_common_company[0][0] if most_common_company else ""

        return {
            "id": os.path.splitext(os.path.basename(pdf_path))[0].lower().strip(),
            "fichier": os.path.basename(pdf_path),
            "titre_contrat": title,
            "pays_cible": sorted(set(countries_heur + list(country_counter.keys()))),
            "ressources": resources_heur,
            "societe_exploitation": top_company,
            "entites_principales": {
                "pays": list(country_counter.keys()),
                "societes_reconnues": dict(Counter(company_matches)),
                "autres_orgs": dict(other_counter),
                "societes_non_matchées": [c for c in other_counter if match_to_known_company(c, company_list) is None]
            }
        }

    except Exception as e:
        print(f"[Error] while processing {pdf_path}: {e}")
        return None

# --- Main execution loop ---

def main():
    country_list = load_list(COUNTRY_LIST_FILE)
    resource_list = load_list(RESOURCE_LIST_FILE)
    company_list = load_list(COMPANY_LIST_FILE)
    results = []

    for filename in os.listdir(PDF_FOLDER):
        if filename.lower().endswith(".pdf"):
            print(f"\n[Info] Processing {filename}...")
            path = os.path.join(PDF_FOLDER, filename)
            data = extract_contract_info(path, country_list, resource_list, company_list)
            if data:
                results.append(data)

    if results:
        with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
            json.dump(results, f, ensure_ascii=False, indent=2)
        print(f"\n[Success] Extraction completed. Results saved to {OUTPUT_JSON}")
    else:
        print("\n[Warning] No data extracted.")

if __name__ == "__main__":
    main()


In [None]:
import json
import re

# Nom du fichier JSON contenant les données extraites
EXTRACTED_JSON = "combined_extraction_output.json"

# Chargement des données JSON
with open(EXTRACTED_JSON, "r", encoding="utf-8") as f:
    data = json.load(f)

# Fonction pour extraire l'identifiant numérique depuis le champ "id" ou "file_name"
def extract_numeric_id(entry): 
    id_str = entry.get("id") or entry.get("file_name", "")
    id_str = id_str.replace(".pdf", "").strip().lower()  # Nettoyage du nom (suppression .pdf, espaces)
    match = re.search(r'\d+', id_str)  # Recherche du premier nombre dans le texte
    return int(match.group()) if match else float("inf")  # Retourne le nombre trouvé ou infini si aucun

# Nettoyage et transformation du champ "id" pour chaque entrée
for entry in data:
    num_id = extract_numeric_id(entry)
    entry["id"] = num_id  # Remplace "id" textuel par l'identifiant numérique

# Tri des données par identifiant numérique croissant
sorted_data = sorted(data, key=lambda x: x["id"])

# Sauvegarde du fichier trié et nettoyé
with open("sorted_by_numeric_id_cleaned.json", "w", encoding="utf-8") as f:
    json.dump(sorted_data, f, ensure_ascii=False, indent=2)

print(f"[OK] Champs 'id' nettoyés et données triées. Enregistré dans 'sorted_by_numeric_id_cleaned.json'")


In [None]:
import json
import pandas as pd
from sklearn.metrics import classification_report
from sklearn.preprocessing import MultiLabelBinarizer
from fuzzywuzzy import fuzz
import unidecode

# Chemins des fichiers
EXTRACTED_JSON = "sorted_by_numeric_id_cleaned.json"       # Résultat de l’extraction automatique
GROUND_TRUTH_XLSX = "contracts_labelized_example_new.xlsx" # Données de vérité terrain (annotations manuelles)

# Chargement des données extraites depuis le fichier JSON
with open(EXTRACTED_JSON, "r", encoding="utf-8") as f:
    extracted_data = json.load(f)

# Conversion des données JSON en DataFrame
extracted_df = pd.DataFrame(extracted_data)

# Chargement du fichier Excel contenant les labels manuels
ground_truth_df = pd.read_excel(GROUND_TRUTH_XLSX)

# Nettoyage des IDs pour les deux sources (on extrait uniquement le nombre)
extracted_df["id"] = extracted_df["id"].astype(str).str.extract(r'(\d+)')[0]
ground_truth_df["id"] = ground_truth_df["id"].astype(str).str.extract(r'(\d+)')[0]

# On renomme les colonnes issues de l'extraction pour éviter la confusion avec les labels manuels
extracted_df.rename(columns={
    "titre_contrat": "pred_titre_contrat",
    "pays_cible": "pred_pays_cible",
    "societe_exploitation": "pred_societe_exploitation",
    "ressources": "pred_ressources"
}, inplace=True)

# Fusion des deux jeux de données sur l’ID
merged_df = pd.merge(ground_truth_df, extracted_df, on="id", how="inner")
print(f"Found {len(merged_df)} matched rows")

# Si aucune correspondance, on arrête le script
if merged_df.empty:
    print("No matching IDs found.")
    exit()

# --- FONCTIONS DE NORMALISATION --- #

# Normalisation du texte (minuscules, sans accents, strip)
def normalize_text(s):
    if isinstance(s, str):
        return unidecode.unidecode(s.lower().strip())
    elif isinstance(s, list):
        return [normalize_text(i) for i in s if isinstance(i, str)]
    return s

# Application de la normalisation sur les colonnes pertinentes
for col in ["titre_contrat", "pred_titre_contrat",
            "societe_exploitation", "pred_societe_exploitation",
            "pays_cible", "pred_pays_cible",
            "ressources", "pred_ressources"]:
    if col in merged_df.columns:
        merged_df[col] = merged_df[col].apply(normalize_text)

# --- MATCHING FLOU POUR LES CHAMPS TEXTUELS SIMPLES --- #

# Fonction de comparaison floue basée sur le ratio de similarité (FuzzyWuzzy)
def fuzzy_match(str1, str2, threshold=85):
    if not isinstance(str1, str) or not isinstance(str2, str):
        return False
    return fuzz.token_set_ratio(str1, str2) >= threshold

# Application du fuzzy matching sur les titres et les sociétés
merged_df["titre_contrat_match"] = merged_df.apply(
    lambda row: fuzzy_match(row["titre_contrat"], row["pred_titre_contrat"]), axis=1)
merged_df["societe_exploitation_match"] = merged_df.apply(
    lambda row: fuzzy_match(row["societe_exploitation"], row["pred_societe_exploitation"]), axis=1)

# Affichage des scores d’accuracy flous
print(f"Accuracy (titre_contrat) with fuzzy matching: {merged_df['titre_contrat_match'].mean():.2f}")
print(f"Accuracy (societe_exploitation) with fuzzy matching: {merged_df['societe_exploitation_match'].mean():.2f}")

# --- MATCHING MULTI-LABEL POUR LES PAYS --- #

# Correction de certaines valeurs fréquentes de pays
country_map = {
    "republique d'afrique du sud": "afrique du sud",
    "iles vierges britanniques": "british virgin island",
    "british virgin island": "british virgin island",
    "rdc": "republique du congo",
    "republique du congo": "republique du congo",
}

# Conversion d’une string ou liste en liste nettoyée
def to_list(x):
    if isinstance(x, str):
        return [i.strip() for i in x.split(",") if i.strip()]
    elif isinstance(x, list):
        return [str(i).strip() for i in x if isinstance(i, str)]
    return []

# Application du mapping
def map_countries(lst):
    return [country_map.get(c, c) for c in lst]

# Préparation des colonnes pour le calcul multi-label
merged_df["true_pays"] = merged_df["pays_cible"].apply(to_list).apply(normalize_text).apply(map_countries)
merged_df["pred_pays"] = merged_df["pred_pays_cible"].apply(to_list).apply(normalize_text).apply(map_countries)

# On filtre les lignes valides (non vides)
valid_pays_df = merged_df[
    merged_df["true_pays"].apply(bool) & merged_df["pred_pays"].apply(bool)
]

# Évaluation multi-label avec classification_report
if valid_pays_df.empty:
    print("\nNo valid rows with non-empty 'pays_cible' and 'pred_pays_cible'.")
else:
    mlb = MultiLabelBinarizer()
    y_true = mlb.fit_transform(valid_pays_df["true_pays"])
    y_pred = mlb.transform(valid_pays_df["pred_pays"])
    support = y_true.sum(axis=0)

    # On ne garde que les classes fréquentes (≥3 exemples)
    min_support = 3
    keep_indices = [i for i, count in enumerate(support) if count >= min_support]
    class_names = [mlb.classes_[i] for i in keep_indices]

    print(f"\nClassification Report for 'pays_cible' (filtered, ≥{min_support} samples):")
    if keep_indices:
        y_true_filtered = y_true[:, keep_indices]
        y_pred_filtered = y_pred[:, keep_indices]
        print(classification_report(y_true_filtered, y_pred_filtered, target_names=class_names, zero_division=0))
    else:
        print("No classes with enough support to report.")

# --- ÉVALUATION DES RESSOURCES --- #

if "ressources" in merged_df.columns and "pred_ressources" in merged_df.columns:
    # Préparation des listes de ressources
    merged_df["true_ressources"] = merged_df["ressources"].apply(to_list).apply(normalize_text)
    merged_df["pred_ressources"] = merged_df["pred_ressources"].apply(to_list).apply(normalize_text)

    valid_res_df = merged_df[
        merged_df["true_ressources"].apply(bool) & merged_df["pred_ressources"].apply(bool)
    ]

    if valid_res_df.empty:
        print("\nNo valid rows with non-empty 'ressources' and 'pred_ressources'.")
    else:
        mlb_res = MultiLabelBinarizer()
        y_true_res = mlb_res.fit_transform(valid_res_df["true_ressources"])
        y_pred_res = mlb_res.transform(valid_res_df["pred_ressources"])

        print("\nClassification Report for 'ressources':")
        print(classification_report(y_true_res, y_pred_res, target_names=mlb_res.classes_, zero_division=0))
else:
    print("\nMissing 'ressources' or 'pred_ressources' columns in merged_df.")
