In [1]:
import re
import pandas as pd
from datetime import datetime
import json
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification
from typing import Dict, List


In [2]:
regulatory_text = """
Warda Bidtha Inc. must report its Scope 1 emissions by 31 March , 2026.
The limit for Sector B in Tunisia is 50,000 tCO2e.
Green Energy Solutions Ltd. shall submit quarterly reports to the Environmental Protection Agency by the 15th of each quarter.
Maximum allowable emissions for manufacturing facilities is 25,000 tons CO2 equivalent per year.
LEONI received a penalty of $2.5 million for exceeding emission limits in Germany.
All companies in the automotive sector must comply with Euro 6 standards before 1 January, 2026.
AGIL Petroleum plc reported 45,200 tCO2e in their 2024 sustainability report.
The deadline for carbon offset submissions is 31 December, 2025.
Renewable energy targets for utilities: 30% by 2030, 50% by 2035.
Mohamed Aziz and his company Wicrosoft are responsible for reducing emissions by 20% by 2027. because before in 2023 they emitted 100,000 tCO2e and they got fined for $300,000.
Everyone must comply with CBAM regulations in Algeria by 1 September, 2025.
The industry F must reduce its methane emissions by 30% by 2030.
"""


In [3]:
class RegulatoryNERPipeline:
    def __init__(self):
        try:
            tokenizer = AutoTokenizer.from_pretrained("dslim/bert-base-NER")
            model = AutoModelForTokenClassification.from_pretrained("dslim/bert-base-NER")
            self.ner_pipeline = pipeline(   
                "ner", 
                model=model, 
                tokenizer=tokenizer,
                aggregation_strategy="simple"
            )
            self.model_name = "dslim/bert-base-NER"
        except Exception as e:
            print(f"Error initializing NER pipeline: {e}")
    
    def extract_entities(self, text: str) -> List[Dict]:
        try:
            entities = self.ner_pipeline(text)
            return entities
        except Exception as e:
            return []

ner_extractor = RegulatoryNERPipeline()

Some weights of the model checkpoint at dslim/bert-base-NER were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use cuda:0


In [4]:
class ComplianceEntityExtractor:
    def __init__(self, ner_pipeline: RegulatoryNERPipeline):
        self.ner_pipeline = ner_pipeline
        # Patterns for extracting specific regulatory entities:
        # 'emission_limits': Matches numbers (with commas/decimals) followed by units like tons/tonnes/tCO2e and emission keywords.
        # 'financial_penalties': Matches dollar and euro  amounts (with commas/decimals) and optional multipliers like million/thousand/k/m.
        # 'percentages': Matches numbers (with decimals) followed by a percent sign.
        # 'dates': Matches dates in the format " Day Month Year" with optional prefixes like "by" or "before".
        # 'deadlines': Matches deadline expressions/ exmaple = 
        # 'sectors': Matches sector or industry labels followed by a single uppercase letter/ example = Sector B
        # 'standards': Matches regulatory standards / example = CBAM
        self.patterns = {
            'emission_limits': r'(\d+(?:,\d{3})*(?:\.\d+)?)\s*(?:t|tons?|tonnes?)?(?:\s*CO2e?|carbon|emissions?)',
            'financial_penalties': r'[\$](\d+(?:,\d{3})*(?:\.\d+)?)\s*(?:million|thousand|k|m)?',
            'percentages': r'(\d+(?:\.\d+)?)\s*%',
            'dates': r'(?:by\s+|before\s+)?\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December),?\s+\d{4}',
            'deadlines': r'(?:deadline|due|by|before|until)\s+(?:is\s+)?([^.]+)',
            'sectors': r'(?:sector|industry)\s+([A-Z])\b',
            'standards': r'(Euro\s+\d+|CBAM|SEC\s+filing\s+\d+-[A-Z]+)',
        }
    
    def extract_compliance_entities(self, text: str) -> Dict:
        results = {
            'organizations': [],
            'dates': [],
            'emission_limits': [],
            'financial_penalties': [],
            'percentages': [],
            'locations': [],
            'persons': [],
            'standards': [],
            'sectors': [],
            'raw_entities': []
        }
        try:
            entities = self.ner_pipeline.extract_entities(text)
            results['raw_entities'] = entities
            #we extract the basic entities from the pipeline
            for entity in entities:
                entity_type = entity.get('entity_group', '').upper()
                entity_text = entity.get('word', '').strip()
                confidence = entity.get('score', 0)
                if confidence < 0.5:
                    continue
                if entity_type in ['ORG', 'ORGANIZATION']:
                    results['organizations'].append({
                        'text': entity_text,
                        'confidence': confidence,
                        'start': entity.get('start', 0),
                        'end': entity.get('end', 0)
                    })
                elif entity_type in ['DATE', 'TIME']:
                    results['dates'].append({
                        'text': entity_text,
                        'confidence': confidence,
                        'start': entity.get('start', 0),
                        'end': entity.get('end', 0)
                    })
                elif entity_type in ['LOC', 'LOCATION']:
                    results['locations'].append({
                        'text': entity_text,
                        'confidence': confidence,
                        'start': entity.get('start', 0),
                        'end': entity.get('end', 0)
                    })
                elif entity_type in ['PER', 'PERSON']:
                    results['persons'].append({
                        'text': entity_text,
                        'confidence': confidence,
                        'start': entity.get('start', 0),
                        'end': entity.get('end', 0)
                    })
            #we extract the specific/custom entites we created using regular expressions 
            self._extract_pattern_entities(text, results)
            return results
        except Exception as e:
            return results
    
    def _extract_pattern_entities(self, text: str, results: Dict):
        emission_matches = re.finditer(self.patterns['emission_limits'], text, re.IGNORECASE)
        for match in emission_matches:
            results['emission_limits'].append({
                'text': match.group(0),
                'value': match.group(1),
                'start': match.start(),
                'end': match.end()
            })
        penalty_matches = re.finditer(self.patterns['financial_penalties'], text, re.IGNORECASE)
        for match in penalty_matches:
            results['financial_penalties'].append({
                'text': match.group(0),
                'value': match.group(1),
                'start': match.start(),
                'end': match.end()
            })
        percentage_matches = re.finditer(self.patterns['percentages'], text)
        for match in percentage_matches:
            results['percentages'].append({
                'text': match.group(0),
                'value': match.group(1),
                'start': match.start(),
                'end': match.end()
            })
        date_matches = re.finditer(self.patterns['dates'], text, re.IGNORECASE)
        for match in date_matches:
            results['dates'].append({
                'text': match.group(0),
                'type': 'pattern_extracted',
                'start': match.start(),
                'end': match.end()
            })
        sector_matches = re.finditer(self.patterns['sectors'], text, re.IGNORECASE)
        for match in sector_matches:
            results['sectors'].append({
                'text': match.group(0),
                'sector': match.group(1),
                'start': match.start(),
                'end': match.end()
            })
        standard_matches = re.finditer(self.patterns['standards'], text, re.IGNORECASE)
        for match in standard_matches:
            results['standards'].append({
                'text': match.group(0),
                'standard': match.group(1),
                'start': match.start(),
                'end': match.end()
            })

