In [25]:
import polars as pl
import sys

def inspect_parquet_file(file_path, sample_size=20000):
    """
    查看parquet文件的数据格式并提取样本
    
    参数:
    file_path: parquet文件路径
    sample_size: 提取的样本大小
    
    返回:
    sample: 提取的数据样本
    """
    print(f"\n=== 查看文件: {file_path} ===")
    
    # 使用lazy loading读取文件
    lazy_df = pl.scan_parquet(file_path)
    
    # 计算文件总数据量（高效方法，不加载所有数据）
    print("\n计算文件总数据量...")
    total_rows = lazy_df.select(pl.count()).collect()[0, 0]
    print(f"文件总数据量: {total_rows} 行")

    # 提取样本数据
    print(f"\n提取前{sample_size}行数据...")
    sample = lazy_df.head(sample_size).collect()
    
    # 打印样本信息
    print(f"样本数据行数: {len(sample)}")
    print(f"样本数据列数: {len(sample.columns)}")
    
    return sample

def save_to_csv(df, file_path):
    """
    保存DataFrame到CSV文件，兼容不同版本的Polars
    
    参数:
    df: DataFrame对象
    file_path: 保存路径
    """
    try:
        # 尝试使用to_csv方法
        df.to_csv(file_path)
        print(f"成功保存到: {file_path}")
    except AttributeError:
        # 如果to_csv方法不存在，尝试其他方法
        print("to_csv方法不存在，尝试其他保存方式...")
        try:
            # 尝试使用write_csv方法（旧版本Polars）
            df.write_csv(file_path)
            print(f"成功保存到: {file_path}")
        except AttributeError:
            # 如果还是失败，转换为Pandas DataFrame再保存
            print("尝试转换为Pandas DataFrame后保存...")
            pd_df = df.to_pandas()
            pd_df.to_csv(file_path, index=False)
            print(f"成功保存到: {file_path}")

if __name__ == "__main__":
    # 打印Polars版本
    print(f"Polars版本: {pl.__version__}")
    
    # 定义文件路径
    train_file = "/home/zhuangzhuohan/sleep_data/train_series.parquet"
    test_file = "/home/zhuangzhuohan/sleep_data/test_series.parquet"
    
    # 查看训练数据
    train_sample = inspect_parquet_file(train_file)
    
    # 查看测试数据
    test_sample = inspect_parquet_file(test_file)
    
    # 可选：保存样本为CSV文件以便后续查看
    save_csv = input("\n是否将样本保存为CSV文件？(y/n): ")
    if save_csv.lower() == 'y':
        save_to_csv(train_sample, "/home/zhuangzhuohan/sleep_data/train_series_parquet_sample.csv")
        save_to_csv(test_sample, "/home/zhuangzhuohan/sleep_data/test_series_parquet_sample.csv")
        print("样本已保存为CSV文件")

Polars版本: 1.37.1

=== 查看文件: /home/zhuangzhuohan/sleep_data/train_series.parquet ===

计算文件总数据量...
文件总数据量: 127946340 行

提取前20000行数据...
样本数据行数: 20000
样本数据列数: 5

=== 查看文件: /home/zhuangzhuohan/sleep_data/test_series.parquet ===

计算文件总数据量...
文件总数据量: 450 行

提取前20000行数据...
样本数据行数: 450
样本数据列数: 5


(Deprecated in version 0.20.5)
  total_rows = lazy_df.select(pl.count()).collect()[0, 0]


In [26]:
import polars as pl

def clean_train_events(input_file, output_file):
    """
    清理train_events.csv文件，移除没有step和timestamp数据的行，不修改原始文件
    
    参数:
    input_file: 输入文件路径
    output_file: 输出文件路径
    """
    print(f"开始处理文件: {input_file}")
    
    # 读取CSV文件（不修改原始文件）
    df = pl.read_csv(input_file)
    
    # 打印原始数据信息
    print(f"原始数据行数: {len(df)}")
    print(f"原始数据列数: {len(df.columns)}")
    
    # 检查每列的空值情况
    print("\n各列空值情况:")
    for col in df.columns:
        null_count = df[col].is_null().sum()
        print(f"  {col}: {null_count} 个空值")
    
    # 过滤掉step或timestamp为空的行
    filtered_df = df.filter(
        pl.col('step').is_not_null() & pl.col('timestamp').is_not_null()
    )
    
    # 打印过滤后的数据信息
    print(f"\n过滤后数据行数: {len(filtered_df)}")
    print(f"移除的行数: {len(df) - len(filtered_df)}")
    
    # 保存过滤后的数据到新文件（不修改原始文件）
    filtered_df.write_csv(output_file)
    print(f"\n过滤后的数据已保存到: {output_file}")
    
    # 打印前5行数据，确认格式正确
    print("\n过滤后的数据前5行:")
    print(filtered_df.head())

