In [28]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import NoSuchElementException
from time import sleep
from random import uniform
import os
import pandas as pd

In [29]:
# 定义两个类，'房地产公司'类继承'企业'类，多了供应商信息
class Enterprise:

    def __init__(self, name, selenium_browser):
        """
        :param name: 公司名称
        :param selenium_browser:
        """
        self.browser = selenium_browser
        self.name = name
        self.code = None
        self.financialTables = None

    def search(self):
        # 1. 每次搜索都会产生新页面，需要关闭多余页面方便管理
        current_window_handle = self.browser.current_window_handle  # 获取当前窗口句柄
        all_window_handles = self.browser.window_handles  # 获取所有窗口句柄
        # 关闭除当前标签页外的所有页面
        for window_handle in all_window_handles:
            if window_handle != current_window_handle:
                self.browser.switch_to.window(window_handle)
                self.browser.close()  # 容易出现bug：如果此时edge弹出其它窗口，可能会触发异常。
        self.browser.switch_to.window(current_window_handle)  # 切回到原来的窗口

        # 2. 搜索企业名称
        search = self.browser.find_element(By.NAME, 'key')
        # 清除搜索框，否则会公司名字叠加
        for _ in range(20):
            search.send_keys(Keys.BACKSPACE)
        search.send_keys(self.name)
        search.send_keys(Keys.ENTER)

        # 3. 点击第一个搜索结果
        # wait = WebDriverWait(self.browser, 5)
        sleep(uniform(2, 5))
        #          /html/body/div/div[2]/div[2]/div[3]/div/div[2]/div/table/tr[1]/td[3]/div/span/span[1]/a
        xpath_1 = "/html/body/div/div[2]/div[2]/div[3]/div/div[2]/div/table/tr[1]/td[3]/div/span/span[1]/a"
        xpath_2 = "/html/body/div/div[2]/div[2]/div[3]/div/div[2]/div/table/tr[1]/td[3]/div[2]/span/span[1]/a"
        # 测试时发现，如果近期有搜索记录，则会出现一个“最近浏览”的div，故而导致xpath路径改变
        try:
            first = self.browser.find_element(By.XPATH, xpath_1)

        except NoSuchElementException:
            try:
                first = self.browser.find_element(By.XPATH, xpath_2)
            except NoSuchElementException:
                print("无法点击第一个搜索结果")
                return
        first.click()

        # 4. 点击后会在新页面打开，需要跳转到下一页
        window_handles = self.browser.window_handles  # 获取所有选项卡的句柄
        self.browser.switch_to.window(window_handles[1])

    def get_code(self):
        # 编写代码以获取公司代码
        xpath = '/html/body/div/div[2]/div[5]/div[2]/div/div[2]/section[2]/div[2]/table/tr[4]/td[2]/span/span[1]'
        try:
            target = self.browser.find_element(By.XPATH, xpath)
            self.code = target.text
            print(f"{self.name} 组织代码：{self.code}")
        except NoSuchElementException:
            print(f"{self.name} 没有组织代码")

    def get_financial_lists(self, path="财务报表HTML/"):
        # 使用绝对路径，效果不好
        # xpath = '/html/body/div[1]/div[2]/div[5]/div[2]/div/div[2]/section[8]/div[5]/div/div/div/div[1]/div/div[4]/div/table'
        # financialTables = self.browser.find_element(By.XPATH, xpath)
        xpath = "//div[@class='financial-table']"
        try:
            target = self.browser.find_element(By.XPATH, xpath)
            # 注意网页源码不能由self.financialTables=target直接保存
            file_path = r"./DATA/" + path + self.name + ".html"
            with open(file_path, 'w', encoding='gbk') as file_in:
                file_in.write(target.get_attribute("outerHTML"))
            self.financialTables = file_path
            print(f"{self.name} 财务报表导入成功")
        except NoSuchElementException:
            file_path = r"./DATA/" + path + self.name + ".txt"
            with open(file_path, 'w', encoding='gbk') as file_in:
                file_in.write("null")
            print(f"{self.name} 没有找到财务报表")

    def info(self, dataList: list):
        information = {
            "Name": self.name,
            "FinancialList": self.financialTables,
            "Code": self.code,
        }
        dataList.append(information)


