Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【功能建议】批量功能 #28

Open
ghost opened this issue Oct 2, 2023 · 1 comment
Open

【功能建议】批量功能 #28

ghost opened this issue Oct 2, 2023 · 1 comment
Labels
enhancement New feature or request

Comments

@ghost
Copy link

ghost commented Oct 2, 2023

拿cursor改了几个版本,有用到的地方库主可以参考下

# -*- coding: utf-8 -*-
"""
ICP-Checker.py
日期:2023-10-01
作者:soapffz
改编自:https://github.com/wongzeon/ICP-Checker

此脚本用于批量查询域名的备案信息。它首先获取必要的Cookie和Token,然后对输入的域名进行查询。查询结果包括备案信息和不支持备案的域名。

主要功能如下:
1. 批量查询:支持从文件中读取域名进行批量查询。
2. 备案信息获取:对每个域名,获取其备案信息,包括域名主办方、域名、备案许可证号、网站备案号、域名类型、网站前置审批项、是否限制接入、审核通过日期等。
3. 不支持备案的域名:对于不支持备案的域名,会打印出相应的提示信息。
4. 查询间隔:在批量查询中,每次查询之间有10秒的间隔,以防止频繁查询导致的问题。

注意:此脚本需要在Python 3环境下运行,并需要安装requests和tqdm等第三方库。
"""

import re
import os
import cv2
import time
import base64
import hashlib
import requests
import openpyxl as xl
from openpyxl.styles import Alignment
import argparse
import sys
import logging
from tqdm import tqdm
import os
import subprocess
import logging

# 创建一个handler,用于写入日志文件
handler = logging.StreamHandler(sys.stdout)

# 再创建一个handler,用于输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)

# 定义handler的输出格式
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
console.setFormatter(formatter)

# 给logger添加handler
logging.getLogger("").addHandler(handler)
logging.getLogger("").addHandler(console)

# 设置日志格式
logging.basicConfig(level=logging.INFO, format="%(message)s")

os.environ["no_proxy"] = "*"

# 添加命令行参数解析
arg_parser = argparse.ArgumentParser(
    description="Check ICP for a domain or a list of domains from a file."
)
arg_parser.add_argument(
    "input", help="The domain or the file containing a list of domains."
)
args = arg_parser.parse_args()

# 设置保存路径
output_directory = "./outs/"
if not os.path.exists(output_directory):
    os.makedirs(output_directory)

# 使用requests.Session
http_session = requests.Session()


class CustomException(Exception):
    pass


def send_request(
    url, method="get", headers=None, data=None, json=None, timeout=(3.06, 27)
):
    try:
        response = requests.request(
            method, url, headers=headers, data=data, json=json, timeout=timeout
        )
        return response
    except requests.RequestException as e:
        raise CustomException(f"请求失败: {e}")


def retrieve_cookies():
    cookie_headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36 Edg/101.0.1210.32"
    }
    err_num = 0
    while err_num < 3:
        response = send_request("https://beian.miit.gov.cn/", headers=cookie_headers)
        try:
            cookie = requests.utils.dict_from_cookiejar(response.cookies)["__jsluid_s"]
            return cookie
        except KeyError:
            err_num += 1
            time.sleep(3)
    raise CustomException("获取Cookie失败,请重试!")


def retrieve_token():
    timeStamp = round(time.time() * 1000)
    authSecret = "testtest" + str(timeStamp)
    authKey = hashlib.md5(authSecret.encode(encoding="UTF-8")).hexdigest()
    auth_data = {"authKey": authKey, "timeStamp": timeStamp}
    url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth"
    try:
        t_response = requests.post(
            url=url, data=auth_data, headers=base_header, timeout=(3.06, 27)
        ).json()
        token = t_response["params"]["bussiness"]
    except:
        return -1
    return token