if __name__ == "__main__":
    # 定义文件路径
    input_file = "/home/zhuangzhuohan/sleep_data/train_events.csv"
    output_file = "/home/zhuangzhuohan/sleep_data/train_events_clean.csv"
    
    # 执行清理操作
    clean_train_events(input_file, output_file)

开始处理文件: /home/zhuangzhuohan/sleep_data/train_events.csv
原始数据行数: 14508
原始数据列数: 5

各列空值情况:
  series_id: 0 个空值
  night: 0 个空值
  event: 0 个空值
  step: 4923 个空值
  timestamp: 4923 个空值

过滤后数据行数: 9585
移除的行数: 4923

过滤后的数据已保存到: /home/zhuangzhuohan/sleep_data/train_events_clean.csv

过滤后的数据前5行:
shape: (5, 5)
┌──────────────┬───────┬────────┬───────┬──────────────────────────┐
│ series_id    ┆ night ┆ event  ┆ step  ┆ timestamp                │
│ ---          ┆ ---   ┆ ---    ┆ ---   ┆ ---                      │
│ str          ┆ i64   ┆ str    ┆ i64   ┆ str                      │
╞══════════════╪═══════╪════════╪═══════╪══════════════════════════╡
│ 038441c925bb ┆ 1     ┆ onset  ┆ 4992  ┆ 2018-08-14T22:26:00-0400 │
│ 038441c925bb ┆ 1     ┆ wakeup ┆ 10932 ┆ 2018-08-15T06:41:00-0400 │
│ 038441c925bb ┆ 2     ┆ onset  ┆ 20244 ┆ 2018-08-15T19:37:00-0400 │
│ 038441c925bb ┆ 2     ┆ wakeup ┆ 27492 ┆ 2018-08-16T05:41:00-0400 │
│ 038441c925bb ┆ 3     ┆ onset  ┆ 39996 ┆ 2018-08-16T23:03:00-0400 │
└────────────

In [27]:
import polars as pl

def remove_timezone_from_timestamp(input_file, output_file):
    """
    移除train_events_clean.csv文件中timestamp字段的时区信息
    
    参数:
    input_file: 输入文件路径
    output_file: 输出文件路径
    """
    print(f"开始处理文件: {input_file}")
    
    # 读取CSV文件
    df = pl.read_csv(input_file)
    
    # 打印原始数据信息
    print(f"原始数据行数: {len(df)}")
    print(f"原始数据列数: {len(df.columns)}")
    
    # 处理timestamp字段，去除时区信息
    # 方法: 使用字符串替换，移除末尾的时区信息
    processed_df = df.with_columns(
        pl.col('timestamp').str.replace(r'[-+]\d{4}$', '').alias('timestamp')
    )
    
    # 保存处理后的数据到新文件
    processed_df.write_csv(output_file)
    print(f"\n处理后的数据已保存到: {output_file}")

if __name__ == "__main__":
    # 定义文件路径
    input_file = "/home/zhuangzhuohan/sleep_data/train_events_clean.csv"
    output_file = "/home/zhuangzhuohan/sleep_data/train_events_clean_no_timezone.csv"
    
    # 执行处理操作
    remove_timezone_from_timestamp(input_file, output_file)

开始处理文件: /home/zhuangzhuohan/sleep_data/train_events_clean.csv
原始数据行数: 9585
原始数据列数: 5

处理后的数据已保存到: /home/zhuangzhuohan/sleep_data/train_events_clean_no_timezone.csv


In [28]:
import polars as pl