class RealEstate(Enterprise):
    def __init__(self, name, selenium_browser):
        """
        :param name: 公司名称
        :param selenium_browser:
        """
        super().__init__(name, selenium_browser)  # 超类调用？？？
        self.browser = selenium_browser
        self.name = name
        self.code = None
        self.financialTables = None
        self.suppilers = set()

    def next_page(self):
        sleep(uniform(1, 3))
        next_bar = "#supplierlist >  :nth-child(2) >   :nth-child(2)"
        try:
            next_bar = self.browser.find_element(By.CSS_SELECTOR, next_bar)
        except NoSuchElementException:
            print("供应商：没有发现下一页")
            return None
        nxts = next_bar.find_elements(By.XPATH, "./*")
        for nxt in nxts:
            if ">" in nxt.text:  #&gt;
                print(f"供应商：下一页")
                # nxt.click()
                return nxt
        print("供应商：没有发现下一页")
        return None

    def get_supplier(self):
        # 点击“经营信息”，注意“经营信息”元素的位置未必一致，但是url有规律
        # www.qcc.com/firm/hbfe840644a7dc0471cf2bd5e1e9d5cb.html 原网址
        # www.qcc.com/crun/hbfe840644a7dc0471cf2bd5e1e9d5cb.html 经营信息网址
        # 注意，企查查有严格的反爬虫机制，如果在浏览器中输入统一url次数过多，会出现405错误
        url = self.browser.current_url
        if "crun" in url:
            pass
        else:
            url = url.replace("firm", "crun")
            self.browser.get(url)
        table = "#supplierlist >  :nth-child(2) >   :first-child"
        # #supplierlist > div.app-ntable > table > tr:nth-child(2) > td.left > div > span.cont > span > span > a
        # #supplierlist > div.app-ntable > table > tr:nth-child(2) > td.left > div > span.cont > span > span > a
        # #supplierlist > div.app-ntable > table > tr:nth-child(3) > td.left > div > span.cont > span > span > a
        # #supplierlist > div.app-ntable > table > tr:nth-child(11) > td.left > div > span.cont > span > span > a
        xpaths = [
            f"#supplierlist > div.app-ntable > table > tr:nth-child({i}) > td.left > div > span.cont > span > span > a"
            for i in range(2, 12)]
        try:
            target = self.browser.find_element(By.CSS_SELECTOR, table)
        except NoSuchElementException:
            print(f'{self.name} 没有供应商信息')
            return
        for i in xpaths:
            try:
                target = self.browser.find_element(By.CSS_SELECTOR, i)
                self.suppilers.add(target.text)
            except NoSuchElementException:
                return

    def get_all_supplier(self):
        self.get_supplier()
        nxt = self.next_page()
        MAXPAGE = 15  # 存在bug，待修复
        cont = 1
        while nxt is not None and cont < MAXPAGE:
            nxt.click()
            nxt = self.next_page()
            self.get_supplier()
            cont = cont + 1

    def info(self, dataList: list):
        information = {
            "Name": self.name,
            "FinancialList": self.financialTables,
            "Code": self.code,
            "suppliers": list(self.suppilers)
        }
        dataList.append(information)


In [30]:
def getAllEnterpriseFinancialList():
    """
    爬取所有企业的财务报表，如果'./DATA/财务报表HTML'目录中已有，则不爬取
    """
    # 已经下载好的财务报表
    downloadedList = [filename.split('.')[0] for filename in os.listdir(r"./DATA/财务报表HTML") if
                      filename.endswith(".html")]
    # 没有财务报表的企业
    noData = [filename.split('.')[0] for filename in os.listdir(r"./DATA/财务报表HTML") if
                      filename.endswith(".txt")]
    # 待下载的财务报表
    df = pd.read_csv("./DATA/企业预警通/所有发行债券企业.csv", index_col=0)
    toDownload = []
    for i in df['企业名称']:
        if i not in downloadedList and i not in noData:
            toDownload.append(i)
    # 打印待爬取企业的数量
    print(f'已经保存 {len(downloadedList)} 家企业的财务数据')
    print(f'{len(noData)} 家企业没有财务报表')
    print(f'待爬取 {len(toDownload)} 家企业的财务数据')
    # 开始爬取
    enterprises = [Enterprise(name=name, selenium_browser=browser) for name in toDownload]
    for enterprise in enterprises:
        enterprise.search()
        enterprise.get_code()
        enterprise.get_financial_lists()

In [None]:
getAllEnterpriseFinancialList()

In [None]:
if __name__ == "__main__":
    # 启动selenium浏览器
    browser = webdriver.Edge()
    browser.get("https://www.qcc.com")
    s = input("请登录并关闭弹窗，完成后请按 y\n")
    while s != 'y':
        sleep(1)
        s = input("请登录并关闭弹窗，完成后请按 y\n y")
    getAllEnterpriseFinancialList()

In [55]:
realEstateData = []

In [None]:
# 读取 房地产公司名录
with open("房地产公司名录.txt", 'r', encoding='utf-8') as file:
    realEstateList = [RealEstate(name=i.strip(), selenium_browser=browser) for i in file.readlines()]
for estate in realEstateList[6:]:
    # browser.get("https://www.qcc.com")
    estate.search()
    estate.get_code()
    estate.get_financial_lists()
    estate.get_all_supplier()
    # realEstateData.append(estate.info())
    estate.info(realEstateData)
    sleep(uniform(3, 8))

In [46]:
lenth = len(realEstateData)
for i in range(lenth):
    realEstateData[i]["suppliers"] = list(realEstateData[i]["suppliers"])

In [50]:
import json

with open("./DATA/房地产公司信息.json", 'w', encoding='utf-8') as file:
    json.dump(realEstateData, file, ensure_ascii=False)

In [78]:
with open("./DATA/房地产公司信息.json", 'r', encoding='utf-8') as file:
    data = json.load(file)
suppliers = set()
for item in data:
    suppliers.update(item["suppliers"])
suppliers = list(suppliers)
suppliers = [Enterprise(name=i, selenium_browser=browser) for i in suppliers]

In [79]:
data = []  # 存储供应商信息

In [None]:
for i, supplier in enumerate(suppliers[84:]):
    print(i, end=",")
    supplier.search()
    supplier.get_code()
    supplier.get_financial_lists(path="供应商财务报表/")
    supplier.info(data)

In [84]:
with open("./DATA/供应商公司信息.json", 'w', encoding='utf-8') as file:
    json.dump(data, file, ensure_ascii=False)