In [5]:
import json
import pandas as pd
from urllib.parse import parse_qs, urlparse, unquote
from typing import Dict, List
from datetime import datetime

class LogAnalyzer:
    def __init__(self):
        self.logs = []
        self.parsed_logs = []

    def parse_log_file_from_path(self, file_path: str, output_txt: str = None, output_xlsx: str = None):
        """
        파일 경로에서 로그 파일을 읽어서 분석하고 결과 출력
        """
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                content = file.read()
            
            # 로그 파싱
            self._parse_log_content(content)
            
            # txt 파일 출력
            if output_txt:
                self._export_to_txt(output_txt)
            
            # Excel 파일 출력
            if output_xlsx:
                self._export_to_excel(output_xlsx)
                
        except FileNotFoundError:
            print(f"❌ 파일을 찾을 수 없습니다: {file_path}")
        except Exception as e:
            print(f"❌ 파일 읽기 오류: {e}")

    def _parse_log_content(self, content: str):
        """로그 내용 파싱"""
        self.logs = []
        self.parsed_logs = []
        lines = content.strip().split('\n')
        
        for i, line in enumerate(lines):
            if line.strip():
                try:
                    log_data = json.loads(line.strip())
                    self.logs.append((i+1, log_data))
                except json.JSONDecodeError as e:
                    print(f"JSON 파싱 오류 (라인 {i+1}): {e}")
        
        print(f"=== 총 {len(self.logs)}개의 로그 발견 ===\n")
        
        # 각 로그의 REQUEST 파라미터 분석
        for log_num, log_data in self.logs:
            if 'REQUEST' in log_data:
                params = self._parse_request_url(log_data['REQUEST'])
                if params:
                    parsed_log = {
                        'log_number': log_num,
                        'timestamp': log_data.get('LOGTIME', 'N/A'),
                        'params': params
                    }
                    self.parsed_logs.append(parsed_log)

    def _parse_request_url(self, request_url: str) -> Dict[str, str]:
        """REQUEST URL에서 파라미터를 파싱하고 한글을 올바르게 처리"""
        try:
            # 유니코드 이스케이프 디코딩
            url = request_url.encode('utf-8').decode('unicode_escape')
            parsed_url = urlparse(url)
            query_params = parse_qs(parsed_url.query, keep_blank_values=True)
            
            processed_params = {}
            for key, value_list in query_params.items():
                raw_value = value_list[0] if value_list else ''
                decoded_value = self._fix_korean_text(raw_value)
                processed_params[key] = decoded_value
            
            return processed_params
            
        except Exception as e:
            print(f"파싱 오류: {e}")
            return {}

    def _fix_korean_text(self, text: str) -> str:
        """깨진 한글을 올바르게 디코딩"""
        try:
            decoded = unquote(text, encoding='utf-8')
            
            if self._has_broken_korean(decoded):
                try:
                    fixed = decoded.encode('latin-1').decode('utf-8')
                    return fixed
                except:
                    try:
                        byte_data = bytes([ord(c) for c in decoded if ord(c) < 256])
                        fixed = byte_data.decode('utf-8', errors='ignore')
                        return fixed
                    except:
                        pass
            
            return decoded
            
        except Exception:
            return text

    def _has_broken_korean(self, text: str) -> bool:
        """깨진 한글 패턴이 있는지 확인"""
        broken_patterns = [
            'ë¶', 'ìë', 'ê¼¬', 'ë', 'í¼', 'ì¸', 'ê¸°', 'ê¸', 'ìì¹', 'ì', 'ê'
        ]
        return any(pattern in text for pattern in broken_patterns)

    def _export_to_txt(self, output_file: str):
        """파싱 결과를 txt 파일로 출력"""
        try:
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(f"=== 총 {len(self.logs)}개의 로그 발견 ===\n\n")
                
                for parsed_log in self.parsed_logs:
                    f.write("=" * 60 + "\n")
                    f.write(f"로그 #{parsed_log['log_number']} - {parsed_log['timestamp']}\n")
                    f.write("=" * 60 + "\n")
                    
                    params = parsed_log['params']
                    if params:
                        # URL 정보 (첫 번째 로그에서 추출)
                        for log_num, log_data in self.logs:
                            if log_num == parsed_log['log_number']:
                                if 'REQUEST' in log_data:
                                    url = log_data['REQUEST'].encode('utf-8').decode('unicode_escape')
                                    parsed_url = urlparse(url)
                                    f.write(f"URL: {parsed_url.netloc}{parsed_url.path}\n")
                                    f.write(f"총 파라미터 수: {len(params)}\n")
                                    f.write("-" * 50 + "\n")
                                break
                        
                        # 파라미터 출력
                        for key, value in params.items():
                            f.write(f"{key:20} : {value}\n")
                    else:
                        f.write("REQUEST 필드가 없습니다.\n")
                    
                    f.write("\n")
            
            print(f"✅ txt 파일 저장: {output_file}")
            
        except Exception as e:
            print(f"❌ txt 파일 저장 오류: {e}")

    def _export_to_excel(self, output_file: str):
        """파싱 결과를 Excel 파일로 출력 (page_id, click_type, act_type, click_text, key_list, value_list, len(key_list))"""
        try:
            excel_data = []
            
            for parsed_log in self.parsed_logs:
                params = parsed_log['params']
                
                # 각 로그에서 필요한 정보 추출
                page_id = params.get('page_id', '')
                click_type = params.get('click_type', '')
                act_type = params.get('act_type', '')
                click_text = params.get('click_text', '')
                
                # key_list와 value_list 생성
                key_list = list(params.keys())
                value_list = list(params.values())
                
                key_list_str = ', '.join(key_list)
                value_list_str = ', '.join(str(v) for v in value_list)
                key_count = len(key_list)
                
                excel_data.append({
                    'page_id': page_id,
                    'click_type': click_type,
                    'act_type': act_type,
                    'click_text': click_text,
                    'key_list': key_list_str,
                    'value_list': value_list_str,
                    'len(key_list)': key_count
                })
            
            # DataFrame 생성 및 Excel 저장
            df = pd.DataFrame(excel_data)
            df.to_excel(output_file, index=False, engine='openpyxl')
            
            print(f"✅ Excel 파일 저장: {output_file}")
            print(f"   총 {len(excel_data)}개 로그 처리됨")
            
        except Exception as e:
            print(f"❌ Excel 파일 저장 오류: {e}")

    def analyze_logs(self, file_path: str):
        """로그 분석 (기존 기능 유지)"""
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                content = file.read()
            self._parse_log_content(content)
            
            # 콘솔 출력
            for parsed_log in self.parsed_logs:
                print("=" * 60)
                print(f"로그 #{parsed_log['log_number']} - {parsed_log['timestamp']}")
                print("=" * 60)
                
                params = parsed_log['params']
                if params:
                    # URL 정보 출력
                    for log_num, log_data in self.logs:
                        if log_num == parsed_log['log_number']:
                            if 'REQUEST' in log_data:
                                url = log_data['REQUEST'].encode('utf-8').decode('unicode_escape')
                                parsed_url = urlparse(url)
                                print(f"URL: {parsed_url.netloc}{parsed_url.path}")
                                print(f"총 파라미터 수: {len(params)}")
                                print("-" * 50)
                            break
                    
                    # 파라미터 출력
                    for key, value in params.items():
                        print(f"{key:20} : {value}")
                else:
                    print("REQUEST 필드가 없습니다.")
                
                print()
                
        except FileNotFoundError:
            print(f"❌ 파일을 찾을 수 없습니다: {file_path}")
        except Exception as e:
            print(f"❌ 파일 읽기 오류: {e}")

