### 가명처리 모듈 파이썬 오픈소스 라이브러리 구현 프로젝트

**가명정보**

개인정보 일부를 삭제, 대체하는 등 가명처리함으로써 원래 상태로 복원하기 위한 추가정보의 사용, 결합 없이는 특정 개인을 알아볼 수 없는 정보

**가명정보 처리**

가명처리와 달리 가명정보 처리는 가명정보의 수집, 생성, 연계, 연동, 기록, 저장, 보유, 가공, 편집, 검색, 출력, 정정, 복구, 이용, 제공, 공개, 파기, 그밖에 이와 유사한 행위를 말한다. 

**신용정보의 이용에 관한 법률 제2조제15호 및 제16호에 의한 가명정보**

15. “가명처리”란 추가정보를 사용하지 아니하고는 특정 개인인 신용정보주체를 알아볼 수 없도록 개인신용정보를 처리(그 처리 결과가 다음 각 목의 어느 하나에 해당하는 경우로서 제40조의2제1항 및 제2항에 따라 그 추가정보를 분리하여 보관하는 등 특정 개인인 신용정보주체를 알아볼 수 없도록 개인신용정보를 처리한 경우를 포함한다)하는 것을 말한다.
    
가. 어떤 신용정보주체와 **다른 신용정보주체가 구별**되는 경우
    
나. 하나의 정보집합물(정보를 체계적으로 관리하거나 처리할 목적으로 일정한 규칙에 따라 구성되거나 배열된 둘 이상의 정보들을 말한다. 이하 같다)에서나 서로 다른 둘 이상의 정보집합물 간에서 어떤 신용정보주체에 관한 **둘 이상의 정보가 연계되거나 연동**되는 경우
    
다. 가목 및 나목과 유사한 경우로서 대통령령으로 정하는 경우
    
16. “가명정보”란 가명처리한 개인신용정보를 말한다.

**개인식별정보와 개인식별가능정보**

|  | 식별정보 | 식별가능정보 |
| --- | --- | --- |
| 정의 | 특정 개인과 직접적으로 연결되는 정보 | 다른 항목과 결합하는 경우 식별가능성이 높아지는 항목 해당 정보를 처리하는 자(이용 또는 제공하는 자)를 기준으로 판단 |
| 예시 | 성명, 고유식별정보, 의료기록번호, 건강기록번호, (개인)휴대전화번호, (개인)전자우편주소 등 | 성별, 연령, 거주 지역, 국적, 직업, 위치정보 등 |

https://github.com/ksydata/pseudonymizer/tree/main

---

**가명처리의 5단계 절차**

1. 목적 설정 등 사전 준비 단계
    
    개인정보를 가명처리하는 목적을 설정한다. 개인정보 처리방침 및 내부관리계획 등 필요한 기본 서류를 작성한다. 정보주체의 동의 없이 가명처리하기 위해서는 1️⃣ 통계 작성, 2️⃣ 과학적 연구(상업 목적 연구 포함), 3️⃣ 공익적 기록 보존이라는 3가지 목적으로 한정된다. 

2. **위험성 검토 단계**
    
    가명처리 대상 데이터의 식별 위험성을 분석 · 평가한다. 데이터 식별가능성은 1️⃣ 데이터 자체의 식별 위험성(data, context side)과 2️⃣ 처리 환경의 안전성에 따른 식별 위험성으로 구분된다. 
    
    데이터 자체의 식별 위험성은 1️⃣ 성명 · 고유식별정보 등 데이터 식별성이 일정 수준 이상인지, 2️⃣ 특이정보(희귀 성씨 등)가 포함되어 있는지, 3️⃣ 식별될 경우 사회적 파장이 클 수 있는 정보(유명인의 정보 등)에 대한 종합 결론을 말한다. 
    
    처리 환경의 식별 위험성은 1️⃣ 데이터의 활용 형태(내부 이용 또는 외부 제공), 2️⃣ 처리 장소(다른 정보의 접근이 제한된 장소에서 처리되는지 여부 등),  3️⃣ 처리 방법(가명정보를 다른 정보와 연계하는지 등)을 고려해 판단한다. 
    
    위험성 검토 결과는 보고서로 작성되어 적정성 검토 시 활용된다.

3. **가명처리 단계**

    가명처리 기법과 처리 수준을 결정 · 실행한다. 대표적인 가명처리 기술은 대체 · 삭제 ·  범주화다. 1️⃣ 대체는 개인식별정보를 암호화된 일련번호(해시값)으로 대신한다. 2️⃣ 삭제는 개인정보 전부 또는 일부(행, 열, 로컬 등)를 없애는 것이다. 3️⃣ 범주화는 연속형 변수를 일정 단위로 묶는 것이다. 이외에도 4️⃣ 양쪽 끝에 치우친 정보를 삭제 또는 경계치를 입력하는 상하단 코딩 기법과 프라이버시 보호 모델(KLT 모델)이 주로 활용된다.

4. 적정성 검토 단계  
    
    내부 또는 외부 검토위원들은 가명처리 결과의 적적성을 최종 논의한다. 3명 이상의 검토위원이 도출한 최종 검토 결과를 개인정보 처리자에게 전달한다. 

5. 안전한 관리 단계
    
    가명정보[개인정보]에 대한 안전성 확보 조치를 이행하고, 처리내역을 기록 ·  보관한다. 특정 개인의 재식별 위험을 모니터링하는 것이 중요하다. 가명정보의 보유기간이 도래하면 지체없이 파기하여야 한다.

In [1]:
import pymysql
from sqlalchemy import create_engine
# pip install mysqlclient

from abc import ABC, abstractmethod
from typing import *
import re

from prettytable import PrettyTable
import pandas as pd
pd.options.display.float_format = '{:.10f}'.format
import pprint

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

**0.파이썬 스크립트를 통해 MySQL 데이터베이스에 접속하여 테이블을 조회하는 방식**

* import ⇢ db접속 ⇢ cursor 생성* ⇢ sql문 작성 ⇢ sql문 실행 ⇢ 실행 결과 확정(commit) ⇢ 연결 해제
* 검색해온 데이터를 key:value 타입으로 가지고 오는 pymysul.cursor.Dictionary
* insert, update, delete의 경우 자원 닫아주기 전에 commit을 해야 DB에 작업 내용이 저장
```
CURSOR = PseudonymDB.cursor()
SQL = """SELECT * FROM DATABASE.TABLE"""
CURSOR.execute(SQL)
PseudonymDB.commit()
PseudonymDB.close()
```

In [None]:
class PreprocessQuery:
    """데이터 전처리 목적의 SQL 쿼리 클래스"""
    def __init__(self, pw):
        self._pw = pw
        self.connection = None
        self.Cursor = None
        self.SQL = None
    
    def connectMySQL(self, 
                     serverIP: str, port_num: int, user_name: str, database_name: str, kr_encoder: str):
        """MySQL DBMS 데이터베이스에 접속: 서버IP주소, 사용자명, 계정 암호, 데이터베이스명, 한글 인코딩 방식"""
        try:
            self.connection = pymysql.connect(
                host = serverIP, port = port_num,
                user = user_name, password = self._pw,
                db = database_name, charset = kr_encoder
            )
            self.Cursor = self.makeCursor(self.connection)
        except pymysql.Error as e:
            print(f"Error Connecting to MySQL from Python: {e}")
    
    def makeCursor(self, connect):
        """커서 생성"""
        return connect.cursor()
    
    def dataQueryLanguage(self, sql):
        """SQL 쿼리문 작성"""
        self.SQL = f"{sql}"
    
    def queryExecute(self):
        """SQL 쿼리문 실행 및 예외처리"""
        try:
            self.Cursor.execute(self.SQL)
            actionOutput = self.Cursor.fetchall()
            return actionOutput
        except pymysql.Error as e:
            print(f"Error Executing Query: {e}")
    
    def queryCommit(self):
        """실행 결과 확정"""
        self.Cursor.execute(self.SQL)
        self.Cursor.commit()
    
    def closeConnection(self):
        """연결 및 커서 닫기"""
        if self.Cursor:
            self.Cursor.close()
        if self.connection:
            self.connection.close()

In [None]:
queryObject = PreprocessQuery(pw = "0123")

In [None]:
queryObject.connectMySQL( 
    serverIP = "localhost", port_num = 3306, user_name = "root", database_name = "FINANCIALCONSUMER", kr_encoder = "utf8")

In [None]:
SQL = input("SQL 쿼리문 입력변수 = ")
queryObject.dataQueryLanguage(sql = SQL)
results = queryObject.queryExecute()

In [None]:
results

In [None]:
DQL = """SELECT 
    SUBSCRIPTION_MONTH,
    MEMBERSHIP_GRADE,
    SUBSCRIPTION_SALES * 100 / TOTAL_SALES AS PCT_TOTAL_SALES
FROM (
    SELECT 
        T1.SUBSCRIPTION_MONTH,
        T1.MEMBERSHIP_GRADE,
        T1.SUBSCRIPTION_SALES,
        SUM(T2.SUBSCRIPTION_SALES) AS TOTAL_SALES
    FROM (
        SELECT 
            DATE_FORMAT(STR_TO_DATE(SUBSCRIPTION_DATE, '%Y%m%d'), '%Y-%m') AS SUBSCRIPTION_MONTH,
            MEMBERSHIP_GRADE,
            SUM(SUBSCRIPTION_FEE) AS SUBSCRIPTION_SALES
        FROM 
            DATA_MOBILE_COMMUNICATION
        GROUP BY 
            SUBSCRIPTION_MONTH, MEMBERSHIP_GRADE
    ) AS T1
    JOIN (
        SELECT 
            DATE_FORMAT(STR_TO_DATE(SUBSCRIPTION_DATE, '%Y%m%d'), '%Y-%m') AS SUBSCRIPTION_MONTH,
            MEMBERSHIP_GRADE,
            SUM(SUBSCRIPTION_FEE) AS SUBSCRIPTION_SALES
        FROM 
            DATA_MOBILE_COMMUNICATION
        WHERE 
            MEMBERSHIP_GRADE IN ('VIP', 'VVIP')
        GROUP BY 
            SUBSCRIPTION_MONTH, MEMBERSHIP_GRADE
    ) AS T2
    ON T1.SUBSCRIPTION_MONTH = T2.SUBSCRIPTION_MONTH
    WHERE T1.MEMBERSHIP_GRADE IN ('VIP', 'VVIP')
    GROUP BY 1, 2, 3
) T3
ORDER BY 1, 2;
"""

In [None]:
# DQL = input("SQL 쿼리문 입력변수 = ")
queryObject.dataQueryLanguage(sql = DQL)
results = queryObject.queryExecute()

In [None]:
if results:
    # PrettyTable에 결과를 추가할 때 각 행의 값이 컬럼 수와 일치해야 함
    # ValueError: Row has incorrect number of values, (actual) 5!=1 (expected)
    columns = [ [desc[0] for desc in queryObject.Cursor.description] ]
    table = PrettyTable(*columns)
    
    for row in results:
        row_list = list(row)
        table.add_row(row_list)
    print(table)
    # ???
    # 원인은 DB에 애초에 잘못 저장된 테이블의 가입일자 컬럼에 있었음
    # 해결은 모바일 통신서비스 가입일자가 YYYYMMDD 형식이므로 일단 가변문자 타입인 VARCHAR로 선언한 후
    # DATE_FORMAT(time, timestamp)으로 형식 변환 등 수행

In [None]:
# 월간 전체 (통신서비스 가입) 매출액 대비 멤버십 회원등급별 매출액 비율 시각화  
plt.figure(figsize = (50, 10))
plt.plot(
    pd.DataFrame(results).loc[pd.DataFrame(results)[1] == "VIP", 0],
    pd.DataFrame(results).loc[pd.DataFrame(results)[1] == "VIP", 2],
    color = "green"
)
# plt.show()
# plt.figure(figsize = (50, 10))
plt.plot(
    pd.DataFrame(results).loc[pd.DataFrame(results)[1] == "VVIP", 0],
    pd.DataFrame(results).loc[pd.DataFrame(results)[1] == "VVIP", 2],
    color = "orange"
)
plt.show()

---

In [2]:
engine = create_engine(
    "mysql://root:0123@localhost/FINANCIALCONSUMER", 
    convert_unicode = True)