def remove_timezone_from_parquet(input_file, output_file):
    """
    移除parquet文件中timestamp字段的时区信息
    
    参数:
    input_file: 输入parquet文件路径
    output_file: 输出parquet文件路径
    """
    print(f"开始处理文件: {input_file}")
    
    # 使用lazy loading读取parquet文件
    lazy_df = pl.scan_parquet(input_file)
    
    # 获取文件架构
    schema = lazy_df.collect_schema()
    print("文件架构:")
    for col_name, col_type in schema.items():
        print(f"  {col_name}: {col_type}")
    
    # 计算文件总数据量
    total_rows = lazy_df.select(pl.count()).collect()[0, 0]
    print(f"文件总数据量: {total_rows} 行")
    
    # 处理数据，移除时区信息
    # 使用正则表达式替换掉末尾的时区信息（如-0400, +0530等）
    processed_lazy_df = lazy_df.with_columns(
        pl.col('timestamp').str.replace(r'[-+]\d{4}$', '').alias('timestamp')
    )
    
    # 收集并保存处理后的数据
    print("处理数据并保存...")
    processed_df = processed_lazy_df.collect()
    processed_df.write_parquet(output_file)
    
    print(f"\n处理后的数据已保存到: {output_file}")

if __name__ == "__main__":
    # 定义文件路径
    train_input = "/home/zhuangzhuohan/sleep_data/train_series.parquet"
    train_output = "/home/zhuangzhuohan/sleep_data/train_series_no_timezone.parquet"
    
    test_input = "/home/zhuangzhuohan/sleep_data/test_series.parquet"
    test_output = "/home/zhuangzhuohan/sleep_data/test_series_no_timezone.parquet"
    
    # 处理训练数据
    print("=== 处理训练数据 ===")
    remove_timezone_from_parquet(train_input, train_output)
    
    # 处理测试数据
    print("\n=== 处理测试数据 ===")
    remove_timezone_from_parquet(test_input, test_output)
    
    print("\n所有文件处理完成！")

=== 处理训练数据 ===
开始处理文件: /home/zhuangzhuohan/sleep_data/train_series.parquet
文件架构:
  series_id: String
  step: UInt32
  timestamp: String
  anglez: Float32
  enmo: Float32
文件总数据量: 127946340 行
处理数据并保存...


(Deprecated in version 0.20.5)
  total_rows = lazy_df.select(pl.count()).collect()[0, 0]



处理后的数据已保存到: /home/zhuangzhuohan/sleep_data/train_series_no_timezone.parquet

=== 处理测试数据 ===
开始处理文件: /home/zhuangzhuohan/sleep_data/test_series.parquet
文件架构:
  series_id: String
  step: UInt32
  timestamp: String
  anglez: Float32
  enmo: Float32
文件总数据量: 450 行
处理数据并保存...

处理后的数据已保存到: /home/zhuangzhuohan/sleep_data/test_series_no_timezone.parquet

所有文件处理完成！


In [29]:
import polars as pl
import sys

def inspect_parquet_file(file_path, sample_size=20000):
    """
    查看parquet文件的数据格式并提取样本
    
    参数:
    file_path: parquet文件路径
    sample_size: 提取的样本大小
    
    返回:
    sample: 提取的数据样本
    """
    print(f"\n=== 查看文件: {file_path} ===")
    
    # 使用lazy loading读取文件
    lazy_df = pl.scan_parquet(file_path)
    
    # 计算文件总数据量（高效方法，不加载所有数据）
    print("\n计算文件总数据量...")
    total_rows = lazy_df.select(pl.count()).collect()[0, 0]
    print(f"文件总数据量: {total_rows} 行")

    # 提取样本数据
    print(f"\n提取前{sample_size}行数据...")
    sample = lazy_df.head(sample_size).collect()
    
    # 打印样本信息
    print(f"样本数据行数: {len(sample)}")
    print(f"样本数据列数: {len(sample.columns)}")
    
    return sample

def save_to_csv(df, file_path):
    """
    保存DataFrame到CSV文件，兼容不同版本的Polars
    
    参数:
    df: DataFrame对象
    file_path: 保存路径
    """
    try:
        # 尝试使用to_csv方法
        df.to_csv(file_path)
        print(f"成功保存到: {file_path}")
    except AttributeError:
        # 如果to_csv方法不存在，尝试其他方法
        print("to_csv方法不存在，尝试其他保存方式...")
        try:
            # 尝试使用write_csv方法（旧版本Polars）
            df.write_csv(file_path)
            print(f"成功保存到: {file_path}")
        except AttributeError:
            # 如果还是失败，转换为Pandas DataFrame再保存
            print("尝试转换为Pandas DataFrame后保存...")
            pd_df = df.to_pandas()
            pd_df.to_csv(file_path, index=False)
            print(f"成功保存到: {file_path}")