def retrieve_check_pic(token):
    url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImage"
    base_header["Accept"] = "application/json, text/plain, */*"
    base_header.update({"Content-Length": "0", "token": token})
    try:
        p_request = requests.post(
            url=url, data="", headers=base_header, timeout=(3.06, 27)
        ).json()
        p_uuid = p_request["params"]["uuid"]
        big_image = p_request["params"]["bigImage"]
        small_image = p_request["params"]["smallImage"]
    except:
        return -1
    # 解码图片,写入并计算图片缺口位置
    with open("bigImage.jpg", "wb") as f:
        f.write(base64.b64decode(big_image))
    with open("smallImage.jpg", "wb") as f:
        f.write(base64.b64decode(small_image))
    background_image = cv2.imread("bigImage.jpg", cv2.COLOR_GRAY2RGB)
    fill_image = cv2.imread("smallImage.jpg", cv2.COLOR_GRAY2RGB)
    position_match = cv2.matchTemplate(
        background_image, fill_image, cv2.TM_CCOEFF_NORMED
    )
    max_loc = cv2.minMaxLoc(position_match)[3][0]
    mouse_length = max_loc + 1
    os.remove("bigImage.jpg")
    os.remove("smallImage.jpg")
    check_data = {"key": p_uuid, "value": mouse_length}
    return check_data


def retrieve_sign(check_data, token):
    check_url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage"
    base_header.update(
        {"Content-Length": "60", "token": token, "Content-Type": "application/json"}
    )
    try:
        pic_sign = requests.post(
            check_url, json=check_data, headers=base_header, timeout=(3.06, 27)
        ).json()
        sign = pic_sign["params"]
    except:
        return -1
    return sign


def query_base(info):
    # 过滤空值和特殊字符,只允许 - —《》. () 分别用于域名和公司名
    try:
        if info == "":
            raise ValueError("InputNone")
        info = re.sub("[^\\u4e00-\\u9fa5-A-Za-z0-9,-.()《》—()]", "", info)
        info = (
            info.replace(" ", "")
            .replace("https://www.", "")
            .replace("http://www.", "")
            .replace("http://", "")
        )
        input_zh = re.compile("[\u4e00-\u9fa5]")
        zh_match = input_zh.search(info)
        if zh_match:
            info_result = info
        else:
            # 检测是否为可备案的域名类型(类型同步日期2022/01/06)
            input_url = re.compile(
                r"([^.]+)(?:\.(?:GOV\.cn|ORG\.cn|AC\.cn|MIL\.cn|NET\.cn|EDU\.cn|COM\.cn|BJ\.cn|TJ\.cn|SH\.cn|CQ\.cn|HE\.cn|SX\.cn|NM\.cn|LN\.cn|JL\.cn|HL\.cn|JS\.cn|ZJ\.cn|AH\.cn|FJ\.cn|JX\.cn|SD\.cn|HA\.cn|HB\.cn|HN\.cn|GD\.cn|GX\.cn|HI\.cn|SC\.cn|GZ\.cn|YN\.cn|XZ\.cn|SN\.cn|GS\.cn|QH\.cn|NX\.cn|XJ\.cn|TW\.cn|HK\.cn|MO\.cn|cn|REN|WANG|CITIC|TOP|SOHU|XIN|COM|NET|CLUB|XYZ|VIP|SITE|SHOP|INK|INFO|MOBI|RED|PRO|KIM|LTD|GROUP|BIZ|AUTO|LINK|WORK|LAW|BEER|STORE|TECH|FUN|ONLINE|ART|DESIGN|WIKI|LOVE|CENTER|VIDEO|SOCIAL|TEAM|SHOW|COOL|ZONE|WORLD|TODAY|CITY|CHAT|COMPANY|LIVE|FUND|GOLD|PLUS|GURU|RUN|PUB|EMAIL|LIFE|CO|FASHION|FIT|LUXE|YOGA|BAIDU|CLOUD|HOST|SPACE|PRESS|WEBSITE|ARCHI|ASIA|BIO|BLACK|BLUE|GREEN|LOTTO|ORGANIC|PET|PINK|POKER|PROMO|SKI|VOTE|VOTO|ICU|LA))",
                flags=re.IGNORECASE,
            )
            info_result = input_url.search(info)
            if info_result is None:
                if info.split(".")[0] == "":
                    raise ValueError("OnlyDomainInput")
                raise ValueError("ValidType")
            else:
                info_result = info_result.group()
        info_data = {
            "pageNum": "1",
            "pageSize": "40",
            "serviceType": 1,
            "unitName": info_result,
        }
        return info_data
    except ValueError as e:
        if str(e) == "InputNone" or str(e) == "OnlyDomainInput":
            logging.error(f"[-] 请正确输入域名: {info}")
        else:
            logging.error(f"[-] {info} 不支持备案")


