In [1]:
import pyarrow.parquet as pq
import json
from typing import List, Dict, Generator
from tqdm import tqdm
from datasets import Dataset
import gc

# HDFS上Parquet文件的路径
hdfs_path = "viewfs://hadoop-lt-cluster/home/mmu_llm/dw/mmu_llm.db/customjtmath_2013_20/type=normal/part-04999-626445f5-ee23-4a80-b0bd-e35648f16988.c000.snappy.parquet"

def parse_json_content(json_str: str) -> str:
    try:
        json_data = json.loads(json_str)
        return json_data.get('content', '')
    except json.JSONDecodeError:
        return ''

def read_and_parse_parquet(file_path: str, max_chunks: int = None, max_length: int = 50) -> Generator[Dict[str, str], None, None]:
    try:
        parquet_file = pq.ParquetFile(file_path)
        print(f"文件包含 {parquet_file.num_row_groups} 个行组")
        print(f"文件模式: {parquet_file.schema}")
        
        for i in tqdm(range(min(max_chunks or float('inf'), parquet_file.num_row_groups))):
            table = parquet_file.read_row_group(i)
            df = table.to_pandas()
            if 'text' in df.columns:
                for text in df['text']:
                    content = parse_json_content(text)[:max_length]
                    if content:
                        yield {'content': content}
            print(f"处理完第 {i+1} 个行组")
            if max_chunks and i + 1 >= max_chunks:
                print(f"已达到指定的最大块数 {max_chunks}，停止读取")
                break
    except Exception as e:
        print(f"读取文件时出错: {e}")

# 读取并解析数据
parsed_data = list(read_and_parse_parquet(hdfs_path, max_chunks=1))[:10000]

if parsed_data:
    print(f"\n成功读取并解析数据")
    print(f"总共解析的数据条数: {len(parsed_data)}")
    print("前5条解析后的内容:")
    for item in parsed_data[:5]:
        print(item['content'][:100] + '...')  # 只打印每条内容的前100个字符

    # 将解析后的数据转换为Hugging Face Dataset格式
    dataset = Dataset.from_list(parsed_data)

    # 删除不再需要的变量以释放内存
    del parsed_data
    gc.collect()  # 强制进行垃圾回收

else:
    print("无法读取或解析数据")


  from .autonotebook import tqdm as notebook_tqdm
24/06/25 11:39:30 WARN util.NativeCodeLoader main: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/25 11:39:30 WARN shortcircuit.DomainSocketFactory main: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
24/06/25 11:39:30 INFO speed4j pool-simple-buffer-trigger-thread-[perf]: Statistics from 2024-06-25 11:38:30 to 2024-06-25 11:39:30
24/06/25 11:39:30 INFO speed4j pool-simple-buffer-trigger-thread-[perf]: Tag                                                           Avg(ms)      Min      Max  Std Dev     95th     99th   99.5th   Count
24/06/25 11:39:30 INFO speed4j pool-simple-buffer-trigger-thread-[perf]: dataarch.hdfs.suzhou03.mmu_llm.12755-dtmachine.2.6.0U60.3.4-cdh5.10.0-CLIENT-RELEASE.hadoop-lt-cluster     0.00     0.00     0.00     0.00     0.00     0.00     0.00       1
24/06/25 11:39:30 INFO speed4j pool-simple-buffer-trigger-thre

文件包含 7 个行组
文件模式: <pyarrow._parquet.ParquetSchema object at 0x7ff2c487f6c0>
required group field_id=-1 spark_schema {
  optional binary field_id=-1 text (String);
}



  0%|          | 0/1 [00:00<?, ?it/s]24/06/25 11:39:31 WARN hdfs.DFSClient main: hedgedFetchBlockByteRange waited 50ms to read from DatanodeInfoWithStorage[10.80.139.99:50010,DS-99e35677-7266-4195-ace9-3f3c1c8ca3e7,DISK] LocatedBlock{BP-1561302996-10.46.134.41-1572878936413:blk_54524707756_53542890497; getBlockSize()=268435456; corrupt=false; offset=0; activeIndex=3; locs=[DatanodeInfoWithStorage[10.80.139.99:50010,DS-99e35677-7266-4195-ace9-3f3c1c8ca3e7,DISK], DatanodeInfoWithStorage[10.80.122.216:50010,DS-122a497f-aa10-4676-8f3b-ccf3bc289174,DISK], DatanodeInfoWithStorage[10.80.139.102:50010,DS-a87049f5-11b7-4f0e-8be0-d817ee9b1a19,DISK]]} 4 145656881; spawning hedged read
  0%|          | 0/1 [00:05<?, ?it/s]

处理完第 1 个行组
已达到指定的最大块数 1，停止读取

成功读取并解析数据
总共解析的数据条数: 10000
前5条解析后的内容:
! 07/24/2000 mhamer /tmp/l-1-81-nc.onoff.bathnav! ...
! 07/24/2000 mhamer /tmp/l-4-79-sc.onoff.bathgravn...
! 07/24/2000 mhamer /tmp/l-7-77-wg.onoff.multichan...
! Zum Reiherhorst 32, Stelle Jenny
Ahoi,
wollte no...
! thread theory ♥
« previous entry | next entry »
...