if __name__ == "__main__":
    # 打印Polars版本
    print(f"Polars版本: {pl.__version__}")
    
    # 定义文件路径
    train_file = "/home/zhuangzhuohan/sleep_data/train_series_no_timezone.parquet"
    test_file = "/home/zhuangzhuohan/sleep_data/test_series_no_timezone.parquet"
    
    # 查看训练数据
    train_sample = inspect_parquet_file(train_file)
    
    # 查看测试数据
    test_sample = inspect_parquet_file(test_file)
    
    # 可选：保存样本为CSV文件以便后续查看
    save_csv = input("\n是否将样本保存为CSV文件？(y/n): ")
    if save_csv.lower() == 'y':
        save_to_csv(train_sample, "/home/zhuangzhuohan/sleep_data/train_series_no_timezone_parquet_sample.csv")
        save_to_csv(test_sample, "/home/zhuangzhuohan/sleep_data/test_series_no_timezone_parquet_sample.csv")
        print("样本已保存为CSV文件")

Polars版本: 1.37.1

=== 查看文件: /home/zhuangzhuohan/sleep_data/train_series_no_timezone.parquet ===

计算文件总数据量...
文件总数据量: 127946340 行

提取前20000行数据...
样本数据行数: 20000
样本数据列数: 5

=== 查看文件: /home/zhuangzhuohan/sleep_data/test_series_no_timezone.parquet ===

计算文件总数据量...
文件总数据量: 450 行

提取前20000行数据...
样本数据行数: 450
样本数据列数: 5


(Deprecated in version 0.20.5)
  total_rows = lazy_df.select(pl.count()).collect()[0, 0]


to_csv方法不存在，尝试其他保存方式...
成功保存到: /home/zhuangzhuohan/sleep_data/train_series_no_timezone_parquet_sample.csv
to_csv方法不存在，尝试其他保存方式...
成功保存到: /home/zhuangzhuohan/sleep_data/test_series_no_timezone_parquet_sample.csv
样本已保存为CSV文件


In [30]:
import polars as pl

def validate_timezone_removal(input_file):
    """
    验证parquet文件中所有timestamp数据是否都去掉了时区
    
    参数:
    input_file: 输入parquet文件路径
    """
    print(f"开始验证文件: {input_file}")
    
    # 使用lazy loading读取parquet文件
    lazy_df = pl.scan_parquet(input_file)
    
    # 获取文件总数据量
    total_rows = lazy_df.select(pl.count()).collect()[0, 0]
    print(f"文件总数据量: {total_rows} 行")
    
    # 定义时区模式的正则表达式
    timezone_pattern = r'[-+]\d{4}$'
    
    # 检查是否存在时区信息
    # 方法: 计算包含时区模式的行数
    timezone_count = lazy_df.filter(
        pl.col('timestamp').str.contains(timezone_pattern)
    ).select(pl.count()).collect()[0, 0]
    
    # 打印验证结果
    print(f"包含时区信息的行数: {timezone_count}")
    print(f"不包含时区信息的行数: {total_rows - timezone_count}")
    
    # 如果存在包含时区信息的行，打印前5个例子
    if timezone_count > 0:
        print("\n包含时区信息的前5个例子:")
        timezone_examples = lazy_df.filter(
            pl.col('timestamp').str.contains(timezone_pattern)
        ).select('timestamp').head().collect()
        print(timezone_examples)
    else:
        print("\n✓ 所有timestamp数据都已成功移除时区信息!")
    
    # 打印一些处理后的timestamp示例，确认格式正确
    print("\n处理后的timestamp示例:")
    examples = lazy_df.select('timestamp').head().collect()
    print(examples)

if __name__ == "__main__":
    # 定义文件路径
    input_file = "/home/zhuangzhuohan/sleep_data/train_series_no_timezone.parquet"
    
    # 执行验证
    validate_timezone_removal(input_file)

开始验证文件: /home/zhuangzhuohan/sleep_data/train_series_no_timezone.parquet
文件总数据量: 127946340 行
包含时区信息的行数: 0
不包含时区信息的行数: 127946340

✓ 所有timestamp数据都已成功移除时区信息!

处理后的timestamp示例:
shape: (5, 1)
┌─────────────────────┐
│ timestamp           │
│ ---                 │
│ str                 │
╞═════════════════════╡
│ 2018-08-14T15:30:00 │
│ 2018-08-14T15:30:05 │
│ 2018-08-14T15:30:10 │
│ 2018-08-14T15:30:15 │
│ 2018-08-14T15:30:20 │
└─────────────────────┘


(Deprecated in version 0.20.5)
  total_rows = lazy_df.select(pl.count()).collect()[0, 0]
