# audit.log 파일 정제 
- 2025년 9월 23일(화) - teams 

In [None]:
# pandas 출력 옵션 설정 - 60줄까지 표시
pd.set_option('display.max_rows', 60)
print("pandas 출력 옵션이 60줄로 설정되었습니다.")

In [1]:
import re
import json

# 로그 파일 경로
log_file = "/home/kongju/DATA/DREAM/teams/0923_audit.log"

# 카운트 변수
json_count = 0
keyval_count = 0
other_count = 0

# 정규식 패턴 정의
json_pattern = re.compile(r"AUDIT_PAYLOAD\s*{")   # JSON 형식 로그
keyval_pattern = re.compile(r"timestamp=\d{4}-\d{2}-\d{2}")  # Key=Value 로그

# 파일 읽기
with open(log_file, "r", encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue

        if json_pattern.search(line):
            json_count += 1
        elif keyval_pattern.search(line):
            keyval_count += 1
        else:
            other_count += 1

# 결과 출력
print("로그 형식별 카운트")
print(f"JSON 형식 (AUDIT_PAYLOAD): {json_count} 건 * 각 200건 = {json_count * 200} 건")
print(f"Key=Value 형식 (timestamp=...): {keyval_count} 건")
print(f"그 외 기타 형식: {other_count} 건")


로그 형식별 카운트
JSON 형식 (AUDIT_PAYLOAD): 4 건 * 각 200건 = 800 건
Key=Value 형식 (timestamp=...): 11200 건
그 외 기타 형식: 4382 건


In [2]:
# AUDIT_PAYLOAD 추출용 정규식 (중괄호 { } 까지 포함)
audit_pattern = re.compile(r"AUDIT_PAYLOAD\s+(\{.*\})")

# 파일 읽기
with open(log_file, "r", encoding="utf-8") as f:
    lines = f.readlines()

block_idx = 0
for line in lines:
    match = audit_pattern.search(line)
    if match:
        block_idx += 1
        json_str = match.group(1)

        try:
            data = json.loads(json_str)   # JSON 파싱
            rows = data.get("rows", [])
            print(f"[블록 {block_idx}] rows 케이스 개수: {len(rows)} 건")
        except json.JSONDecodeError as e:
            print(f"[블록 {block_idx}] JSON 파싱 오류: {e}")

[블록 1] rows 케이스 개수: 200 건
[블록 2] rows 케이스 개수: 200 건
[블록 3] rows 케이스 개수: 200 건
[블록 4] rows 케이스 개수: 200 건


In [None]:
import re, json, csv, html
from pathlib import Path
import pandas as pd

log_path = Path(log_file)  # Path 객체로 변환
out_path = Path("/home/kongju/DREAM/SYSTEM_LOGS/audit/output/0923_audit.csv")


In [26]:
# 원하는 컬럼 순서 (no 컬럼을 맨 앞에 추가)
column_order = ['no', 'timestamp', 'caseData', 'index', 'caseResult', 'caseUser', 'caseType']

rows_out = []

# 한글 처리를 위해 여러 인코딩 시도
def detect_encoding(file_path):
    encodings = ['cp949', 'euc-kr', 'utf-8', 'utf-8-sig']
    for encoding in encodings:
        try:
            with file_path.open("r", encoding=encoding, errors="strict") as f:
                # 첫 1000줄 테스트해서 한글이 있는지 확인
                for i, line in enumerate(f):
                    if i > 1000:
                        break
                # 성공하면 인코딩 이름만 반환
                return encoding
        except:
            continue
    # 모든 인코딩이 실패하면 utf-8 반환
    return "utf-8"

def process_multiline_case(lines_buffer, debug=False):
    """여러 줄에 걸친 key=value 케이스를 처리"""
    if not lines_buffer:
        return None
    
    if debug:
        print(f"=== 멀티라인 케이스 처리 ===")
        for i, line in enumerate(lines_buffer):
            print(f"  {i}: '{line.strip()}'")
    
    kv_dict = {}
    for line in lines_buffer:
        line = line.strip()
        if "=" in line:
            k, v = line.split("=", 1)
            k = k.strip()
            v = v.strip()
            kv_dict[k] = v
            if debug:
                print(f"    파싱: {k} = '{v}'")
    
    if debug:
        print(f"  최종 딕셔너리: {kv_dict}")
    
    # 최소한 timestamp가 있어야 유효한 케이스
    if 'timestamp' not in kv_dict or not kv_dict['timestamp']:
        if debug:
            print("  --> timestamp 없음, 무시")
        return None
    
    # 최소한 2개 이상의 필드가 있어야 유효한 케이스로 간주
    valid_fields = sum(1 for v in kv_dict.values() if v and v.strip())
    if valid_fields < 2:
        if debug:
            print(f"  --> 유효 필드 {valid_fields}개, 무시")
        return None
    
    result = {
        "no": len(rows_out) + 1,
        "timestamp": kv_dict.get("timestamp", ""),
        "caseData": kv_dict.get("caseData", ""),
        "index": kv_dict.get("index", ""),
        "caseResult": kv_dict.get("caseResult", ""),
        "caseUser": kv_dict.get("caseUser", ""),
        "caseType": kv_dict.get("caseType", ""),
    }
    
    if debug:
        print(f"  --> 유효한 케이스: {result}")
        print("=" * 40)
    
    return result

def is_indented_line(line):
    """들여쓰기된 라인인지 확인 (공백이나 탭으로 시작)"""
    return line.startswith('  ') or line.startswith('\t')

def is_key_value_line(line):
    """key=value 형태의 라인인지 확인"""
    stripped = line.strip()
    return '=' in stripped and not stripped.startswith('=')

# 적절한 인코딩 감지
detected_encoding = detect_encoding(log_path)
print(f"사용된 인코딩: {detected_encoding}")

# 멀티라인 케이스 처리를 위한 버퍼
multiline_buffer = []
processed_count = 0
skipped_incomplete = 0
debug_mode = True  # 처음 몇 개 케이스만 디버그 출력

# 감지된 인코딩으로 파일 열기
with log_path.open("r", encoding=detected_encoding, errors="replace") as f:
    for line_num, line in enumerate(f, 1):
        original_line = line
        line = line.rstrip('\n\r')  # 개행문자만 제거, 공백은 유지
        
        # 빈 줄 처리
        if not line.strip():
            # 빈 줄이 나오면 현재 멀티라인 케이스 완료
            if multiline_buffer:
                case = process_multiline_case(multiline_buffer, debug_mode and processed_count < 5)
                if case:
                    rows_out.append(case)
                    processed_count += 1
                else:
                    skipped_incomplete += 1
                multiline_buffer = []
            continue

        # --- 1. JSON 형식 (AUDIT_PAYLOAD) 처리 ---
        if "AUDIT_PAYLOAD" in line:
            # 이전에 쌓인 멀티라인 케이스가 있다면 처리
            if multiline_buffer:
                case = process_multiline_case(multiline_buffer, debug_mode and processed_count < 5)
                if case:
                    rows_out.append(case)
                    processed_count += 1
                else:
                    skipped_incomplete += 1
                multiline_buffer = []
            
            try:
                json_start = line.index("AUDIT_PAYLOAD") + len("AUDIT_PAYLOAD")
                json_str = line[json_start:].strip()
                first_brace = json_str.find("{")
                if first_brace == -1:
                    continue
                s = json_str[first_brace:]
                depth, end_idx = 0, None
                for i, ch in enumerate(s):
                    if ch == "{":
                        depth += 1
                    elif ch == "}":
                        depth -= 1
                        if depth == 0:
                            end_idx = i
                            break
                if end_idx is None:
                    continue
                payload = json.loads(s[: end_idx + 1])
            except Exception as e:
                if debug_mode:
                    print(f"JSON 파싱 오류 (라인 {line_num}): {e}")
                continue

            for r in payload.get("rows", []):
                row = {
                    "no": len(rows_out) + 1,
                    "timestamp": html.unescape(r.get("logDatetime", "")).replace("\xa0", " ").strip(),
                    "caseData": r.get("caseData", ""),
                    "index": r.get("index", ""),
                    "caseResult": r.get("caseResult", ""),
                    "caseUser": r.get("caseUser", ""),
                    "caseType": r.get("caseType", ""),
                }
                rows_out.append(row)
                processed_count += 1

        # --- 2. 한 줄 Key=Value 형식 처리 ---
        elif line.strip().startswith("timestamp=") and " caseData=" in line:
            # 이전에 쌓인 멀티라인 케이스가 있다면 처리
            if multiline_buffer:
                case = process_multiline_case(multiline_buffer, debug_mode and processed_count < 5)
                if case:
                    rows_out.append(case)
                    processed_count += 1
                else:
                    skipped_incomplete += 1
                multiline_buffer = []
            
            # 한 줄에 모든 key=value가 있는 경우
            parts = line.split()
            kv_dict = {}
            
            # 더 정확한 파싱을 위해 정규식 사용
            import re
            # key=value 패턴을 찾되, value에 공백이 포함될 수 있음을 고려
            pattern = r'(\w+)=([^=]*?)(?=\s+\w+=|$)'
            matches = re.findall(pattern, line)
            
            for key, value in matches:
                kv_dict[key.strip()] = value.strip()
            
            if debug_mode and processed_count < 5:
                print(f"=== 한줄 케이스 (라인 {line_num}) ===")
                print(f"  원본: '{line.strip()}'")
                print(f"  파싱 결과: {kv_dict}")
            
            if 'timestamp' in kv_dict:
                row = {
                    "no": len(rows_out) + 1,
                    "timestamp": kv_dict.get("timestamp", ""),
                    "caseData": kv_dict.get("caseData", ""),
                    "index": kv_dict.get("index", ""),
                    "caseResult": kv_dict.get("caseResult", ""),
                    "caseUser": kv_dict.get("caseUser", ""),
                    "caseType": kv_dict.get("caseType", ""),
                }
                rows_out.append(row)
                processed_count += 1
                
                if debug_mode and processed_count <= 5:
                    print(f"  --> 저장됨: {row}")
                    print("=" * 40)
            else:
                skipped_incomplete += 1

        # --- 3. 멀티라인 Key=Value 형식 처리 ---
        elif line.strip().startswith("timestamp="):
            # 이전에 쌓인 멀티라인 케이스가 있다면 처리
            if multiline_buffer:
                case = process_multiline_case(multiline_buffer, debug_mode and processed_count < 5)
                if case:
                    rows_out.append(case)
                    processed_count += 1
                else:
                    skipped_incomplete += 1
                multiline_buffer = []
            
            # 새로운 멀티라인 케이스 시작
            multiline_buffer = [line]
            
        elif multiline_buffer and (is_indented_line(line) or is_key_value_line(line)):
            # 현재 멀티라인 케이스에 추가
            if debug_mode and processed_count < 5:
                print(f"멀티라인 추가 (라인 {line_num}): '{line.strip()}'")
            multiline_buffer.append(line)
        
        elif multiline_buffer:
            # 멀티라인 케이스가 진행 중인데 관련 없는 줄을 만난 경우
            # 현재 케이스를 완료하고 새로운 줄 처리
            case = process_multiline_case(multiline_buffer, debug_mode and processed_count < 5)
            if case:
                rows_out.append(case)
                processed_count += 1
            else:
                skipped_incomplete += 1
            multiline_buffer = []
        
        # 진행상황 출력
        if line_num % 1000 == 0:
            print(f"처리 중... {line_num}번째 라인, 유효한 케이스: {len(rows_out)}개")

# 파일 끝에서 남은 멀티라인 케이스 처리
if multiline_buffer:
    case = process_multiline_case(multiline_buffer, debug_mode)
    if case:
        rows_out.append(case)
        processed_count += 1
    else:
        skipped_incomplete += 1

# CSV 저장 (지정한 column_order 적용)
with out_path.open("w", encoding="utf-8", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=column_order)
    writer.writeheader()
    writer.writerows(rows_out)

print(f"✅ 유효한 케이스 {len(rows_out)}건을 CSV로 저장했습니다 → {out_path}")
print(f"⚠️ 무시된 불완전한 케이스: {skipped_incomplete}건")
print(f"📊 전체 처리 통계:")
print(f"   - JSON 케이스: {sum(1 for row in rows_out if 'AUDIT_PAYLOAD' in str(row))}건")
print(f"   - 한줄 key=value 케이스: {processed_count - skipped_incomplete - len(rows_out)}건")
print(f"   - 멀티라인 케이스: 나머지")

# 처음 5개 결과 샘플 출력
print(f"\n🔍 처음 5개 케이스 샘플:")
for i, row in enumerate(rows_out[:5]):
    print(f"  {i+1}. timestamp: '{row['timestamp']}', caseData: '{row['caseData'][:30]}...', caseUser: '{row['caseUser']}'")

사용된 인코딩: utf-8
처리 중... 1000번째 라인, 유효한 케이스: 923개
처리 중... 2000번째 라인, 유효한 케이스: 1047개
처리 중... 3000번째 라인, 유효한 케이스: 1172개
처리 중... 4000번째 라인, 유효한 케이스: 1295개
처리 중... 7000번째 라인, 유효한 케이스: 2136개
처리 중... 8000번째 라인, 유효한 케이스: 3082개
처리 중... 9000번째 라인, 유효한 케이스: 4019개
처리 중... 10000번째 라인, 유효한 케이스: 4972개
처리 중... 11000번째 라인, 유효한 케이스: 5922개
처리 중... 12000번째 라인, 유효한 케이스: 6878개
처리 중... 13000번째 라인, 유효한 케이스: 7854개
처리 중... 14000번째 라인, 유효한 케이스: 8844개
처리 중... 15000번째 라인, 유효한 케이스: 9834개
처리 중... 16000번째 라인, 유효한 케이스: 10824개
처리 중... 17000번째 라인, 유효한 케이스: 11814개
✅ 유효한 케이스 12000건을 CSV로 저장했습니다 → /home/kongju/DREAM/audit/output/0923_audit.csv
⚠️ 무시된 불완전한 케이스: 0건
📊 전체 처리 통계:
   - JSON 케이스: 0건
   - 한줄 key=value 케이스: 0건
   - 멀티라인 케이스: 나머지

🔍 처음 5개 케이스 샘플:
  1. timestamp: '2025-09-23  14:41:48', caseData: 'TEST_IDP, 시동 시 테스트...', caseUser: 'TEST_IDP'
  2. timestamp: '2025-09-23  14:41:48', caseData: 'TEST_IDP, 시동 시 테스트...', caseUser: 'TEST_IDP'
  3. timestamp: '2025-09-23  14:41:48', caseData: 'TEST_IDP, 시동 시 테스트...', caseUser

- 엑셀 파일로 저장

In [27]:
import pandas as pd

# Excel 파일로 저장
excel_path = out_path.with_suffix('.xlsx')
df = pd.DataFrame(rows_out)
df.to_excel(excel_path, index=False)
print(f"Saved {len(rows_out)} case rows to: {excel_path}")


Saved 12000 case rows to: /home/kongju/DREAM/audit/output/0923_audit.xlsx


In [29]:
print(df.shape)
df.columns


(12000, 7)


Index(['no', 'timestamp', 'caseData', 'index', 'caseResult', 'caseUser', 'caseType'], dtype='object')

In [30]:
print(f"caseUser 고유 값 개수: {df['caseUser'].nunique()}")
df['caseUser'].value_counts()

caseUser 고유 값 개수: 5


caseUser
TEST_IDP    11542
ssoadmin      274
ssouser       124
usertest       52
ssodream        8
Name: count, dtype: int64

In [33]:
df['caseUser'].unique()

array(['TEST_IDP', 'ssoadmin', 'usertest', 'ssouser', 'ssodream'],
      dtype=object)

In [34]:
print(f"caseType 고유 값 개수: {df['caseType'].nunique()}")
df['caseType'].value_counts()

caseType 고유 값 개수: 35


caseType
암호키 파기             2318
감사 기능 시작/종료     2077
암호모듈 자가시험       2057
암호키 생성             1137
SSO프로세스 확인        1054
SSO모듈 무결성 검증     1051
??? ??                   607
???? ????                331
?? ?? ??/??              321
SSO???? ??               173
SSO?? ??? ??             172
관리자 로그인 요청       138
암호 연산                 66
사용자 로그인 요청        63
인증토큰 생성             58
?? ??                     50
사용자 연계 요청          50
??? ??? ??                40
관리자 로그아웃           34
비밀정보 파기             34
??? ?? ??                 32
???? ??                   31
??? ????                  24
감사정보 설정 변경        14
사용자 로그아웃           13
???? ?? ??                10
사용자 정책 변경           8
관리자 정보 변경           8
??? ?? IP ??               5
클라이언트 정보 변경       4
사용자 정보 변경           4
관리자 접속 IP 변경        4
관리자 정책 변경           4
????? ?? ??                4
관리자 비밀번호 변경       4
Name: count, dtype: int64