In [2]:
import pyarrow.parquet as pq
import json
from typing import List, Dict, Generator
from tqdm import tqdm
from datasets import Dataset
import matplotlib.pyplot as plt
from cycler import cycler
from src.text_clustering import ClusterClassifier
import os
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from datasets import config
import gc

# Configure HuggingFace datasets cache
config.HF_DATASETS_CACHE = "/mmu_nlp_hdd/suzhou03/data/model_zoo/hugging_face/datasets/cache"

# Create ./data directory (if it doesn't exist)
data_dir = "./data"
os.makedirs(data_dir, exist_ok=True)

# Assume dataset is already loaded and contains a 'content' column
texts = dataset["content"]

# Set custom color scheme
colors = [
    "#0F0A0A", "#FF6600", "#FFBE00", "#496767", "#87A19E",
    "#FF9200", "#0F3538", "#F8E08E", "#0F2021", "#FAFAF0"
]
plt.rcParams['axes.prop_cycle'] = cycler(color=colors)

# Create ClusterClassifier instance
cc = ClusterClassifier(embed_device="cuda")  # Use "cuda" if GPU is available, else use "cpu"

# Function to embed texts in batches
def batch_embed_texts(texts, batch_size=1000):
    all_embs = []
    for i in tqdm(range(0, len(texts), batch_size)):
        batch_texts = texts[i:i + batch_size]
        embs = cc.embed(batch_texts)
        all_embs.append(embs)
        gc.collect()  # Force garbage collection to free memory
    return np.vstack(all_embs)

# Embed texts in batches
print("Embedding texts...")
embs = batch_embed_texts(texts)

# Perform clustering on the combined embeddings
print("Starting text clustering...")
clustering_result = cc.cluster(embs)

# Print the clustering result to understand its structure
print(f"Clustering result: {clustering_result}")

# Assuming the clustering_result contains embeddings, labels, and summaries
labels = clustering_result[0]
summaries = clustering_result[1]

# Display results
print("Clustering complete, generating visualization...")

def custom_show(embs, labels):
    # Use PCA to reduce embeddings to 2D if they're not already
    if embs.shape[1] > 2:
        pca = PCA(n_components=2)
        embs_2d = pca.fit_transform(embs)
    else:
        embs_2d = embs

    fig, ax = plt.subplots(figsize=(12, 8), dpi=300)
    
    unique_labels = np.unique(labels)
    for i, label in enumerate(unique_labels):
        mask = labels == label
        ax.scatter(embs_2d[mask, 0], embs_2d[mask, 1], c=[colors[i % len(colors)]],
                   label=f'Cluster {label}', s=0.75, alpha=0.8)
    
    ax.legend()
    ax.set_title("Text Clustering Visualization")
    plt.show()

# Call the custom show function
custom_show(embs, labels)

# Save results
save_path = os.path.join(data_dir, "cc_parquet_data")
cc.save(save_path)
print(f"Clustering model saved to {save_path}")

# Print and save cluster summaries
print("\nCluster Summaries:")
summary_path = os.path.join(data_dir, "cluster_summaries.txt")
with open(summary_path, 'w', encoding='utf-8') as f:
    for i, summary in enumerate(summaries):
        print(f"Cluster {i}: {summary}")
        f.write(f"Cluster {i}: {summary}\n")
print(f"Cluster summaries saved to {summary_path}")

# Example: Predict clusters for new texts
new_texts = ["This is a new math problem", "This is another text about history"]
cluster_labels, embeddings = cc.infer(new_texts, top_k=1)
print("\nCluster labels for new texts:")
for text, label in zip(new_texts, cluster_labels):
    if isinstance(label, (list, np.ndarray)):
        cluster = label[0]
    else:
        cluster = label
    print(f"Text: '{text}' -> Cluster: {cluster}")

# Save clustering results to CSV file
results_df = pd.DataFrame({
    'text': texts,
    'cluster': labels
})
results_path = os.path.join(data_dir, "clustering_results.csv")
results_df.to_csv(results_path, index=False)
print(f"\nClustering results saved to {results_path}")

# Save visualization image
plt.savefig(os.path.join(data_dir, "cluster_visualization.png"))
print(f"Cluster visualization saved to {os.path.join(data_dir, 'cluster_visualization.png')}")

print("\nText clustering analysis complete. All output files have been saved to the ./data directory.")


INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-MiniLM-L6-v2


Embedding texts...


Batches: 100%|██████████| 16/16 [00:01<00:00, 10.70it/s]
Batches: 100%|██████████| 16/16 [00:00<00:00, 82.78it/s]
Batches: 100%|██████████| 16/16 [00:00<00:00, 78.71it/s]
Batches: 100%|██████████| 16/16 [00:00<00:00, 79.02it/s]
Batches: 100%|██████████| 16/16 [00:00<00:00, 75.67it/s]
Batches: 100%|██████████| 16/16 [00:00<00:00, 71.19it/s]
Batches: 100%|██████████| 16/16 [00:00<00:00, 75.68it/s]
Batches: 100%|██████████| 16/16 [00:00<00:00, 82.71it/s]
Batches: 100%|██████████| 16/16 [00:00<00:00, 83.46it/s]
Batches: 100%|██████████| 16/16 [00:00<00:00, 82.69it/s]
100%|██████████| 10/10 [00:05<00:00,  1.90it/s]
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Starting text clustering...
Using DBSCAN (eps, nim_samples)=((0.08,), 50)


: 

: 