compliance_extractor = ComplianceEntityExtractor(ner_extractor)

In [5]:
def extract_compliance_rules(text: str) -> Dict:
    try:
        results = compliance_extractor.extract_compliance_entities(text)
        return results
    except Exception as e:
        return {}

results = extract_compliance_rules(regulatory_text)


In [6]:
def analyze_compliance_patterns(results: Dict) -> Dict:
    analysis = {
        'summary': {},
        'insights': [],
        'key_deadlines': [],
        'financial_exposure': 0
    }
    analysis['summary'] = {
        'total_organizations': len(results.get('organizations', [])),
        'total_dates': len(results.get('dates', [])),
        'total_emission_limits': len(results.get('emission_limits', [])),
        'total_financial_penalties': len(results.get('financial_penalties', [])),
        'total_locations': len(results.get('locations', [])),
        'total_standards': len(results.get('standards', []))
    }
    for penalty in results.get('financial_penalties', []):
        try:
            value = float(penalty['value'].replace(',', ''))
            analysis['financial_exposure'] += value
        except (ValueError, KeyError):
            continue

    for date in results.get('dates', []):
        if any(keyword in date['text'].lower() for keyword in ['deadline', 'by', 'before', 'due']):
            analysis['key_deadlines'].append(date['text'])

    if analysis['summary']['total_organizations'] > 0:
        analysis['insights'].append(f"Found {analysis['summary']['total_organizations']} organizations mentioned in regulatory text")
    if analysis['summary']['total_emission_limits'] > 0:
        analysis['insights'].append(f"Identified {analysis['summary']['total_emission_limits']} emission limits/targets")
    if analysis['financial_exposure'] > 0:
        analysis['insights'].append(f"Total financial exposure: ${analysis['financial_exposure']:,.2f}")
    if len(analysis['key_deadlines']) > 0:
        analysis['insights'].append(f"Found {len(analysis['key_deadlines'])} critical deadlines")
        
    return analysis


#we can add another regulatory text to analyse then we combine the results 
analysis = analyze_compliance_patterns(results)

In [7]:
def create_compliance_dataframe(results: Dict) -> pd.DataFrame:
    all_entities = []
    for entity_type, entities in results.items():
        if entity_type == 'raw_entities':
            continue
        for entity in entities:
            entity_row = {
                'entity_type': entity_type,
                'text': entity.get('text', ''),
                'confidence': entity.get('confidence', None),
                'start': entity.get('start', None),
                'end': entity.get('end', None),
                'value': entity.get('value', None),
                'sector': entity.get('sector', None),
                'standard': entity.get('standard', None)
            }
            all_entities.append(entity_row)
    return pd.DataFrame(all_entities)

df = create_compliance_dataframe(results)

In [8]:
#we save the dataframe to a CSV file 
csv_filename = "regulatory_entities_extraction.csv"
df.to_csv(csv_filename, index=False)
print(f"Entities saved to: {csv_filename}")

#we save the results and analysis to a JSON file
results_filename = "regulatory_compliance_results.json"
with open(results_filename, 'w') as f:
    json_results = {
        'extraction_results': results,
        'analysis': analysis,
        'model_info': {
            'model_name': ner_extractor.model_name,
            'extraction_timestamp': datetime.now().isoformat()
        }
    }
    json.dump(json_results, f, indent=2, default=str)

print(f"Complete results saved to: {results_filename}")

Entities saved to: regulatory_entities_extraction.csv
Complete results saved to: regulatory_compliance_results.json
