# 개인정보 가명처리 모델
> NER (Named Entity Recognition)


## 작업환경 설정

colab으로 생성한 노트북(.ipynb) 커널을 구글드라이브(google dirve) 경로에 마운트 합니다.


In [None]:
# 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')

작업 경로 설정

In [None]:
cd /content/drive/MyDrive/abc

### datasets 라이브러리 설치
- Hugging Face에서 제공하는 데이터셋 관리 라이브러리

In [None]:
!pip install datasets

### 라이브러리

python의 기본 라이브러리와 함께 Hugging Face의 datasets, transformers의 라이브러리를 불러 옵니다.

In [None]:
import datasets
import os, glob, json, random
import re

from tqdm import tqdm
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt

import torch

from transformers import (AutoTokenizer,
                          AutoModelForTokenClassification,
                          Trainer,
                          TrainingArguments,
                          DataCollatorWithPadding)

from huggingface_hub import login

In [None]:
login(token='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')

## 데이터 전처리

### 데이터셋 로드
- huggin face 데이터셋

In [None]:
merge_ds=datasets.load_dataset('TeamUNIVA/NER_example')['train']

### 개인정보 식별자 추출

데이터(text)에서 식별자 형식(`{개인정보}`)으로 포함된 개인정보를 추출합니다.
- 정규표현식을 사용하여 개인정보 식별자를 추출
- 추출된 개인정보 식별자는 리스트(list)로 저장

In [None]:
refine_ds = []

for item in merge_ds:

    x = item

    # 중괄호로 감싸진 식별자를 추출하는 코드 | 정규표현식
    id_list = re.findall(r'\{.*?\}',x['assistant'])

    x['id_list'] = id_list

    refine_ds.append(x)

추출된 전체 개인정보 식별자를 확인
- 정의하지 않은 개인정보 식별자 포함 여부를 확인

In [None]:
list(set(pd.DataFrame(refine_ds)['id_list'].sum()))

데이터 필터링
- 정의하지 않은 개인정보 식별자를 포함한 데이터를 필터링

In [None]:
refine_ds = [item for item in refine_ds if '{WEIGHT}' not in item['id_list']]
refine_ds = [item for item in refine_ds if '{AGE}' not in item['id_list']]
refine_ds = [item for item in refine_ds if '{DRUG_NAME}' not in item['id_list']]

필터링된 개인정보 식별자를 correct_list로 생성
- 이후 NER에서 Entity를 정의하고 매핑(mapping)하는데 사용

In [None]:
correct_list = sorted(list(set(pd.DataFrame(refine_ds)['id_list'].sum())))
correct_list

### 데이터 EDA
> 데이터의 정량적인 정보를 확인
- 텍스트 길이분포
- 데이터별 라벨 개수
- 식별자별 개수

데이터프레임 생성 (pandas)

In [None]:
df = pd.DataFrame(refine_ds)
# 데이터별 식별자의 수량 추가
df['id_length'] = df['id_list'].apply(lambda X: len(X))

# 텍스트 길이 추가(어절)
df['text_len'] = df['assistant'].apply(lambda X: len(X.split(' ')))

df.tail()

### 데이터 시각화

 데이터별 식별자의 개수와 텍스트의 길이 분포를 확인

In [None]:
# 1 by 2 plot 생성
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# 첫 번째 plot - Num ID
axes[0].hist(df['id_length'], bins=6, color='skyblue', edgecolor='black', alpha=0.7)
axes[0].set_title('Distribution of Num ID', fontsize=16, fontweight='bold')
axes[0].set_xlabel('Num of ID', fontsize=12)
axes[0].set_ylabel('Frequency', fontsize=12)
axes[0].grid(axis='y', linestyle='--', alpha=0.7)

# ytick 간격 조정
max_frequency_id = axes[0].get_ylim()[1]
axes[0].set_yticks(np.arange(0, max_frequency_id + 1, step=400))

# 두 번째 히스토그램 - Text length
axes[1].hist(df['text_len'], bins=50, color='skyblue', edgecolor='black', alpha=0.7)
axes[1].set_title('Distribution of Text Length', fontsize=16, fontweight='bold')
axes[1].set_xlabel('Text Length', fontsize=12)
axes[1].set_ylabel('Frequency', fontsize=12)
axes[1].grid(axis='y', linestyle='--', alpha=0.7)