# 사용 예시
def run_example():
    """사용 예시"""
    analyzer = LogAnalyzer()
    
    # 1. 로그 분석 및 txt, Excel 파일 생성
    analyzer.parse_log_file_from_path(
        file_path="log_file.txt",
        output_txt="./result/log_analysis_result.txt",
        output_xlsx="./result/log_analysis_result.xlsx"
    )
    
    # 2. 콘솔에 로그 분석 결과 출력만 하고 싶은 경우
    # analyzer.analyze_logs("log_file.txt")

# 기존 QA 검증 클래스 (기존 기능 유지)
class LogQAValidator:
    def __init__(self):
        self.scenarios = []
        self.logs = []
        self.unique_keys = []
        
    def load_scenario_csv(self, csv_file_path: str):
        """CSV 파일에서 시나리오 로드"""
        self.scenarios = []
        df = pd.read_csv(csv_file_path, encoding='utf-8')
        
        # 컬럼 구성에 따라 고유키 조합 결정
        columns = df.columns.tolist()
        if 'page_id' in columns:
            self.unique_keys = ['page_id', 'click_type', 'act_type']
            print("🔑 고유키 조합: page_id + click_type + act_type")
        elif 'click_text' in columns:
            self.unique_keys = ['click_text', 'click_type', 'act_type']
            print("🔑 고유키 조합: click_text + click_type + act_type")
        else:
            raise ValueError("지원하지 않는 시나리오 형식입니다. page_id 또는 click_text 컬럼이 필요합니다.")
        
        for _, row in df.iterrows():
            key_list = [k.strip() for k in str(row['key_list']).split(',') if k.strip()]
            scenario = {
                'expected_keys': key_list
            }
            
            # 고유키 값들 추가
            for key in self.unique_keys:
                scenario[key] = str(row[key]) if pd.notna(row[key]) else ''
            
            self.scenarios.append(scenario)
        
        print(f"✅ 시나리오 로드: {len(self.scenarios)}개")

    def parse_logs_from_file(self, log_file_path: str):
        """로그 txt 파일에서 직접 파싱"""
        try:
            with open(log_file_path, 'r', encoding='utf-8') as file:
                log_content = file.read()
            self.parse_logs(log_content)
            print(f"✅ 로그 파일 로드: {log_file_path}")
        except FileNotFoundError:
            print(f"❌ 파일을 찾을 수 없습니다: {log_file_path}")
        except Exception as e:
            print(f"❌ 파일 읽기 오류: {e}")

    def parse_logs(self, log_content: str):
        """로그 파싱"""
        self.logs = []
        for i, line in enumerate(log_content.strip().split('\n')):
            if not line.strip():
                continue
                
            try:
                log_data = json.loads(line.strip())
                if 'REQUEST' not in log_data:
                    continue
                    
                params = self._parse_url(log_data['REQUEST'])
                if params:
                    self.logs.append({
                        'line': i + 1,
                        'timestamp': log_data.get('LOGTIME', ''),
                        'params': params
                    })
            except json.JSONDecodeError:
                continue
                
        print(f"✅ 로그 파싱: {len(self.logs)}개")

    def _parse_url(self, request_url: str) -> Dict[str, str]:
        """URL 파라미터 파싱"""
        try:
            url = request_url.encode('utf-8').decode('unicode_escape')
            parsed = urlparse(url)
            query_params = parse_qs(parsed.query, keep_blank_values=True)
            
            result = {}
            for key, values in query_params.items():
                if values:
                    decoded = unquote(values[0], encoding='utf-8')
                    # 한글 깨짐 복구 시도
                    try:
                        if any(ord(c) > 127 for c in decoded):
                            decoded = decoded.encode('latin-1').decode('utf-8')
                    except:
                        pass
                    result[key] = decoded
            return result
        except:
            return {}

    def validate_and_export(self, output_file: str):
        """검증 수행 및 결과 XLSX 출력"""
        results = []
        
        for scenario in self.scenarios:
            # 시나리오와 매칭되는 로그 찾기
            matched_logs = self._find_matching_logs(scenario)
            
            if not matched_logs:
                # 매칭되는 로그가 없는 경우
                for key in scenario['expected_keys']:
                    result = {
                        'key': key,
                        'value': 'NOT_FOUND',
                        'pass': 'FAIL'
                    }
                    
                    # 고유키 값들 추가
                    for uk in self.unique_keys:
                        result[uk] = scenario.get(uk, '')
                    
                    results.append(result)
                continue
            
            # 각 매칭된 로그에 대해 검증
            for log in matched_logs:
                actual_keys = set(log['params'].keys())
                expected_keys = set(scenario['expected_keys'])
                
                # 예상 키들 검증
                for key in expected_keys:
                    value = log['params'].get(key, 'MISSING')
                    pass_status = 'PASS' if key in actual_keys else 'FAIL'
                    
                    result = {
                        'key': key,
                        'value': value,
                        'pass': pass_status
                    }
                    
                    # 고유키 값들 추가
                    for uk in self.unique_keys:
                        result[uk] = scenario.get(uk, '')
                    
                    results.append(result)
                
                # 예상치 못한 키들 (선택사항)
                unexpected_keys = actual_keys - expected_keys
                for key in unexpected_keys:
                    result = {
                        'key': key,
                        'value': log['params'][key],
                        'pass': 'UNEXPECTED'
                    }
                    
                    # 고유키 값들 추가
                    for uk in self.unique_keys:
                        result[uk] = scenario.get(uk, '')
                    
                    results.append(result)

        # XLSX 출력 - 컬럼 순서 정렬
        df_results = pd.DataFrame(results)
        
        # 고유키 조합에 따른 컬럼 순서 정의
        if 'page_id' in self.unique_keys:
            column_order = ['page_id', 'click_type', 'act_type', 'key', 'value', 'pass']
        else:  # click_text 기반
            column_order = ['click_text', 'click_type', 'act_type', 'key', 'value', 'pass']
        
        # 컬럼 순서 적용
        df_results = df_results[column_order]
        df_results.to_excel(output_file, index=False, engine='openpyxl')
        
        # 요약 출력
        self._print_summary(results)
        print(f"✅ 결과 저장: {output_file}")

    def _find_matching_logs(self, scenario: Dict) -> List[Dict]:
        """시나리오와 매칭되는 로그 찾기"""
        matched = []
        
        for log in self.logs:
            params = log['params']
            
            # 동적 고유키로 매칭
            is_match = True
            for key in self.unique_keys:
                if params.get(key, '') != scenario.get(key, ''):
                    is_match = False
                    break
            
            if is_match:
                matched.append(log)
        
        return matched

    def _print_summary(self, results: List[Dict]):
        """결과 요약 출력"""
        df = pd.DataFrame(results)
        
        total = len(df)
        passed = len(df[df['pass'] == 'PASS'])
        failed = len(df[df['pass'] == 'FAIL'])
        unexpected = len(df[df['pass'] == 'UNEXPECTED'])
        
        print(f"\n📊 검증 결과:")
        print(f"   전체: {total}개")
        print(f"   통과: {passed}개")
        print(f"   실패: {failed}개")
        print(f"   예상외: {unexpected}개")
        print(f"   성공률: {passed/total*100:.1f}%" if total > 0 else "   성공률: 0%")

if __name__ == "__main__":
    # 로그 분석 및 파일 출력 예시
    print("=== 로그 분석 및 파일 출력 ===")
    run_example()

=== 로그 분석 및 파일 출력 ===
JSON 파싱 오류 (라인 94): Expecting ',' delimiter: line 1 column 618 (char 617)
=== 총 385개의 로그 발견 ===

✅ txt 파일 저장: ./result/log_analysis_result.txt
✅ Excel 파일 저장: ./result/log_analysis_result.xlsx
   총 385개 로그 처리됨
