In [1]:
import requests
import json
import csv
import time
import random
from datetime import datetime
from typing import List, Dict

class BilibiliRankCrawler:
    def __init__(self):
        # 初始化请求头
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Referer': 'https://www.bilibili.com/v/popular/rank/kichiku',
            'Cookie': 'buvid3=infoc;',
            'Accept': 'application/json, text/plain, */*',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        }
        # 鬼畜区分区ID，这里选择"鬼畜"主分区，分区ID是119
        self.rid = 119  
        # 排行榜URL
        self.rank_url = f"https://api.bilibili.com/x/web-interface/ranking/v2"
        
    def get_rank_data(self, page_size: int = 100) -> List[Dict]:
        """获取排行榜数据"""
        params = {
            'rid': self.rid,  # 控制分区编号的关键参数
            'type': 'all',
            'ps': page_size,  # 每页数量
        }
        
        try:
            print(f"正在获取B站鬼畜区排行榜数据，分区ID: {self.rid}")
            response = requests.get(self.rank_url, headers=self.headers, params=params, timeout=10)
            response.raise_for_status()
            
            data = response.json()
            if data['code'] != 0:
                print(f"API返回错误: {data.get('message')}")
                return []
            
            # 解析排行榜列表
            rank_list = data['data']['list']
            print(f"成功获取到 {len(rank_list)} 条数据")
            
            return rank_list
            
        except requests.exceptions.RequestException as e:
            print(f"请求失败: {e}")
            return []
        except json.JSONDecodeError as e:
            print(f"JSON解析失败: {e}")
            return []
    
    def parse_video_details(self, rank_list: List[Dict]) -> List[Dict]:
        """解析视频详细信息"""
        video_data_list = []
        
        for i, item in enumerate(rank_list, 1):
            try:
                # 提取基本信息
                video_data = {
                    'rank': i,  # 排名
                    'title': item['title'],  # 视频标题
                    'bvid': item['bvid'],  # 视频BV号
                    'aid': item['aid'],  # 视频AV号
                }
                
                # 核心指标1: 播放量
                video_data['play'] = item['stat']['view']
                
                # 核心指标2: 点赞数
                video_data['like'] = item['stat']['like']
                
                # 核心指标3: 投币数
                video_data['coins'] = item['stat']['coin']
                
                # 核心指标4: 属地 - 从owner信息获取
                video_data['location'] = item['copyright_info']['original']  # 简化处理，实际属地需要从up主信息获取
                
                # 核心指标5: 视频时长（秒）
                duration_seconds = item['duration']
                # 转换为分钟:秒格式
                minutes = duration_seconds // 60
                seconds = duration_seconds % 60
                video_data['duration'] = f"{minutes}:{seconds:02d}"
                video_data['duration_seconds'] = duration_seconds
                
                # 其他有用信息
                video_data['up_name'] = item['owner']['name']
                video_data['up_mid'] = item['owner']['mid']
                video_data['comments'] = item['stat']['reply']  # 评论数
                video_data['favorites'] = item['stat']['favorite']  # 收藏数
                video_data['shares'] = item['stat']['share']  # 分享数
                
                # 发布时间
                pubdate = datetime.fromtimestamp(item['pubdate'])
                video_data['pubdate'] = pubdate.strftime('%Y-%m-%d %H:%M:%S')
                
                video_data_list.append(video_data)
                print(f"已解析第 {i} 个视频: {video_data['title'][:20]}...")
                
                # 添加随机延迟避免请求过快
                time.sleep(random.uniform(0.1, 0.3))
                
            except KeyError as e:
                print(f"解析第 {i} 个视频时缺少字段: {e}")
                continue
            except Exception as e:
                print(f"解析第 {i} 个视频时出错: {e}")
                continue
        
        return video_data_list
    
    def save_to_csv(self, video_data_list: List[Dict], filename: str = "bilibili_kichiku_rank.csv"):
        """保存数据到CSV文件"""
        if not video_data_list:
            print("没有数据可保存")
            return
        
        # 定义CSV字段
        fieldnames = [
            'rank', 'title', 'bvid', 'aid',
            'play', 'like', 'coins', 'location', 'duration',
            'up_name', 'up_mid', 'comments', 'favorites', 'shares',
            'pubdate', 'duration_seconds'
        ]
        
        try:
            with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(video_data_list)
            
            print(f"数据已保存到 {filename}")
            print(f"共保存 {len(video_data_list)} 条记录")
            
            # 显示前几条数据预览
            print("\n数据预览（前5条）:")
            print("=" * 100)
            for i, data in enumerate(video_data_list[:5], 1):
                print(f"{i}. {data['title'][:30]}... | 播放: {data['play']:,} | "
                      f"点赞: {data['like']:,} | 投币: {data['coins']:,} | "
                      f"时长: {data['duration']} | UP: {data['up_name']}")
            
        except Exception as e:
            print(f"保存CSV文件时出错: {e}")
    
    def get_up_location(self, up_mid: str) -> str:
        """获取UP主属地信息（需要额外的API调用）"""
        try:
            # 尝试获取UP主信息中的属地
            up_info_url = f"https://api.bilibili.com/x/space/acc/info"
            params = {'mid': up_mid}
            
            response = requests.get(up_info_url, headers=self.headers, params=params, timeout=5)
            if response.status_code == 200:
                data = response.json()
                if data['code'] == 0:
                    # 从UP主信息中获取属地
                    location = data['data'].get('place', '未知')
                    return location if location else '未知'
        except:
            pass
        return '未知'

def main():
    """主函数"""
    print("=" * 60)
    print("B站鬼畜区排行榜数据采集器")
    print("=" * 60)
    
    # 创建爬虫实例
    crawler = BilibiliRankCrawler()
    
    # 获取排行榜数据
    rank_list = crawler.get_rank_data(page_size=100)
    
    if not rank_list:
        print("获取排行榜数据失败，程序退出")
        return
    
    # 解析视频详细信息
    video_data_list = crawler.parse_video_details(rank_list)
    
    if not video_data_list:
        print("解析视频数据失败，程序退出")
        return
    
    # 生成文件名（包含时间戳）
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"bilibili_kichiku_rank_{timestamp}.csv"
    
    # 保存到CSV文件
    crawler.save_to_csv(video_data_list, filename)
    
    # 打印采集完成信息
    print("\n" + "=" * 60)
    print("数据采集完成！")
    print(f"文件名: {filename}")
    print("=" * 60)
    
    # 随机选择3个视频用于验证
    if len(video_data_list) >= 3:
        print("\n随机选择的3个验证视频:")
        random_videos = random.sample(video_data_list, 3)
        for i, video in enumerate(random_videos, 1):
            print(f"\n{i}. 视频标题: {video['title'][:40]}...")
            print(f"   视频链接: https://www.bilibili.com/video/{video['bvid']}")
            print(f"   采集数据 - 点赞数: {video['like']:,} | 投币数: {video['coins']:,} | "
                  f"播放量: {video['play']:,} | 时长: {video['duration']}")
            print(f"   UP主: {video['up_name']}")

if __name__ == "__main__":
    main()

B站鬼畜区排行榜数据采集器
正在获取B站鬼畜区排行榜数据，分区ID: 22
API返回错误: 请求错误
获取排行榜数据失败，程序退出