# ytick 간격 조정
max_frequency_text = axes[1].get_ylim()[1]
axes[1].set_yticks(np.arange(0, max_frequency_text + 1, step=400))

plt.tight_layout()  # 그래프 간격을 자동으로 조정
plt.show()

전체 데이터이 식별자별 개수(plot)

In [None]:
# 데이터프레임으로 확인
# id_list의 합산값에 대한 빈도수를 계산
sum_value = pd.Series(df['id_list'].sum()).value_counts()

# 데이터프레임 출력
pd.DataFrame(sum_value)

In [None]:
# 빈도수를 막대그래프로 시각화 (bar plot)
plt.figure(figsize=(16,6))
sum_value.plot(kind='bar', color='skyblue', edgecolor='black')
plt.title('Frequency of IDs')
plt.xlabel('count')
plt.xticks(rotation=0)
plt.ylabel('Frequency')
plt.grid(axis='y', linestyle='--', alpha=0.7)

plt.show()

### 개인정보 DB 로드
- 난수 및 무작위 조합 방식으로 생성한 유형별 개인정보 DB
- 각 항목별로 일부 영역이 비식별화 처리됨

.json 포맷의 개인정보 DB 파일을 로드합니다.

In [None]:
with open('./private_db.json', 'r', encoding='utf-8') as f:
    db = json.load(f)

### BIO 표기법


BIO Entity 카테고리 생성
- NER으로 분류하는 Entity를 BIO 표기법으로 생성
- 분류해야하는 객체의 시작을 '`B-`', 객체의 중간과 끝을 '`I-`' 그리고 entitiy 분류되지 않는 객체는 `0`(int)로 표기
    - B- : Begin
    - I- : Inside
    - O :  Outside
- 앞에서 선언한 correct_list의 값을 for문으로 돌면서 처리함

In [None]:
# entitie 카테고리 생성

# 선행 식별자
bi0_list = ['I-','B-']

label_list = []
for i in range(len(bi0_list)):
    bi0 = bi0_list[i]
    for item in correct_list: # correct_list는 앞에서 처리한 전체 Entity 객체의 고유값
        lb = bi0 + item[1:-1]

        label_list.append(lb)

# 카테고리의 가정 첫번째는 O(Outside)에 해당하는 '0'을 설정
label_list.insert(0,'0')

print(len(label_list))

# 객체명의 순서의 기준 정렬을 통해 설정
label_list.sort()
label_list= list((label_list))
label_list

Entity index 매핑
- index는 Entity의 순서대로 설정

In [None]:
cat_idx_dict = {}
for idx, cat in enumerate(label_list):
    cat_idx_dict[cat] = idx
cat_idx_dict

### 개인정보 치환

데이터 치환함수
- 개인정보 DB를 입력 받아 각 개인정보 유형에 해당하는 정보를 랜덤으로 매칭함.
- 식별자로 개인정보가 포함된 텍스트를 매칭된 개인정보에 맞춰 치환하는 방식
- 개별 데이터와 개인정보 DB(db)를 인자로 받아 실행하는 함수

In [None]:
def private_info_replace(x,db):

    # 추출된 식별자 정보
    ids = x['id_list']

    # 텍스트에서 개인정보 치환을 위한 식별자 set 생성
    replace_set = {}

    labeling_list = []
    labelings =[]
    for item in ids:
        key = item[1:-1] # 식별자의 중괄호를 파싱
        key = key.lower() # db에서 key를 대응시키기 위해 소문자로 변환

        value = random.choice(db[key]) # 식별자 키로 db에서 개인정보를 랜덤으로 매핑

        replace_set[item] = value # replace_set 업데이트

        labeling_dict ={
            item:value
        }
        labeling_list.append(labeling_dict)

    # 식별자가 포함된 텍스트를 위에서 매핑한 식별자:개인정보로 수정(replace)
    for old, new in replace_set.items():
        x['assistant'] = x['assistant'].replace(old, new)


    # 수정(replace) 내역을 별토의 key로 기록
    mapped_data = [{k: replace_set.get(k, v) for k, v in item.items()} for item in labeling_list]
    x['keywords'] = mapped_data

    return x

개인정보 치환 실행

In [None]:
# 함수 실행
new_data = [private_info_replace(item,db) for item in refine_ds]

# 처리된 데이터 확인
new_data[0]

BIO 표기를 적용하기 위한 labeling_info 생성
- 치환한 식별자:개인정보의 인덱스 값을 함께 산출

