In [11]:
import pandas as pd
import csv
import io
from typing import Dict, List

class SimpleTSVValidator:
    def __init__(self, exclude_keys=None):
        self.test_cases = []
        self.exclude_keys = exclude_keys if exclude_keys is not None else ['click_type']
        
    def load_tsv_file(self, file_path: str):
        try:
            # header=0 사용 (첫 번째 행이 헤더)
            df = pd.read_csv(file_path, sep='\t', encoding='utf-8', header=0)
            if len(df.columns) == 19:
                df = df.iloc[:,:-4]
            else:
                pass
            
            df.columns = df.columns.str.rstrip(',').str.strip()
            
            for col in df.columns:
                if df[col].dtype == 'object':
                    df[col] = df[col].astype(str).str.rstrip(',').str.strip()
            
            print(f"로드된 컬럼: {list(df.columns)}")  # 디버그용
            
            for idx, row in df.iterrows():
                def safe_str(value):
                    if pd.isna(value) or str(value).lower() == 'nan':
                        return ''
                    return str(value).strip()
                
                # 실제 TSV 구조에 맞춰 수정
                # key 컬럼 (정답 키들)
                expected_keys_str = safe_str(row['key'])
                expected_keys = [k.strip() for k in expected_keys_str.split(',') if k.strip() and k.strip().lower() != 'nan']
                
                # value 컬럼 (정답 값들) - 키와 매핑되도록 파싱
                expected_values_str = safe_str(row['value'])
                expected_values = self._parse_values_by_key_count(expected_values_str, len(expected_keys))
                
                # keys_combined 컬럼 (실제 키들) - page_id, act_type, click_type 포함
                actual_keys_str = safe_str(row['keys_combined'])
                actual_keys = [k.strip() for k in actual_keys_str.split(',') if k.strip() and k.strip().lower() != 'nan']
                
                # 제외할 키들 필터링
                actual_keys_filtered = [k for k in actual_keys if k not in self.exclude_keys]
                
                # values_combined 컬럼 (실제 값들) - page_id, act_type, click_type 값 포함
                values_str = safe_str(row['values_combined'])
                values = self._parse_values_by_key_count(values_str, len(actual_keys))
                
                # 실제 키-값 매핑 생성 (keys_combined에서 추출한 값들 사용)
                actual_key_value_map = {}
                for i, key in enumerate(actual_keys):
                    if i < len(values):
                        actual_key_value_map[key] = safe_str(values[i])
                    else:
                        actual_key_value_map[key] = ''
                
                test_case = {
                    'unique_id': idx + 1,
                    '기능': safe_str(row['기능']),
                    '경로': safe_str(row['경로']),
                    '활동': safe_str(row['활동']),
                    'page_id': safe_str(row['page_id']),
                    'act_type': safe_str(row['act_type.1']) if 'act_type.1' in row else safe_str(row.get('act_type', '')),
                    'click_type': safe_str(row['click_type.1']) if 'click_type.1' in row else safe_str(row.get('click_type', '')),
                    'click_text': safe_str(row.get('click_text', '')),
                    # 정답 값들 추가
                    'act_type_정답': safe_str(row.get('act_type', '')),
                    'click_type_정답': safe_str(row.get('click_type', '')),
                    'value_정답': safe_str(row['value']),
                    'expected_keys': expected_keys,
                    'expected_values': expected_values,
                    'actual_keys': actual_keys_filtered,
                    'actual_keys_original': actual_keys,
                    'values': values,
                    'actual_key_value_map': actual_key_value_map  # 실제 키-값 매핑 추가
                }
                self.test_cases.append(test_case)
                
        except Exception as e:
            print(f"❌ 오류: {e}")
            import traceback
            traceback.print_exc()
            raise
    
    def _parse_values_by_key_count(self, values_str: str, expected_count: int):
        if not values_str or expected_count <= 0:
            return []
        
        try:
            csv_reader = csv.reader(io.StringIO(values_str), delimiter=',', quotechar='"')
            parsed_row = next(csv_reader, [])
            cleaned_values = [v.strip() for v in parsed_row if v.strip() and v.strip().lower() != 'nan']
            
            if len(cleaned_values) == expected_count:
                return cleaned_values
            elif len(cleaned_values) > expected_count:
                return self._merge_excess_values(cleaned_values, expected_count)
            else:
                while len(cleaned_values) < expected_count:
                    cleaned_values.append('')
                return cleaned_values
        except:
            simple_split = [v.strip() for v in values_str.split(',') if v.strip() and v.strip().lower() != 'nan']
            if len(simple_split) == expected_count:
                return simple_split
            elif len(simple_split) > expected_count:
                return self._merge_excess_values(simple_split, expected_count)
            else:
                while len(simple_split) < expected_count:
                    simple_split.append('')
                return simple_split
    
    def _merge_excess_values(self, split_values: list, expected_count: int):
        if len(split_values) <= expected_count:
            return split_values
        
        result = split_values[:2] if expected_count > 2 else split_values[:1]
        
        if expected_count > 2:
            excess_count = len(split_values) - expected_count
            middle_start = 2
            middle_end = middle_start + excess_count + 1
            
            if middle_end < len(split_values):
                middle_parts = split_values[middle_start:middle_end]
                merged_middle = ', '.join(middle_parts)
                result.append(merged_middle)
                result.extend(split_values[middle_end:])
            else:
                remaining = split_values[middle_start:]
                merged_remaining = ', '.join(remaining)
                result.append(merged_remaining)
        else:
            remaining = split_values[len(result):]
            merged_remaining = ', '.join(remaining)
            result.append(merged_remaining)
        
        return result[:expected_count]
    
    def validate_and_export(self, output_file: str):
        results = []
        
        def safe_str_result(value):
            if pd.isna(value) or str(value).lower() == 'nan':
                return ''
            return str(value).strip()
        
        def compare_values(actual, expected):
            """두 값을 비교하여 PASS/FAIL 반환 (빈 값 처리 포함)"""
            actual_clean = safe_str_result(actual)
            expected_clean = safe_str_result(expected)
            
            # 둘 다 비어있으면 PASS
            if not actual_clean and not expected_clean:
                return 'PASS'
            # 하나만 비어있거나 값이 다르면 FAIL
            elif actual_clean != expected_clean:
                return 'FAIL'
            else:
                return 'PASS'
        
        for test_case in self.test_cases:
            unique_id = test_case['unique_id']
            actual_keys = test_case['actual_keys']
            expected_keys = test_case['expected_keys']
            expected_values = test_case['expected_values']
            values = test_case['values']
            actual_keys_original = test_case['actual_keys_original']
            actual_key_value_map = test_case['actual_key_value_map']
            
            # 정답 키-값 매핑 생성
            expected_key_value_map = {}
            for i, key in enumerate(expected_keys):
                if i < len(expected_values):
                    expected_key_value_map[key] = safe_str_result(expected_values[i])
                else:
                    expected_key_value_map[key] = ''
            
            if len(actual_keys) == 0 and len(expected_keys) > 0:
                # 로그 누락되어도 각 정답 키별로 행 생성
                for key in expected_keys:
                    expected_value = expected_key_value_map.get(key, '')
                    
                    results.append({
                        '고유번호': unique_id,
                        '기능': safe_str_result(test_case['기능']),
                        '경로': safe_str_result(test_case['경로']),
                        '활동': safe_str_result(test_case['활동']),
                        'page_id': safe_str_result(test_case['page_id']),
                        'act_type': safe_str_result(test_case['act_type']),
                        'act_type_정답': safe_str_result(test_case['act_type_정답']),
                        'click_type': safe_str_result(test_case['click_type']),
                        'click_type_정답': safe_str_result(test_case['click_type_정답']),
                        'click_text': safe_str_result(test_case['click_text']),
                        'key': '',  # 실제 키 (누락시)
                        'key_정답': key,  # 정답 키
                        'value': '',
                        'value_정답': expected_value,
                        'key-pass': 'FAIL',  # 키 존재 여부
                        'value-pass': 'FAIL',  # 값 일치 여부
                        'exist-pass': 'MISSING'  # 키 존재 여부
                    })
                continue
            
            if len(actual_keys) == 0 and len(expected_keys) == 0:
                results.append({
                    '고유번호': unique_id,
                    '기능': safe_str_result(test_case['기능']),
                    '경로': safe_str_result(test_case['경로']),
                    '활동': safe_str_result(test_case['활동']),
                    'page_id': safe_str_result(test_case['page_id']),
                    'act_type': safe_str_result(test_case['act_type']),
                    'act_type_정답': safe_str_result(test_case['act_type_정답']),
                    'click_type': safe_str_result(test_case['click_type']),
                    'click_type_정답': safe_str_result(test_case['click_type_정답']),
                    'click_text': safe_str_result(test_case['click_text']),
                    'key': '로그 누락',
                    'key_정답': 'NO_LOG_SPEC',
                    'value': '로그 사양 없음',
                    'value_정답': safe_str_result(test_case['value_정답']),
                    'key-pass': 'REVIEW',  # 키 존재 여부
                    'value-pass': 'REVIEW',  # 값 일치 여부
                    'exist-pass': 'REVIEW'  # 키 존재 여부
                })
                continue    
            
            # 정상 검증 케이스 - keys_combined에서 파싱된 값 사용
            key_value_map = actual_key_value_map  # keys_combined에서 파싱된 실제 키-값 매핑 사용
            
            actual_keys_set = set(actual_keys)
            expected_keys_set = set(expected_keys)
            
            for key in expected_keys:
                # 실제 값과 정답 값 가져오기
                actual_value = key_value_map.get(key, '')
                expected_value = expected_key_value_map.get(key, '')
                
                # 키 존재 여부 확인
                key_exists = key in actual_keys_set
                actual_key = key if key_exists else ''
                
                # 특별한 키들에 대한 정답값 처리 (기존 컬럼값과 비교)
                if key == 'page_id':
                    expected_value = safe_str_result(test_case['page_id'])
                elif key == 'act_type':
                    expected_value = safe_str_result(test_case['act_type_정답'])
                elif key == 'click_type':
                    expected_value = safe_str_result(test_case['click_type_정답'])
                
                # 키와 값 각각 평가
                key_pass = 'PASS' if key_exists else 'FAIL'
                exist_pass = 'PASS' if key_exists else ''
                
                if not key_exists:
                    value_pass = 'FAIL'  # 키가 없으면 값도 FAIL
                    exist_pass = 'MISSING' #6/16 추가가
                else:
                    value_pass = compare_values(actual_value, expected_value)
                
                results.append({
                    '고유번호': unique_id,
                    '기능': safe_str_result(test_case['기능']),
                    '경로': safe_str_result(test_case['경로']),
                    '활동': safe_str_result(test_case['활동']),
                    'page_id': safe_str_result(test_case['page_id']),
                    'act_type': safe_str_result(test_case['act_type']),
                    'act_type_정답': safe_str_result(test_case['act_type_정답']),
                    'click_type': safe_str_result(test_case['click_type']),
                    'click_type_정답': safe_str_result(test_case['click_type_정답']),
                    'click_text': safe_str_result(test_case['click_text']),
                    'key': actual_key,  # 실제 키 (있으면 키, 없으면 '로그 누락')
                    'key_정답': key,  # 정답 키
                    'value': safe_str_result(actual_value),
                    'value_정답': expected_value,
                    'key-pass': key_pass,  # 키 존재 여부
                    'value-pass': value_pass,  # 값 일치 여부
                    'exist-pass': exist_pass  # 키 존재 여부
                })
            
            # UNEXPECTED 키 처리
            unexpected_keys = actual_keys_set - expected_keys_set
            for key in unexpected_keys:
                actual_value = key_value_map.get(key, '')
                
                results.append({
                    '고유번호': unique_id,
                    '기능': safe_str_result(test_case['기능']),
                    '경로': safe_str_result(test_case['경로']),
                    '활동': safe_str_result(test_case['활동']),
                    'page_id': safe_str_result(test_case['page_id']),
                    'act_type': safe_str_result(test_case['act_type']),
                    'act_type_정답': safe_str_result(test_case['act_type_정답']),
                    'click_type': safe_str_result(test_case['click_type']),
                    'click_type_정답': safe_str_result(test_case['click_type_정답']),
                    'click_text': safe_str_result(test_case['click_text']),
                    'key': key,  # 실제 키 (UNEXPECTED는 실제로 존재하는 키)
                    'key_정답': '',  # UNEXPECTED 키에는 정답이 없음
                    'value': safe_str_result(actual_value),
                    'value_정답': '',  # UNEXPECTED 키에는 정답 값이 없음
                    'key-pass': 'UNEXPECTED',  # 키 존재 여부 (예상되지 않은 키)
                    'value-pass': 'UNEXPECTED',  # 값 일치 여부 (예상되지 않은 값)
                    'exist-pass': 'UNEXPECTED'  # 키 존재 여부 (예상되지 않은 키)
                })
        
        df_results = pd.DataFrame(results)
        
        # 컬럼 순서 정의
        column_order = [
            '고유번호', '기능', '경로', '활동', 'page_id', 
            'click_text',
            'act_type_정답',
            'act_type',
            'click_type_정답',
            'click_type',
            'key_정답',
            'key',
            'value_정답',
            'value',
            'key-pass',
            'value-pass',
            'exist-pass'  # 키와 값 각각의 평가 컬럼
        ]

        
        # 컬럼 순서 재정렬
        output_file
        df_results = df_results[column_order]
        df_results_retry = df_results.iloc[:,:14]
        df_results.to_excel(output_file, index=False, engine='openpyxl')
        df_results_retry.to_excel("./result/qa_result_retry.xlsx", index=False, engine='openpyxl')
        
        total = len(df_results)
        key_passed = len(df_results[df_results['key-pass'] == 'PASS'])
        value_passed = len(df_results[df_results['value-pass'] == 'PASS'])
        print(f"검증 완료: 전체 {total}개")
        print(f"키 통과: {key_passed}개 ({key_passed/total*100:.1f}%)")
        print(f"값 통과: {value_passed}개 ({value_passed/total*100:.1f}%)")
        
        return results

# 사용 예시
def run_validator():
    validator = SimpleTSVValidator(exclude_keys=['page_url', 'os_name'])
    validator.load_tsv_file("./notion-log_tsv/tester.tsv")
    validator.validate_and_export("./result/qa_result.xlsx")

if __name__ == "__main__":
    run_validator()

로드된 컬럼: ['기능', '경로', '활동', 'act_type', 'click_type', 'key', 'value', 'no', 'page_id', 'click_type.1', 'act_type.1', 'click_text', 'keys_combined', 'values_combined', 'key_count']
검증 완료: 전체 1325개
키 통과: 416개 (31.4%)
값 통과: 87개 (6.6%)


In [10]:
df = pd.read_csv("./notion-log_tsv/tester.tsv", sep='\t', encoding='utf-8', header=0)
if len(df.columns) == 19:
    df = df.iloc[:,:-4]
else :
    pass
len(df.columns)
# df.iloc[:,:-4]

19