(Deprecated in version 0.20.5)
  ).select(pl.count()).collect()[0, 0]


In [22]:
import polars as pl

def filter_train_series_by_events():
    """
    找出train_events_clean_no_timezone.csv中没有但train_series_no_timezone.parquet中有的series_id
    并从train_series_no_timezone.parquet中移除这些series_id对应的数据，生成新文件
    """
    print("开始处理数据...")
    
    # 定义文件路径
    train_series_path = "/home/zhuangzhuohan/sleep_data/train_series_no_timezone.parquet"
    train_events_path = "/home/zhuangzhuohan/sleep_data/train_events_clean_no_timezone.csv"
    output_path = "/home/zhuangzhuohan/sleep_data/train_series_new_no_timezone.parquet"
    
    # 1. 读取train_events_clean_no_timezone.csv中的series_id
    print("读取train_events_clean_no_timezone.csv中的series_id...")
    train_events = pl.read_csv(train_events_path)
    events_series_ids = train_events.select('series_id').unique()
    print(f"train_events_clean_no_timezone.csv中的series_id数量: {len(events_series_ids)}")
    
    # 2. 读取train_series_no_timezone.parquet中的series_id
    print("读取train_series_no_timezone.parquet中的series_id...")
    train_series = pl.scan_parquet(train_series_path)
    series_series_ids = train_series.select('series_id').unique().collect()
    print(f"train_series_no_timezone.parquet中的series_id数量: {len(series_series_ids)}")
    
    # 3. 找出train_series中有但train_events中没有的series_id
    print("找出差异的series_id...")
    # 使用anti join找出只在train_series中存在的series_id
    diff_series_ids = series_series_ids.join(
        events_series_ids,
        on='series_id',
        how='anti'
    )
    print(f"train_series中有但train_events中没有的series_id数量: {len(diff_series_ids)}")
    print("这些series_id是:")
    print(diff_series_ids)
    
    # 4. 从train_series中过滤掉这些差异的series_id
    print("过滤train_series数据...")
    # 先获取需要保留的series_id列表
    keep_series_ids = events_series_ids.to_series().to_list()
    # 过滤数据
    filtered_train_series = train_series.filter(
        pl.col('series_id').is_in(keep_series_ids)
    )
    
    # 5. 计算过滤前后的数据量
    print("计算数据量...")
    before_count = train_series.select(pl.len()).collect()[0, 0]
    after_count = filtered_train_series.select(pl.len()).collect()[0, 0]
    print(f"过滤前数据行数: {before_count}")
    print(f"过滤后数据行数: {after_count}")
    print(f"移除的数据行数: {before_count - after_count}")
    
    # 6. 将过滤后的数据保存为新文件
    print("保存过滤后的数据...")
    filtered_train_series.sink_parquet(output_path)
    print(f"新文件已保存到: {output_path}")
    
    # 7. 验证新文件
    print("验证新文件...")
    new_train_series = pl.scan_parquet(output_path)
    new_series_ids = new_train_series.select('series_id').unique().collect()
    print(f"新文件中的series_id数量: {len(new_series_ids)}")
    print(f"新文件中的数据行数: {new_train_series.select(pl.len()).collect()[0, 0]}")
    
    print("\n处理完成!")

if __name__ == "__main__":
    filter_train_series_by_events()

开始处理数据...
读取train_events_clean_no_timezone.csv中的series_id...
train_events_clean_no_timezone.csv中的series_id数量: 269
读取train_series_no_timezone.parquet中的series_id...
train_series_no_timezone.parquet中的series_id数量: 277
找出差异的series_id...
train_series中有但train_events中没有的series_id数量: 8
这些series_id是:
shape: (8, 1)
┌──────────────┐
│ series_id    │
│ ---          │
│ str          │
╞══════════════╡
│ 2fc653ca75c7 │
│ e11b9d69f856 │
│ 89c7daa72eee │
│ a3e59c2ce3f6 │
│ c7b1283bb7eb │
│ c5d08fc3e040 │
│ 0f9e60a8e56d │
│ 390b487231ce │
└──────────────┘
过滤train_series数据...
计算数据量...
过滤前数据行数: 127946340
过滤后数据行数: 124822080
移除的数据行数: 3124260
保存过滤后的数据...
新文件已保存到: /home/zhuangzhuohan/sleep_data/train_series_new_no_timezone.parquet
验证新文件...
新文件中的series_id数量: 269
新文件中的数据行数: 124822080

