In [1]:
import yaml
import re

import pprint
from pullenti.ner.ProcessorService import ProcessorService
from pullenti.ner.SourceOfAnalysis import SourceOfAnalysis
from pullenti.Sdk import Sdk

from natasha import (
    Segmenter,
    NewsEmbedding,
    NewsNERTagger,
    Doc
)

Sdk.initialize_all()

def load_patterns_from_yaml(yaml_file):
    with open(yaml_file, 'r', encoding='utf-8') as file:
        patterns_config = yaml.safe_load(file)
    return patterns_config['masking_patterns'][0]['regex']

def mask_pattern(text, yaml_config, mask_prefix):
    patterns = load_patterns_from_yaml(yaml_config)
    mask_dict = {}
    mask_count = 1
    for pattern in patterns:
        for match in re.finditer(pattern, text, flags=re.IGNORECASE):
            masked_label = f"{{{mask_prefix}_{mask_count}}}"
            mask_dict[masked_label] = match.group()
            text = text.replace(match.group(), masked_label, 1)
            mask_count += 1
    return text, mask_dict

def mask_file_paths(text):
    return mask_pattern(text, "app/config/mask_file_paths.yaml", "FILE_PATH")

def mask_token_hashes(text):
    return mask_pattern(text, "app/config/mask_token_hashes.yaml", "TOKEN_HASH")

def mask_ips(text):
    return mask_pattern(text, "app/config/mask_ips.yaml", "IP")

def mask_urls(text):
    return mask_pattern(text, "app/config/mask_urls.yaml", "URL")

def mask_passport_ID(text):
    return mask_pattern(text, "app/config/mask_passport_ID.yaml", "PASSPORT_ID")

def mask_passport_ID_text(text):
    return mask_pattern(text, "app/config/mask_passport_ID_text.yaml", "PASSPORT_TEXT_ID")

def mask_passport_issuing_authority(text):
    return mask_pattern(text, "app/config/mask_passport_issuing_authority.yaml", "PASSPORT_ISSUING_AUTHORITY")

def mask_account_numbers(text):
    return mask_pattern(text, "app/config/mask_account_numbers.yaml", "ACCOUNT_NUM")

def mask_account_transactions(text):
    return mask_pattern(text, "app/config/mask_account_transactions.yaml", "ACCOUNT_TRANS")

def mask_money(text):
    return mask_pattern(text, "app/config/mask_money.yaml", "MONEY")

def mask_broker_account_numbers(text):
    return mask_pattern(text, "app/config/mask_broker_account_numbers.yaml", "BROKER_ACCT")

def mask_user_identifiers(text):
    return mask_pattern(text, "app/config/mask_user_identifiers.yaml", "USER_ID")

def mask_security_keys(text):
    return mask_pattern(text, "app/config/mask_security_keys.yaml", "SECURITY_KEY")

def mask_passwords(text):
    return mask_pattern(text, "app/config/mask_passwords.yaml", "PASSWORD")

def mask_strong_passwords(text):
    return mask_pattern(text, "app/config/mask_strong_passwords.yaml", "STRONG_PASSWORD")

def mask_code_words(text):
    return mask_pattern(text, "app/config/mask_code_words.yaml", "CODE_WORD")

def mask_otps(text):
    return mask_pattern(text, "app/config/mask_otps.yaml", "OTP")

def mask_cvc_cvvs(text):
    return mask_pattern(text, "app/config/mask_cvc_cvvs.yaml", "CVC_CVV")

def mask_inns(text):
    return mask_pattern(text, "app/config/mask_inns.yaml", "INN")

def mask_bics(text):
    return mask_pattern(text, "app/config/mask_bics.yaml", "BIC")

def mask_swift_bics(text):
    return mask_pattern(text, "app/config/mask_swift_bics.yaml", "SWIFT_BIC")

def mask_numbers(text):
    return mask_pattern(text, "app/config/mask_numbers.yaml", "NUM")

def mask_dates(text):
    return mask_pattern(text, "app/config/mask_dates.yaml", "DATE")

def mask_mobile_phones(text):
    return mask_pattern(text, "app/config/mask_mobile_phones.yaml", "MOBILE_PHONE")

def mask_home_phones(text):
    return mask_pattern(text, "app/config/mask_home_phones.yaml", "HOME_PHONE")

def mask_emails(text):
    return mask_pattern(text, "app/config/mask_emails.yaml", "EMAIL")

def pullenti_address(txt: str, type_mask: str, address_dict: dict, address_counter: int, char_counter: int = 0):
    with ProcessorService.create_processor() as proc:
        ar = proc.process(SourceOfAnalysis(txt), None, None)
        for e0_ in ar.entities:
            if e0_.type_name == type_mask:
                for i in e0_.occurrence:
                    replacement = f'{{ADDRESS_{address_counter}}}'
                    address_dict[replacement] = txt[i.begin_char - char_counter:i.end_char + 1 - char_counter]
                    txt = txt[:i.begin_char - char_counter] + replacement + txt[i.end_char - char_counter + 1:]
                    char_counter += (i.end_char - i.begin_char + 1) - len(replacement)
                    address_counter += 1
    return txt, address_dict, address_counter

def masking_address(txt: str, address_counter: int = 1):
    address_dict = {}
    types_mask = ["ADDRESS"]

    for type_mask in types_mask:
        txt, address_dict, address_counter = pullenti_address(txt, type_mask, address_dict, address_counter)

    return txt, address_dict