In [None]:
for item in new_data:
    labeling_info = []

    # 인덱스 기본 값 = 0
    start = 0

    # label의 기준이 되는 텍스트 (개인정보 포함 텍스트)
    text = item['assistant']


    for label in item['keywords']:
        key = list(label.keys())[0]
        value = list(label.values())[0]

        # 치환한 개인정보가 시작하는 위치값
        start = text.find(value,start)

        if start == -1:
            break

        # 치환한 개인정보가 시작하는 위치값, 시작값 + 개인정보 텍스트의 길이를 index로 설정
        index = [start,start+len(value)]

        # index 추출 순회를 위한 start값 업데이트
        start += len(value)

        data_dict ={
            'category': key[1:-1], # 식별자의 중괄호를 파싱
            'word':value, # 치환한 개인정보
            'location': index # 치환한 개인정보의 위치값
        }

        labeling_info.append(data_dict)

    item['labeling_info'] = labeling_info

### BIO 표기 적용

BIO 표기 적용 함수(string 단위)
- 텍스트의 글자 단위로 label을 적용하고 개인정보에 해당하는 텍스트는 앞에서 설정한 Entity를 표기
- Entity를 표기하는 텍스트의 가장 첫번째는 '`B-`', 나머지 영역을 '`I-`'로 표기
- Entity에 해당하지 않은 텍스트(string)은 '`0`'로 표기함

In [None]:
def exclude_MetaText(x):
    text = x['assistant']
    annotations = x['labeling_info']

    # BIO 표기법을 위한 레이블 초기화
    labels = ['0'] * len(text)

    # 개인정보의 위치값을 기반으로 BIO 표기
    for annotation in annotations:
        start,end = annotation['location']
        category = annotation['category']

        # start값이 텍스트의 길이보다 크거나 end값이 텍스트의 길이보다 길면 오류 메시지를 출력
        if start >= len(text) or end >= len(text):
            print('location values error')
            continue

        # start값은 'B-', 나머지 end값까지의 위치는 'I-'로 표기
        if start == end:
            labels[start] = f'B-{category}'
        else:
            labels[start] = f'B-{category}'

            for i in range(start+1, end):
                labels[i] = f'I-{category}'

    x['labels'] = labels

    return x

BIO 표기 함수 실행 & 데이터 재구조화

In [None]:
the_new_data = []

for item in new_data:
    result = exclude_MetaText(item) # 함수 적용
    text = result['assistant']
    labels = result['labels']

    data_dict={
        'text':text,
        'labeling_info': result['labeling_info'],
        'labels':labels
    }

    the_new_data.append(data_dict)

BIO 표기 결과 확인
- index : string : label의 순서로 출력

In [None]:
idx = 67
for i in range(len(the_new_data[idx]['text'])):
    a = the_new_data[idx]['text'][i]
    b = the_new_data[idx]['labels'][i]

    print(f'{i}:{a}:{b}')

### BIO 표기 수정 (token 단위)

토크나이저 로드
- klue/roberta-base

In [None]:
from transformers import AutoTokenizer
model_path = 'klue/roberta-base'
tokenizer = AutoTokenizer.from_pretrained(model_path)

label 정렬/수정 함수
- string 단위로 적용된 BIO 표기를 token 단위로 수정하는 함수
- 한글이 포함되지 않는 Entity의 오류(한글포함)오류를 수정하는 함수
    - 정규표현식

In [None]:
# BIO label을 token 단위로 수정하는 함수
def allign_tokens(x):
    text = x['text']
    char_labels = x['labels']

    # 텍스트 인코딩
    encoding = tokenizer.encode_plus(text, return_offsets_mapping=True)
    tokens = tokenizer.convert_ids_to_tokens(encoding["input_ids"])
    token_indices = encoding["offset_mapping"][1:-1]  # [CLS]와 [SEP]의 오프셋을 제외

    # 각 토큰의 인덱스를 추출하고 인덱스에 해당하는 label(Entity)를 적용
    token_labels = []
    for i, (start_char, end_char) in enumerate(token_indices):
        token_label = char_labels[start_char]
        token_labels.append(token_label)

    x['tokens'] = tokens
    x['token_labels'] = token_labels
    x['token_length'] = len(token_labels)
    return x