处理完成!


In [31]:
import polars as pl

def count_series_ids_in_all_files():
    """
    查看多个文件中series_id的数量
    """
    print("开始统计所有文件中的series_id数量...")
    
    # 定义文件路径
    files = {
        "train_series_no_timezone": "/home/zhuangzhuohan/sleep_data/train_series_no_timezone.parquet",
        "train_events_clean_no_timezone": "/home/zhuangzhuohan/sleep_data/train_events_clean_no_timezone.csv",
        "test_series_no_timezone": "/home/zhuangzhuohan/sleep_data/test_series_no_timezone.parquet",
        "train_series_original": "/home/zhuangzhuohan/sleep_data/train_series.parquet",
        "train_events_original": "/home/zhuangzhuohan/sleep_data/train_events.csv",
        "test_series_original": "/home/zhuangzhuohan/sleep_data/test_series.parquet",
        "train_series_new": "/home/zhuangzhuohan/sleep_data/train_series_new_no_timezone.parquet"
    }
    
    # 统计每个文件的series_id数量
    for name, path in files.items():
        print(f"\n统计 {name} 中的series_id数量:")
        
        try:
            if path.endswith('.parquet'):
                # 读取parquet文件
                df = pl.scan_parquet(path)
                count = df.select('series_id').unique().collect()
            else:
                # 读取csv文件
                df = pl.read_csv(path)
                count = df.select('series_id').unique()
            
            print(f"  唯一series_id数量: {len(count)}")
            
        except Exception as e:
            print(f"  错误: {str(e)}")
    
    print("\n统计完成!")

if __name__ == "__main__":
    count_series_ids_in_all_files()

开始统计所有文件中的series_id数量...

统计 train_series_no_timezone 中的series_id数量:


  唯一series_id数量: 277

统计 train_events_clean_no_timezone 中的series_id数量:
  唯一series_id数量: 269

统计 test_series_no_timezone 中的series_id数量:
  唯一series_id数量: 3

统计 train_series_original 中的series_id数量:
  唯一series_id数量: 277

统计 train_events_original 中的series_id数量:
  唯一series_id数量: 277

统计 test_series_original 中的series_id数量:
  唯一series_id数量: 3

统计 train_series_new 中的series_id数量:
  唯一series_id数量: 269

统计完成!


In [32]:
import polars as pl

def check_series_id_consistency():
    """
    检查train_series_new_no_timezone.parquet和train_events_clean_no_timezone.csv中
    每个series_id是否都在另一个文件中存在
    """
    print("开始检查series_id一致性...")
    
    # 定义文件路径
    train_series_path = "/home/zhuangzhuohan/sleep_data/train_series_new_no_timezone.parquet"
    train_events_path = "/home/zhuangzhuohan/sleep_data/train_events_clean_no_timezone.csv"
    
    # 1. 读取train_series_new_no_timezone.parquet中的series_id
    print("读取train_series_new_no_timezone.parquet...")
    train_series = pl.scan_parquet(train_series_path)
    series_ids = train_series.select('series_id').unique().collect()
    print(f"train_series_new_no_timezone中的唯一series_id数量: {len(series_ids)}")
    
    # 2. 读取train_events_clean_no_timezone.csv中的series_id
    print("读取train_events_clean_no_timezone.csv...")
    train_events = pl.read_csv(train_events_path)
    events_ids = train_events.select('series_id').unique()
    print(f"train_events_clean_no_timezone中的唯一series_id数量: {len(events_ids)}")
    
    # 3. 检查train_series中的series_id是否都在train_events中存在
    print("\n检查train_series中的series_id是否都在train_events中存在:")
    series_only = series_ids.join(events_ids, on='series_id', how='anti')
    if len(series_only) > 0:
        print(f"  发现 {len(series_only)} 个series_id只在train_series中存在:")
        print(series_only)
    else:
        print("  ✓ 所有train_series中的series_id都在train_events中存在")
    
    # 4. 检查train_events中的series_id是否都在train_series中存在
    print("\n检查train_events中的series_id是否都在train_series中存在:")
    events_only = events_ids.join(series_ids, on='series_id', how='anti')
    if len(events_only) > 0:
        print(f"  发现 {len(events_only)} 个series_id只在train_events中存在:")
        print(events_only)
    else:
        print("  ✓ 所有train_events中的series_id都在train_series中存在")
    
    # 5. 检查共同存在的series_id数量
    common_ids = series_ids.join(events_ids, on='series_id', how='inner')
    print(f"\n共同存在的series_id数量: {len(common_ids)}")
    
    print("\n检查完成!")