def retrieve_beian_info(info_data, p_uuid, token, sign):
    global base_header
    domain_list = []
    info_url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition"
    base_header.update(
        {"Content-Length": "78", "uuid": p_uuid, "token": token, "sign": sign}
    )
    max_retries = 3
    for _ in range(max_retries):
        try:
            beian_info = requests.post(
                url=info_url, json=info_data, headers=base_header, timeout=(3.06, 27)
            ).json()
            if not beian_info["success"]:
                if beian_info["code"] in [401, 429]:
                    # 如果遇到401或429错误,重新获取COOKIE和token
                    logging.info("[+] 正在获取Cookie,请稍等……")
                    cookie = retrieve_cookies()
                    base_header = {
                        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36 Edg/101.0.1210.32",
                        "Origin": "https://beian.miit.gov.cn",
                        "Referer": "https://beian.miit.gov.cn/",
                        "Cookie": f"__jsluid_s={cookie}",
                    }
                    if cookie != -1:
                        token = retrieve_token()
                        if token == -1:
                            raise CustomException("获取Token失败")
                        logging.info("[+] Retrieving Token, please wait……")
                        if token != -1:
                            logging.info("[+] Token retrieved, querying, please wait……")
                            check_data = retrieve_check_pic(token)
                            if check_data != -1:
                                sign = retrieve_sign(check_data, token)
                                p_uuid = check_data["key"]
                                if sign != -1:
                                    base_header.update(
                                        {
                                            "Content-Length": "78",
                                            "uuid": p_uuid,
                                            "token": token,
                                            "sign": sign,
                                        }
                                    )
                                    continue
                logging.error(
                    f'[-] 请求错误: CODE {beian_info["code"]} MSG {beian_info["msg"]}'
                )
                return domain_list
            # 如果请求成功,处理数据并退出循环
            # ...(省略处理数据的代码)
            break
        except Exception as e:
            logging.error(f"[-] 意外错误: {e}")
            return domain_list
    return domain_list