# 한글이 포함되지 않는 Entity의 오류를 수정하는 함수
def fix_label(x):
    tokens = x['tokens']
    labels = x['token_labels']

    # 한글이 포함되지 않는 Entity 리스트
    cat_list_nonkor = ['B-BANK_ACCOUNT', 'B-CELL_PHONE', 'B-CREDIT_CARD', 'B-DRIVER_LICENSE', 'B-EMAIL', 'B-PASSPORT', 'B-PHONE', 'B-RRN', 'B-SNS', 'I-BANK_ACCOUNT', 'I-CELL_PHONE', 'I-CREDIT_CARD', 'I-DRIVER_LICENSE', 'I-EMAIL', 'I-PASSPORT', 'I-PHONE', 'I-RRN', 'I-SNS']
    korean_pattern = re.compile(r'[가-힣]') # 한글 포함 여부를 확인하는 정규표현식

    for i in range(len(labels)):
        token = tokens[i+1]
        label = labels[i]

        if label in cat_list_nonkor and bool(korean_pattern.search(token)):
            x['token_labels'][i] = '0'
        else:
            x['token_labels'][i] = label

    return x

label 정렬/수정 함수 적용

In [None]:
rds = [allign_tokens(item) for item in tqdm(the_new_data)]
tds = [fix_label(item) for item in tqdm(rds)]

데이터 확인

In [None]:
idx = 67
print(tds[idx]['text'])
for i in range(len(tds[idx]['tokens'])):

    try:
        a = the_new_data[idx]['tokens'][i]
        b = the_new_data[idx]['token_labels'][i-1]

    # if b != '0':
        print(f'{i}:{a}:{b}')
    except Exception as e:
        print(i)

label 추가 수정
- 스페셜 토큰([CLS],[SEP])의 label을 -100으로 처리
- 스페셜 토큰은 각각 토크나이징된 데이터의 가장 첫번째, 가장 마지막에 위치 함.

In [None]:
def add_labels(x):
    token_labels = x['token_labels']

    labels = [-100] # [CLS] 토큰 label
    for lab in token_labels:
        idx = cat_idx_dict[lab]
        labels.append(idx)

    labels.append(-100) # [SEP] 토큰 label 처리
    labels = np.array(labels)
    x['labels'] = labels
    return x

# 함수 실행
lds = [add_labels(item) for item in tds]

데이터 확인

In [None]:
# idx = 67

r_idx = random.choice(range(1990))
print(lds[r_idx]['text'])
for i in range(len(lds[r_idx]['tokens'])):

    try:
        a = lds[r_idx]['tokens'][i]
        b = lds[r_idx]['labels'][i]

    # if b != '0':
        print(f'{i}:{a}:{b}')
    except Exception as e:
        print(i)

## 데이터 임베딩
> 앞서 전처리한 데이터를 Dataset 형식으로 변환하고 train, valid, test로 분할


### Dataset 형식으로 변환
- hugging face의 datasets 라이브러리
- train set, validation set, test set으로 데이터 split


In [None]:
from datasets import Dataset
ds = Dataset.from_list(lds)

# 데이터 split
sds = ds.train_test_split(0.2)
ssds = sds['test'].train_test_split(0.5)
sds['valid'], sds['test'] = ssds['train'], ssds['test']

# 데이터 분할 확인
sds

### 학습 모델(토크나이저) 로드

In [None]:
# 학습 모델 id (hugging face model ID)
model_path = 'klue/roberta-small'

# 토크나이저 로드
tokenizer = AutoTokenizer.from_pretrained(model_path)

# 모델 로드
num_labels=len(cat_idx_dict) # 모델이 분류하는 entity의 수(갯수)를 정의
model = AutoModelForTokenClassification.from_pretrained(model_path, num_labels=num_labels)

# 모델의 토큰 임베딩 크기를 토크나이저의 크기로 조정
model.resize_token_embeddings(len(tokenizer))

데이터 인코딩 함수
- batch 처리 옵션을 적용 : truncation, padding

In [None]:
def tokenize_and_encode(examples):
    return tokenizer(examples["text"], truncation=True, padding='max_length')

In [None]:
# labels 패딩 함수
def pad_labels(data):
    seq_length = data['input_ids'].size(0)

    # labels 텐서의 길이를 가져옵니다.
    labels_length = data['labels'].size(0)

    # labels 텐서를 input_ids의 길이와 동일하게 패딩합니다.
    if labels_length < seq_length:
        padding_length = seq_length - labels_length
        padding_tensor = torch.full((padding_length,), -100, dtype=torch.long)
        data['labels'] = torch.cat([data['labels'], padding_tensor])

    # for k, v in data.items():
    #     data[k] = v.unsqueeze(0)
    return data