conn = engine.connect()

  engine = create_engine(


In [3]:
DATA_FINANCE = pd.read_sql_table("DATA_FINANCE", conn)
DATA_RETAIL = pd.read_sql_table("DATA_RETAIL", conn)
DATA_MOBILE_COMMUNICATION = pd.read_sql_table("DATA_MOBILE_COMMUNICATION", conn)
DATA_JOIN_CARDPAYMENT = pd.read_sql_table("DATA_JOIN_CARDPAYMENT", conn)
DATA_JOIN_ACCOMODATIONAPP = pd.read_sql_table("DATA_JOIN_ACCOMODATIONAPP", conn)

---

1. 구현의 핵심 목표는 **가명처리 패키지를 구조화하고 필요한 기능을 제공하기 위해 디렉토리 구조와 객체 및 속성을 설계**함에 있다.

In [4]:
# ./pseudonymizer/

class Pseudonymizer(ABC):
    """가명처리 추상 클래스 및 추상 메서드 선언"""
    @abstractmethod
    def pseudonymizeData(self, value):
        """확장성을 갖춘 가명처리 클래스를 만들어 특정 가명처리 기법으로 구체화하기 위한 추상 메서드"""
        pass

---

2. **MaskingPseudonymizer | 마스킹 적용(정규표현식을 주로 활용)**

* 성명: 직접 식별자에 대해 마스킹을 적용하여 식별성을 낮출 수 있음. 단, 이름이 세글자가 아닌 경우에는 어떻게 할지(두글자이거나 네글자 이상일 경우. 한국인이 아닌 외국인일 경우 어떻게 처리할지)

* 나이 : 간접 식별자에 대해 마스킹을 적용할 수는 있지만 마스킹 방식을 거의 사용하지 않음

* 질병코드 : 민감 질병에 대해 마스킹을 적용하여 식별성을 낮춘 사례

* 사업자 등록번호 :

    (1) 법인의 종류를 알 수 없도록 가운데 마스킹(2자릿수)
    
    (2) 법인의 종류만 구분가능하도록 맨 마지막 마스킹(5자릿수)
    
* 이메일 주소 : 메일발신기관만 구분가능하도록 마스킹하거나 발신자와 발신기관 모두 구분할 수 없도록 마스킹

* 특수문자 체크 정규식
  const regExp = /[\{\}\[\]\/?.,;:|\)*~`!^\-_+<>@\#$%&\\\=\(\'\"]/g;

* 모든 공백 체크 정규식
  const regExp = /\s/g;

* 숫자만 체크 정규식
  const regExp = /[0-9]/g;

* 이메일 체크 정규식
  const regExp = /^[0-9a-zA-Z]([-_\.]?[0-9a-zA-Z])*@[0-9a-zA-Z]([-_\.]?[0-9a-zA-Z])*\.[a-zA-Z]{2,3}$/i;
  
* 핸드폰번호 정규식
  const regExp = /^\d{3}-\d{3,4}-\d{4}$/;

* 일반 전화번호 정규식
  const regExp = /^\d{2,3}-\d{3,4}-\d{4}$/;

* 아이디나 비밀번호 정규식
  const regExp = /^[a-z0-9_]{4,20}$/;

* 휴대폰번호 체크 정규식
  const regExp = /^01([0|1|6|7|8|9]?)-?([0-9]{3,4})-?([0-9]{4})$/;

In [None]:
# ./pseudonymizer/pseudonymizers/nameMasking.py

# from pseudonymizer.pseudonymizer import Pseudonymizer

class NameMaskingModule(Pseudonymizer):
    """
    성명 마스킹 클래스
    ------------------
    성명 식별자에 직접 마스킹을 적용하여 식별성을 낮추는 구체 클래스
    3자릿수가 넘는 이름은 특정인에 대한 식별가능성이 높아지므로 4자릿수로 마스킹
    """
    def pseudonymizeData(self, name):
        name_list = list(name)
        if 2 <= len(name) <= 3:
            return name_list[0] + "*"*(len(name)-1)
        else: 
            # len(name) > 4:
            return name_list[0] + "*"*3

In [None]:
# ./pseudonymizer/pseudonymizers/emailMasking.py

# from pseudonymizer.pseudonymizer import Pseudonymizer
# import re
# from typing import *

class EmailMaskingModule(Pseudonymizer):
    """
    이메일 마스킹 클래스
    --------------------
    이메일 주소의 메일 발신자 또는 발신기관을 구분할 수 없도록 하는 구체 클래스
    """
    def __init__(self, masking_domain: bool):
        self.masking_domain = masking_domain
        
    def pseudonymizeData(self, email):
        pattern_match = re.match(
            r"^[a-zA-z0-9]([-_\.]?[a-zA-Z0-9])*@[a-zA-Z0-9]+\(.[a-zA-Z]{2,3})", email)
            # .: 정확히 1개 문자 매칭
            # * : 앞 패턴이 0개 이상이어야 함
            # ? : 앞 패턴이 없거나 하나이어야 함
            # + : 1회 이상 반복되는 패턴을 매칭
            # \. : 도메인과 최상위 도메인(TLD)에 대한 구분자 마침표 
            # {,} : 중괄호 안에 표기된 범위만큼 반복되는 패턴을 매칭. {3,5}는 3~5회 매칭을 의미함
            
        if pattern_match:
            local_part = pattern_match.group(0)
            domain_part = pattern_match.group(1)
            tld_part = pattern_math.group(2)
            # local_part, domain_part = email.split("@")
            
            if self.masking_domain:
                masked_local_part = re.sub(r"\S", "*", local_part)
                masked_domain_part = re.sub(r"\S", "*", domain_part)
                return masked_local_part + "@" + masked_domain_part + tld_part
            else:
              # self.masking_domain = False:
                masked_local_part = re.sub(r"\S", "*", local_part)
                return masked_local_part + "@" + domain_part + tld_part

        else:
            print("입력받은 { }은 이메일 패턴에 매칭되지 않아 마스킹할 수 없습니다.".format(email))
    
        # 이메일의 표준은 인터넷 표준 기구(IETF, Internet Engineering Task Force)에서 정의
        # re.sub(pattern, replace, text)

In [None]:
# ./pseudonymizer/pseudonymizers/residentNumMasking.py

# from pseudonymizer.pseudonymizer import Pseudonymizer

class ResidentNumberMaskingModule(Pseudonymizer):
    """
    주민등록번호 마스킹 클래스
    --------------------
    주민등록번호 뒷자리를 복원할 수 없는 비가역성 기법으로 개인의 식별을 방지하는 구체 클래스
    """        
    def pseudonymizeData(self, resident_number):
        """한국 주민등록번호의 정규표현식을 기준으로 패턴 매칭이 되는 경우 뒷자리 7자리 중 
        성별정보 1자리를 제외한 나머지 출생지정보에 대한 6자리에 대한 마스킹을 수행하는 메서드"""
        pattern_match = re.match(r"^\d{6}-[1-8]\d{6}$", resident_number)
        
        if pattern_match:
            front_part, rear_part = resident_number.split("-")
            return front_part + rear_part[0] + "*"*6
            
        else:
            print("입력받은 { }은 주민등록번호 패턴에 매칭되지 않아 마스킹할 수 없습니다.".format(resident_number))

In [None]:
# ./pseudonymizer/pseudonymizers/businessMasking.py

# from pseudonymizer.pseudonymizer import Pseudonymizer
# from typing import *

class BusinessNumberMaskingModule(Pseudonymizer):
    """
    사업자등록번호 마스킹 클래스
    --------------------
    사업자등록번호의 일부(2번째 혹은 3번째 자리)를 복원할 수 없는 비가역성 기법으로 개인의 식별을 방지하는 구체 클래스
    """
    def __init__(self, masking_part: str):
        self.masking_part = masking_part
        
    def pseudonymizeData(self, business_number):
        """한국 사업자등록번호의 정규표현식을 기준으로 패턴 매칭이 되는 경우 마스킹을 수행하는 메서드"""
        pattern_match = re.match(r"^\d{3}-\d{2}-\d{5}$", business_number)
        # ex) 124-86-23875
        
        if pattern_match:
            front_part, middle_part, rear_part = business_number.split("-")
            
           # 2번째 자리인 법인의 등록지역을 마스킹할 때
            if self.masking_part == "middle":
                return front_part + "*"*2 + rear_part
           # 3번째 자리인 법인의 일련번호를 마스킹할 때
            elif self.masking_part == "rear":
                return front_part + middle_part + "*"*5
           # 1번째 자리인 법인의 유형만을 남기고 마스킹할 때
            elif self.masking_part == "both":
                return front_part + "*"*2 + "-" + "*"*5
        else:
            print("입력받은 { }은 사업자등록번호 패턴에 매칭되지 않아 마스킹할 수 없습니다.".format(business_number))

In [None]:
# ./pseudonymizer/pseudonymizers/businessMasking.py

# from pseudonymizer.pseudonymizer import Pseudonymizer
# from typing import *

class AddressMaskingModule(Pseudonymizer):
    """
    주소 마스킹 클래스
    --------------------
    시군구 읍면동 단위의 일부를 복원할 수 없는 비가역성 기법으로 제거하는 구체 클래스
    """
    def __init__(self, masking_loca: str):
        self.masking_loca = masking_loca
        
    def pseudonymizeData(self, address: str):
        """주소의 일부에 대한 마스킹을 수행하는 메서드"""    
        # 시군구 단위
        if self.masking_loca == "시군구":
            return 
        # 시군구 읍면동/행정동 단위
        elif self.masking_part == "읍면동":
            return 
        else:
            print("입력받은 { }은 주소가 아닙니다.".format(address))

In [None]:
# ./pseudonymizer/pseudonymizers/phoneNumMasking.py

# from pseudonymizer.pseudonymizer import Pseudonymizer
# from typing import *

class PhoneNumberMaskingModule(Pseudonymizer):
    """
    연락처(휴대전화번호 혹은 일반전화번호) 마스킹 클래스
    --------------------
    3번째 자리를 복원할 수 없는 비가역성 기법으로 개인의 식별을 방지하는 구체 클래스
    특히 연관된 다른 정보(생년월일, 기념일, 가족 전화번호, 기존 통화내역 등)와 쉽게 결합하여 사용자가 누구인지 식별가능하다는 점에서 개인정보에 해당함
    """   
    def pseudonymizeData(self, phone_number):
        """전화번호의 정규표현식을 기준으로 패턴 매칭이 되는 경우 마스킹을 수행하는 메서드"""
        pattern_match = re.match(r"^\[0-9]-\[0-9]-\d{4}$", phone_number)
        
        if pattern_match:
            front_part, middle_part, rear_part = phone_number.split("-")
            return front_part + middle_part + "*"*4
        # 다만, 전화번호 마지막 4자리는 ****와 같은 기호로 대체하지 않고 전체를 해시값으로 암호화하기도 한다는 점에 유의하여야 함
        # 암호화와 복호화는 알고리즘 및 키 관리 등 복잡한 과정이 필요하므로 높은 보안이 필요한 경우 고려함
        else:
            print("입력받은 { }은 전화번호 패턴에 매칭되지 않아 마스킹할 수 없습니다.".format(phone_number))

In [None]:
# ./pseudonymizer/pseudonymizers/maskingPseudonymizer.py

# from pseudonymizer.pseudonymizers.nameMasking import NameMaskingModule
# from pseudonymizer.pseudonymizers.emailMasking import EmailMaskingModule
# from pseudonymizer.pseudonymizers.residentNumMasking import ResidentNumberMaskingModule
# from pseudonymizer.pseudonymizers.businessNumMasking import BusinessNumberMaskingModule
# from pseudonymizer.pseudonymizers.phoneNumMasking import PhoneNumberMaskingModule

# import re
# from typing import *

    
class MaskingPseudonymizer(Pseudonymizer):
    def __init__(self, data_type: str, masking_domain: bool, masking_part: str):
        """data_type은 향후 pseudonymizer.py에서 Pseudonymn 실행 클래스의 
        self._dataframe[column] 개인식별정보의 유형으로 이름, 이메일, 주민등록번호, 사업자등록번호 중 하나로 선언"""
        self.data_type = datatype,
        self.email_masker = EmailMaskingModule(masking_domain)
        self.name_masker = NameMaskingModule()
        self.resident_num_masker = ResidentNumberMaskingModule()
        self.business_num_masker = BusinessNumberMaskingModule(masking_part)
        self.phone_num_masker = PhoneNumberMaskingModule()

    def pseudonymizeData(self, data):
        if data_type == "name":
            return self.name_masker.pseudonymzieData(data)
        elif data_type == "email":
            return self.email_masker.pseudonymzieData(data)
        elif data_type == "resident_number":
            return self.resident_num_masker.pseudonymizeData(data)
        elif data_type == "business_number":
            return self.business_num_masker.pseudonymizeData(data)
        elif data_type == "phone_number":
            return self.phone_num_masker.pseudonymizeData(data)
        else:
            raise ValueError("유효한 마스킹 대상 개인식별정보 데이터 타입이 아닙니다.")

---

3. **RoundingPseudonymizer**
* 일반라운딩은 np.round()함수로 처리가능하므로 구현의 의미 없으므로 제외하며,
* 제어라운딩은 라운딩 적용 전후의 항목 합계를 일치시키면서 소요되는 높은 연산량으로 실무에서 활용하지 않으므로 제외
* 랜덤라운딩만 구현할 예정
* 나이, 신장, 소득, 카드지출금액, 유동인구, 사용자 수 등에 적용

In [None]:
# ./pseudonymizer/pseudonymizers/randomRoundingPseudonymizer.py

# from pseudonymizer.pseudonymizer import Pseudonymizer
# from typing import *

class RandomRoundingPseudonymizer(Pseudonymizer):
    """
    랜덤라운딩 구체 클래스
    ----------------------
    데이터의 길이가 일정하지 않은 경우 값의 크기에 따라 처리 단위를 다르게 올림, 내림, 반올림하는 가명처리 기법
    """
    def __init__(self, rounding_type):
        self.rounding_type = rounding_type
        
    def pseudonymizeData(self, numeric):
        """수치데이터를 실제 수 기준으로 자릿수 올림 또는 내림하여 일반화(범주화)하는 메서드"""
        if self.rounding_type == "round_up":
            return numeric if numeric == int(numeric) else int(numeric)+1
        elif self.rounding_type == "round_down":
            return int(numeric)
        elif self.rounding_type == "round":
            decimal_part = numeric - int(numeric)
            return int(numeric)+1 if decimal_part >= 0.5 else int(numeric)
        else:
            raise ValueError("입력받은 {}은 유효한 라운딩 방법이 아닙니다.".format(rounding_type))

---

4. **PrivacyPreservingModel | 동질 집합(Equivalent class) 찾기와 프라이버시 보호 모델에 따른 PPDM(Privacy Preserving Data Mining)**

* 데이터 비식별화 : 식별방지(식별자 제거) 및 추론방지(프라이버시 모델 준수)
* 범주화: 연속형 변수를 일정 단위로 묶는 것
* 프라이버시 보호 모델(KLT 모델)을 주로 활용하여 개인식별가능정보의 동질성 집합(QI)에 대한 비식별조치를 수행하는 것(K-익명성, L-다양성, T-근접성)
* 출처: 박준범 외 2인, 관계형 데이터베이스에서 데이터 그룹화를 이용한 익명화 처리 기법, 한국전자통신연구원 25권 3호, 2015.6.

In [7]:
# ./pseudonymizer/pseudonymizer/deidentificationTechnique/equivalent_class.py

class EquivalentClass:
    """개인식별가능정보 속성을 기준으로 데이터를 그룹화하는 부모 클래스
    
    준식별자를 이용한 그룹화 기법 의사코드
    --------------------------------------
    data grouping using quasi-identifier
    Input : PI(Personal Information)
    Output : Grouped PI

    grouped_PI = dict()
    for identifier, quasi in PI.items():
        key = quasi[0] + quasi[1] + quasi[2] + quasi[3]
        if key in grouping_PI:
            grouping_PI[key].append(identifier)
        else:
            grouping_PI[key] = []
            grouping_PI[key].append[identifier]

        return grouping_PI
    """
    def __init__(self, dataframe):
        self._dataframe = dataframe
        self.equivalent_class = {}

    def __str__(self):
        # __repr__
        """캡슐화된 데이터셋의 속성(컬럼)정보를 반환하는 메서드"""
        return self._dataframe.info()
    
    def categorizeEquivalentClass(self, attributes: List[str]):
        """각 행(레코드)에 대한 개인식별가능정보 속성(컬럼)들 사이에 동질 집합을 확인하는 메서드"""
        groupby_data = self._dataframe.groupby(attributes)
        
        for group, data in groupby_data:
            if len(group) > 1:
                key = tuple(group)
                # 딕셔너리에서 키 값으로 리스트(동적 타입)는 사용할 수 없으므로 튜플로 변환
                self.equivalent_class[key] = data.index.tolist()
                # 동질 집합에 해당하는 행(레코드)의 인덱스 번호를 키 값으로 조회되도록 저장

    def removeDuplicatesInEquivalentClass(self):
        """각 동질집합 내 레코드 간 중복된 행을 제거하는 메서드"""
        for group_key, index_value in self.equivalent_class.items():
            unique_record = self._dataframe.loc[index_value, :].drop_duplicates()
            # set(self._dataframe.loc[index_value, :])
            self.equivalent_class[group_key] = unique_record.index.tolist()

In [None]:
# ./pseudonymizer/pseudonymizer/deidentificationTechnique/kAnonimity.py

# from pseudonymizer.pseudonymizer.deidentificationTechnique.equivalent_class import EquivalentClass
# from typing import *

class K_Anonymity(EquivalentClass):
    """개별 레코드가 최소한 K개 이상 동일한 속성값을 가지도록 하는 K-익명성 클래스
    
    데이터 그룹화가 적용된 k-익명성 알고리즘 의사코드
    -------------------------------------------------
    basic k-anonymity algorithm
    Input : grouped_PI, limited_k
    Output : k_data
    
    k_data = dict()
    for key, identifiers in grouped_PI.items():
        k_anonymity = len(identifiers)
        if k_anonymity >= limited_k:
            k_data[k] = identifiers
    return k_data
    """
    def __init__(self, dataframe):
        super().__init__(dataframe)
        self.K_data = None
        
    def applyKAnonymity(self, K: int, attributes: List[str]) -> Dict:
        K_data = dict()
        # EquivalentClass 클래스의 categorizeEquivalentClass 메서드 호출
        super().categorizeEquivalentClass(attributes)

        for group_key, index_value in self.equivalent_class.items():
            K_anonymity = len(index_value)
            # index_value = identifiers
            if K_anonymity >= K:
                K_data[group_key] = index_value
            else:
                print(group_key, len(index_value))
        self.K_data = K_data

In [None]:
# K_DATA_FINANCE = K_Anonymity(dataframe = DATA_FINANCE)
# K_DATA_FINANCE.applyKAnonymity(K = 250, attributes = ["AGE", "TF_PENSION"])
# K_DATA_FINANCE.K_data.keys()

In [None]:
# ./pseudonymizer/pseudonymizer/deidentificationTechnique/lDiversity.py

# from pseudonymizer.pseudonymizer.deidentificationTechnique.equivalent_class import EquivalentClass
# from typing import *

class L_Diversity(K_Anonymity):
    """각 동질집합 내 특정 민감 속성의 빈도가 L값 이상의 다양성을 가지도록 하는 L-다양성 클래스
    k-익명성 보호 모델 적용 결과에 l-다양성 보호 모델을 적용
    
    k-익명성 처리가 그룹 단위로 구현된 상황에서 l-다양성 알고리즘 의사코드
    ----------------------------------------------------------------------
    basic l-diversity algorithm
    Input : k_data, limited_l
    Output : l_data
    
    l_data = dict()
    for key, identifiers in k_data.items():
        l_list = []
        for identifier in identifiers:
            # k익명성을 만족하는 데이터의 식별자값을 가지고 
            user_info = data[identifier]
            # 해당 식별자값의 민감정보를 가져오는 부분
            user_sa = user_info[4]
            if user_sa in l_list:
                pass
    """
    def __init__(self, dataframe):
        """모듈의 유연성을 제공하기 위해 K익명성 클래스를 확장하여 손자 클래스로 정의"""
        super().__init__(dataframe)
        self.L_data = None
        self.sensitive_attribute = None
        self.LocalL_data = None
        
    def applyLDiversity(self, K: int, L: int, attributes: List[str], sensitive_attribute: str):
        """두 모형을 동시에 적용할 경우 중복이 발생할 가능성이 높아 조합적인 보호 모델을 설계하여 중복을 최소화하는 메서드"""
        super().applyKAnonymity(K, attributes)
        L_data = dict()
        self.sensitive_attribute = sensitive_attribute
        
        for group_key, index_value in self.K_data.items():
            unique_sensitive_values = self._dataframe.loc[index_value, 
                                                          sensitive_attribute].unique()
            # self._dataframe.iloc[index_value, self._dataframe.columns.get_loc(column_name)]
            if len(unique_sensitive_values) >= L:
                L_data[group_key] = index_value
            else:
                print(group_key, len(unique_sensitive_values))
        self.L_data = L_data
    
    def applyLocalLDiversity(self, local_L: int):
        """특정 민감정보의 속성값이 일부 레코드(행)에 집중되는 문제에 따라
        전체적으로 안전한 다양성을 확보할 수 있도록 l-로컬 다양성을 적용하는 메서드"""
        LocalL_data = dict()
        
        for group_key, index_value in self.L_data.items():
            count_local_diversity = self._dataframe.loc[index_value, self.sensitive_attribute].value_counts()
            if count_local_diversity.min() >= local_L: 
                LocalL_data[group_key] = index_value
            else:
                for sensitive_attr, freq in count_local_diversity.items():
                    if freq == count_local_diversity.min():
                        print(group_key, sensitive_attr, freq)
                    else:
                        pass
        self.LocalL_data = LocalL_data

In [None]:
# L_DATA_FINANCE = L_Diversity(dataframe = DATA_FINANCE)
# L_DATA_FINANCE.applyLDiversity(K = 5, L = 8, attributes = ["AGE", "TF_PENSION", "TF_LOAN"], sensitive_attribute = "HOME_TYPE")
# L_DATA_FINANCE.applyLocalLDiversity(local_L = 5)

In [None]:
# Counter(DATA_FINANCE.loc[L_DATA.get((23, 'N', 'N')), "HOME_TYPE"])
# DATA_FINANCE.loc[L_DATA.get((50, 'Y', 'Y')), "HOME_TYPE"]

---

**4.1. 전체 집단과 동질 집합의 경험분포 계산: 연속형 변수, 범주형 변수**

X 벡터의 empirical pdf(경험분포의 누적확률밀도함수) = P,
Y 벡터의 empirical pdf(경험분포의 누적확률밀도함수) = Q
$P = [p_1, p_2, \dots, p_m],\ Q = [q_1, q_2, \dots, q_m]$

**4.2. 두 민감정보 확률분포의 차이: EMD(토지이동거리)**

민감정보 타입이 연속형이라면, 
$EMD(P, Q) = \frac{1}{m-1} \Sigma_{i}^{m-1} |\Sigma_{j=1}^{i}(q_j - p_j)|$

민감정보 타입이 범주형이라면, 
$E'(P, Q) = \frac{1}{2} \Sigma_{i}^{m} |p_i - q_i)|$

범주형 변수의 경우:

범주형 변수의 경우, 각 카테고리의 빈도를 기록하여 분포를 표현합니다.
이 경우, 각 카테고리 간의 이동 거리는 단순히 카테고리 간의 거리로 정의됩니다.
각 카테고리의 확률 분포를 계산한 후, EMD를 계산할 수 있습니다.

연속형 변수의 경우, 주어진 데이터를 히스토그램이나 누적 분포 함수로 변환하여 분포를 표현할 수 있습니다.
이 경우, 누적 분포 함수를 사용하여 누적 확률 분포를 계산한 후, EMD를 계산할 수 있습니다.
다음은 범주형 변수와 연속형 변수 각각에 대한 EMD를 계산하는 과정을 예시로 설명한 것입니다.

범주형 변수의 EMD 계산:
범주형 변수의 카테고리와 해당 카테고리의 빈도를 파악합니다.
각 카테고리의 빈도를 확률 분포로 변환합니다.
변환된 확률 분포를 사용하여 EMD를 계산합니다.

연속형 변수의 EMD 계산:
연속형 변수의 데이터를 히스토그램으로 변환하거나 누적 분포 함수로 변환합니다.
변환된 히스토그램 또는 누적 분포 함수를 사용하여 EMD를 계산합니다.
이러한 과정을 통해 각 변수 유형에 따라 적절한 방법으로 EMD를 계산할 수 있습니다. 변수의 유형을 고려하여 데이터를 준비하고 EMD를 계산하는 것이 중요합니다.

In [None]:
# ./pseudonymizer/pseudonymizer/deidentificationTechnique/tCloseness.py

# from pseudonymizer.pseudonymizer.deidentificationTechnique.equivalent_class import EquivalentClass

# from typing import *
# from scipy.stats import wasserstein_distance

class T_Closeness(EquivalentClass):
    """민감 정보(SA)의 분포를 전체 데이터 셋의 분포와 유사하도록 하는 T-근접성 클래스
    
    l-다양성과 달리 민감정보를 원본 그대로 배열에 저장한 후 
    데이터를 내림차순 정렬하여 (확률분포의 차이)
    데이터의 분포도를 측정하는 t-근접성 알고리즘 의사코드
    -----------------------------------------------------
    basic Earth Mover's Distance algorithms
    Input : t_list
    Output : EMD
    
    total_range = []
    for n in range(100):
        total_range.append(n)
        # 설정된 배열에 정수가 순서대로 추가
        total_length = len(total_info)
        
        static_part = total_length / len(t_list)
        # EMD(데이터의 분산 정도)를 계산하기 위해 나눗셈
        extra_part = float(static_part) % float(len(t_list))
        extra_part = extra_part.split(".")[0]
        # 나누어 떨어지지 않는 여분으로 연산해주어야 할 때 계산

        balance_value = len(t_list) - (extra_part)
        # 데이터의 분산도 측정
        # 여분의 연산으로 하는 부분(나누어 떨어지지 않는 수)에 대하여
        # 배열의 마지막 부분에서 처리
    """
    def __init__(self, dataframe):
        super().__init__(dataframe)
        self.T_data = None
        self.sensitive_attribute = None
        self.tolerance = None
        
    def checkSensitivesDistribution(self, data_set, sensitive_attribute: str):
        """개인식별가능정보(준식별자)의 모든 가능한 조합 n개의 관심 대상값
        (sensitive_attribute)의 분포와 전체 집단의 분포의 거리 최댓값이 <= t로 규정할 때
        분포를 계산하는 메서드"""
        sensitive_vector = data_set[sensitive_attribute]
        
        # 민감속성의 고유한 값과 그 값의 비율 {v: count/len(V)}
        if sensitive_vector.dtype in ["int", "float"]:
            ordered_vector = np.sort(sensitive_vector)
            distribution = {v: n/len(ordered_vector)  for (v, n)
                            in zip(ordered_vector, list(range(len(ordered_vector))))}
        elif sensitive_vector.dtype == "object":
            distribution = {v: count/len(v) for (v, count) 
                            in Counter(sensitive_vector).items()}
        elif sensitive_vector.dtype == "category":
            distribution = {v: count/len(v) for (v, count) 
                            in Counter(sensitive_vector).items()}
        else: ValueError("입력받은 {}은 유효한 자료형이 아닙니다.".format(
            data_set[self.sensitive_attribute].dtype))
        return distribution
    
    def earthMoversDistance(self, qi_dist, total_dist):
        """scipy.wasserstein_distance(data_sensitivity, data_population)"""
        # eucdistance = np.sqrt((qi_dist - total_dist)**2)
        emdistance = np.sum(np.abs(qi_dist - total_dist))
        return emdistance

    def applyTCloseness(self, quasi_identifiers: List[str], tolerance: float, sensitive_attribute: str):
        """tolerance: 허용가능한 확률분포 차이의 범위를 정의하여 T-근접성을 적용하는 메서드"""
        T_data = dict()

        if 0 <= tolerance <= 1: 
            # threshold
            vector = np.array(self._dataframe[sensitive_attribute])
            super().categorizeEquivalentClass(quasi_identifiers)

            for group_key, index_value in self.equivalent_class.items():
                # 1. Empirical Cummulative Probability Distribution
                qi_distribution[group_key] = self.checkSensitivesDistribution(vector[index_value], sensitive_attribute)
                total_distribution[group_key] = self.checkSensitivesDistribution(vector, sensitive_attribute)
                    # self._dataframe.loc[index_value, sensitive_attribute]
                    # .value_count(normalize = True) = .value_counts() / sum 

                # 2. Earth's Mover Distance
                emd = self.earthMoversDistance(qi_distribution, total_distribution)

                # 3.
                if emd < tolerance:
                    T_data[group_key] = index_value
                else:
                    print(group_key, len(unique_sensitive_values))
            self.T_data = T_data
        else: 
            ValueError("입력받은 {}은 허용가능한 동질집합과 전체집단 간 확률분포 차이의 범위로서 유효하지 않습니다.".format(tolerance))

In [None]:
# t = T_Closeness(dataframe = DATA_FINANCE)
# t.applyTCloseness(quasi_identifiers = ["HOME_TYPE", "TF_LOAN", "TF_PENSION"], tolerance = 0.1, sensitive_attribute = "AMT_CREDITLOAN")

---

5.1. **일반화 클래스**

In [None]:
# from pseudonymizer.pseudonymizer import Pseudonymizer
from datetime import datetime

class CategorizationOfCharacter(Pseudonymizer):
    """문자형으로 저장된 정보에 대하여 상위의 개념으로 범주화하는 가명처리기법 구체클래스"""    
    def __init__(self, category_type: str):
        self.category_type = category_type
    
    def pseudonymizeData(self, input_string: str, category_mapping: dict):
        """식별성이 높은 그룹을 하나로 묶는 메서드
        일반적으로 나이 + 주소 + 성별 조합(동질 집합)이 재식별 가능성 있음
        주소 | 시군구만 남기고 읍면동 단위 삭제
        
        입력값
        ------
        실행 클래스 Pseudonym 자체에서 열 벡터 자체가 아닌 **특정 열의 개별 레코드 형태**로 
        입력받아 가명처리 기법을 적용하므로 별도로 자료형으로 제약 요건을 두지 아니함
        입력값을 특정 범주로 분류하기 위한 전제 조건
        : 실행 클래스에 범주 파악을 위한 Counter함수 적용하는 메서드 정의해야 함
        
        출력값
        ------
        조건에서 정의한 특정속성의 개별레코드별 그룹값 할당
        """
        if self.category_type == "date":
            return self.pseudonymizeDate(input_string)
        elif self.category_type == "user_definition":
            return self.pseudonymizeDefinition(input_string, category_mapping)
        else:
            raise ValueError(f"{self.category_type}은 유효한 범주화 기법 적용 유형이 아닙니다.")
    
    def pseudonymizeDate(self, date_time):
        """개인과 관련된 날짜 정보(자격 취득일자, 합격일 등)는 연 단위로 처리"""
        if isinstance(date_time, datetime):
            # datetime.datetime객체일 경우 날짜 문자열로 반환
            # 2023-06-03
            date_time = date_time.strftime("%Y-%m")
            return date_time
        try:
            # 날짜 문자열의 길이를 확인하여 연월일인지 연월인지 구분
            if len(date_time) == 8:
            # 20230603
                date = datetime.strptime(date_time, "%Y%m%d")
                return date.strftime("%Y-%m")  
                # 일시 삭제 후 연월로 변환
            elif len(date_time) == 6:
            # 202306
                date = datetime.strptime(date_time, "%Y%m")
                return date.strftime("%Y-%m")  
                # datetime 형식이 아닐 때, 연월로 변환
            else:
                raise ValueError(f"{date_time}은 유효한 날짜 형식이 아닙니다.")
        except ValueError:
            return f"{date_time}은 유효한 날짜 형식이 아닙니다."  
            # 에러 출력문 수정
    
    def pseudonymizeDefinition(self, string_tobeclassified, category_mapping: dict):
        """직접 특정 범주에 속하는 문자열 리스트를 딕셔너리 키, 값으로 입력
        서울특별시 141,704개의 고유필지 → 2023년 기준 서울특별시 1,650개의 골목상권코드으로 그룹핑할 수 있도록 유형화
        코스피 상장주식회사 종목 810개 → 24개 업종 분류로 범주화"""
        for category, string_list in category_mapping.items():
            # key는 범주이면서 value는 문자열 리스트일 때
            if string_tobeclassified in string_list: 
                # 입력받은 문자열이 for루프에 걸린 문자열 리스트의 원소인 경우 해당 범주형 반환
                # 접근 연산 시간복잡도를 줄이기 위한 시도는?
                # 현재의 배열과 같이 링크드 리스트의 경우 원하는 노드에 접근하는 시간은 몇 번째 인덱스인지에 비례
                return category
        return "other types"
            # 없으면 기타
            

내장함수 map + set 자료형 활용

In [None]:
def map_pseudonymizeDefinition(self, string_tobeclassified, category_mapping: dict):

    def make_category(input_string, category_set):
        if input_string in category_set:
            return category
        return "other types"

    re_li = list(map(make_category, string_tobeclassified, category, string_set in category_mapping.items())
    
    # for category, string_set in category_mapping.items():
    #     if string_tobeclassified in string_set:
    #         return category
    # return "other types"

In [None]:
# test_date_object = CategorizationOfCharacter(category_type="user_definition")
# test_define_result = DATA_RETAIL["GENDER"].apply(lambda data: test_date_object.pseudonymizeData(input_string=data, category_mapping={"M": set(["male"]), "F": set(["female"])}))

In [None]:
def pseudonymizeDefinition3(self, string_tobeclassified, category_mapping: dict):
    for category, string_list in category_mapping.items():
        linked_list = deque(string_list)

        if string_tobeclassified in linked_list:
            return category

    return "other types"

In [None]:
def pseudonymizeDefinition4(self, string_tobeclassified, category_mapping: dict):
    for category, string_list in category_mapping.items():
        string_set = set(string_list)

        if string_tobeclassified in string_set:
            return category

    return "other types"

pseudonymizeDefinition 메서드 numpy 단위 연산 튜토리얼 코드
- 보통 numpy array 연산이 일반 for문보다 속도가 빠르기 때문에, pandas 데이터를 다루는 클래스 및 함수의 특성상 numpy 연산을 시도하기 적합하다고 판단함

In [None]:
def pseudonymizeDefinition2(series_tobeclassified, category_mapping: dict):
    """
    pseudonymizeDefinition 함수를, pandas column 단위로 파라미터를 넣어 numpy로 연산하는 함수
    보통 numpy 연산이 일반 for문보다 속도가 빠르기 때문에, pandas 데이터를 다루는 특성상 numpy 연산을 시도함
    """
    result = np.full_like(series_tobeclassified, "other types")
    for category, string_list in category_mapping.items():
        result = np.where(np.isin(series_tobeclassified, string_list), category, result)

    return result

In [None]:
# tr = pseudonymizeDefinition2(DATA_RETAIL["GENDER"], {"M": ["male"], "F": ["female"]})
# tr, len(tr)

In [None]:
# di = {"M": ["male", "Man"], "F": ["female", "Woman"]}
# data = np.array(["male", "male", "female", "female", "male"])
# for category, string_list in di.items():
    # res = np.where(np.isin(data, string_list), category, "nn")

기존 일반화 메서드 pseudonymizeDefinition 코드

In [None]:
# test_date_object = CategorizationOfCharacter(category_type="user_definition")
# test_define_result = DATA_RETAIL["GENDER"].apply(lambda data: test_date_object.pseudonymizeData(input_string=data, category_mapping={"M": ["male"], "F": ["female"]}))
# test_define_result

날짜 정보 범주화 객체

In [None]:
# test_date_object = CategorizationOfCharacter(category_type = "date")
# test_date_category = DATA_RETAIL["JOIN_DATE"].apply(lambda data: test_date_object.pseudonymizeData(input_string = data, category_mapping = None))
# display(test_date_category.head(5))
# display(test_date_category[test_date_category.str.contains("유효한 날짜 형식이 아닙니다.")].head(5))
# test_date_category

사용자 정의 특정 그룹에 해당하는 문자열 리스트를 딕셔너리 키, 값으로 범주화

In [None]:
# test_date_object = CategorizationOfCharacter(category_type="user_definition")
# test_define_result = DATA_RETAIL["GENDER"].apply(lambda data: test_date_object.pseudonymizeData(input_string=data, category_mapping={"M": ["male"], "F": ["female"]}))
# DATA_RETAIL["GENDER"].head(5)
# Counter(test_define_result)

In [None]:
# from pseudonymizer.pseudonymizer import Pseudonymizer
# from datetime import datetime

class CategorizationOfNumeric(Pseudonymizer):
    """수치(연속형) 데이터를 임의의 수를 기준으로 범위(범주형)으로 설정하는 가명처리기법 구체클래스 """
    def __init__(self, numeric_type):
        self.numeric_type = numeric_type
    
    def pseudonymizeData(self, input_numeric: float, grouping_standard):
        """식별성이 높은 그룹을 하나로 묶는 메서드"""
        if self.numeric_type == "age":
            return self.pseudonymizeAge(input_numeric, grouping_standard)
        elif self.numeric_type == "income":
            return self.pseudonymizeIncome(input_numeric, grouping_standard)
        elif self.numeric_type == "user_definition":
            return self.pseudonymizeDefinition(input_numeric, grouping_standard)
        else: 
            raise ValueError(f"{self.numeric_type}은 유효한 범주화 기법 적용 유형이 아닙니다.")
    
    def pseudonymizeAge(self, birthday, grouping_standard):
        """연령 범주화 메서드
        일반적으로 나이 + 주소 + 성별 조합(동질 집합)이 재식별 가능성 있으므로
        5세, 10세 단위 또는 초중후반으로 만 나이 범주화"""
        currentdate = datetime.now().date()
        if (currentdate.month, currentdate.day) < (birthday.month, birthday.day):
        # 아직 현재 날짜가 생일 전인 경우 만 나이 계산 시 한 살 제외
            age = currentdate.year - birthday.year - 1
        else:
            age = currentdate.year - birthday.year
        
        if grouping_standard in ["3bin", "5bin", "10bin"]:
            if grouping_standard == "3bin":
            # 0,1,2(초반) / 3,4,5,6(중반) / 7,8,9(후반)
                sort = (age % 10) // 3
                range = (age // 10) * 10
                if sort == 0:
                    return f"{range}대 초반"
                elif sort == 1:
                    return f"{range}대 중반"
                elif sort == 2:
                    return f"{range}대 후반"
                else: 
                    return
            elif grouping_standard == "5bin":
            # 0,1,2,3,4(초반) / 5,6,7,8,9(후반)
                return f"{(age // 10) * 10}대 초반" if (age % 10) < 5 else f"{(age // 10) * 10}대 후반"
            elif grouping_standard == "10bin":
            # 10대~100대
                return f"{(age // 10) * 10}대"
            else:
                raise ValueError("입력받은 {}은 연령 범주화 기준으로 유효하지 않습니다.".format(grouping_standard))
        
    def pseudonymizeIncome(self, income, grouping_standard):
        """소득금액 범주화 메서드
        소득을 전체 대상자를 9분위(2024년 건강보험료 1인 기준 소득분위)로 균등 분할"""
        if grouping_standard is None:
            threshold_list= [1841500, 2025500, 2675000, 2897000, 3120000, 3343000, 3566000, 3789000, 4012000]
            for index, threshold in enumerate(threshold_list, start = 1):
                if income <= threshold:
                    return f"{index}분위"
        else:
        # While grouping_standard True:
            grouping_standard.sort()
            # 오름차순 정렬 시 일반적으로 사용하는 버블 정렬은 O(N**2)이므로 
            # 시간복잡도 낮추려면 병합 정렬 O(NlogN) 활용 -> 추후 merge_sort() 메서드 적용하여 리팩토링
            for index, grouping_standard in enumerate(grouping_standard, start = 1):
                if income <= grouping_standard:
                    return f"{index}분위"
        
    def pseudonymizeDefinition(self, numeric_tobeclassified, category_mapping: dict):
        """사용자가 직접 구간을 설정하도록 하는 범주화 메서드
        intervals: [(0, 1000), (1000, 5000), (5000, 10000)]
        """
        for category, interval in category_mapping.items():
            if not isinstance(interval, tuple) or len(interval) != 2:
                return f"유효하지 않은 구간 {interval}입니다."
            
            lower, upper = interval # tuple(lower, upper)
            if lower <= numeric_tobeclassified <= upper:
                return category
            return "other types"

In [None]:
# test_date_object = CategorizationOfCharacter(category_type="user_definition")
# test_define_result = DATA_RETAIL["GENDER"].apply(lambda data: test_date_object.pseudonymizeData(input_string=data, category_mapping={"M": ["male"], "F": ["female"]}))

In [None]:
# numeric_category = CategorizationOfNumeric(numeric_type="age")
# test_numeric_result = DATA_RETAIL["AGE"].apply(lambda data: numeric_category.pseudonymizeData(input_numeric=data, grouping_standard="5bin"))

수치형 데이터 범위 설정 클래스

In [None]:
# from pseudonymizer.pseudonymizer import Pseudonymizer
# from datetime import datetime

class CategorizationOfColumn(Pseudonymizer):
    """수치(연속형) 데이터를 임의의 수를 기준으로 범위(범주형)으로 설정하는 가명처리기법 구체클래스 """
    def __init__(self, numeric_type, grouping_standard, right, ascending):
        self.numeric_type = numeric_type
        self.grouping_standard = grouping_standard
        self.right = right
        self.ascending = ascending
    
    def pseudonymizeData(self, df):
        """식별성이 높은 그룹을 하나로 묶는 메서드"""
        if self.numeric_type == "bin":
            return self.pseudonymizeAmountbyBin(df, self.grouping_standard, self.right, self.ascending)
        elif self.numeric_type == "pct":
            return self.pseudonymizeAmountbyPct(df, self.grouping_standard, self.right, self.ascending)        
        else: 
            raise ValueError(f"{self.numeric_type}은 유효한 범주화 기법 적용 유형이 아닙니다.")

    def makeLabels(self, num_type, df, grouping_standard, ascending: bool):
        """범주화 중 필요한 label을 만들어주는 클래스"""
        if num_type == "pct":

            labels = []
            
            for i in range(len(grouping_standard)):
                if i == 0:
                    label = f"{grouping_standard[i]} 미만"
                    labels.append(label)
                elif 0 < i < len(grouping_standard) - 1:
                    label = f"{grouping_standard[i-1]} ~ {grouping_standard[i]}"
                    labels.append(label)
                else:
                    label1 = f"{grouping_standard[i-1]} ~ {grouping_standard[i]}"
                    labels.append(label1)
                    label2 = f"{grouping_standard[i]} 이상"
                    labels.append(label2)

            if ascending == False:
                return labels.reverse()
            elif ascending == True:
                return labels
            else:
                raise ValueError("ascending 파라미터는 True / False 형태로 입력해 주십시오.")


        elif num_type == "bin":

            labels = []
            
            category_values = pd.cut(df, bins = grouping_standard).value_counts(sort=False)

            for category in list(category_values.index):
                start_value = category.left
                end_value = category.right
                labels.append(f"{start_value} ~ {end_value}")

            if ascending == False:
                return labels.reverse()
            elif ascending == True:
                return labels

        else:
            raise ValueError(f"{num_type}은 유효한 범주화 기법 적용 유형이 아닙니다.")

    
    def pseudonymizeAmountbyBin(self, df, grouping_standard, right: bool, ascending: bool):
        """기타 금액 구간별 범주화 메서드
        신용공여금액(예: 한도/건별대출, 담보대출, 리스/카드할부금융서비스 등)의 일정 급간화
        다만, pd.cut과의 차별점 없으며, pandas 내장 함수를 활용하여 범주화하는 것이 훨씬 효율적

        - 등급명 전부 입력 받고 그 등급명대로 매칭해주는 케이스 (O)
        - 등급명 없이 갯수만 입력받고, 등급명은 “10~30”, “30~50” 이런 식으로 매칭해주는 케이스 (O)
        - "이상 ~ 미만" 과 "초과 ~ 이하" 중 선택지 부여하는 케이스 (각각 right = False, right = True) (O)
        - 등급을 오름차순(작은것 ~ 큰것) / 내림차순(큰것 ~ 작은것) 중 어느 순으로 매길지 고르는 케이스 (레이블 순서를 뒤집으면 내림차순이 됨)
            -> ascending = True 면 오름차순, 아니면 내림차순 (O)
        - 해당 선택지 및 케이스들을 메서드 1개로 올릴지, 메서드 여러개로 분리할지 결정 필요
        """
        if isinstance(grouping_standard, list):
            num = len(grouping_standard)
            df = pd.cut(df, bins = num, labels = grouping_standard, right = right)
            return df

        elif isinstance(grouping_standard, int):
            labels = self.makeLabels("bin", df, grouping_standard, ascending = ascending)                              
            df = pd.cut(df, bins = grouping_standard, labels = labels, right = right)
            return df

        else:
            raise ValueError("grouping_standard는 list 또는 int 타입으로 입력해 주십시오.")
                
        
    def pseudonymizeAmountbyPct(self, df, grouping_standard, right: bool, ascending: bool):
        """기타 금액 백분위에 의한 범주화 메서드
        개인사업자의 추청매출액/평당월임대료를 백분위수에 따라 매출등급화(90~100%, 65~90%, 35~65%, 10~35%, 0~10%)

        - 각 레코드별 등수 구하기 (중복값은 가장 낮은 순위로)
        - 백분위수 = ((등수 - 1) / 컬럼 레코드 갯수) * 100
        - 백분위수별로 pd.cut
        """
        rank_df = df
        rank_df["rank"] = df.rank(method='min')
        
        percentiles = []

        for rank in rank_df["rank"]:
            percentile = ((rank - 1) / len(df['rank'])) * 100
            percentiles.append(percentile)

        rank_df["percentile"] = percentiles
        new_labels = self.makeLabels("pct", df, grouping_standard, ascending)
        bins = [0] + grouping_standard + [100]
        result_df = pd.cut(rank_df["percentile"], bins = bins, labels = new_labels, right = right)
        return result_df

In [None]:
# tt = CategorizationOfColumn(numeric_type="bin", grouping_standard = ["A", "B", "C", "D", "E"], right = False, ascending = True)
# tt3 = CategorizationOfColumn(numeric_type="bin", grouping_standard = 5, right = False, ascending = False)
# tt3 = tt.pseudonymizeData(df = DATA_FINANCE["AMT_CREDITCARD_PAYMENT"])

# Test Scipt for pseudonymizeAmountbyBin method
# pseudonym_data = Pseudonym(dataframe = DATA_FINANCE)
# pseudonym_data.addDictionary(column = "AMT_CREDITCARD_PAYMENT", pseudonymizers = [tt])
# pseudonym_data.addDictionary(column = "AMT_CREDITCARD_PAYMENT", pseudonymizers = [tt3])
# pseudonym_data.pseudonymizeData()
# pseudonym_data.getPseudonymizedDataframe()

pseudonymizeAmountbyPct 메서드 테스트

In [None]:
# Test Scipt for pseudonymizeAmountbyPct method
# tt2 = CategorizationOfColumn(numeric_type="pct", grouping_standard = [20, 50, 70, 80], right = True, ascending = False)
# pseudonym_data = Pseudonym(dataframe = DATA_FINANCE)
# pseudonym_data.addDictionary(column = "AMT_CASHADVANCE_PAYMENT", pseudonymizers = [tt2])
# pseudonym_data.pseudonymizeData()
# pseudonym_data.getPseudonymizedDataframe()

부분총계 클래스

* 총계처리:(총합집계, 범주/범위화기법과혼용)
    - 추정매출액=원시매출액*추정현금비율*카드사시장점유비(MS)
    - 지역별업종추정평균매출액=Σ(업소별추정매출액)/업소수
    - 매출범위로환산:5,314,000원→추정매출5,000천원∼6,000천원

* 총계: (합계, 평균, 빈도, 추세, 추정)
    - 소득: 개인의소득→ 소지역단위로총계화
    - 예) <삼봉아파트101동구역> 
    - 홍길동연수입34,100,000원 / 성춘향연수입27,000,000원 / 심학도연수입41,000,000원
    - → 삼봉아파트101동연수입(총계, 평균, 등록고객수, 작년대비성장, 대푯값)
    - 적용예시) 26

In [59]:
# from pseudonymizer.pseudonymizer import Pseudonymizer

class MicroAggregation(Pseudonymizer):
    """특정 그룹의 속성에서 정확한 통계값을 확인하는 가명처리기법 구체 클래스
       클래스를 통해 도출할 부분 통계값: 평균, 총합, 중간값, 최댓값, 최솟값, 최빈값
    """
    def __init__(self, calulate_type, quasi_identifier):
        self.calculate_type = calulate_type
        self.quasi_identifier = quasi_identifier

    def pseudonymizeData(self, data, attribute: str, equivalent_class: dict()):
        """Pseudonym 클래스 내에서 클래스 실행하는 메서드"""
        qi_index_list = equivalent_class[self.quasi_identifier]
        attribute_location = data.columns.get_loc(attribute)
        pseudonymize_data = data.iloc[qi_index_list, attribute_location]
    
        if self.calculate_type == "average":
            # 동질집합 특정 준식별자의 속성에 대한 평균값 구하는 메서드
            data.iloc[qi_index_list, attribute_location] = pseudonymize_data.mean()
            return data[attribute]
        elif self.calculate_type == "sum":
            # 동질집합 특정 준식별자의 속성에 대한 총계 구하는 메서드
            data.iloc[qi_index_list, attribute_location] = pseudonymize_data.sum()
            return data[attribute]
        elif self.calculate_type == "median":
            # 동질집합 특정 준식별자의 속성에 대한 중간값 구하는 메서드
            data.iloc[qi_index_list, attribute_location] = pseudonymize_data.median()
            return data[attribute]
        elif self.calculate_type == "max":
            # 동질집합 특정 준식별자의 속성에 대한 최댓값 구하는 메서드
            data.iloc[qi_index_list, attribute_location] = pseudonymize_data.max()
            return data[attribute]
        elif self.calculate_type == "min":
            # 동질집합 특정 준식별자의 속성에 대한 최솟값 구하는 메서드
            data.iloc[qi_index_list, attribute_location] = pseudonymize_data.min()
            return data[attribute]
        elif self.calculate_type == "freqency":
            # 동질집합 특정 준식별자의 속성에 대한 최빈값 구하는 메서드
            data.iloc[qi_index_list, attribute_location] = pseudonymize_data.mode()
            return data[attribute]
        else:
            raise ValueError(f"{self.calculate_type}은 유효한 부분총계 기법 적용 유형이 아닙니다.")    

In [60]:
EC = EquivalentClass(DATA_FINANCE)
EC.categorizeEquivalentClass(
    attributes = ["GENDER", "TF_LOAN", "TF_PENSION"])

In [61]:
ec_dictionary = EC.equivalent_class
EC.equivalent_class.keys()

dict_keys([('female', 'N', 'N'), ('female', 'N', 'Y'), ('female', 'Y', 'N'), ('female', 'Y', 'Y'), ('male', 'N', 'N'), ('male', 'N', 'Y'), ('male', 'Y', 'N'), ('male', 'Y', 'Y')])

In [62]:
print(ec_dictionary[('female', 'Y', 'N')], " ")

[6, 32, 41, 45, 57, 86, 101, 108, 115, 143, 149, 180, 195, 204, 228, 253, 255, 280, 282, 289, 323, 336, 338, 346, 349, 381, 464, 493, 530, 571, 590, 595, 677, 704, 710, 723, 733, 740, 822, 874, 877, 908, 973, 978, 979, 990, 994, 1024, 1077, 1099, 1106, 1114, 1140, 1158, 1188, 1200, 1257, 1340, 1358, 1385, 1400, 1403, 1408, 1424, 1449, 1462, 1495, 1527, 1585, 1597, 1599, 1642, 1679, 1681, 1690, 1698, 1726, 1736, 1742, 1761, 1764, 1772, 1840, 1841, 1848, 1888, 1891, 1895, 1905, 1940, 1983, 2000, 2086, 2107, 2122, 2131, 2198, 2200, 2220, 2248, 2265, 2304, 2333, 2351, 2354, 2378, 2414, 2428, 2430, 2450, 2452, 2462, 2470, 2574, 2604, 2634, 2649, 2690, 2692, 2772, 2776, 2781, 2834, 2849, 2910, 2936, 2982, 2992, 3003, 3030, 3039, 3077, 3154, 3202, 3255, 3278, 3300, 3326, 3365, 3453, 3550, 3556, 3557, 3560, 3601, 3613, 3650, 3657, 3674, 3698, 3740, 3762, 3764, 3770, 3789, 3812, 3883, 3911, 3922, 3944, 3950, 3963, 3965, 4014, 4083, 4106, 4135, 4146, 4165, 4178, 4201, 4205, 4219, 4224, 4242, 424

In [63]:
MA = MicroAggregation(
    calulate_type = "average", quasi_identifier = ('female', 'Y', 'N'))

In [64]:
MA.pseudonymizeData(
    data = DATA_FINANCE, attribute = "CREDIT_SCORE", equivalent_class = ec_dictionary)

0       441.0000000000
1       515.0000000000
2       269.0000000000
3       254.0000000000
4       590.0000000000
             ...      
94995   868.0000000000
94996   875.0000000000
94997   294.0000000000
94998   986.0000000000
94999    44.0000000000
Name: CREDIT_SCORE, Length: 95000, dtype: float64

In [None]:
# DATA_FINANCE.iloc[
    # PseudonymizeFinanceData.equivalent_class[('비거주용 건물(상가', 'Y', 'Y')], :][
    # (DATA_FINANCE["AMT_CREDITLOAN"] >= 0) & (DATA_FINANCE["AMT_CASHADVANCE_PAYMENT"] > 8000000)] 

# DATA_FINANCE.iloc[
    # PseudonymizeFinanceData.equivalent_class[('영업 겸용 단독주택', 'Y', 'Y') ], :][
    # (DATA_FINANCE["AMT_CREDITLOAN"] >= 0) & (DATA_FINANCE["AMT_CASHADVANCE_PAYMENT"] > 8000000)] 

# 94,999명 중 대출실행고객은 9466명
# ('영업 겸용 단독주택', 'Y', 'Y') ('비거주용 건물(상가', 'Y', 'Y)

---

5.2. **일반화 클래스 - 이상치 탐색 및 처리**

로컬 일반화
- minimality attack
- 모든 레코드에 대해 일반화 하는 것이 아니고 특정 집단만 일반화하는 기술

In [None]:
# from pseudonymizer.pseudonymizer import Pseudonymizer

class LocalGeneralization(Pseudonymizer):
    """분포상의 특이성으로 인해 식별 가능성이 높아지는 경우 해당 부분만 일반화를 적용하는 가명처리기법 구체클래스"""
    def pseudonymizeData(self, index_list: List[int]):
        pass

상하단 코딩

- 양끝단의 특이값이 식별성이 높으므로 이를 그룹화
- 결국 하위의 공통된 특성을 찾아 상위 개념으로 묶는 기법
(특정 정보를 해당 그룹의 대푯값이나 구간값으로 변환)

In [None]:
class TopandBottomCoding(Pseudonymizer):
    """적은 수의 분포를 가진 양 끝단의 정보를 범주화 등의 기법을 적용하여 식별성을 낮추는 가명처리기법 구체클래스"""
    def __init__(self, outlier_type, bounded_value):
        self.outlier_type = outlier_type
        self.bounded_value = bounded_value
    
    def pseudonymizeData(self, dataseries):
        if self.outlier_type == "IQR":
            return self.pseudonymizeOutlierIQR(dataseries)
        elif self.outlier_type == "Pct":
            return self.pseudonymizeOutlierPct(dataseries)
        else: 
            raise ValueError(f"{self.outlier_type}은 유효한 상하단코딩 적용 유형이 아닙니다.")
    
    def pseudonymizeOutlierIQR(self, dataseries):
        """
        상자도표(boxplot)을 이용한 Q3 + 1.5IQR(상위 경계) 이상이거나 Q1 - 1.5IQR(하위 경계) 이하인 관측값
        다만, 다변량 설정이나 고차원 데이터에는 유용성이 떨어진다는 한계
        """
        Q1, Q2, Q3 = self.calculateQuartiles(dataseries)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5*IQR
        upper_bound = Q3 + 1.5*IQR
        
        datatopcoding = dataseries.apply(
          lambda x: x if x <= upper_bound
          else upper_bound)
        databottomcoding = datatopcoding.apply(
          lambda x: x if x >= lower_bound
          else lower_bound)
        return databottomcoding

    def pseudonymizeOutlierPct(self, dataseries):
        """
        백분율을 기준으로 상하단 경계값을 설정
        """
        if self.bounded_value < 0.5:
            lower_bound = self.bounded_value
            upper_bound = 1 - lower_bound
        else:
            upper_bound = self.bounded_value
            lower_bound = 1 - upper_bound
            
        # dataseries = self.quickSorting(dataseries)
        datareturn = dataseries.apply(
            lambda x: x if lower_bound <= x <= upper_bound 
            else lower_bound if x < lower_bound 
            else upper_bound)
        return datareturn


    @staticmethod
    def calculateMedian(self, data):
        """중간값(백분위50%) 계산하는 메서드
        정적 메서드를 선언함으로써
        클래스의 네임스페이스 내 속하는 유틸리티 함수를 제공하며 해당 클래스의 인스턴스를 생성하지 않고(의존성 없이) 필요한 기능 수행"""
        n = len(data)
        # 전체 길이가 홀수일 때(2로 나눈 나머지가 0일 때, 2로 나눈 몫의 위치에 해당하는 중간값)
        if n % 2 != 0:
            return data[n//2]
        # 전체 길이가 짝수일 때
        else:
            return (data[n//2-1] + data[n//2])/2
        
    @staticmethod
    def quickSorting(self, data):
        pass

    @staticmethod
    def calculateQuartiles(self, dataseries):
        """백분위25%(Q1)과 백분위75%(Q3)를 중간값을 이용하여 구하는 메서드"""
        data = dataseries.to_list()
        # 빠른 quick정렬 알고리즘(heap, merge으로도 대체 가능)
        datasorted = self.quickSorting(data)
        
        # 백분위 25%(가장 작은 수부터 중간값까지 수의 중간값), 백분위 75%(중간값부터 가장 큰 수까지 수의 중간값)
        Quartile2 = self.calculateMedian(datasorted)
        Quartile1 = self.calculateMedian(datasorted[:Quartile2])
        Quartile3 = self.calculateMedian(datasorted[Quartile2:])
        return Quartile1, Quartile2, Quartile3

In [None]:
# ./pseudonymizer/pseudonymizer/deidentificationTechnique/equivalent_class.py

class EquivalentClass:
    """개인식별가능정보 속성을 기준으로 데이터를 그룹화하는 부모 클래스
    
    준식별자를 이용한 그룹화 기법 의사코드
    --------------------------------------
    data grouping using quasi-identifier
    Input : PI(Personal Information)
    Output : Grouped PI

    grouped_PI = dict()
    for identifier, quasi in PI.items():
        key = quasi[0] + quasi[1] + quasi[2] + quasi[3]
        if key in grouping_PI:
            grouping_PI[key].append(identifier)
        else:
            grouping_PI[key] = []
            grouping_PI[key].append[identifier]

        return grouping_PI
    """
    def __init__(self, dataframe):
        self._dataframe = dataframe
        self.equivalent_class = {}

    def __str__(self):
        # __repr__
        """캡슐화된 데이터셋의 속성(컬럼)정보를 반환하는 메서드"""
        return self._dataframe.info()
    
    def categorizeEquivalentClass(self, attributes: List[str]):
        """각 행(레코드)에 대한 개인식별가능정보 속성(컬럼)들 사이에 동질 집합을 확인하는 메서드"""
        groupby_data = self._dataframe.groupby(attributes)
        
        for group, data in groupby_data:
            if len(group) > 1:
                key = tuple(group)
                # 딕셔너리에서 키 값으로 리스트(동적 타입)는 사용할 수 없으므로 튜플로 변환
                self.equivalent_class[key] = data.index.tolist()
                # 동질 집합에 해당하는 행(레코드)의 인덱스 번호를 키 값으로 조회되도록 저장

    def removeDuplicatesInEquivalentClass(self):
        """각 동질집합 내 레코드 간 중복된 행을 제거하는 메서드"""
        for group_key, index_value in self.equivalent_class.items():
            unique_record = self._dataframe.loc[index_value, :].drop_duplicates()
            # set(self._dataframe.loc[index_value, :])
            self.equivalent_class[group_key] = unique_record.index.tolist()

차분 프라이버시

* 제약조건 : 데이터 공개 메커니즘만으로는 임의의 레코드가 포함된 데이터셋과 그렇지 않은 데이터셋을 구별하기 어려워야 한다.

* 통계적 확률응답모형을 응용하여 원본 데이터에 노이즈를 추가하는 방법

    (1) 라플라스 메커니즘
        
    * 라플라스 분포(laplace distribution)로부터 파생된 랜덤 노이즈 값
    
    * 분산이 개인정보의 민감도(sensitivity)/ε 상수와 비례하도록 설정된 라플라스 분포로부터 생성한 임의의 잡음
    
    * $f(x) = \frac{1}{2b}exp[\frac{-|x-\mu|}{b}]$
        
    (2) 가우스 메커니즘

    * ε 값과 관계된 평균과 분산을 가지는 가우스 분포로부터 생성한 임의의 잡음을 더하여(noise addition) 변조된 결과를 데이터 이용자에게 제공
    
    * $f(x) = \frac1{\sqrt{2\pi\sigma^2}}{exp}^\frac{x-\mu}{\sigma}$

In [None]:
# ./pseudonymizer/pseudonymizer/deidentificationTechnique/differentialPrivacy.py

# from pseudonymizer.pseudonymizer.deidentificationTechnique.equivalentClass import EquivalentClass
# from typing import *
# from scipy.stats import laplace

class DifferentialPrivacy(EquivalentClass):
    """라플라스 메커니즘 적용 차분 프라이버시 클래스
    차등적 정보보호 기능을 수행"""
    def __init__(self, dataframe, self.ratio_bounded: float):
        super().__init__(dataframe)
        # 동질집합을 키로하고 인덱스 번호를 값으로 하는 신뢰구간 초과하는 값을 
        self.upperoutlier_dictionary = {}
        self.loweroutlier_dictionary = {}
        self.sensitive_attribute = None
        
        # ratio_bounded = epsilon(개인정보(가명정보) 보호 수준 결정)
        self.ratio_bounded = self.ratio_bounded
    
    def dataDeviatingfromCI(self, boundary: float, attributes: List[str], sensitive_attribute: str):
        """동질집합 내 평균에서 양쪽 3표준편차의 범위 99.7%에 들지 않는 민감정보 행 번호만 별도로 추출하는 메서드
        가설검정 기반에서 노이즈를 입력하는 방식인 GDP(Gaussian Differential Privacy) 방법
        이 방식은 통계 가설 관점에서 주어진 유의수준에서 최소의 제2종 오류를 trade-off function으로 나타냄으로써 차분 정보보호를 적용
        """
        super().categorizeEquivalentClass(attributes)
        self.sensitive_attribute = sensitive_attribute
        for group_key, index_value in self.equivalent_class.items():
            mu = np.nanmean(self._dataframe.loc[index_value, sensitive_attribute])
            sigma = np.nanstd(self._dataframe.loc[index_value, sensitive_attribute])
            
            for i in index_value:
                x = self._dataframe.loc[i, self.sensitive_attribute]
                if mu-boundary*sigma <= x <= mu+boundary*sigma:
                    pass
                elif x > mu+boundary*sigma:
                    self.upperoutlier_dictionary.setdefault(group_key, []).append(i)
                elif x < mu-boundary*sigma:
                    self.loweroutlier_dictionary.setdefault(group_key, []).append(i)
                else:
                    raise ValueError(f"{x}은 유효한 수가 아닙니다.")
    
    def dataGlobalSensitivity(self):
        """특정 레코드(식별가능한 개인) 유무에 따른 민감도 산출하는 메서드
        특정 결과를 얻기 위한 쿼리 K를 각 데이터에 적용한 결과인 K(D1)와 K(D2)가 동일한 분포 S에 속할
        확률의 비율(두 데이터 분포의 차이)을 일정 수준(epsilon)보다 작도록 함
        """
        for group_key, outlier_list self.upperoutlier_dictionary.items():
            for i in outlier_list:
                group_data, exception_data = 0, 0
                group_list = self.equivalent_class[group_key]
                exception_list = group_list.remove(i)
                
                group_data = self._dataframe.loc[group_list, self.sensitive_attribute]
                exception_data = self._dataframe.loc[exception_list, self.sensitive_attribute]
                
                pseudonymize_data = self.laplaceMechanism(
                    group_data, exception_data, self._dataframe.loc[i, self.sensitive_attribute])
                self._dataframe.loc[i, self.sensitive_attribute] = pseudonymize_data
                
                
    @staticmethod
    def estimateLaplaceParameters(self, data):
        """라플라스 분포의 모수 평균(mu)과 스케일(beta)을 추정하는 메서드"""
        mu = np.nanmean(data)
        beta = np.mean(np.abs(data - mu))
        return mu, beta

    @staticmethod
    def laplacePDF(self, x, mu, beta):
        """라플라스 연속확률분포 확률밀도함수
        확률분포(확률변수가 특정한 값을 가질 확률을 나타내는 함수)"""
        return (1 / (2*beta)) * exp(-abs(x-mu) / beta)

    @staticmethod
    def calculateProbabilityRatio(self, include_data, exclude_data):
        """확률변수 x가 라플라스 분포에 속할 확률과 특정 행의 포함 여부 데이터 간 비율을 계산하는 메서드"""
        # 두 데이터의 평균과 스케일 파라미터를 추정
        mu_include, beta_include = self.estimateLaplaceParameters(include_data)
        mu_exclude, beta_exclude = self.estimateLaplaceParameters(exclude_data)
        
        # 두 데이터의 라플라스 분포에 속할 확률 계산
        prob_include = self.laplacePDF(include_data, mu_include, beta_include)
        prob_exclude = self.laplacePDF(exclude_data, mu_exclude, beta_exclude)
        prob_ratio = prob_include / prob_exclude
        
        # 두 데이터의 라플라스 분포에 속할 확률의 비율 계산
        return prob_ratio

    @staticmethod
    def laplaceMechanism(self, include_data, exclude_data, particular_record):
        """라플라스 메커니즘을 적용하여 특정 데이터 행에 랜덤 노이즈값을 추가하는 메서드"""
        # 전역 민감도 계산
        sensitivity = self.calculateProbabilityRatio(include_data, exclude_data)
        # 사용자 지정 개인정보 보호 수전 하이퍼파라미터 엡실론을 통해 스케일 파라미터 정의
        beta = sensitivity / self.ratio_bounded
        # 평균 0, 베타를 분산으로 가지는 라플라스 분포에 속하는 랜덤 난수 추출
        noise = np.random.laplace(0, beta, len(include_data))
        
        # 이상치에 노이즈 추가
        return particular_record + noise
    

In [None]:
# ./pseudonymizer/pseudonymizer/deidentificationTechnique/differentialPrivacy.py

# from pseudonymizer.pseudonymizer.deidentificationTechnique.equivalentClass import EquivalentClass
# from typing import *
# from scipy.stats import laplace

class GaussianDifferentialPrivacy(EquivalentClass):
    """가우시안 메커니즘 적용 차분 프라이버시 클래스"""
    
    def __init__(self, dataframe, self.ratio_bounded: float):
        super().__init__(dataframe)
        # 동질집합을 키로하고 인덱스 번호를 값으로 하는 신뢰구간 초과하는 값을 
        self.upperoutlier_dictionary = {}
        self.loweroutlier_dictionary = {}
        self.sensitive_attribute = None
        
        # ratio_bounded = epsilon(개인정보(가명정보) 보호 수준 결정)
        self.ratio_bounded = self.ratio_bounded
    
    def dataDeviatingfromCI(self, boundary: float, attributes: List[str], sensitive_attribute: str):
        """동질집합 내 평균에서 양쪽 3표준편차의 범위 99.7%에 들지 않는 민감정보 행 번호만 별도로 추출하는 메서드
        가설검정 기반에서 노이즈를 입력하는 방식인 GDP(Gaussian Differential Privacy) 방법
        이 방식은 통계 가설 관점에서 주어진 유의수준에서 최소의 제2종 오류를 trade-off function으로 나타냄으로써 차분 정보보호를 적용
        """
        super().categorizeEquivalentClass(attributes)
        self.sensitive_attribute = sensitive_attribute
        for group_key, index_value in self.equivalent_class.items():
            mu = np.nanmean(self._dataframe.loc[index_value, sensitive_attribute])
            sigma = np.nanstd(self._dataframe.loc[index_value, sensitive_attribute])
            
            for i in index_value:
                x = self._dataframe.loc[i, self.sensitive_attribute]
                if mu-boundary*sigma <= x <= mu+boundary*sigma:
                    pass
                elif x > mu+boundary*sigma:
                    self.upperoutlier_dictionary.setdefault(group_key, []).append(i)
                elif x < mu-boundary*sigma:
                    self.loweroutlier_dictionary.setdefault(group_key, []).append(i)
                else:
                    raise ValueError(f"{x}은 유효한 수가 아닙니다.")
    
    def dataGlobalSensitivity(self):
        """특정 레코드(식별가능한 개인) 유무에 따른 민감도 산출하는 메서드
        특정 결과를 얻기 위한 쿼리 K를 각 데이터에 적용한 결과인 K(D1)와 K(D2)가 동일한 분포 S에 속할
        확률의 비율(두 데이터 분포의 차이)을 일정 수준(epsilon)보다 작도록 함
        """
        for group_key, outlier_list self.upperoutlier_dictionary.items():
            for i in outlier_list:
                group_data, exception_data = 0, 0
                group_list = self.equivalent_class[group_key]
                exception_list = group_list.remove(i)
                
                group_data = self._dataframe.loc[group_list, self.sensitive_attribute]
                exception_data = self._dataframe.loc[exception_list, self.sensitive_attribute]
                
                pseudonymize_data = self.gaussianMechanism(
                    group_data, exception_data, self._dataframe.loc[i, self.sensitive_attribute])
                self._dataframe.loc[i, self.sensitive_attribute] = pseudonymize_data
                
    @staticmethod
    def estimateGaussianParameters(self, data):
        """정규분포의 모수 평균(mu)과 표준편차(sigma)를 추정하는 메서드"""
        mu = np.nanmean(data)
        sigma = np.nanstd(data)
        return mu, sigma
    
    @staticmethod
    def gaussianPDF(self, x, mu, sigma):
        """정규분포(가우시안) 확률밀도함수
        확률분포(확률변수가 특정한 값을 가질 확률을 나타내는 함수)"""
        return (1 / sqrt(2*pi*sigma**2)) * exp(-0.5*((x-mu) / sigma)**2)

    @staticmethod
    def calculateProbabilityRatio(self, include_data, exclude_data):
        """확률변수 x가 정규분포에 속할 확률과 특정 행의 포함 여부 데이터 간 비율을 계산하는 메서드"""
        # 두 데이터의 평균과 표준편차 파라미터를 추정
        mu_include, beta_include = self.estimateGaussianParameters(include_data)
        mu_exclude, beta_exclude = self.estimateGaussianParameters(exclude_data)
        
        # 두 데이터의 라플라스 분포에 속할 확률 계산
        prob_include = self.gaussianPDF(include_data, mu_include, beta_include)
        prob_exclude = self.gaussianPDF(exclude_data, mu_exclude, beta_exclude)
        prob_ratio = prob_include / prob_exclude
        
        # 두 데이터의 정규분포에 속할 확률의 비율 계산
        return prob_ratio
    
    @staticmethod
    def laplaceMechanism(self, include_data, exclude_data, particular_record):
        """가우스 메커니즘을 적용하여 특정 데이터 행에 랜덤 노이즈값을 추가하는 메서드"""
        # 전역 민감도 계산
        sensitivity = self.calculateProbabilityRatio(include_data, exclude_data)
        # 사용자 지정 개인정보 보호 수전 하이퍼파라미터 엡실론을 통해 스케일 파라미터 정의
        beta = sensitivity / self.ratio_bounded
        # 평균 0, 베타를 분산으로 가지는 라플라스 분포에 속하는 랜덤 난수 추출
        noise = np.random.normal(0, sigma, len(data))
        
        # 이상치에 노이즈 추가
        return particular_record + noise
    

In [None]:
# DATA_JOIN_CARDPAYMENT["PAYMENT_DATE"] = DATA_JOIN_CARDPAYMENT["PAYMENT_DATE"].dt.strftime("%Y-%m")
DATA_JOIN_CARDPAYMENT.head(3)
# attributes = ["GENDER", "PAYMENT_DATE", "AFFILIGATESTORE_INDUSTRY_CODE"]
# sensitive_attribute = "AMT_CREDITCARD_PAYMENT"

In [None]:
LDPobject = DifferentialPrivacy(dataframe = DATA_JOIN_CARDPAYMENT)

In [None]:
LDPobject.dataDeviatingfromCI(attributes = ["GENDER", "PAYMENT_DATE", "AFFILIGATESTORE_INDUSTRY_CODE"],
                              sensitive_attribute = "AMT_CREDITCARD_PAYMENT", boundary = 2)

In [None]:
LDPobject.upperoutlier_dictionary

In [None]:
LDPobject.loweroutlier_dictionary

In [None]:
# from pseudonymizer.pseudonymizer.deidentificationTechnique.equivalent_class import EquivalentClass
# from pseudonymizer.pseudonymizer.deidentificationTechnique.kAnonimity import K_Anonymity
# from pseudonymizer.pseudonymizer.deidentificationTechnique.lDiversity import L_Diversity
# from pseudonymizer.pseudonymizer.deidentificationTechnique.tCloseness import T_Closeness

# from typing import *

class PrivacyPreservingModel:
    """개인식별가능정보 속성을 기준으로 그룹화된 데이터로 프라이버시 보호 모델을 적용하여 정량적인 위험성을 규정하는 실행 클래스"""
    def __init__(self, dataframe):
        self._dataframe = dataframe
        self.equivalnt_class = EquivalentClass(self._dataframe)
        self.Kanonymity = K_Anonymity(self._dataframe)
        self.Ldiversity = L_Diversity(self._dataframe)
        self.Tcloseness = T_Closeness(self._dataframe)
        
    def applyKAnonymityOrLDiversity(self, method: str, **kwargs):
        """K-익명성과 L-다양성 모델을 선택적으로 적용하는 메서드
        input
        -----
        method: 프라이버시 보호 모델 메서드를 받고, 
        keyword arguments에 딕셔너리 형식으로 각 기법에 필요한 파라미터를 받아옴"""
        if method == "K":
            self.Kanonymity.applyKAnonymity(**kwargs)
            # K: int, attribute: List[str]
            return self.Kanonymity.K_data
        elif method == "L":
            self.Ldiversity.applyLDiversity(**kwargs)
            # K: int, L: int, attribute: List[str], sensitive_attribute
            return self.Ldiversity.L_data
        
    def applyLocalLDiversity(self, LocalL: int, **kwargs):
        self.Ldiversity.applyLDiversity(**kwargs)
        # K: int, L: int, attribute: List[str], sensitive_attribute
        self.Ldiversity.applyLocalLDiversity(LocalL)
        # local_L: int
        return self.Ldiversity.LocalL_data

    def applyTCloseness(self, aquasi_identifiers: List[str], sensitive_attribute: str, tolerance: float):
        self.Tcloseness.applyTCloseness(quasi_identifiers, tolerance, sensitive_attribute)
        return self.Tcloseness.T_data
    
    def applyDPrivacy(self):
        """차분 프라이버시 기법(차등적 정보보호 기능)을 적용하는 메서드"""
        pass
    
    def __str__(self):
        """동질집합에 대한 정보를 문자열로 반환하는 메서드"""
        return str(self.equivalent_class)
    
# if __name__ == "__main__":

---

6. **가명처리 실행 클래스**

In [None]:
# ./pseudonymizer/pseudonymizer.py

# from abc import ABC, ABCMeta, abstractmethod
# import pandas as pd
    
class Pseudonym:
    def __init__(self, dataframe):
        """원본정보(재현데이터)와 가명처리 구체 클래스를 인스턴스 변수로 선언하는(초기화) 생성자"""
        self._dataframe = dataframe
        self.equivalent_class = {}
        self._pseudonymizers = []
        self._pseudonymDictionary = {}
        
    def __str__(self):
        # __repr__
        """캡슐화된 데이터셋의 속성(컬럼)정보를 반환하는 메서드"""
        return self._dataframe.info()
    
    def categorizeEquivalentClass(self, attributes: List[str]):
        """각 행(레코드)에 대한 개인식별가능정보 속성(컬럼)들 사이에 동질 집합을 확인하는 메서드
        Pseudonym(dataframe).equivalent_class.keys()를 통해 동질집합 확인"""
        groupby_data = self._dataframe.groupby(attributes)
        for group, data in groupby_data:
            if len(group) > 1:
                key = tuple(group)
                # 딕셔너리에서 키 값으로 리스트(동적 타입)는 사용할 수 없으므로 튜플로 변환
                self.equivalent_class[key] = data.index.tolist()
                # 동질 집합에 해당하는 행(레코드)의 인덱스 번호를 키 값으로 조회되도록 저장
                
    def countEquivalentClass(self):
        for group_key, index_value in self.equivalent_class.items():
            print(group_key, len(index_value))
            
    def addPseudonymizer(self, pseudonymizer):
        """가명처리 추상 클래스에 대한 자식 클래스를 입력받는 pseudonymizer파라미터를 가지는 메서드"""
        if isinstance(pseudonymizer, Pseudonymizer):
            self._pseudonymizers.append(pseudonymizer)
        else:
            print("입력받은 {} 기술은 가명처리 기법에 추가할 수 없습니다.".format(pseudonymizer))
    
    def addDictionary(self, column, pseudonymizers):
        """가명처리를 수행할 데이터 컬럼명과 해당 열에 적용할 여러 가명처리 기법 리스트를 입력받아 다양한 비식별 조치를 수행할 수 있도록 지정하는 메서드"""
        self._pseudonymDictionary[column] = pseudonymizers
        
    def pseudonymizeData(self):
        """가명처리 기법을 해당 컬럼에 적용하는 메서드(apply함수를 활용하여 데이터프레임 모든 행, 특정 열에 비식별조치를 취하는 접근방식) """
        for column, pseudonymizers in self._pseudonymDictionary.items():
            for pseudonymizer in pseudonymizers:
                if isinstance(pseudonymizer, CategorizationOfColumn):
                    self._dataframe[column] = pseudonymizer.pseudonymizeData(self._dataframe[column])
                elif isinstance(pseudonymizer, MicroAggregation):
                    self._dataframe[column] = pseudonymizer.pseudonymizeData(self._dataframe, column, self.equivalent_class)
                else:
                    self._dataframe[column] = self._dataframe[column].apply(pseudonymizer.pseudonymizeData)

    def getPseudonymizedDataframe(self):
        """가명처리 데이터 반환"""
        return self._dataframe

In [None]:
PseudonymizeFinanceData = Pseudonym(dataframe = DATA_FINANCE)

In [None]:
print(PseudonymizeFinanceData)

In [None]:
PseudonymizeFinanceData.categorizeEquivalentClass(attributes = ["HOME_TYPE", "TF_LOAN", "TF_PENSION"])

In [None]:
PseudonymizeFinanceData.equivalent_class.keys()

In [None]:
PseudonymizeFinanceData.countEquivalentClass()

In [None]:
PseudonymizeFinanceData.equivalent_class[('비거주용 건물(상가', 'Y', 'Y')]

In [None]:
PseudonymizeFinanceData.getPseudonymizedDataframe()

In [None]:
# DATA_FINANCE.iloc[PseudonymizeFinanceData.equivalent_class[('비거주용 건물(상가', 'Y', 'Y')], :]
DATA_FINANCE["AMT_CASHADVANCE_PAYMENT"].describe()

In [None]:
DATA_FINANCE[DATA_FINANCE["AMT_CREDITLOAN"] > 0] # 94,999명 중 대출실행고객은 9466명

In [None]:
DATA_FINANCE.iloc[
    PseudonymizeFinanceData.equivalent_class[('비거주용 건물(상가', 'Y', 'Y')], :][
    (DATA_FINANCE["AMT_CREDITLOAN"] >= 0) & (DATA_FINANCE["AMT_CASHADVANCE_PAYMENT"] > 8000000)] 

DATA_FINANCE.iloc[
    PseudonymizeFinanceData.equivalent_class[('영업 겸용 단독주택', 'Y', 'Y') ], :][
    (DATA_FINANCE["AMT_CREDITLOAN"] >= 0) & (DATA_FINANCE["AMT_CASHADVANCE_PAYMENT"] > 8000000)] 

# 94,999명 중 대출실행고객은 9466명
# ('영업 겸용 단독주택', 'Y', 'Y') ('비거주용 건물(상가', 'Y', 'Y)

7. **가명처리 모듈 활용 예제 스크립트**

```
# ./pseudonymizer/example/example_usage.py

from pseudonymizer.pseudonymizer import Pseudonym
from pseudonymizer.pseudonymizers.{MODULE.py} import {MODULE}Pseudonymizer


# DB 데이터프레임 생성
Engine = create_engine(
    "mysql://{user_name}:{password}@localhost/{database}".format(root, 0123, FINANCIALCONSUMER), 
    convert_unicode = True)
Connection = Engine.connect()

DATA = pd.read_sql_table("DATA", Connection)


# Pseudonymizer 추상클래스 인스턴스 생성 및 추가
{MODULE}_pseudonymizer = {MODULE}Pseudonymizer
pseudonym_instance = Pseudonym(dataframe = DATA)

# Pseudonymizer의 인스턴스를 입력변수로 설정
pseudonym_instance.addPseudonymizer(pseudonymizer = {MODULE}_pseudonymizer)

# DATA의 특정 컬럼에 가명처리 기법을 적용하기 위한 Dictionary에 입력
pseudonym_instance.addDictionary(column = "PHONE_NUMBER", pseudonymizers = [{MODULE}_pseudonymizer])

# 가명처리 수행
pseudonym_instance.pseudonymizeData()

# 가명처리 데이터프레임 반환
print(pseudonym_instance.getPseudonymizedDataframe())
```

8. **가명처리 결합**

* 결합키(이름, 생년월일, 연락처 등을 활용하여 생성한 암호화된 해시값), 결합키연계정보(결합키가 동일한 정보를 결합할 수 있도록 서로 다른 결합신청자의 결합키를 연계한 정보)


* 가명처리시 사용하는 추가정보는 무엇인가요?
    - 가명처리에 사용되는 추가정보는 가명 생성 및 복원에 사용되는 정보를 의미합니다. 가명처리 알고리즘 및 방법에 따라 필요한 추가정보는 다를 수 있으며, 이러한 정보가 제3자에게 노출되면 가명처리된 정보를 원래 상태로 되돌릴 수 있으므로 보호가 필요합니다.

* 다음은 가명처리에 사용할 수 있는 추가정보의 예시입니다:
    - 키(Key): 암호화 및 해시 과정에서 사용되는 키로, 해당 키를 알면 가명처리된 정보를 복호화하거나 원래의 정보로 복원할 수 있습니다.
    - 매핑 테이블: 원래의 정보와 가명처리된 정보 간의 대응 관계를 보여주는 테이블로, 이를 통해 가명처리된 정보를 원래의 정보로 매핑할 수 있습니다.
    - 알고리즘 세부사항: 가명처리에 사용된 특정 알고리즘의 설정이나 파라미터로, 이러한 세부사항을 알면 원래의 정보를 유추하는 데 도움이 될 수 있습니다.
