In [20]:
from dataclasses import dataclass
import hashlib
import json
from pathlib import Path
import psutil
import time
from tqdm.auto import tqdm
import logging
@dataclass() 
class ExperimentConfig:
    # 核心参数
    num_bands: int = 4
    band_size: int = 16
    threshold: float = 0.8
    simhash_bits: int = 64
    ngram_range: tuple = (1, 3)
    chunk_size: int = 10000
    
    # 路径参数
    raw_data_root: str = "D:/spring2025/UCUG2011-Discrete-Math/project 1/discrete-math-project-1/src/data"
    processed_dir: str = "D:/spring2025/UCUG2011-Discrete-Math/Duplication_detecting/processeddata_optimizedsimhash"

    def __post_init__(self):
        """参数校验"""
        if self.num_bands * self.band_size != self.simhash_bits:
            raise ValueError("num_bands * band_size must equal simhash_bits")
        if not (0 < self.threshold <= 1):
            raise ValueError("Threshold must be in (0.0, 1.0]")

    @property
    def param_hash(self) -> str:
        """包含所有参数的哈希标识"""
        param_dict = {
            "num_bands": self.num_bands,
            "band_size": self.band_size,
            "threshold": round(self.threshold, 2),
            "simhash_bits": self.simhash_bits,
            "ngram_range": self.ngram_range,
            "chunk_size": self.chunk_size
        }
        return hashlib.md5(json.dumps(param_dict, sort_keys=True).encode()).hexdigest()[:10]
        
    @property
    def params_dict(self) -> dict:
        """获取可序列化的参数字典"""
        return {
            'num_bands': self.num_bands,
            'band_size': self.band_size,
            'threshold': self.threshold,
            'simhash_bits': self.simhash_bits,
            'ngram_range': str(self.ngram_range),
            'chunk_size': self.chunk_size
        }
    
    @property
    def experiment_dir(self) -> Path:
        """动态实验目录"""
        dir_path = Path(self.processed_dir) / f"exp_{self.param_hash}"
        dir_path.mkdir(parents=True, exist_ok=True)
        return dir_path
    
    preprocessed_path = Path(processed_dir) / "preprocessed.csv"
    
    @property
    def signature_path(self) -> Path:
        """动态签名文件路径"""
        return self.experiment_dir / f"signatures_{self.param_hash}.parquet"

    @property
    def candidates_path(self) -> Path:
        """动态候选对路径"""
        return self.experiment_dir / f"candidates_{self.param_hash}.csv"
    
    @property
    def performance_log_path(self) -> Path:
        """动态性能评估路径"""
        return self.experiment_dir / f"performance.json"
    
    def __post_init__(self):
        """参数校验并创建基础目录"""
        if self.num_bands * self.band_size != self.simhash_bits:
            raise ValueError("num_bands * band_size must equal simhash_bits")
        if not (0 < self.threshold <= 1):
            raise ValueError("Threshold must be in (0.0, 1.0]")
        
        # 新增：确保基础目录存在
        Path(self.processed_dir).mkdir(parents=True, exist_ok=True)

def configure_logging():
    """带自动目录创建的日志配置"""
    log_dir = Path(ExperimentConfig.processed_dir)
    log_file = log_dir / 'process.log'
    
    # 创建目录（如果不存在）
    log_dir.mkdir(parents=True, exist_ok=True)
    
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    logging.info(f"日志系统已初始化，日志文件: {log_file}")

# 调用配置函数
configure_logging()

2025-04-19 21:18:22,408 [INFO] 日志系统已初始化，日志文件: D:\spring2025\UCUG2011-Discrete-Math\Duplication_detecting\processeddata_optimizedsimhash\process.log


In [21]:
def monitor_performance(func):
    def wrapper(*args, **kwargs):
        start_time = time.monotonic()
        process = psutil.Process()
        start_mem = process.memory_info().rss
        
        result = func(*args, **kwargs)
        
        # 计算资源消耗
        elapsed = time.monotonic() - start_time
        end_mem = process.memory_info().rss
        peak_mem = (end_mem - start_mem) // 1024 // 1024  # 转换为MB
        
        # 自动捕获配置对象
        config = next((a for a in args if isinstance(a, ExperimentConfig)), None)
        
        if config:
            # 记录到实验结果
            performance_data = {
                "stage": func.__name__,
                "time_sec": round(elapsed, 2),
                "peak_memory_mb": peak_mem,
                "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
            }
            
            # 确保目录存在
            config.experiment_dir.mkdir(parents=True, exist_ok=True)
            
            # 合并历史日志（如果存在）
            log_path = config.performance_log_path
            existing_logs = []
            if log_path.exists():
                try:
                    with open(log_path, "r") as f:
                        existing_logs = json.load(f)
                except json.JSONDecodeError:
                    logging.warning(f"性能日志 {log_path} 损坏，重置文件")

            # 追加新记录
            existing_logs.append(performance_data)
            
            # 安全写入
            try:
                with open(log_path, "w") as f:
                    json.dump(existing_logs, f, indent=2)
            except IOError as e:
                logging.error(f"写入性能日志失败: {str(e)}")

        
        return result
    return wrapper