def natasha(text: str) -> str:
    """
    Function to mask named entities in the text using Natasha library.
    """
    
    segmenter = Segmenter()
    emb = NewsEmbedding()
    ner_tagger = NewsNERTagger(emb)
    doc = Doc(text)
    doc.segment(segmenter)
    doc.tag_ner(ner_tagger)
    
    mask_dict, mask_text_natasha = mask_text(doc)
    
    return mask_text_natasha, mask_dict

def mask_text(doc: Doc) -> tuple[dict, str]:
    """
    Function to replace named entities with masks.
    """
    masks = {'PER': 'NAME', 'ORG': 'ORGANIZATION', 'LOC': 'LOCATION'}
    counts = {'PER': 0, 'ORG': 0, 'LOC': 0}
    masked_text = {}

    for span in doc.spans:
        counts[span.type] += 1
        mask = '{' + masks[span.type] + '_' + str(counts[span.type]) + '}'
        doc.text = doc.text.replace(span.text, mask, 1)
        masked_text[mask] = span.text

    return masked_text, doc.text

In [2]:
def apply_masking(text, config_path):
    with open(config_path, 'r', encoding='utf-8') as file:
        config = yaml.safe_load(file)
    
    mask_dict = {}
    for function_name in config['masking_order']:
        if function_name in globals():
            text, new_mask_dict = globals()[function_name](text)
            mask_dict.update(new_mask_dict)
        else:
            print(f"Warning: function '{function_name}' not found")
    
    return text, mask_dict

In [3]:
text_file_path = 'test_text.txt'
config_path = "app/masking_order.yaml"

with open(text_file_path, 'r', encoding='utf-8') as file:
    text = file.read()

# text = text.replace('\n', ' ')
    
masked_text, mask_dict = apply_masking(text, config_path)

In [4]:
print(masked_text)

Вот ваш текст, разделенный на абзацы для лучшей читабельности:

---

{NAME_6} клиент,

В соответствии с запрошенными категориями, представленным ниже информационный текст, содержащий все необходимые данные для территории {LOCATION_1}.

Российские паспортные данные: Фамилия: {NAME_1}: {NAME_{NUM_10}}: {NAME_3} рождения: {DATE_3} Место рождения: г. {LOCATION_2} Серия и номер паспорта: {PASSPORT_ID_1} Орган выдачи: {ORGANIZATION_1} {LOCATION_3} по г. {LOCATION_4} Код подразделения: {NUM_1}

Контактные данные: Мобильный телефон: {MOBILE_PHONE_1} Домашний телефон: {MOBILE_PHONE_2} Email: {EMAIL_1}

Сведения о маркетинговых компаниях: Маркетинговые компании: {ORGANIZATION_2}, {ORGANIZATION_3}, {ORGANIZATION_4}"

Адресные данные: переулок {LOCATION_5}, д. {NUM_9}, кв. {NUM_16} г. {LOCATION_6}, {LOCATION_7}, {NUM_2}
наб. р. {LOCATION_8}, д. {NUM_11}, г. {LOCATION_9}, {LOCATION_10}, {LOCATION_11}, {NUM_3}

Сведения о движении денежных средств: Номера счетов:{ACCOUNT_NUM_1},{ACCOUNT_NUM_2} Детал

In [5]:
mask_dict

{'{USER_ID_1}': 'Логин: IvanovSP',
 '{USER_ID_2}': 'логин IvanovSP,',
 '{USER_ID_3}': 'логин SmirnovAS,',
 '{USER_ID_4}': 'идентификатор пользователя: sdngjoj',
 '{PASSWORD_1}': 'Пароль: q1w2e3r4',
 '{PASSWORD_2}': 'пароль q1w2e3r4,',
 '{PASSWORD_3}': 'пароль z1x2c3v4b5n6m7,',
 '{CVC_CVV_1}': 'CVV: 789\n',
 '{CVC_CVV_2}': 'CVV: 392\n',
 '{EMAIL_1}': 'ivanov.sp@mail.ru',
 '{EMAIL_2}': 'ivanov.sp@mail.ru',
 '{EMAIL_3}': 'smirnov.as@mail.ru',
 '{LOCATION_1}': 'Российской Федерации',
 '{NAME_1}': 'Иванов Имя',
 '{NAME_2}': 'Сергей Отчество',
 '{NAME_3}': 'Петрович Дата',
 '{LOCATION_2}': 'Москва',
 '{ORGANIZATION_1}': 'УФМС',
 '{LOCATION_3}': 'России',
 '{LOCATION_4}': 'Москве',
 '{ORGANIZATION_2}': 'ООО "Маркетинг Плюс"',
 '{ORGANIZATION_3}': 'ООО "Рекламные Технологии"',
 '{ORGANIZATION_4}': 'ООО "Диджитал Солюшенс',
 '{LOCATION_5}': 'Строителей',
 '{LOCATION_6}': 'Владивосток',
 '{LOCATION_7}': 'Приморский край',
 '{LOCATION_8}': 'Волги',
 '{LOCATION_9}': 'Самара',
 '{LOCATION_10}': 'Са

In [6]:
import json
with open('masked.json', 'w') as f:
    json.dump(mask_dict, f)