def adjust_labels_length(data):
    # input_ids의 tensor 길이 (sequence length).
    seq_length = data['input_ids'].size(0)

    # labels의 tensor 길이.
    labels_length = data['labels'].size(0)

    # tensor 길이 조정
    if labels_length > seq_length:
        data['labels'] = data['labels'][:seq_length]

    return data

### 데이터 임베딩

토크나이징 함수 적용
- torch 포맷으로 변환

In [None]:
ds_enc = sds.map(tokenize_and_encode, batched=True)
ds_enc.set_format("torch")

데이터별 길이 맞춤(tensor 길이)

In [None]:
ds_enc = ds_enc.map(pad_labels, num_proc=8, batched=False)
ds_enc = ds_enc.map(adjust_labels_length, num_proc=8, batched=False)

## NER 모델 학습


### Train augment 설정

In [None]:
EPOCHS = 3 # Epochs 설정
project_name = f'NER_{EPOCHS}epochs' # 학습 실험 프로젝트 명명
output_dir = f'./project/{project_name}' # 모델 저장경로 설정

training_args = TrainingArguments(
    output_dir=output_dir, # 모델 저장 경로
    num_train_epochs=EPOCHS, # 학습 Epochs
    per_device_train_batch_size=4, # 학습 batch size
    per_device_eval_batch_size=4, # 검증 batch size
    warmup_steps=200, # warmup_steps
    weight_decay=0.01,
    logging_dir='./logs',
    logging_strategy="epoch", # 학습 중, 성능 평가 실행 기준
    load_best_model_at_end=True, # 학습 모델(checkpoint) 저장 옵션 / 학습이 끝난 후 가장 좋은 모델 로드
    metric_for_best_model="eval_loss", # 가장 좋은 모델을 결정할 metric 기준
    lr_scheduler_type="cosine_with_restarts", # scheduler 설정
    greater_is_better=False, # 모델 저장 기준의 모니터링 방향. 점수 정렬 기준
    save_strategy='epoch', # 학습 모델(checkpoint) 저장 단위
    save_total_limit=1, # 저장할 학습 모델(checkpoint) 개수

    eval_strategy='epoch',  # Evaluation 실행 기준
    logging_steps=10, # 학습 중 성능 지표 출력 기준
    disable_tqdm=False, # 학습 진행률 표시 여부
    report_to="none",
)



### 성능 지표(benchmark) 산출 함수
- metric : accuracy, f1-score

In [None]:
from sklearn.metrics import f1_score, accuracy_score

def ner_compute_metrics(eval_preds):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)

    # 패딩이나 특별 토큰이 아닌 경우에 대한 실제 레이블 및 예측값 추출
    true_labels = []
    pred_labels = []

    for label_list, prediction in zip(labels, predictions):
        for label, pred in zip(label_list, prediction):
            # 무시할 토큰이 아니라면 리스트에 추가
            if label != -100:  # 여기서 -100은 무시해야 할 레이블 ID입니다.
                true_labels.append(label)
                pred_labels.append(pred)

    # 토큰 수준에서의 정확도 및 F1 점수 계산
    accuracy = accuracy_score(true_labels, pred_labels)
    f1 = f1_score(true_labels, pred_labels, average='micro')  # NER에서는 'micro' 평균을 사용하는 것이 일반적입니다.

    return {
        'accuracy': accuracy,
        'f1': f1,
    }

### trainer 설정

In [None]:
trainer = Trainer(model=model, # 학습 모델
                  args=training_args, # train augments
                  train_dataset=ds_enc["train"], # 학습 데이터셋
                  eval_dataset=ds_enc["valid"], # 평가 데이터셋
                  tokenizer=tokenizer, # 토크나이저
                  compute_metrics=ner_compute_metrics # 성능 측정 함수
                  )

print(f'project : {project_name}')
print(f'output_dir : {output_dir}')

### NER 학습 실행
- training_args 등에서 설정한 hyper parameter로 학습을 실행
- epoch 단위 성능 평가 실행
- 학습 완료 후, best model에 해당하는 checkpoint와 tokenizer 저장
    - checkpoint 1개 저장

In [None]:
print('\n')
print(f'project : {project_name}')

# 학습 실행(trainer)
trainer.train()