In [22]:
import re
import pandas as pd
from datasets import load_dataset

def preprocess_document(text):
    # 1. 删除结构化标记（包括_START_xxx和_NEWLINE_）
    text = re.sub(
        r'_START[＿_]\w+\b\s*|_NEWLINE_',  # 同时匹配两种标记
        '', 
        text,
        flags=re.MULTILINE
    )
    
    # 2. 标准化为小写
    text = text.lower()
    
    # 3. 去除标点符号（保留字母数字和空格）
    text = re.sub(r'[^\w\s]', '', text)
    
    # 4. 合并连续空格并去除首尾空白
    return ' '.join(text.strip().split())

def preprocess_data(config: ExperimentConfig):
    logging.info("Starting Step 1: Data Preprocessing")
    
    # 加载原始数据
    test_files = [str(p) for p in Path(f"{config.raw_data_root}/test").glob("*.arrow")]
    val_files = [str(p) for p in Path(f"{config.raw_data_root}/validation").glob("*.arrow")]
    
    test_data = load_dataset("arrow", data_files=test_files, split="train")
    val_data = load_dataset("arrow", data_files=val_files, split="train")
    
    # 转换为Pandas并重命名列
    val_df = val_data.data.table.to_pandas().rename(columns={'wikidata_id': 'doc_id'})
    test_df = test_data.data.table.to_pandas().rename(columns={'wikidata_id': 'doc_id'})
    
    # 添加source列
    val_df['source'] = 'val'
    test_df['source'] = 'test'
    
    # 合并数据集
    merged_df = pd.concat([val_df[['doc_id', 'text', 'source']], 
                          test_df[['doc_id', 'text', 'source']]])
    
    # 分块处理文本（内存优化）
    chunk_size = 10000
    chunks = []
    
    with tqdm(total=len(merged_df), desc="Preprocessing Texts") as pbar:
        for i in range(0, len(merged_df), chunk_size):
            chunk = merged_df.iloc[i:i+chunk_size].copy()
            chunk['clean_text'] = chunk['text'].apply(
                lambda x: preprocess_document(str(x))
            )
            # 删除原始文本列释放内存
            chunk.drop(columns=['text'], inplace=True)
            chunks.append(chunk)
            pbar.update(len(chunk))
    
    processed_df = pd.concat(chunks)
    
    # 保存预处理结果
    processed_df[['doc_id', 'source', 'clean_text']].to_csv(
        ExperimentConfig.preprocessed_path,
        index=False,
        encoding='utf-8'
    )
    logging.info(f"Saved preprocessed data to {ExperimentConfig.preprocessed_path}")

In [30]:
import numpy as np
from sklearn.feature_extraction.text import HashingVectorizer, TfidfTransformer
from sklearn.pipeline import make_pipeline
from joblib import Parallel, delayed

class OptimizedTFIDFSimHasher:
    def __init__(self, num_bits=64):
        self.num_bits = num_bits
        self._feature_cache = {}  # 特征哈希缓存
        
    def _get_feature_hash(self, feature_id: int) -> np.ndarray:
        """预计算特征哈希（使用稳定哈希）"""
        if feature_id not in self._feature_cache:
            # generate a 160-bit hash using SHA-1
            hash_bytes = hashlib.sha256(str(feature_id).encode()).digest()
            # convert to an integer and mask to num_bits bits
            hash_int = int.from_bytes(hash_bytes, byteorder='big')
            hash_int &= (1<<self.num_bits) - 1  # mask to num_bits bits
            byte_length = (self.num_bits + 7) // 8  # compute byte length
            hash_bytes = hash_int.to_bytes(byte_length, 'big')            # convert to binary representation using vectorized operations to avoid loops
            bits = np.unpackbits(np.frombuffer(hash_bytes, dtype=np.uint8))
            bits = bits[:self.num_bits]
            self._feature_cache[feature_id] = np.where(bits, 1, -1).astype(np.int8)
        return self._feature_cache[feature_id]

    def batch_generate(self, tfidf_matrix):
        num_docs= tfidf_matrix.shape[0]
        hash_vectors=np.zeros((num_docs, self.num_bits), dtype=np.float32)

        for doc_idx in range(num_docs):
            row = tfidf_matrix[doc_idx]
            indices = row.indices
            data = row.data
            feature_hashes = np.stack([self._get_feature_hash(i) for i in indices])
            hash_vectors[doc_idx]=(feature_hashes*data[:, np.newaxis]).sum(axis=0)
        
        binary_sigs = np.where(hash_vectors > 0,'1','0')
        return [int(''.join(row), 2) for row in binary_sigs]
    

