## 导入必要库

In [2]:
import os
import pandas as pd
import numpy as np
import ast
import re
import gc
from pathlib import Path
from cleantext import clean

## 定义数据目录与文件列表

In [3]:
data_dir = Path('./output')  # 根据实际路径进行调整

merged_files = [
    "20220227-0302_merged.csv.gz",
    "20220330-0405_merged.csv.gz",
    "20220518-0524_merged.csv.gz",
    "20220623-0701_merged.csv.gz",
    "20220930-1006_merged.csv.gz",
    "20221109-1115_merged.csv.gz",
    "20230301-0305_merged.csv.gz",
    "20230518-0524_merged.csv.gz"
]

# 创建一个输出目录，用于存放清洗后的文件
clean_output_dir = data_dir / "cleaned"
clean_output_dir.mkdir(exist_ok=True)

## 定义统一的字段列表

In [4]:
all_columns = [
    "userid", "username", "acctdesc", "location", "following", "followers", "totaltweets",
    "usercreatedts", "tweetid", "tweetcreatedts", "retweetcount", "text", "hashtags",
    "language", "coordinates", "favorite_count", "is_retweet",
    "original_tweet_id", "original_tweet_userid", "original_tweet_username",
    "in_reply_to_status_id", "in_reply_to_user_id", "in_reply_to_screen_name",
    "is_quote_status", "quoted_status_id", "quoted_status_userid", "quoted_status_username",
    "extractedts"
]

## 数据加载与字段补齐

In [5]:
def load_and_unify_columns(filepath, all_cols):
    # 读取CSV
    df = pd.read_csv(filepath, compression='gzip', encoding='utf-8', engine='python')

    # 检查缺少的列并补齐
    for col in all_cols:
        if col not in df.columns:
            # 根据列类型选择合适的填充值
            if col in ["is_retweet", "is_quote_status"]:
                # 布尔值的列缺失时，可以填False
                df[col] = False
            else:
                # 对于id类字段可用NaN或空字符串，此处用NaN代表未知
                df[col] = np.nan

    # 将列顺序统一
    df = df[all_cols]
    return df

## 数据清洗

In [None]:
# 预编译正则表达式，用于提取 'text' 字段
HASHTAG_TEXT_RE = re.compile(r'["\']text["\']\s*:\s*["\']([^"\']+)["\']', re.IGNORECASE)

def parse_and_clean_hashtags_regex(hashtags_str):
    if pd.isna(hashtags_str) or not hashtags_str.strip():
        return []
    
    # 使用预编译的正则表达式查找所有匹配的 'text' 值
    matches = HASHTAG_TEXT_RE.findall(hashtags_str)
    
    # 将所有匹配的标签转换为小写，并去除前后空格
    cleaned_hashtags = {tag.strip().lower() for tag in matches if tag.strip()}
    
    return list(cleaned_hashtags)

def clean_tweet_text(text):
    cleaned = clean(
        text,
        fix_unicode=True,  # 修正潜在的Unicode问题
        to_ascii=True,  # 转为ASCII字符，有助于剔除非英文字符
        lower=True,  # 转小写
        no_line_breaks=True,  # 移除换行符
        no_urls=True,  # 移除URL
        no_emails=True,  # 移除email地址
        no_phone_numbers=True,  # 移除电话号码
        no_numbers=False,  # 保留数字(根据需求可改为True移除)
        no_digits=False,  # 保留数字字符
        no_currency_symbols=True,  # 移除货币符号
        no_punct=True,  # 移除标点
        replace_with_url="",
        replace_with_email="",
        replace_with_phone_number="",
        replace_with_number="",
        replace_with_currency_symbol="",
        no_emoji=False,  # 移除表情符号
        lang="en"  # 指定为英文
    )
    # 清理多余空格
    cleaned = re.sub(r'\s+', ' ', cleaned).strip()
    return cleaned


for f in merged_files:
    filepath = data_dir / 'merged' / f
    print(f"处理文件: {filepath.name}")

    # 加载并补齐列
    #df = load_and_unify_columns(filepath, all_columns)

    # 读取数据
    df = pd.read_csv(filepath, compression='gzip', encoding='utf-8', engine='python')

    # 只分析英文数据
    df = df[df['language'] == 'en']

    # 去重
    df.drop_duplicates(subset='tweetid', keep='first', inplace=True)

    # 时间戳转换
    df['tweetcreatedts'] = pd.to_datetime(df['tweetcreatedts'], errors='coerce')
    df['extractedts'] = pd.to_datetime(df['extractedts'], errors='coerce')

    # 删除无效数据：tweetcreatedts为空或text为空
    df = df.dropna(subset=['tweetcreatedts', 'text'])

    # 使用cleantext清理text字段
    df['text'] = df['text'].apply(clean_tweet_text)
    df = df[df['text'].str.strip() != '']
    
    # 对无意义的空文本再次过滤
    df = df[df['text'].str.strip() != '']

    # 对hashtags进行提取和规范化
    df['hashtags'] = df['hashtags'].apply(parse_and_clean_hashtags_regex)

    # 重置索引
    df.reset_index(drop=True, inplace=True)

    # 保存清洗后的数据
    clean_file = clean_output_dir / f.replace('_merged.csv.gz', '_cleaned.csv.gz')
    df.to_csv(clean_file, index=False, compression='gzip')
    print(f"已保存清洗后的数据到: {clean_file}")

    del df
    gc.collect()

print("所有文件的清洗与预处理完成。")

处理文件: 20220227-0302_merged.csv.gz