if __name__ == "__main__":
    check_series_id_consistency()

开始检查series_id一致性...
读取train_series_new_no_timezone.parquet...
train_series_new_no_timezone中的唯一series_id数量: 269
读取train_events_clean_no_timezone.csv...
train_events_clean_no_timezone中的唯一series_id数量: 269

检查train_series中的series_id是否都在train_events中存在:
  ✓ 所有train_series中的series_id都在train_events中存在

检查train_events中的series_id是否都在train_series中存在:
  ✓ 所有train_events中的series_id都在train_series中存在

共同存在的series_id数量: 269

检查完成!


In [33]:
import polars as pl

def validate_event_alternation():
    """
    验证train_events_clean_no_timezone.csv中onset和wakeup事件的交替性
    1. 检查每个series_id和night分组中onset和wakeup数量是否相等
    2. 检查事件是否交替出现（onset后是wakeup，wakeup后是onset）
    """
    print("开始验证事件交替性...")
    
    # 定义文件路径
    file_path = "/home/zhuangzhuohan/sleep_data/train_events_clean_no_timezone.csv"
    
    # 读取文件
    train_events = pl.read_csv(file_path)
    
    # 确保必要的列存在
    required_cols = ['series_id', 'night', 'event', 'timestamp']
    for col in required_cols:
        if col not in train_events.columns:
            print(f"错误: 文件中不存在{col}列!")
            return
    
    # 转换timestamp为datetime类型
    train_events = train_events.with_columns(
        pl.col('timestamp').str.to_datetime()
    )
    
    # 按series_id、night分组，并按timestamp排序
    print("按series_id和night分组并排序...")
    grouped_events = train_events.group_by(['series_id', 'night']).agg([
        pl.col('event').sort_by('timestamp').alias('sorted_events')
    ])
    
    # 验证每个分组
    print("验证每个分组的事件交替性...")
    issues = []
    
    for row in grouped_events.iter_rows(named=True):
        series_id = row['series_id']
        night = row['night']
        events = row['sorted_events']
        
        # 检查1: onset和wakeup数量是否相等
        onset_count = events.count('onset')
        wakeup_count = events.count('wakeup')
        
        if onset_count != wakeup_count:
            issues.append({
                'series_id': series_id,
                'night': night,
                'issue': f'事件数量不匹配: onset={onset_count}, wakeup={wakeup_count}'
            })
            continue
        
        # 检查2: 事件是否交替出现
        valid_sequence = True
        for i, event in enumerate(events):
            if i == 0:
                # 第一个事件应该是onset
                if event != 'onset':
                    valid_sequence = False
                    issues.append({
                        'series_id': series_id,
                        'night': night,
                        'issue': f'第一个事件应该是onset，实际是{event}'
                    })
                    break
            else:
                # 检查事件交替
                prev_event = events[i-1]
                if (prev_event == 'onset' and event != 'wakeup') or \
                   (prev_event == 'wakeup' and event != 'onset'):
                    valid_sequence = False
                    issues.append({
                        'series_id': series_id,
                        'night': night,
                        'issue': f'事件交替错误: {prev_event}后应该是{["wakeup" if prev_event == "onset" else "onset"][0]}，实际是{event}'
                    })
                    break
    
    # 输出验证结果
    print(f"\n验证完成! 总共检查了 {len(grouped_events)} 个夜晚的事件序列")
    
    if len(issues) == 0:
        print("✓ 所有事件序列都符合要求:")
        print("  - 每个夜晚的onset和wakeup数量相等")
        print("  - 事件交替出现（onset后是wakeup，wakeup后是onset）")
        print("  - 每个夜晚以onset开始，以wakeup结束")
    else:
        print(f"✗ 发现 {len(issues)} 个问题:")
        for issue in issues[:10]:  # 只显示前10个问题
            print(f"  series_id: {issue['series_id']}, night: {issue['night']}, 问题: {issue['issue']}")
        
        if len(issues) > 10:
            print(f"  ... 还有 {len(issues) - 10} 个问题未显示")
    
    return len(issues) == 0

if __name__ == "__main__":
    validate_event_alternation()

开始验证事件交替性...
按series_id和night分组并排序...
验证每个分组的事件交替性...

