文档说明：本文档为中国上市公司数据爬虫文档。   
为完成作业，体验2种数据爬取方式，我方采用从api爬取和从网页爬取，分别获取上市公司的信息及其关联的行业信息。   

- 数据来源：东方财富   
- 数据爬取时间：20250529   
- 最后产出文件：data/data_raw.csv


In [1]:
# 引用库
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import pandas as pd
import os
import time
import random
import requests
import os

1. api爬取中国a股的所有股票

In [2]:
# stock_api_crawler.py

class StockAPICrawler:
    def __init__(self):
        self.BASE_URL = "https://quote.eastmoney.com"
        self.LIST_API = "https://push2.eastmoney.com/api/qt/clist/get"
        self.DETAIL_API = "https://push2.eastmoney.com/api/qt/stock/get"
        # 获取当前工作目录（Jupyter环境下无__file__，用cwd）
        script_dir = os.getcwd()
        # 获取父目录路径
        parent_dir = os.path.dirname(script_dir)
        # 构建data文件夹路径（从父目录进入data）
        self.data_dir = os.path.join(parent_dir, "data")
        os.makedirs(self.data_dir, exist_ok=True)
        # 数据文件路径
        self.api_data_path = os.path.join(self.data_dir, "api_data.csv")
        
        self.HEADERS = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
            "Referer": "https://quote.eastmoney.com/",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.9",
            "Connection": "keep-alive",
        }
        
        self.LIST_PARAMS = {
            "pn": 1,
            "pz": 100,
            "po": 1,
            "np": 1,
            "ut": "bd1d9ddb04089700cf9c27f6f742625",
            "fltt": 2,
            "invt": 2,
            "fid": "f3",
            "fs": "m:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23,m:0 t:81 s:2048",
            "fields": "f2,f12,f13,f14,f189",
            "_": int(time.time() * 1000),
        }
        
        self.chrome_options = Options()
        self.chrome_options.add_argument("--headless")
        self.chrome_options.add_argument("--disable-gpu")
        self.chrome_options.add_argument("--no-sandbox")
        
        self.session = requests.Session()
        self.session.headers.update(self.HEADERS)
        self._init_cookies()
    
    def _init_cookies(self):
        try:
            self.session.get(self.BASE_URL, timeout=10)
            print("成功初始化Cookie")
        except Exception as e:
            print(f"初始化Cookie失败: {e}")
    
    def _get_timestamp(self):
        return int(time.time() * 1000)
    
    def _random_delay(self):
        time.sleep(random.uniform(1.5, 2.5))
    
    def _safe_request(self, url, params=None, method="GET"):
        try:
            response = self.session.request(method, url, params=params, timeout=15)
            response.raise_for_status()
            if "text/html" in response.headers.get("Content-Type", ""):
                return response.text
            return response.json()
        except Exception as e:
            print(f"请求失败: {e}")
            return None
    
    def get_total_pages(self):
        params = self.LIST_PARAMS.copy()
        params["pn"] = 1
        params["pz"] = 1
        data = self._safe_request(self.LIST_API, params)
        if data and data.get("data") and data["data"].get("total"):
            total = data["data"]["total"]
            return (total + self.LIST_PARAMS["pz"] - 1) // self.LIST_PARAMS["pz"]
        print("警告：未获取到总页数，默认返回1页")
        return 1
    
    def parse_stock_exchange(self, stock_code, market_code):
        exchange = "未知交易所"
        board = "未知板块"
        if market_code == 1:
            exchange = "上海交易所"
            board = "科创板" if stock_code.startswith("688") else "主板"
        else:  # 深市或北交所
            if stock_code.startswith("30"):
                exchange = "深圳交易所"
                board = "创业板"
            elif stock_code.startswith("0"):
                exchange = "深圳交易所"
                board = "主板"
            else:
                exchange = "北京交易所"
                board = "北交所"
        return exchange, board
    
    def format_listing_date(self, timestamp):
        if not timestamp or timestamp == '-':
            return "未知日期"
        return str(timestamp)
    
    def get_stock_detail(self, stock_code, market_code):
        secid = f"{market_code}.{stock_code}"
        params = {
            "ut": "fa5fd1943c7b386f1734de88e5951d5",
            "fields": "f189",
            "secid": secid,
            "_": self._get_timestamp()
        }
        data = self._safe_request(self.DETAIL_API, params)
        return data["data"].get("f189", "") if data and data.get("data") else ""
    
    def crawl_page(self, page_num):
        print(f"\n爬取第 {page_num} 页...")
        params = self.LIST_PARAMS.copy()
        params["pn"] = page_num
        data = self._safe_request(self.LIST_API, params)
        if not data or not data.get("data") or not data["data"].get("diff"):
            print(f"第 {page_num} 页无数据")
            return []
        
        stock_list = data["data"]["diff"]
        print(f"第 {page_num} 页共有 {len(stock_list)} 只股票")
        stocks = []
        
        for i, item in enumerate(stock_list):
            try:
                stock_code = item.get("f12", "")
                market_code = item.get("f13", 0)
                stock_name = item.get("f14", "")
                stock_price = item.get("f2", "")
                raw_list_time = item.get("f189", "")
                
                listing_time = self.format_listing_date(self.get_stock_detail(stock_code, market_code) or raw_list_time)
                exchange, board = self.parse_stock_exchange(stock_code, market_code)
                
                stocks.append({
                    "股票代码": stock_code,
                    "股票名称": stock_name,
                    "交易所": exchange,
                    "板块": board,
                    "上市时间": listing_time,
                    "最新价": stock_price
                })
                
                print(f"  {i+1}/{len(stock_list)}: {stock_name} ({stock_code}) | 上市时间: {listing_time} | 最新价: {stock_price}")
                
            except Exception as e:
                print(f"解析失败: {e}")
                continue
        
        return stocks
    
    def save_to_csv(self, data):
        if not data:
            print("无数据可保存")
            return
        
        df = pd.DataFrame(data)
        columns = ["股票代码", "股票名称", "交易所", "板块", "上市时间", "最新价"]
        df[columns].to_csv(self.api_data_path, index=False, encoding="utf-8-sig")
        print(f"\nAPI数据已保存至: {self.api_data_path}，共 {len(df)} 条记录")
    
    def crawl_all(self, max_pages=None):
        total_pages = self.get_total_pages()
        if max_pages:
            total_pages = min(max_pages, total_pages)
            print(f"限制爬取前 {total_pages} 页")
        
        all_data = []
        for page in range(1, total_pages + 1):
            page_data = self.crawl_page(page)
            all_data.extend(page_data)
            self._random_delay()  # 每页之间延迟
        
        self.save_to_csv(all_data)
        return all_data

# 单独运行此文件时执行
if __name__ == "__main__":
    crawler = StockAPICrawler()
    crawler.crawl_all()

成功初始化Cookie

爬取第 1 页...
第 1 页共有 100 只股票
  1/100: N古麒 (001390) | 上市时间: 20250529 | 最新价: 32.0
  2/100: 三友科技 (834475) | 上市时间: 20200727 | 最新价: 24.67
  3/100: 路桥信息 (837748) | 上市时间: 20230816 | 最新价: 38.38
  4/100: 天晟新材 (300169) | 上市时间: 20110125 | 最新价: 8.26
  5/100: 睿智医药 (300149) | 上市时间: 20101222 | 最新价: 8.56
  6/100: 贝斯美 (300796) | 上市时间: 20191115 | 最新价: 11.51
  7/100: 广康生化 (300804) | 上市时间: 20230627 | 最新价: 41.26
  8/100: 拉卡拉 (300773) | 上市时间: 20190425 | 最新价: 23.75
  9/100: 概伦电子 (688206) | 上市时间: 20211228 | 最新价: 27.77
  10/100: 广立微 (301095) | 上市时间: 20220805 | 最新价: 56.74
  11/100: 霍普股份 (301024) | 上市时间: 20210728 | 最新价: 37.97
  12/100: 益方生物-U (688382) | 上市时间: 20220725 | 最新价: 29.1
  13/100: 雄帝科技 (300546) | 上市时间: 20160928 | 最新价: 22.08
  14/100: 舒泰神 (300204) | 上市时间: 20110415 | 最新价: 23.04
  15/100: 万马科技 (300698) | 上市时间: 20170831 | 最新价: 41.59
  16/100: 超捷股份 (301005) | 上市时间: 20210601 | 最新价: 36.61
  17/100: 久盛电气 (301082) | 上市时间: 20211027 | 最新价: 27.19
  18/100: 四方精创 (300468) | 上市时间: 20150527 | 最新价: 23.11
  19

2. 网页爬取   
- 爬取站点：东方财富官网    
- 爬取时间：20250529   
- 说明：api接口中的行业信息需要额外查询匹配，因此我方直接采用网页爬取，直接获取对应股票公司的“东财行业”和“证监会行业”2个行业字段。   
- 实现方案：页面爬取容易失败，为保障行业爬取可断点重连，我方采用针对2个文件进行比对，对行业爬取缺漏的公司的行业进行重新爬取   
    - 初始阶段：由于尚未爬取任何股票的行业信息，`missing_industry` 文件内容与 `api_data` 完全一致。  
    - 首次网页爬取后：将已获取的 `industry_data` 与 `api_data` 进行比对，自动识别缺漏的股票，并将其股票代码同步记录到 `missing_industry` 文件中。  
    - 针对 `missing_industry` 文件中的股票，继续进行网页爬取，爬取成功后实时补充进 `industry_data`，并再次比对，直至所有行业信息补齐，`missing_industry` 文件为空，实现断点续爬和数据完整性保障。

In [3]:
# 比对目前还有多少股票没有行业数据，缺少的需要爬取
class StockDataComparator:
    def __init__(self):
        # 获取当前工作目录（Jupyter环境下无__file__，用cwd）
        script_dir = os.getcwd()
        # 获取父目录路径
        parent_dir = os.path.dirname(script_dir)
        # 构建data文件夹路径（从父目录进入data）
        self.data_dir = os.path.join(parent_dir, "data")
        os.makedirs(self.data_dir, exist_ok=True)
        self.api_data_path = os.path.join(self.data_dir, "api_data.csv")
        self.industry_data_path = os.path.join(self.data_dir, "industry_data.csv")
        self.missing_data_path = os.path.join(self.data_dir, "missing_industry.csv")
        
    def compare_data(self):
        """比对两个CSV文件，找出需要重新爬取的股票"""
        if not os.path.exists(self.api_data_path):
            print(f"错误：未找到 {self.api_data_path}")
            return None
        
        if not os.path.exists(self.industry_data_path):
            print(f"错误：未找到 {self.industry_data_path}")
            return None
        
        api_df = pd.read_csv(self.api_data_path, on_bad_lines='skip')
        industry_df = pd.read_csv(self.industry_data_path, on_bad_lines='skip')
        
        api_df['股票代码'] = api_df['股票代码'].astype(str).str.zfill(6)
        industry_df['股票代码'] = industry_df['股票代码'].astype(str).str.zfill(6)
        
        api_codes = set(api_df['股票代码'])
        industry_codes = set(industry_df['股票代码'])
        missing_codes = api_codes - industry_codes
        
        print(f"API数据中共有 {len(api_codes)} 只股票")
        print(f"行业数据中已有 {len(industry_codes)} 只股票")
        print(f"缺失 {len(missing_codes)} 只股票的行业数据")
        
        # 先查找解析失败的股票代码
        failed_df = industry_df[
            (industry_df['东财行业'] == '解析失败') & 
            (industry_df['证监会行业'] == '解析失败')
        ]
        failed_codes = set(failed_df['股票代码'])

        # 如果有解析失败的股票，则从industry_data中删除这些行并保存
        if failed_codes:
            cleaned_industry_df = industry_df[~industry_df['股票代码'].isin(failed_codes)]
            cleaned_industry_df.to_csv(self.industry_data_path, index=False, encoding="utf-8-sig")
            print(f"已从 {self.industry_data_path} 中删除 {len(failed_codes)} 条解析失败的股票数据")
        
        print(f"有 {len(failed_codes)} 只股票的行业数据解析失败")
        
        codes_to_retry = missing_codes.union(failed_codes)
        print(f"总计需要重新爬取 {len(codes_to_retry)} 只股票")
        
        if codes_to_retry:
            retry_df = api_df[api_df['股票代码'].isin(codes_to_retry)].copy()
            retry_df['需要爬取'] = retry_df['股票代码'].apply(
                lambda x: '缺失数据' if x in missing_codes else '解析失败'
            )
            retry_df.to_csv(self.missing_data_path, index=False, encoding="utf-8-sig")
            print(f"需要重新爬取的股票已保存到 {self.missing_data_path}")
            
            exchange_counts = retry_df['交易所'].value_counts()
            print("\n按交易所统计：")
            for exchange, count in exchange_counts.items():
                print(f"{exchange}: {count} 只")
            
            return retry_df
        else:
            print("所有股票的行业数据都已完整且正确")
            return None

# 执行比对
if __name__ == "__main__":
    comparator = StockDataComparator()
    result_df = comparator.compare_data()


API数据中共有 5720 只股票
行业数据中已有 5720 只股票
缺失 0 只股票的行业数据
有 0 只股票的行业数据解析失败
总计需要重新爬取 0 只股票
所有股票的行业数据都已完整且正确


In [4]:
# 从东方财富的网页抓取行业信息

class StockIndustryCrawler:
    def __init__(self):
        self.COMPANY_INFO_URL = "https://emweb.securities.eastmoney.com/pc_hsf10/pages/index.html"
        # 获取当前工作目录（Jupyter环境下无__file__，用cwd）
        script_dir = os.getcwd()
        # 获取父目录路径
        parent_dir = os.path.dirname(script_dir)
        # 构建data文件夹路径（从父目录进入data）
        self.data_dir = os.path.join(parent_dir, "data")
        os.makedirs(self.data_dir, exist_ok=True)
        # 数据文件路径保持不变
        self.api_data_path = os.path.join(self.data_dir, "api_data.csv")
        self.missing_data_path = os.path.join(self.data_dir, "missing_industry.csv")  # 待爬取的股票
        self.industry_data_path = os.path.join(self.data_dir, "industry_data.csv")    # 行业数据
        
        # 浏览器配置
        self.chrome_options = webdriver.ChromeOptions()
        self.chrome_options.add_argument("--headless")
        self.chrome_options.add_argument("--disable-gpu")
        self.chrome_options.add_argument("--no-sandbox")
        self.driver = webdriver.Chrome(options=self.chrome_options)
        
        # 市场代码映射
        self.market_map = {
            "上海交易所": 1,
            "深圳交易所": 0,
            "北京交易所": 2
        }
    
    def _random_delay(self):
        """随机延迟防反爬（2-4秒）"""
        time.sleep(random.uniform(0, 1))
    
    def parse_industry_from_html(self, stock_code, market_code):
        """从网页解析行业信息"""
        market_abbr = {"0": "SZ", "1": "SH", "2": "BJ"}.get(str(market_code), "SZ")
        url = f"{self.COMPANY_INFO_URL}?type=web&code={market_abbr}{stock_code}#/gsgk"
        
        try:
            self.driver.get(url)
            self._random_delay()
            soup = BeautifulSoup(self.driver.page_source, "html.parser")
            
            # 解析行业信息
            east_money_industry = soup.find("th", string="所属东财行业").find_next("td").text.strip() if soup.find("th", string="所属东财行业") else "未找到"
            csrc_industry = soup.find("th", string="所属证监会行业").find_next("td").text.strip() if soup.find("th", string="所属证监会行业") else "未找到"
            
            return {
                "股票代码": stock_code,
                "东财行业": east_money_industry,
                "证监会行业": csrc_industry
            }
            
        except Exception as e:
            print(f"解析 {stock_code} 行业信息失败: {e}")
            return {
                "股票代码": stock_code,
                "东财行业": "解析失败",
                "证监会行业": "解析失败"
            }
    
    def load_missing_data(self):
        """加载missing_industry.csv中的股票数据"""
        if not os.path.exists(self.missing_data_path):
            print("错误：未找到待爬取的股票数据文件 (missing_industry.csv)")
            return None
        
        missing_df = pd.read_csv(self.missing_data_path, on_bad_lines='skip')
        
        # 确保股票代码为字符串类型并补零到6位
        missing_df['股票代码'] = missing_df['股票代码'].astype(str).str.zfill(6)
        
        #print(f"加载待爬取的股票数据：共 {len(missing_df)} 只股票")
        return missing_df
    
    def crawl_missing_industry_data(self):
        """从missing_industry.csv读取股票代码并爬取行业信息"""
        missing_df = self.load_missing_data()
        if missing_df is None or len(missing_df) == 0:
            print("没有需要爬取的股票数据")
            return
        
        # 检查已存在的行业数据，避免重复爬取
        existing_codes = set()
        if os.path.exists(self.industry_data_path):
            existing_df = pd.read_csv(self.industry_data_path, on_bad_lines='skip')
            existing_codes = set(existing_df['股票代码'].astype(str).str.zfill(6))
            print(f"已存在 {len(existing_codes)} 只股票的行业数据")
        
        # 筛选出需要爬取的股票
        stocks_to_crawl = missing_df[~missing_df['股票代码'].isin(existing_codes)]
        total = len(stocks_to_crawl)
        
        if total == 0:
            print("所有待爬取的股票数据已存在")
            return
        
        print(f"开始爬取 {total} 只股票的行业信息...")
        
        # 分批爬取，每10只保存一次
        batch_size = 10
        for i in range(0, total, batch_size):
            batch = stocks_to_crawl.iloc[i:i+batch_size]
            industry_list = []
            
            for idx, row in batch.iterrows():
                stock_code = row['股票代码']
                exchange = row['交易所']
                market_code = self.market_map.get(exchange, 0)
                print(f"正在爬取 {i+idx+1}/{total}: {stock_code} ({exchange})")
                
                info = self.parse_industry_from_html(stock_code, market_code)
                industry_list.append(info)
            
            # 追加保存到industry_data.csv
            self._append_to_industry_data(industry_list)
        
        print(f"行业数据爬取完成，共添加 {total} 条记录到 {self.industry_data_path}")
    
    def _append_to_industry_data(self, data):
        """追加数据到行业数据文件"""
        df = pd.DataFrame(data)
        if not os.path.exists(self.industry_data_path):
            df.to_csv(self.industry_data_path, index=False, encoding="utf-8-sig")
        else:
            df.to_csv(self.industry_data_path, mode="a", header=False, index=False, encoding="utf-8-sig")
    
    def close_browser(self):
        """关闭浏览器驱动"""
        self.driver.quit()

# 主程序入口
if __name__ == "__main__":
    crawler = StockIndustryCrawler()
    crawler.crawl_missing_industry_data()
    crawler.close_browser()
    print("任务完成！")

已存在 5720 只股票的行业数据
所有待爬取的股票数据已存在
任务完成！


3. 数据合并

In [5]:
# 获取当前工作目录（Jupyter环境下无__file__，用cwd）
script_dir = os.getcwd()
# 获取父目录路径
parent_dir = os.path.dirname(script_dir)
# 构建data文件夹路径（从父目录进入data）
data_dir = os.path.join(parent_dir, "data")
os.makedirs(data_dir, exist_ok=True)
        
# 定义文件路径
api_data_path = os.path.join(data_dir, "api_data.csv")
industry_data_path = os.path.join(data_dir, "industry_data.csv")
output_path = os.path.join(data_dir, "data_raw.csv")

# 检查文件是否存在
if not all([os.path.exists(path) for path in [api_data_path, industry_data_path]]):
    print("错误：api_data.csv 或 industry_data.csv 不存在")
    exit(1)

# 读取数据并去重
df_api = pd.read_csv(api_data_path).drop_duplicates()
df_industry = pd.read_csv(industry_data_path).drop_duplicates()

print(f"df_api去重后行数: {len(df_api)}")
print(f"df_industry去重后行数: {len(df_industry)}")

# 数据清洗：确保股票代码为字符串类型并统一格式
df_api["股票代码"] = df_api["股票代码"].astype(str).str.zfill(6)
df_industry["股票代码"] = df_industry["股票代码"].astype(str).str.zfill(6)

# 合并数据（内连接，仅保留两个表中都存在的股票代码）
merged_df = pd.merge(
    df_api,
    df_industry,
    on="股票代码",
    how="inner"
)

# 选择目标列（直接使用"证监会行业"）
merged_df = merged_df.filter([
    "股票代码", "股票名称", "交易所", "板块",
    "上市时间", "最新价", "东财行业", "证监会行业"
])

# 处理行业为空的情况（包括空值、无效格式等）
def is_industry_empty(x):
    return pd.isnull(x) or str(x).strip() == "" or str(x).strip() == "解析失败" or "-" not in x

# 检查"证监会行业"
merged_df["行业为空"] = merged_df["东财行业"].apply(is_industry_empty) | merged_df["证监会行业"].apply(is_industry_empty)
industry_empty_count = merged_df["行业为空"].sum()

# 保存结果
merged_df.to_csv(output_path, index=False, encoding="utf-8-sig")

# 输出统计结果
print(f"合并后数据总行数: {len(merged_df)}")
print(f"证监会行业为空的行数: {industry_empty_count}")

df_api去重后行数: 5720
df_industry去重后行数: 5720
合并后数据总行数: 5720
证监会行业为空的行数: 11


个人感悟：
1. api爬取比页面访问更快，如果api没有加密解密，且api暴露，可以采用api爬取方式
2. 网页爬取需要考虑防爬虫机制，因为东方财富防爬机制较弱，我方间隔时间仅0-1s
3. 不管哪种爬取方式，早上3-7点见爬都是最快的
4. 原始数据爬取存在爬漏情况，需要检查