# 토크나이저 저장(학습 완료 후, 모델의 저장 경로를 확인하여 저장)
tokenizer_path = glob.glob(os.path.join(training_args.output_dir,'*'))
print(tokenizer_path)
tokenizer.save_pretrained(tokenizer_path[0])

## 성능 측정
> 학습한 모델의 benchmark와 추론 테스트를 통해 성능을 확인

### 라이브러리

In [None]:
from transformers import pipeline
from sklearn.metrics import f1_score, accuracy_score

### 학습 모델 로드

In [None]:
# 학습 모델(checkpoint) 경로
ckp_model='./project/NER_3epochs/checkpoint-798'

# 토크나이저 로드
tokenizer = AutoTokenizer.from_pretrained(ckp_model)

# 모델 로드
model = AutoModelForTokenClassification.from_pretrained(ckp_model, num_labels=num_labels)

### benchmark 측정


benchmark 측정 함수
- accuracy와 f1 score를 출력


In [None]:
# benchmark 함수
from sklearn.metrics import f1_score, accuracy_score
import numpy as np
def ner_compute_metrics(eval_preds):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)

    # 패딩이나 특별 토큰이 아닌 경우에 대한 실제 레이블 및 예측값 추출
    true_labels = []
    pred_labels = []

    for label_list, prediction in zip(labels, predictions):
        for label, pred in zip(label_list, prediction):
            # 무시할 토큰이 아니라면 리스트에 추가
            if label != -100:  # 여기서 -100은 무시해야 할 레이블 ID입니다.
                true_labels.append(label)
                pred_labels.append(pred)

    # 토큰 수준에서의 정확도 및 F1 점수 계산
    accuracy = accuracy_score(true_labels, pred_labels)
    f1 = f1_score(true_labels, pred_labels, average='micro')  # NER에서는 'micro' 평균을 사용하는 것이 일반적입니다.

    return {
        'accuracy': accuracy,
        'f1': f1,
    }

Benchmark Augmemnt 설정

In [None]:
args = TrainingArguments(
    'eval',
    fp16=True,
    report_to='none',
    per_device_eval_batch_size=1
)



benchmark trainer 설정

In [None]:
from transformers import DataCollatorWithPadding

trainer = Trainer(model=model,
                  args=args,
                  tokenizer=tokenizer,
                  compute_metrics=ner_compute_metrics
                 )

benchmark 실행 & 결과 확인

In [None]:
# benchmark 데이터셋
test_data = ds_enc['test']

# benchmark 실행
benchmark_result = trainer.predict(test_data)

# benchmark 결과 확인
benchmark_result.metrics

### 모델 추론 테스트 (pipeline)
> 학습한 모델을 NER 파이프라인으로 로드하여 텍스트 입력에 대한 결과를 확인

### NER 파이프라인

파이프라인 로드

In [None]:
# 테스트 텍스트 -> 개인정보가 포함된 텍스트
original_text=test_data[3]['text']

# NER 파이프라인 실행
ner = pipeline("ner", model=ckp_model, tokenizer=tokenizer, grouped_entities=True,device=0)# 파이프라인 입력

# NER 파이프라인 실행
result = ner(original_text)

NER 결과 확인
- NER 모델의 결과는 entity_group, score, word, start, end 로 구성 됨
    - entity_group : Entity label
    - score : Entity 추론 점수
    - word : 객체 텍스트
    - start : word의 시작 index
    - end : word의 종료 index


In [None]:
result

## 개인정보 비식별화 처리
> 텍스트(개인정보가 포함된 텍스트)를 파이프라인으로 불러온 NER 모델에 입력하여 개인정보가 비식별화 처리된 텍스트를 반환하는 프로세스를 구축
- entity_group을 Entity 객체로 다시 매핑 (LABEL_1 -> ADDRESS)
- 'B-', 'I-'로 나눠진 Entity는 병합하여 하나의 객체로 처리
- 의미없는 Entity(LABEL_0,0)은 출력되지 않도록 함.
- 병합 처리된 entity_group, word, index(strart,end)를 기준으로 텍스트(개인정보가 포함된 텍스트)의 가명처리(비식별화)를 진행

### NER(pipeline) 결과 처리 함수
- postprocess



In [None]:
# 라벨을 카테고리로 매핑하는 함수
cat_list = list(cat_idx_dict.keys())