def save_data(domain_list):
    """
    打印最终结果,并保存数据至Excel表格,同时调整表格格式。
    """
    # 计算需要写入表格的总行数,如果是空列表,即代表该域名没有备案信息,也有可能是获取信息失败了
    total_row = len(domain_list)
    if total_row == 1:
        total_row = 0
    elif total_row == 0:
        logging.info("[!] 所查域名无备案")
        return
    logging.info(f"[+] 查询结果如下:\n\n{domain_list}")
    # 将表格保存到当前目录的outs文件夹下
    file_path = os.path.join(output_directory, "备案信息.xlsx")
    # 存在对应文件,则读取表格追加写入,不存在则创建,并设置表格的标题、列宽、冻结窗格、文字布局等格式
    if os.path.exists(file_path):
        wb = xl.load_workbook(file_path)
        ws = wb["备案信息"]
        max_row = ws.max_row
        start = max_row + 1
        total_row = total_row + start
        after_title = 0
    else:
        wb = xl.Workbook()
        ws = wb.active
        ws.title = "备案信息"
        title_list = [
            "域名主办方",
            "域名",
            "备案许可证号",
            "网站备案号",
            "域名类型",
            "网站前置审批项",
            "是否限制接入",
            "审核通过日期",
        ]
        for i in range(0, 8):
            ws.cell(1, i + 1).value = title_list[i]
        col_width = {
            "A": 45,
            "B": 40,
            "C": 22,
            "D": 24,
            "E": 9,
            "F": 15,
            "G": 13,
            "H": 21,
        }
        for k, v in col_width.items():
            ws.column_dimensions[k].width = v
        ws.freeze_panes = "A2"
        start = 0
        after_title = 2
    # 写入查询数据
    for j in range(start, total_row + 1):
        for k in range(0, 8):
            try:
                ws.cell(j + after_title, k + 1).value = domain_list[j - start][k]
            except:
                continue
    # 垂直居中
    for row in range(ws.max_row):
        for col in range(ws.max_column):
            ws.cell(row + 1, col + 1).alignment = Alignment(
                horizontal="center", vertical="center"
            )
    try:
        wb.save(file_path)
    except PermissionError:
        logging.error("[!] 备案信息登记表格已打开,无法写入文件。如需写入,请关闭文件后重新执行!")
        return -1
    logging.info(f"[+] 查询结果保存在:{file_path}")
    return "OK"


import time


def main(input):
    try:
        query_count = 0  # 添加计数器
        while True:  # 添加一个无限循环
            if query_count % 20 == 0:  # 每20次查询后重新生成COOKIE和token
                logging.info("[+] 正在获取Cookie,请稍等……")
                cookie = retrieve_cookies()
                global base_header
                base_header = {
                    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36 Edg/101.0.1210.32",
                    "Origin": "https://beian.miit.gov.cn",
                    "Referer": "https://beian.miit.gov.cn/",
                    "Cookie": f"__jsluid_s={cookie}",
                }
                if cookie != -1:
                    token = retrieve_token()
                    logging.info("[+] Retrieving Token, please wait……")
                    if token != -1:
                        logging.info("[+] Token retrieved, querying, please wait……")
                        check_data = retrieve_check_pic(token)
                        if check_data != -1:
                            sign = retrieve_sign(check_data, token)
                            p_uuid = check_data["key"]
                            if sign != -1:
                                # If input is a file, perform batch check
                                if os.path.isfile(input):
                                    # 获取文件行数作为进度条的总进度
                                    with open(input) as f:
                                        total = sum(1 for _ in f)

                                    with open(input) as f, tqdm(
                                        total=total, ncols=70, position=0, leave=True
                                    ) as pbar:
                                        for line in f:
                                            domain = line.strip()
                                            logging.info(f"\n[+] 正在查询 {domain} ……")
                                            info = query_base(domain)
                                            domain_list = retrieve_beian_info(
                                                info, p_uuid, token, sign
                                            )
                                            if domain_list:
                                                logging.info(
                                                    f"\n{domain} 备案信息为:\n{domain_list}"
                                                )
                                            else:
                                                logging.info(f"\n{domain} 不支持备案")
                                            save_data(domain_list)
                                            pbar.update()
                                            time.sleep(8)  # 设置间隔时间
                                            query_count += 1  # 每次查询后增加计数器的值
                                else:
                                    domain = input
                                    info = query_base(domain)
                                    domain_list = retrieve_beian_info(
                                        info, p_uuid, token, sign
                                    )
                                    save_data(domain_list)
                                    query_count += 1  # 每次查询后增加计数器的值
    except CustomException as e:
        logging.error(f"[-] {e}\n")


if __name__ == "__main__":
    main(args.input)

检测到401重新生成cookie和token,就是429屏蔽设置8秒或10秒间隔都会被长时间封禁

还改了一个加代理池的版本,但是用代理IP会有SSL的问题

@wongzeon
Copy link
Owner

wongzeon commented Oct 2, 2023

感谢,这个写得不错👍

@wongzeon wongzeon added the enhancement New feature or request label Oct 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant