# 00 数据接入与类型规范

目标：将原始 CSV 流式加载、统一字段类型、合并作者元数据，并写入 Parquet 缓存。

In [1]:
import sys
from pathlib import Path

# 将项目根目录添加到 Python 路径
project_root = Path('/workspace')
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))
    
print(f"✅ Python 路径已配置: {project_root}")

✅ Python 路径已配置: /workspace


## 步骤 1: 加载原始数据

In [2]:
from src import io, analysis, profiling
import polars as pl

# 加载原始推文数据 (LazyFrame)
raw_lf = io.scan_raw_tweets()
print(f"📊 数据 schema:")
print(raw_lf.collect_schema())

📊 数据 schema:
Schema([('pseudo_id', Int64), ('text', String), ('retweetCount', Int64), ('replyCount', Int64), ('likeCount', Int64), ('quoteCount', Int64), ('viewCount', Int64), ('bookmarkCount', Int64), ('createdAt', String), ('lang', String), ('isReply', String), ('pseudo_conversationId', Int64), ('pseudo_inReplyToUsername', String), ('pseudo_author_userName', Int64), ('quoted_pseudo_id', String), ('author_isBlueVerified', String)])


## 步骤 2: 数据清洗与类型规范化

In [3]:
# 收集数据并规范化布尔列
df = raw_lf.collect()
print(f"✅ 数据加载完成: {df.height:,} 行, {df.width} 列")

# 规范化布尔列
bool_columns = ['isReply', 'author_isBlueVerified']
df_cleaned = analysis.normalize_boolean_columns(df, bool_columns)
print(f"✅ 布尔列规范化完成: {bool_columns}")

✅ 数据加载完成: 508,954 行, 16 列
✅ 布尔列规范化完成: ['isReply', 'author_isBlueVerified']


## 步骤 3: 数据质量检查

In [4]:
# 缺失值检查
key_cols = ['pseudo_id', 'pseudo_author_userName', 'createdAt', 'text']
missing_stats = profiling.missingness_summary(df_cleaned, key_cols)
print("📊 缺失值统计 (top 5):")
print(missing_stats.head(5))

# 重复检查
dupes = profiling.duplicate_check(df_cleaned, ['pseudo_id'])
print(f"\n🔍 重复推文数: {dupes.height}")

📊 缺失值统计 (top 5):
shape: (5, 4)
┌──────────────┬────────────┬────────────┬────────┐
│ column       ┆ null_count ┆ null_ratio ┆ is_key │
│ ---          ┆ ---        ┆ ---        ┆ ---    │
│ str          ┆ i64        ┆ f64        ┆ bool   │
╞══════════════╪════════════╪════════════╪════════╡
│ pseudo_id    ┆ 0          ┆ 0.0        ┆ true   │
│ text         ┆ 0          ┆ 0.0        ┆ true   │
│ retweetCount ┆ 0          ┆ 0.0        ┆ false  │
│ replyCount   ┆ 0          ┆ 0.0        ┆ false  │
│ likeCount    ┆ 0          ┆ 0.0        ┆ false  │
└──────────────┴────────────┴────────────┴────────┘

🔍 重复推文数: 50


## 步骤 4: 合并作者元数据

In [5]:
# 加载作者信息
authors_df = io.read_well_known_authors()
print(f"📋 作者元数据: {authors_df.height} 位作者")

# 类型转换：将 pseudo_author_userName 转为字符串以匹配 author_userName
df_with_str_author = df_cleaned.with_columns(
    pl.col('pseudo_author_userName').cast(pl.Utf8).alias('pseudo_author_userName')
)

# 合并推文和作者数据（基于用户名）
df_with_authors = df_with_str_author.join(
    authors_df, 
    left_on='pseudo_author_userName', 
    right_on='author_userName',
    how='left'
)
print(f"✅ 数据合并完成: {df_with_authors.height:,} 行, {df_with_authors.width} 列")

📋 作者元数据: 4242 位作者
✅ 数据合并完成: 508,954 行, 23 列


## 步骤 5: 写入 Parquet 缓存

In [6]:
# 写入 parquet 文件供后续分析使用
output_path = Path("parquet/tweets_enriched.parquet")
io.materialize_parquet(df_with_authors.lazy(), output_path)

print(f"✅ Parquet 文件已生成: {output_path}")
print(f"📁 文件大小: {output_path.stat().st_size / 1024 / 1024:.2f} MB")

# 列出所有生成的 parquet 文件
print("\n📂 生成的 Parquet 文件:")
for f in io.list_parquet_files():
    print(f"  - {f}")

✅ Parquet 文件已生成: parquet/tweets_enriched.parquet
📁 文件大小: 58.00 MB

📂 生成的 Parquet 文件:
  - /workspace/src/notebooks/parquet/content_analysis.parquet
  - /workspace/src/notebooks/parquet/network_centrality.parquet
  - /workspace/src/notebooks/parquet/network_edges.parquet
  - /workspace/src/notebooks/parquet/topic_distribution.parquet
  - /workspace/src/notebooks/parquet/tweets_anomalies.parquet
  - /workspace/src/notebooks/parquet/tweets_daily.parquet
  - /workspace/src/notebooks/parquet/tweets_enriched.parquet
  - /workspace/src/notebooks/parquet/tweets_rolling.parquet
  - /workspace/src/notebooks/parquet/verified_comparison.parquet


## ✅ 数据接入完成！

接下来可以运行 `01_temporal_dynamics.ipynb` 进行时间序列分析。