验证完成! 总共检查了 4795 个夜晚的事件序列
✗ 发现 5 个问题:
  series_id: 44a41bba1ee7, night: 10, 问题: 事件数量不匹配: onset=0, wakeup=1
  series_id: efbfc4526d58, night: 7, 问题: 事件数量不匹配: onset=0, wakeup=1
  series_id: 154fe824ed87, night: 30, 问题: 事件数量不匹配: onset=0, wakeup=1
  series_id: 0ce74d6d2106, night: 20, 问题: 事件数量不匹配: onset=1, wakeup=0
  series_id: f8a8da8bdd00, night: 17, 问题: 事件数量不匹配: onset=0, wakeup=1


In [34]:
import polars as pl

def validate_event_alternation():
    """
    验证train_events_clean_no_timezone.csv中onset和wakeup事件的交替性
    1. 检查每个series_id和night分组中onset和wakeup数量是否相等
    2. 检查事件是否交替出现（onset后是wakeup，wakeup后是onset）
    """
    print("开始验证事件交替性...")
    
    # 定义文件路径
    file_path = "/home/zhuangzhuohan/sleep_data/train_events_clean_new_no_timezone.csv"
    
    # 读取文件
    train_events = pl.read_csv(file_path)
    
    # 确保必要的列存在
    required_cols = ['series_id', 'night', 'event', 'timestamp']
    for col in required_cols:
        if col not in train_events.columns:
            print(f"错误: 文件中不存在{col}列!")
            return
    
    # 转换timestamp为datetime类型
    train_events = train_events.with_columns(
        pl.col('timestamp').str.to_datetime()
    )
    
    # 按series_id、night分组，并按timestamp排序
    print("按series_id和night分组并排序...")
    grouped_events = train_events.group_by(['series_id', 'night']).agg([
        pl.col('event').sort_by('timestamp').alias('sorted_events')
    ])
    
    # 验证每个分组
    print("验证每个分组的事件交替性...")
    issues = []
    
    for row in grouped_events.iter_rows(named=True):
        series_id = row['series_id']
        night = row['night']
        events = row['sorted_events']
        
        # 检查1: onset和wakeup数量是否相等
        onset_count = events.count('onset')
        wakeup_count = events.count('wakeup')
        
        if onset_count != wakeup_count:
            issues.append({
                'series_id': series_id,
                'night': night,
                'issue': f'事件数量不匹配: onset={onset_count}, wakeup={wakeup_count}'
            })
            continue
        
        # 检查2: 事件是否交替出现
        valid_sequence = True
        for i, event in enumerate(events):
            if i == 0:
                # 第一个事件应该是onset
                if event != 'onset':
                    valid_sequence = False
                    issues.append({
                        'series_id': series_id,
                        'night': night,
                        'issue': f'第一个事件应该是onset，实际是{event}'
                    })
                    break
            else:
                # 检查事件交替
                prev_event = events[i-1]
                if (prev_event == 'onset' and event != 'wakeup') or \
                   (prev_event == 'wakeup' and event != 'onset'):
                    valid_sequence = False
                    issues.append({
                        'series_id': series_id,
                        'night': night,
                        'issue': f'事件交替错误: {prev_event}后应该是{["wakeup" if prev_event == "onset" else "onset"][0]}，实际是{event}'
                    })
                    break
    
    # 输出验证结果
    print(f"\n验证完成! 总共检查了 {len(grouped_events)} 个夜晚的事件序列")
    
    if len(issues) == 0:
        print("✓ 所有事件序列都符合要求:")
        print("  - 每个夜晚的onset和wakeup数量相等")
        print("  - 事件交替出现（onset后是wakeup，wakeup后是onset）")
        print("  - 每个夜晚以onset开始，以wakeup结束")
    else:
        print(f"✗ 发现 {len(issues)} 个问题:")
        for issue in issues[:10]:  # 只显示前10个问题
            print(f"  series_id: {issue['series_id']}, night: {issue['night']}, 问题: {issue['issue']}")
        
        if len(issues) > 10:
            print(f"  ... 还有 {len(issues) - 10} 个问题未显示")
    
    return len(issues) == 0

if __name__ == "__main__":
    validate_event_alternation()

开始验证事件交替性...
按series_id和night分组并排序...
验证每个分组的事件交替性...

验证完成! 总共检查了 4790 个夜晚的事件序列
✓ 所有事件序列都符合要求:
  - 每个夜晚的onset和wakeup数量相等
  - 事件交替出现（onset后是wakeup，wakeup后是onset）
  - 每个夜晚以onset开始，以wakeup结束