In [32]:
import csv
import numpy as np
from pathlib import Path
from collections import defaultdict
from typing import List, Union
import random
from pybloom_live import ScalableBloomFilter

class SimHashLSHProcessor:
    def __init__(self, config):
        """
        增强型LSH处理器，适配预处理流水线
        :param num_bands: 分桶数（必须满足 num_bands * band_size == 64）
        :param band_size: 每桶比特数
        :param threshold: 相似度阈值 (0.0~1.0)
        """
        self.config=config
        self._validate_config()
        self.num_bands = config.num_bands
        self.band_size = config.band_size
        self.threshold = config.threshold
        self.simhash_bits=config.simhash_bits
        self.inverted_index = defaultdict(set)
        self.signatures = {}
        self.doc_metadata = {}
        self.seen_pairs = ScalableBloomFilter(
        initial_capacity=1_000_000, 
        error_rate=1e-6
        ) 

    def _validate_config(self):
        """参数校验"""
        if self.config.num_bands * self.config.band_size != self.config.simhash_bits:
            raise ValueError(
                f"num_bands({self.config.num_bands}) * band_size({self.config.band_size}) "
                f"must equal to simhash_bits({self.config.simhash_bits})"
            )
        if not (0 < self.config.threshold <= 1):
            raise ValueError(f"Threshold must be in (0.0, 1.0], got {self.config.threshold}")

    def load_signatures(self, parquet_path: Union[str, Path]):
        """
        从Parquet文件加载预处理生成的签名数据
        :param parquet_path: 预处理输出文件路径
        """
        try:
            data = pd.read_parquet(parquet_path)
        except FileNotFoundError:
            logging.error(f"签名文件不存在: {parquet_path}")
            raise
        except Exception as e:
            logging.error(f"加载Parquet失败: {str(e)}")
            raise
        
        # 数据格式验证
        required_fields = {'doc_id', 'source', 'simhash'}
        if not required_fields.issubset(data.columns):
            missing=required_fields - set(data.columns)
            raise KeyError(f"Missing required fields: {missing}")
        
                # 加载到内存
        for row in data.itertuples():
            if len(row.simhash) != self.config.simhash_bits:
                raise ValueError(
                    f"Length of the signature of the file {row.doc_id} is not equal to {self.config.simhash_bits}. "
                    f"Expected: {self.config.simhash_bits}."
                )
                
            self.signatures[row.doc_id] = row.simhash
            self.doc_metadata[row.doc_id] = {'source': row.source}

    def _band_signature(self, binary_str: str) -> List[str]:
        """优化分桶策略，适配预处理格式"""
        return [
            hashlib.sha1(binary_str[i*self.band_size : (i+1)*self.band_size].encode()).hexdigest()
            for i in range(self.num_bands)
        ]

    def _hamming_similarity(self, sig1: str, sig2: str) -> float:
        """向量化汉明距离计算"""
        arr1 = np.frombuffer(sig1.encode(), 'u1') - ord('0')
        arr2 = np.frombuffer(sig2.encode(), 'u1') - ord('0')
        return 1 - np.count_nonzero(arr1 != arr2) / self.simhash_bits
    


    def generate_candidates(self, output_path: str):
        """
        生成候选对并保存为CSV
        :param output_path: 输出文件路径
        """
        seen_pairs = ScalableBloomFilter(
            initial_capacity=1_000_000, 
            error_rate=1e-7
        )
        # 建立倒排索引
        self.inverted_index.clear()
        for doc_id, sig in tqdm(self.signatures.items(), desc="Building Inverted Index"):
            for band_hash in self._band_signature(sig):
                self.inverted_index[band_hash].add(doc_id)


        stats = {'total': 0, 'valid':0}
        BUFFER_SIZE = 10_000  # 批量写入缓冲区大小
        buffer = []
        
        with open(output_path, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(["doc_id1", "source1", "doc_id2", "source2", "similarity"])
            
            for doc_id1 in tqdm(self.signatures, desc="生成候选对"):
                candidate_ids = set()
                for band_hash in self._band_signature(self.signatures[doc_id1]):
                    candidate_ids.update(self.inverted_index.get(band_hash,set()))
                
                stats['total'] += len(candidate_ids)
                
                for doc_id2 in candidate_ids:
                    if doc_id1 >= doc_id2:
                        continue
                    
                    pair_key = f"{doc_id1}_{doc_id2}"
                    if pair_key in seen_pairs:
                        continue
                    seen_pairs.add(pair_key)
                    similarity = self._hamming_similarity(
                        self.signatures[doc_id1], 
                        self.signatures[doc_id2]
                    )
                    
                    if similarity >= self.config.threshold:
                        seen_pairs.add(pair_key)
                        stats['valid'] += 1
                        
                        source1 = self.doc_metadata[doc_id1]['source']
                        source2 = self.doc_metadata[doc_id2]['source']
                        writer.writerow([
                            doc_id1, source1, 
                            doc_id2, source2, 
                            f"{similarity:.4f}"
                        ])

                        if buffer:
                            writer.writerows
        
        logging.info(f"生成候选对完成: 总候选 {stats['total']}, 有效 {stats['valid']}")
        return {
            'total_candidates': stats['total'],
            'valid_candidates': stats['valid'],
        }

In [25]:
class UnionFind:
    def __init__(self, elements):
        self.parent = {e: e for e in elements}
        self.rank = {e: 1 for e in elements}
    
    def find(self, x):
        if self.parent[x] != x:
            self.parent[x] = self.find(self.parent[x])
        return self.parent[x]
    
    def union(self, x, y):
        root_x = self.find(x)
        root_y = self.find(y)
        
        if root_x != root_y:
            if self.rank[root_x] > self.rank[root_y]:
                self.parent[root_y] = root_x
            else:
                self.parent[root_x] = root_y
                if self.rank[root_x] == self.rank[root_y]:
                    self.rank[root_y] += 1

    def get_components(self):
        components =defaultdict(list)
        for elem in self.parent:
            root = self.find(elem)
            components[root].append(elem)
        return components

In [33]:
class OnlineIDF:
    def __init__(self):
        self.doc_count = 0
        self.feature_df = np.zeros(2**20)  # 对应哈希空间
        
    def partial_fit(self, X):
        # X是HashingVectorizer输出的计数矩阵
        self.doc_count += X.shape[0]
        # 统计每个特征出现的文档数
        self.feature_df += (X > 0).sum(axis=0).A1  # 转换为ndarray
    
    def get_idf(self):
        # 计算IDF值
        idf = np.log((self.doc_count + 1) / (self.feature_df + 1)) + 1
        return idf

In [35]:
class ExperimentRunner:
    def __init__(self, base_dir=ExperimentConfig.processed_dir):
        self.base_dir = Path(base_dir)
        self.current_results = {}
        self.summary_path = self.base_dir / "experiment_summary.csv" 
        self._init_summary_file()
    
    def _init_summary_file(self):
        """初始化空CSV文件"""
        if not self.summary_path.exists():
            columns = [
                'experiment_id', 'timestamp',
                *ExperimentConfig().params_dict.keys(),
                'simhash_time_sec', 'lsh_time_sec',
                'total_time_sec', 'peak_memory_mb',
                'total_candidates', 'valid_candidates',
                'total_duplicate_ratio', 'cross_set_ratio',
                'intra_val_ratio', 'intra_test_ratio',
            ]
            pd.DataFrame(columns=columns).to_csv(self.summary_path, index=False)

    def _update_summary(self, config: ExperimentConfig, metrics: dict):
        if self.summary_path.exists():
            existing_df = pd.read_csv(self.summary_path)
        else:
            existing_df = pd.DataFrame()

        new_row = {
            'experiment_id': config.param_hash,
            'timestamp': pd.Timestamp.now().isoformat(),
            **config.params_dict,
            **metrics
        }
        updated_df = pd.concat([existing_df, pd.DataFrame([new_row])], ignore_index=True)
        updated_df.to_csv(self.summary_path, index=False)

    def run_experiment(self, config: ExperimentConfig):
        """执行完整实验流程"""
        self.current_results = {}
        # exp_dir = self.base_dir / config.param_hash  
        # exp_dir.mkdir(parents=True, exist_ok=True)
        
        # 动态生成路径
        logging.info(f"签名文件路径: {config.signature_path}")
        logging.info(f"候选对路径: {config.candidates_path}")
        
        # 执行各阶段并记录性能
        if config.preprocessed_path.exists():
            logging.info(f"使用现有预处理文件: {config.preprocessed_path}")
        else:
            logging.info(f"开始预处理数据: {config.preprocessed_path}")
            self._run_preprocessing(config)

        simhash_start = time.time()
        self._run_simhash(config)
        simhash_time = time.time() - simhash_start
        logging.info(f"SimHash签名生成耗时: {simhash_time:.2f}秒")
        
        lsh_start = time.time()
        candidate_path=self._run_lsh(config)
        lsh_time = time.time() - lsh_start

        dup_metrics= self._calculate_duplication_metrics(candidate_path)
        

        try:
            with open(config.performance_log_path) as f:
                perf_data = json.load(f)
                peak_memory = max(log['peak_memory_mb'] for log in perf_data)
        except FileNotFoundError:
            logging.error("性能日志未生成，请检查预处理阶段")
            peak_memory = -1  # 标记异常值
        except Exception as e:
            logging.error(f"读取性能日志失败: {str(e)}")
            peak_memory = -1
            
        summary_metrics={
            **config.params_dict,
            'simhash_time_sec': round(simhash_time, 2),
            'lsh_time_sec': round(lsh_time, 2),
            'total_time_sec': round(simhash_time + lsh_time, 2),
            'peak_memory_mb': peak_memory,
            'total_candidates': self.current_results.get('total_candidates', 0),
            'valid_candidates': self.current_results.get('valid_candidates', 0),
            **dup_metrics
        }
        # 保存实验结果
        self._update_summary(config, summary_metrics)
        
    def _run_preprocessing(self, config):
        Path(config.processed_dir).mkdir(parents=True, exist_ok=True)
        
        if config.preprocessed_path.exists():
            logging.info(f"使用现有预处理文件: {config.preprocessed_path}")
            return
        
        # 执行完整预处理
        preprocess_data(config)
    
    @monitor_performance
    def _run_simhash(self, config):
        """优化后的签名生成流程"""
        # 加载预处理后的数据
        preprocessed_path = Path(ExperimentConfig.preprocessed_path)
        if not preprocessed_path.exists():
            raise FileNotFoundError(f"预处理文件不存在: {preprocessed_path}")
        
        online_idf= OnlineIDF()
        vectorizer = HashingVectorizer(
        ngram_range=config.ngram_range,
        stop_words='english',
        n_features=2**20,  # 增加哈希空间避免冲突
        alternate_sign=True, 
        binary=False
        )
        idf_reader=pd.read_csv(preprocessed_path, memory_map=True, usecols=['clean_text'],chunksize=config.chunk_size)
        for chunk in idf_reader:
            texts= chunk['clean_text'].astype('string').tolist()
            X= vectorizer.transform(texts)
            online_idf.partial_fit(X)

        tfidf_trans= TfidfTransformer()
        tfidf_trans.idf_= online_idf.get_idf()

        dtype={'doc_id': 'string', 'source': 'category', 'clean_text': 'string'}
        data_reader=pd.read_csv(preprocessed_path, usecols=['doc_id', 'source', 'clean_text'], 
                                dtype=dtype, chunksize=10000,memory_map=True)

        signature_chunks=[]
        for chunk in data_reader:
            chunk=chunk.astype(dtype)
            texts=chunk['clean_text'].tolist()
            
            X= vectorizer.fit_transform(texts)
            X_tfidf = tfidf_trans.transform(X)

            hasher= OptimizedTFIDFSimHasher(num_bits=config.simhash_bits)
            signatures = hasher.batch_generate(X_tfidf)

            chunk_df = pd.DataFrame({
                'doc_id': chunk['doc_id'],
                'source': chunk['source'],
                'simhash': [format(sig, f'0{config.simhash_bits}b') for sig in signatures]
            })
            signature_chunks.append(chunk_df)
            del chunk_df, X, X_tfidf#, signatures  # 释放内存

        signature_df = pd.concat(signature_chunks, ignore_index=True) 

        sample = signature_df.sample(min(1000, len(signature_df)), random_state=42) 
        assert all(sample['simhash'].str.len() == config.simhash_bits), "签名长度异常"
            
        
        # 保存签名文件
        config.signature_path.parent.mkdir(parents=True, exist_ok=True)
        signature_df.to_parquet(
            config.signature_path,
            engine='pyarrow',
            compression='snappy',
            index=False
        )
        logging.info(f"SimHash签名已保存至: {config.signature_path}")
    
    @monitor_performance
    def _run_lsh(self, config):
        '''generate candidates'''
        processor = SimHashLSHProcessor(config=config)
        try:
            processor.load_signatures(config.signature_path)
        except Exception as e:
            logging.error(f"加载签名失败: {str(e)}")
            raise
        result=processor.generate_candidates(config.candidates_path)
        self.current_results.update({
            'total_candidates': result['total_candidates'],
            'valid_candidates': result['valid_candidates']
        })

        return config.candidates_path
    
    def _calculate_duplication_metrics(self, candidates_path: Path) -> dict:
        """计算详细的重复率指标"""

        candidates = pd.read_csv(candidates_path)
        preprocessed_path = Path(ExperimentConfig.preprocessed_path)
        preprocessed = pd.read_csv(preprocessed_path)
        total_docs = len(preprocessed)
        
        # 构建并查集
        uf = UnionFind(preprocessed['doc_id'].tolist())
        
        # 合并所有候选对
        for _, row in candidates.iterrows():
            uf.union(row['doc_id1'], row['doc_id2'])
        
        # 统计各连通分量
        components = uf.get_components()
        
        # 初始化统计指标
        cross_set_duplicates = 0    # 跨数据集重复文档数
        intra_val_duplicates = 0    # 验证集内部重复文档数
        intra_test_duplicates = 0   # 测试集内部重复文档数
        duplicate_docs =set()

        # 分析每个连通分量
        for component in components.values():
            component_size = len(component)
            if component_size < 2:
                continue  # 忽略单文档组
            
            # 统计来源分布
            sources = preprocessed[preprocessed['doc_id'].isin(component)]['source']
            has_val = 'val' in sources.values
            has_test = 'test' in sources.values
            
            # 统计重复文档数（去除基准计数）
            duplicate_docs.update(component)
            
            if has_val and has_test:
                cross_set_duplicates += component_size  # 整个分量都算作跨集重复
            elif has_val:
                intra_val_duplicates += component_size
            elif has_test:
                intra_test_duplicates += component_size
        
        # 计算重复率（基于总文档数）
        return {
            'total_docs': total_docs,
            'total_duplicate_ratio': len(duplicate_docs) / total_docs if total_docs else 0,
            'cross_set_ratio': cross_set_duplicates / total_docs if total_docs else 0,
            'intra_val_ratio': intra_val_duplicates / total_docs if total_docs else 0,
            'intra_test_ratio': intra_test_duplicates / total_docs if total_docs else 0

        }

In [36]:
def main():
    runner = ExperimentRunner()

    # 手动定义参数组合 (已验证 num_bands * band_size = simhash_bits)
    specified_params = [
         {'num_bands': 4, 'band_size': 16, 'threshold': 0.8, 'simhash_bits': 64, 'ngram_range': (1,3), 'chunk_size': 10000},
        # {'num_bands': 4, 'band_size': 16, 'threshold': 0.75, 'simhash_bits': 64, 'ngram_range': (1,3), 'chunk_size': 10000},
        # {'num_bands': 4, 'band_size': 16, 'threshold': 0.7, 'simhash_bits': 64, 'ngram_range': (1,3), 'chunk_size': 10000},
        # {'num_bands': 8, 'band_size': 8, 'threshold': 0.8, 'simhash_bits': 64, 'ngram_range': (1,3), 'chunk_size': 10000},
        # {'num_bands': 8, 'band_size': 16, 'threshold': 0.8, 'simhash_bits': 128, 'ngram_range': (1,3), 'chunk_size': 10000},
        # {'num_bands': 4, 'band_size': 32, 'threshold': 0.8, 'simhash_bits': 128, 'ngram_range': (1,3), 'chunk_size': 10000},
        # {'num_bands': 4, 'band_size': 16, 'threshold': 0.8, 'simhash_bits': 64, 'ngram_range': (2,4), 'chunk_size': 10000},
        # {'num_bands': 8, 'band_size': 16, 'threshold': 0.8, 'simhash_bits': 128, 'ngram_range': (2,4), 'chunk_size': 10000},
        # {'num_bands': 4, 'band_size': 16, 'threshold': 0.8, 'simhash_bits': 64, 'ngram_range': (1,3), 'chunk_size': 50000},
        # {'num_bands': 8, 'band_size': 16, 'threshold': 0.8, 'simhash_bits': 128, 'ngram_range': (1,3), 'chunk_size': 50000},
        # {'num_bands': 4, 'band_size': 16, 'threshold': 0.8, 'simhash_bits': 64, 'ngram_range': (1,3), 'chunk_size': 100000},
        # {'num_bands': 8, 'band_size': 16, 'threshold': 0.8, 'simhash_bits': 128, 'ngram_range': (1,3), 'chunk_size': 100000},
    ]

    print(f"总实验数: {len(specified_params)}")
    
    for i, params in enumerate(tqdm(specified_params, desc="Running Specific Experiments")):
            config = ExperimentConfig(
                num_bands=params['num_bands'],
                band_size=params['band_size'],
                threshold=params['threshold'],
                simhash_bits=params['simhash_bits'],
                ngram_range=params['ngram_range'],
                chunk_size=params['chunk_size']
            )
            
            print(f"\n=== 实验 {i+1}/{len(specified_params)} ===")
            print(f"参数哈希: {config.param_hash}")
            print(f"配置详情: {params}")
            
            # 运行实验
            runner.run_experiment(config)
            
            # 显示性能摘要
            print("\n[性能摘要]")
            print(f"总耗时: {runner.current_results.get('total_time_sec', 'N/A')}s")
            print(f"内存峰值: {runner.current_results.get('peak_memory_mb', 'N/A')}MB")
            print(f"候选对总数: {runner.current_results.get('total_candidates', 'N/A')}")
            print("-"*50)
            
        # except Exception as e:
        #     print(f"\n!!! 实验 {i+1} 失败: {str(e)}")
        #     print(f"失败参数: {params}")
            if config.signature_path.exists():
                print(f"已生成签名文件大小: {config.signature_path.stat().st_size} bytes")
            continue  # 继续执行后续实验



def load_performance_log(log_path: Path) -> list:
    """安全加载性能日志"""
    if not log_path.exists():
        return []
    try:
        with open(log_path, "r") as f:
            return json.load(f)
    except json.JSONDecodeError:
        print(f"警告: 性能日志 {log_path} 格式错误")
        return []
    except Exception as e:
        print(f"加载日志失败: {str(e)}")
        return []

if __name__ == "__main__":
    main()

总实验数: 1


Running Specific Experiments:   0%|          | 0/1 [00:00<?, ?it/s]2025-04-19 21:27:00,671 [INFO] 签名文件路径: D:\spring2025\UCUG2011-Discrete-Math\Duplication_detecting\processeddata_optimizedsimhash\exp_dc3a27f2da\signatures_dc3a27f2da.parquet
2025-04-19 21:27:00,672 [INFO] 候选对路径: D:\spring2025\UCUG2011-Discrete-Math\Duplication_detecting\processeddata_optimizedsimhash\exp_dc3a27f2da\candidates_dc3a27f2da.csv
2025-04-19 21:27:00,673 [INFO] 使用现有预处理文件: D:\spring2025\UCUG2011-Discrete-Math\Duplication_detecting\processeddata_optimizedsimhash\preprocessed.csv



=== 实验 1/1 ===
参数哈希: dc3a27f2da
配置详情: {'num_bands': 4, 'band_size': 16, 'threshold': 0.8, 'simhash_bits': 64, 'ngram_range': (1, 3), 'chunk_size': 10000}


2025-04-19 21:42:38,712 [INFO] SimHash签名已保存至: D:\spring2025\UCUG2011-Discrete-Math\Duplication_detecting\processeddata_optimizedsimhash\exp_dc3a27f2da\signatures_dc3a27f2da.parquet
2025-04-19 21:42:38,841 [INFO] SimHash签名生成耗时: 938.17秒
Building Inverted Index: 100%|██████████| 325871/325871 [00:03<00:00, 108190.62it/s]
生成候选对: 100%|██████████| 325871/325871 [07:48<00:00, 696.20it/s]
2025-04-19 21:50:30,964 [INFO] 生成候选对完成: 总候选 26528983, 有效 5530
  updated_df = pd.concat([existing_df, pd.DataFrame([new_row])], ignore_index=True)
Running Specific Experiments: 100%|██████████| 1/1 [24:13<00:00, 1453.53s/it]


[性能摘要]
总耗时: N/As
内存峰值: N/AMB
候选对总数: 26528983
--------------------------------------------------
已生成签名文件大小: 11449630 bytes





In [37]:
from pathlib import Path
import pandas as pd
from tqdm import tqdm

def generate_text_matched_candidates(config: ExperimentConfig):
    """为候选对生成包含文本内容的增强文件"""
    # 加载预处理数据建立文本映射
    text_lookup = pd.read_csv(config.preprocessed_path,
                            usecols=['doc_id', 'clean_text']
                            ).set_index('doc_id')['clean_text'].to_dict()

    # 生成增强后的候选文件路径
    enhanced_path = config.candidates_path.with_name(
        config.candidates_path.name.replace(".csv", "_text.csv"))

    # 分块处理候选对
    chunks = pd.read_csv(config.candidates_path, chunksize=10000)
    header = True

    with tqdm(desc="生成带文本的候选对") as pbar:
        for chunk in chunks:
            # 添加文本列
            chunk['text1'] = chunk['doc_id1'].map(text_lookup)
            chunk['text2'] = chunk['doc_id2'].map(text_lookup)

            # 过滤无效匹配（约0.1%的数据）
            valid_mask = chunk['text1'].notna() & chunk['text2'].notna()
            chunk = chunk[valid_mask].copy()

            # 保存增强数据
            chunk[['doc_id1', 'doc_id2', 'text1', 'text2']].to_csv(
                enhanced_path, 
                mode='w' if header else 'a',
                header=header,
                index=False
            )
            header = False
            pbar.update(len(chunk))

    print(f"增强文件已生成：{enhanced_path}")
    return enhanced_path

def batch_generate_text_matches(base_dir=ExperimentConfig.processed_dir):
    """批量处理所有实验的候选对"""
    base_path = Path(base_dir)
    summary_df = pd.read_csv(base_path / "experiment_summary.csv")

    for _, row in tqdm(summary_df.iterrows(), total=len(summary_df)):
        config = ExperimentConfig(
            num_bands=row['num_bands'],
            band_size=row['band_size'],
            threshold=row['threshold'],
            simhash_bits=row['simhash_bits'],
            ngram_range=eval(row['ngram_range']),  # 将字符串转换为元组
            chunk_size=row['chunk_size']
        )

        if config.candidates_path.exists():
            generate_text_matched_candidates(config)
        else:
            print(f"跳过未找到的候选文件：{config.candidates_path}")

# 使用示例
if __name__ == "__main__":
    # 处理单个实验
    config = ExperimentConfig()  # 使用默认参数
    generate_text_matched_candidates(config)

    # 批量处理所有实验
    batch_generate_text_matches()

生成带文本的候选对: 5530it [00:00, 11469.21it/s]


增强文件已生成：D:\spring2025\UCUG2011-Discrete-Math\Duplication_detecting\processeddata_optimizedsimhash\exp_dc3a27f2da\candidates_dc3a27f2da_text.csv


生成带文本的候选对: 5530it [00:00, 11131.59it/s]


增强文件已生成：D:\spring2025\UCUG2011-Discrete-Math\Duplication_detecting\processeddata_optimizedsimhash\exp_dc3a27f2da\candidates_dc3a27f2da_text.csv


100%|██████████| 1/1 [00:09<00:00,  9.55s/it]


In [None]:
# import time
# from sklearn.model_selection import ParameterGrid
# from tqdm.auto import tqdm
# import pandas as pd

# def main():
#     """主函数：执行多组参数实验"""
#     # 初始化实验运行器
#     runner = ExperimentRunner(base_dir="experiments")
    
#     # 定义参数搜索空间
#     param_grid = {
#         'num_bands': [4, 8, 16],
#         'band_size': [16, 8, 4],  # 需确保 num_bands * band_size = simhash_bits
#         'threshold': [0.7, 0.75, 0.8],
#         'simhash_bits': [64],     # 固定参数示例
#         'ngram_range': [(1,3), (2,4)],
#         'chunk_size': [10000, 50000]
#     }

#     # 生成有效参数组合
    # valid_params = []
    # for params in ParameterGrid(param_grid):
    #     # 跳过无效组合（例如 num_bands * band_size != simhash_bits）
    #     if params['num_bands'] * params['band_size'] != params['simhash_bits']:
    #         continue
    #     valid_params.append(params)
    
    # print(f"总实验数: {len(valid_params)}")
    
#     # 准备结果表格
#     result_df = pd.DataFrame(columns=[
#         'param_hash', 'num_bands', 'band_size', 'threshold',
#         'simhash_bits', 'ngram_range', 'chunk_size',
#         'total_candidates', 'valid_candidates', 'duplicate_rate',
#         'time_cost', 'memory_peak'
#     ])
    
#     # 执行实验
#     for i, params in enumerate(tqdm(valid_params, desc="Running Experiments")):
#         try:
#             # 创建配置对象
#             config = ExperimentConfig(
#                 num_bands=params['num_bands'],
#                 band_size=params['band_size'],
#                 threshold=params['threshold'],
#                 simhash_bits=params['simhash_bits'],
#                 ngram_range=params['ngram_range'],
#                 chunk_size=params['chunk_size']
#             )
            
#             # 运行实验
#             start_time = time.time()
#             runner.run_experiment(config)
#             elapsed = time.time() - start_time
            
#             # 收集结果
#             result_row = {
#                 'param_hash': config.param_hash,
#                 'num_bands': config.num_bands,
#                 'band_size': config.band_size,
#                 'threshold': config.threshold,
#                 'simhash_bits': config.simhash_bits,
#                 'ngram_range': str(config.ngram_range),  # 转换为字符串方便存储
#                 'chunk_size': config.chunk_size,
#                 'total_candidates': runner.current_results['total_candidates'],
#                 'valid_candidates': runner.current_results['valid_candidates'],
#                 'duplicate_rate': runner.current_results['duplicate_rate'],
#                 'time_cost': elapsed,
#                 'memory_peak': max([log['peak_memory_mb'] for log in config.performance_log])
#             }
            
#             # 保存到表格
#             result_df = pd.concat([result_df, pd.DataFrame([result_row])], ignore_index=True)
            
#             # 实时保存进度
#             result_df.to_csv("experiments/summary.csv", index=False)
            
#         except Exception as e:
#             print(f"\n参数组合 {params} 执行失败: {str(e)}")
#             continue
    
#     # 分析最佳参数
#     print("\n===== 实验结果分析 =====")
#     print("耗时最短配置:")
#     print(result_df.loc[result_df['time_cost'].idxmin()][['num_bands', 'band_size', 'time_cost']])
    
#     print("\n重复率最高配置:")
#     print(result_df.loc[result_df['duplicate_rate'].idxmax()][['threshold', 'duplicate_rate']])
    
#     # 生成可视化报告
#     generate_report(result_df)

# def generate_report(df):
#     """生成可视化报告（示例）"""
#     import matplotlib.pyplot as plt
    
#     # 耗时与分块大小关系
#     plt.figure(figsize=(10,6))
#     for ngram in df['ngram_range'].unique():
#         subset = df[df['ngram_range'] == ngram]
#         plt.scatter(subset['chunk_size'], subset['time_cost'], label=ngram)
#     plt.xlabel('Chunk Size')
#     plt.ylabel('Time Cost (s)')
#     plt.legend()
#     plt.savefig("experiments/time_vs_chunk.png")
    
#     # 内存使用分布
#     plt.figure(figsize=(10,6))
#     df['memory_peak'].plot(kind='hist', bins=20)
#     plt.xlabel('Peak Memory Usage (MB)')
#     plt.savefig("experiments/memory_dist.png")

# if __name__ == "__main__":
#     main()