In [1]:
# 导入必要的库
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, TimestampType, ArrayType, BooleanType
from pyspark.ml.feature import Tokenizer, StopWordsRemover
import re
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

print("库导入完成！")


库导入完成！


In [2]:
# 初始化 Spark Session
spark = SparkSession.builder \
    .appName("TweetAnalysis_DataCleaning") \
    .master("local[*]") \
    .config("spark.driver.memory", "16g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

sc = spark.sparkContext
print(f"Spark Version: {spark.version}")
print(f"Available cores: {sc.defaultParallelism}")

# 加载10%采样数据（速度更快）
sample_data_path = "/home/jovyan/work/data/processed/the-reddit-climate-change-dataset-comments-ten-percent.parquet"
df_raw = spark.read.parquet(sample_data_path)
df_raw.cache()

print(f"10%采样数据加载完成，共 {df_raw.count():,} 条记录")


Spark Version: 3.5.0
Available cores: 20
10%采样数据加载完成，共 460,070 条记录


In [3]:
# 定义文本清洗函数
def create_text_cleaning_udf():
    """创建文本清洗的UDF函数"""
    def clean_text_func(text):
        if text is None:
            return None
        
        text = str(text)
        
        # 1. 转换为小写
        text = text.lower()
        
        # 2. 去除URL
        text = re.sub(r'https?://\S+|www\.\S+', '', text)
        
        # 3. 去除Reddit特有的格式
        text = re.sub(r'/u/\w+', '', text)  # 去除用户名
        text = re.sub(r'/r/\w+', '', text)  # 去除子版块名
        text = re.sub(r'&gt;', '', text)   # 去除引用符号
        text = re.sub(r'&lt;', '', text)
        text = re.sub(r'&amp;', 'and', text)
        
        # 4. 去除HTML标签
        text = re.sub(r'<.*?>', '', text)
        
        # 5. 去除特殊字符，保留字母、数字、空格和基本标点
        text = re.sub(r'[^a-zA-Z0-9\s\.\,\!\?\;\:]', ' ', text)
        
        # 6. 去除多余的空格
        text = re.sub(r'\s+', ' ', text).strip()
        
        # 7. 过滤过短的文本
        if len(text) < 10:
            return None
            
        return text
    
    return F.udf(clean_text_func, StringType())

# 创建UDF
clean_text_udf = create_text_cleaning_udf()
print("文本清洗UDF创建完成！")


文本清洗UDF创建完成！


In [4]:
# 执行数据清洗流程
print("=== 开始数据清洗流程 ===")

# 1. 添加时间戳列
print("1. 处理时间戳...")
df_cleaned = df_raw.withColumn("timestamp", F.from_unixtime(F.col("created_utc")))

# 2. 去除重复记录（基于id列）
print("2. 去除重复记录...")
initial_count = df_cleaned.count()
df_cleaned = df_cleaned.dropDuplicates(['id'])
after_dedup_count = df_cleaned.count()
print(f"   去重前: {initial_count:,} 条")
print(f"   去重后: {after_dedup_count:,} 条")
print(f"   删除了 {initial_count - after_dedup_count:,} 条重复记录")

# 3. 处理缺失值
print("3. 处理缺失值...")
# 删除body为空的记录（这是我们分析的核心字段）
df_cleaned = df_cleaned.filter(F.col("body").isNotNull())
df_cleaned = df_cleaned.filter(F.col("body") != "")
after_null_count = df_cleaned.count()
print(f"   删除空评论后: {after_null_count:,} 条")

# 4. 应用文本清洗
print("4. 应用文本清洗...")
df_cleaned = df_cleaned.withColumn("cleaned_body", clean_text_udf(F.col("body")))

# 5. 过滤清洗后为空的记录
df_cleaned = df_cleaned.filter(F.col("cleaned_body").isNotNull())
after_text_clean_count = df_cleaned.count()
print(f"   文本清洗后: {after_text_clean_count:,} 条")

print(f"\n总计删除了 {initial_count - after_text_clean_count:,} 条记录")
print(f"清洗完成率: {after_text_clean_count/initial_count*100:.2f}%")


=== 开始数据清洗流程 ===
1. 处理时间戳...
2. 去除重复记录...
   去重前: 460,070 条
   去重后: 460,070 条
   删除了 0 条重复记录
3. 处理缺失值...
   删除空评论后: 460,070 条
4. 应用文本清洗...
   文本清洗后: 459,175 条

总计删除了 895 条记录
清洗完成率: 99.81%


In [5]:
# 文本分词和停用词处理
print("=== 文本分词和停用词处理 ===")

# 1. 分词
print("1. 执行分词...")
tokenizer = Tokenizer(inputCol="cleaned_body", outputCol="tokens_raw")
df_tokenized = tokenizer.transform(df_cleaned)

# 2. 去除停用词
print("2. 去除停用词...")
remover = StopWordsRemover(inputCol="tokens_raw", outputCol="tokens_cleaned")
df_tokenized = remover.transform(df_tokenized)

# 3. 过滤掉分词后为空的记录
df_tokenized = df_tokenized.filter(F.size(F.col("tokens_cleaned")) > 0)

print(f"分词处理后剩余: {df_tokenized.count():,} 条记录")

# 缓存结果
df_tokenized.cache()
print("数据已缓存到内存中")


=== 文本分词和停用词处理 ===
1. 执行分词...
2. 去除停用词...
分词处理后剩余: 459,171 条记录
数据已缓存到内存中


In [6]:
# 数据质量检查
print("=== 数据质量检查 ===")

# 1. 查看清洗前后的对比样本
print("1. 清洗前后对比样本:")
comparison_sample = df_tokenized.select("body", "cleaned_body", "tokens_cleaned").limit(3).collect()

for i, row in enumerate(comparison_sample):
    print(f"\n样本 {i+1}:")
    print(f"原文: {row['body'][:200]}...")
    print(f"清洗后: {row['cleaned_body'][:200]}...")
    print(f"分词结果: {row['tokens_cleaned'][:10]}...")

# 2. 统计信息
print("\n2. 清洗后数据统计:")
print(f"总记录数: {df_tokenized.count():,}")

# 文本长度分布
length_stats = df_tokenized.withColumn("cleaned_length", F.length("cleaned_body")) \
                          .select("cleaned_length") \
                          .describe()
print("\n清洗后文本长度统计:")
length_stats.show()

# 词汇数量分布
token_stats = df_tokenized.withColumn("token_count", F.size("tokens_cleaned")) \
                         .select("token_count") \
                         .describe()
print("分词后词汇数量统计:")
token_stats.show()


=== 数据质量检查 ===
1. 清洗前后对比样本:

样本 1:
原文: Maybe I was unclear, what I mean is, the Republicans lies include terrorists trying to kill us, gays molesting children and breaking up marriages, government g-men taking our guns and creating a polic...
清洗后: maybe i was unclear, what i mean is, the republicans lies include terrorists trying to kill us, gays molesting children and breaking up marriages, government g men taking our guns and creating a polic...
分词结果: ['maybe', 'unclear,', 'mean', 'is,', 'republicans', 'lies', 'include', 'terrorists', 'trying', 'kill']...

样本 2:
原文: Aaarrggh!!!  I be the copyright pyrate.  Free the information ya salty pups.



&lt;&lt;&lt;America is losing the free world" by Gideon Rachman

Published: January 4 2010 20:11 | Last updated: January...
清洗后: aaarrggh!!! i be the copyright pyrate. free the information ya salty pups. america is losing the free world by gideon rachman published: january 4 2010 20:11 last updated: january 4 2010 20:11 ever si...
分词结果: ['aaar

In [8]:
# 保存清洗后的数据
print("=== 保存清洗后的数据 ===")

# 选择需要保存的列
columns_to_save = [
    "id", 
    "`subreddit.name`",  # 使用反引号处理包含点号的列名
    "created_utc", 
    "timestamp",
    "body", 
    "cleaned_body", 
    "tokens_cleaned",
    "sentiment", 
    "score"
]

df_final = df_tokenized.select(*columns_to_save)

# 保存为Parquet格式
output_path = "/home/jovyan/work/data/processed/cleaned_comments.parquet"
print(f"正在保存到: {output_path}")

df_final.write.mode("overwrite").parquet(output_path)

print("数据保存完成！")
print(f"最终数据集包含 {df_final.count():,} 条清洗后的记录")

# 显示最终数据结构
print("\n最终数据结构:")
df_final.printSchema()


=== 保存清洗后的数据 ===
正在保存到: /home/jovyan/work/data/processed/cleaned_comments.parquet
数据保存完成！
最终数据集包含 459,171 条清洗后的记录

最终数据结构:
root
 |-- id: string (nullable = true)
 |-- subreddit.name: string (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- body: string (nullable = true)
 |-- cleaned_body: string (nullable = true)
 |-- tokens_cleaned: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- sentiment: double (nullable = true)
 |-- score: long (nullable = true)



In [None]:
# 最终结果验证和检查
print("=== 最终清洗结果验证 ===")

# 1. 重新加载保存的数据进行验证
print("1. 验证保存的数据文件...")
df_saved = spark.read.parquet("/home/jovyan/work/data/processed/cleaned_comments.parquet")
saved_count = df_saved.count()
print(f"保存的数据记录数: {saved_count:,}")

# 2. 检查数据完整性
print("\n2. 数据完整性检查:")
print("各列的非空值统计:")
for col_name in df_saved.columns:
    # 处理包含点号的列名
    if "." in col_name:
        null_count = df_saved.filter(F.col(f"`{col_name}`").isNull()).count()
    else:
        null_count = df_saved.filter(F.col(col_name).isNull()).count()
    print(f"  {col_name}: {saved_count - null_count:,} 非空值 ({null_count:,} 空值)")

# 3. 随机抽样查看清洗结果
print("\n3. 随机抽样查看清洗结果:")
sample_data = df_saved.sample(0.001, seed=42).select(
    "id", "`subreddit.name`", "body", "cleaned_body", "tokens_cleaned", "sentiment"
).limit(3).collect()

for i, row in enumerate(sample_data):
    print(f"\n样本 {i+1} (ID: {row['id']}):")
    print(f"  子版块: {row['subreddit.name']}")
    print(f"  原文: {row['body'][:100]}...")
    print(f"  清洗后: {row['cleaned_body'][:100]}...")
    print(f"  分词数量: {len(row['tokens_cleaned'])} 个词")
    print(f"  情感得分: {row['sentiment']}")

# 4. 关键统计信息
print("\n4. 关键统计信息:")
stats = df_saved.select(
    F.min("created_utc").alias("earliest_timestamp"),
    F.max("created_utc").alias("latest_timestamp"),
    F.avg("sentiment").alias("avg_sentiment"),
    F.avg(F.size("tokens_cleaned")).alias("avg_tokens"),
    F.avg(F.length("cleaned_body")).alias("avg_text_length")
).collect()[0]

print(f"  时间范围: {stats['earliest_timestamp']} 到 {stats['latest_timestamp']}")
print(f"  平均情感得分: {stats['avg_sentiment']:.4f}")
print(f"  平均分词数量: {stats['avg_tokens']:.1f}")
print(f"  平均文本长度: {stats['avg_text_length']:.1f} 字符")

# 5. 子版块分布
print("\n5. 子版块数据分布:")
subreddit_dist = df_saved.groupBy("`subreddit.name`").count().orderBy(F.desc("count")).limit(10)
print("Top 10 子版块:")
subreddit_dist.show(10, False)

print("✅ 数据清洗流程完成！数据已准备好进行后续分析。")


=== 最终清洗结果验证 ===
1. 验证保存的数据文件...
保存的数据记录数: 459,171

2. 数据完整性检查:
各列的非空值统计:
  id: 459,171 非空值 (0 空值)
  subreddit.name: 459,171 非空值 (0 空值)
  created_utc: 459,171 非空值 (0 空值)
  timestamp: 459,171 非空值 (0 空值)
  body: 459,171 非空值 (0 空值)
  cleaned_body: 459,171 非空值 (0 空值)
  tokens_cleaned: 459,171 非空值 (0 空值)
  sentiment: 454,264 非空值 (4,907 空值)
  score: 459,171 非空值 (0 空值)

3. 随机抽样查看清洗结果:

样本 1 (ID: ebia1dn):
  子版块: politics
  原文: Climate change is a chinese hoax!!

...
  清洗后: climate change is a chinese hoax!!...
  分词数量: 4 个词
  情感得分: -0.3987

样本 2 (ID: ec7hk8x):
  子版块: politicalhumor
  原文: Frankly, America's love affair with moderates in general is what's scaring the shit out of me with c...
  清洗后: frankly, america s love affair with moderates in general is what s scaring the shit out of me with c...
  分词数量: 56 个词
  情感得分: 0.8381

样本 3 (ID: een386u):
  子版块: viennacircle
  原文: Climate change rates drop to 0%...
  清洗后: climate change rates drop to 0...
  分词数量: 5 个词
  情感得分: -0.2732

4. 关键统计信息:
  时间范

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 49566)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/usr/local/spark/python/pyspark/accumulators.py", line 271, in accum_updates
    num_updates =