# entity_group 라벨을 entity 명으로 변환하는 함수
def label_changer(x,origin_text):
    result_list = []
    for item in x:

        # entity_group이 LABEL_0이 아닌 값만 추출
        if item['entity_group'] != 'LABEL_0':
            label_num = int(item['entity_group'].split('_')[-1])

            # category list에서 LABEL에 맞는 값을 찾아 치환
            entity_group = cat_list[label_num]

            # 치환한 LABEL, 입력된 텍스트에서 추출된 단어, 단어의 위치 값으로 dict 저장
            data_dict = {
                'entity_group':entity_group,
                'word':origin_text[item['start']:item['end']],
                'location':[item['start'],item['end']]
            }

            result_list.append(data_dict)

    return result_list

# BIO 표기로 나눠진 entity_group 병합 함수
def predict_printer(x):
    merged_results = []
    current_entity = None
    current_word = ''
    current_location = []

    for item in x:
        entity_group = item['entity_group']
        word = item['word']
        location = item['location']

        if entity_group.startswith('B-'):
            if current_entity:
                # 이전 결과 추가
                merged_results.append({
                    'entity_group': current_entity[2:],  # 'B-' 제거
                    'word': current_word,
                    'location': [current_location[0], current_location[1]]  # 시작과 끝 위치
                })

            # 새 엔티티 시작
            current_entity = entity_group
            current_word = word
            current_location = location
        elif entity_group.startswith('I-') and current_entity and entity_group[2:] == current_entity[2:]:
            # 같은 엔티티 그룹의 I-일 경우 병합
            current_word += word
            current_location[1] = location[1]  # 끝 위치 업데이트

    # 마지막 엔티티 결과 추가
    if current_entity:
        merged_results.append({
            'entity_group': current_entity[2:],  # 'B-' 제거
            'word': current_word,
            'location': [current_location[0], current_location[1]]
        })

    return merged_results


NER postprocess 함수 테스트
- 'B-', 'I-'로 구분된 객체명의 병합을 확인

In [None]:
result = ner(original_text)

print('\n')
print('+++++++++++++++++\n')
print('### 텍스트 ###')
print(original_text)
ner_result = label_changer(result,original_text)
ner_result = predict_printer(ner_result)

ner_result

print(ner_result)
print('\n+++++++++++++++++\n')
print('### NER ###')
for item in ner_result:
    print(item['entity_group'],':',item['word'])

### 텍스트 가명처리 코드

In [None]:
target_text = original_text  # 변환 텍스트

# 역순 정렬로 대체: 위치 겹침 방지
for entity in sorted(ner_result, key=lambda x: x['location'][0], reverse=True):
    category = entity['entity_group']
    location = entity['location']
    word = target_text[location[0]:location[1]]
    entity['word'] = word  # 병합된 word 값 사용
    target_text = target_text[:location[0]] + f'#@{category}#' + target_text[location[1]:]

data_dict = {
    'text': original_text,
    'prediction': ner_result,
    'de_identification_text': target_text
}

print(data_dict['de_identification_text'])

### 개인정보 가명처리 코드(for 문)
- 테스트 데이터의 text를 순회하며 개인정보 가명처리를 적용
- 최종적으로 출력되는 데이터 입력되는 텍스트, 가명처리 정보, 가명처리된 텍스트로 구성
    - text : 입력 텍스트
    - prediction : 가명처리 정보(전처리된 모델의 추론결과)
    - de_identification_text : 가명처리된 텍스트

In [None]:
test_data = ds_enc['test']

ner_results = []

for item in tqdm(test_data):
    original_text = item['text']
    result = ner(original_text)
    ner_result = label_changer(result,original_text)
    ner_result = predict_printer(ner_result)
    # print(ner_result)

    target_text = original_text  # 변환 텍스트

    # 역순 정렬로 대체: 위치 겹침 방지
    for entity in sorted(ner_result, key=lambda x: x['location'][0], reverse=True):
        category = entity['entity_group']
        location = entity['location']

        word = target_text[location[0]:location[1]] # 텍스트 파싱(index 기준)
        entity['word'] = word  # 병합된 word 값 사용
        target_text = target_text[:location[0]] + f'#@{category}#' + target_text[location[1]:]

    data_dict = {
        'text': original_text,
        'prediction': ner_result,
        'de_identification_text': target_text
    }

    ner_results.append(data_dict)

print(len(ner_results))

### 가명처리 결과 확인
- Dataframe(pandas)

In [None]:
df = pd.DataFrame(ner_results)

df