In [1]:
import json,sys,logging,os,re,subprocess
from typing import Optional, List, Tuple, Mapping
import numpy as np
from PIL import Image
from onnxruntime import InferenceSession, SessionOptions, GraphOptimizationLevel
import concurrent.futures

os.environ["HF_OFFLINE"] = "1"

def resize(pic: Image.Image, size: int, keep_ratio: float = True) -> Image.Image:
    """按指定要求调整图像的大小"""
    if not keep_ratio:
        target_size = (size, size)
    else:
        min_edge = min(pic.size)
        target_size = (int(pic.size[0] / min_edge * size), int(pic.size[1] / min_edge * size))
    target_size = ((target_size[0] // 4) * 4, (target_size[1] // 4) * 4)
    return pic.resize(target_size, resample=Image.Resampling.BILINEAR)

def to_tensor(pic: Image.Image):
    """张量,调整和归一化"""
    img: np.ndarray = np.array(pic, np.uint8, copy=True)
    img = img.reshape(pic.size[1], pic.size[0], len(pic.getbands()))
    img = img.transpose((2, 0, 1))
    return img.astype(np.float32) / 255

def fill_background(pic: Image.Image, background: str = 'white') -> Image.Image:
    """颜色处理"""
    if pic.mode == 'RGB':
        return pic
    if pic.mode != 'RGBA':
        pic = pic.convert('RGBA')
    background = background or 'white'
    result = Image.new('RGBA', pic.size, background)
    result.paste(pic, (0, 0), pic)
    return result.convert('RGB')

def image_to_tensor(pic: Image.Image, size: int = 512, keep_ratio: float = True, background: str = 'white'):
    return to_tensor(resize(fill_background(pic, background), size, keep_ratio))

def open_onnx_model(ckpt: str, provider: str) -> InferenceSession:
    options = SessionOptions()
    options.graph_optimization_level = GraphOptimizationLevel.ORT_ENABLE_ALL
    logging.info(f'Model {ckpt!r} loaded with provider {provider!r}')
    return InferenceSession(ckpt, options, [provider])

def load_classes(onnx_model_path) -> List[str]:
    classes_file = os.path.join(onnx_model_path, 'classes.json')
    with open(classes_file, 'r', encoding='utf-8') as f:
        return json.load(f)

def get_tags_from_image( pic: Image.Image, onnx_model_path, model, threshold: float = 0.7, size: int = 512, keep_ratio: bool = False):
    real_input = image_to_tensor(pic, size, keep_ratio)
    real_input = real_input.reshape(1, *real_input.shape)

    native_output, = model.run(['output'], {'input': real_input})

    output = (1 / (1 + np.exp(-native_output))).reshape(-1)
    tags = load_classes(onnx_model_path)
    pairs = sorted([(tags[i], ratio) for i, ratio in enumerate(output)], key=lambda x: (-x[1], x[0]))
    del real_input, native_output, output
    return {tag: float(ratio) for tag, ratio in pairs if ratio >= threshold}

def image_to_mldanbooru_tags(filtered_tags, use_spaces: bool, use_escape: bool, include_ranks: bool, score_descend: bool) \
        -> Tuple[str, Mapping[str, float]]:
    text_items = []
    tags_pairs = filtered_tags.items()
    if score_descend:
        tags_pairs = sorted(tags_pairs, key=lambda x: (-x[1], x[0]))
    for tag, score in tags_pairs:
        tag_outformat = tag
        if use_spaces:
            tag_outformat = tag_outformat.replace('_', ' ')
        if use_escape:
            RE_SPECIAL = re.compile(r'([\\()])')
            tag_outformat = re.sub(RE_SPECIAL, r'\\\1', tag_outformat)
        if include_ranks:
            tag_outformat = f"({tag_outformat}:{score:.3f})"
        text_items.append(tag_outformat)
    output_text = ', '.join(text_items)

    return output_text

def process_main(image_path, onnx_model_path, model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend):
    input_image = Image.open(image_path)
    filtered_tags = get_tags_from_image(input_image, onnx_model_path, model, threshold, size, keep_ratio)
    result_text = image_to_mldanbooru_tags(filtered_tags, use_spaces, use_escape, include_ranks, score_descend)
    del image_path, onnx_model_path, model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend, input_image, filtered_tags
    return result_text

def process_image(image_path, onnx_model_path, model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend, output_path=None, extension="ml_danbooru"):
    base_name = os.path.splitext(os.path.basename(image_path))[0]
    if output_path=None:
        output_file_path = os.path.join(os.path.dirname(image_path), f"{base_name}.{extension}")
    else:
        os.makedirs(output_path, exist_ok=True)
        output_file_path = os.path.join(os.path.dirname(image_path), f"{output_path}.{extension}")
    result_text = main(image_path, onnx_model_path, model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend)
    with open(output_file_path, 'w', encoding='utf-8') as output_file:
        output_file.write(result_text)

def process_images(image_paths, onnx_model_path, model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend, output_path=None, batch_size=16, extension="ml_danbooru"):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        for i in range(0, len(image_paths), batch_size):
            batch_paths = image_paths[i:i+batch_size]
            batch_futures = {executor.submit(process_image, path, onnx_model_path, model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend): path for path in batch_paths}
            futures.extend(batch_futures)

        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error processing image: {e}")

def get_image_paths(temporarily_dir, train_data_dir):
    if os.path.isdir(train_data_dir):
        zip_files = [file for file in os.listdir(train_data_dir) if file.endswith('.zip')]
        if len(zip_files) == 1:
            zip_file_path = os.path.join(train_data_dir, zip_files[0])
        else:
            print("zip文件必须有且仅能有一个")
            exit()
    if train_data_dir.endswith('.zip'):
        zip_file_path = train_data_dir
    os.makedirs(temporarily_dir, exist_ok=True)
    unzip_zipfile(zip_file_path, temporarily_dir, password="shiertier")
    base_name = os.path.splitext(os.path.basename(zip_file_path))[0]
    IMAGE_EXTENSIONS = [".png", ".jpg", ".jpeg", ".webp", ".bmp", ".PNG", ".JPG", ".JPEG", ".WEBP", ".BMP"]
    pic_dir = os.path.join(temporarily_dir, base_name)
    image_files = [f for f in os.listdir(pic_dir) if f.endswith(tuple(IMAGE_EXTENSIONS))]
    image_paths = [os.path.join(pic_dir, f) for f in os.listdir(pic_dir) if f.endswith(tuple(IMAGE_EXTENSIONS))]
    return image_paths


def main(args):
    temporarily_dir = args.temporarily_dir
    train_data_dir = args.train_data_dir
    pic_dir = args.pic_path
    onnx_model_path = args.onnx_model_path
    onnx_model_name = args.onnx_model_name
    output_path = args.output_path
    batch_size = args.batch_size
    extension = args.extension
    threshold = args.threshold
    size = args.size
    keep_ratio = args.keep_ratio
    use_spaces = args.use_spaces
    use_escape = args.use_escape
    include_ranks = args.include_ranks
    score_descend = args.score_descend

    if onnx_model_path is None：
        print("请先下载仓库https://huggingface.co/deepghs/ml-danbooru-onnx，并指定存储目录为onnx_model_path")
        sys.exit(1)

    if train_data_dir is not None and pic_path is not None:
        print("错误：不能同时指定 --train_data_dir 和 --pic_path。")
        parser.print_help()
        sys.exit(1)

    if train_data_dir is None and pic_path is None:
        print("错误：--train_data_dir 和 --pic_path 至少需要指定一个。")
        parser.print_help()
        sys.exit(1)

    if train_data_dir is not None:
        if temporarily_dir is None:
            temporarily_dir = os.path.join(os.path.expanduser("~"), "tarin")
            os.makedirs(temporarily_dir, exist_ok=True)
        image_paths = get_image_paths(temporarily_dir, train_data_dir)
        ml_model = open_onnx_model(os.path.join(onnx_model_path, onnx_model_name), "CUDAExecutionProvider")
        process_images(image_paths, onnx_model_path, ml_model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend, output_path, batch_size, extension)

    if pic_dir is not None:
        ml_model = open_onnx_model(os.path.join(onnx_model_path, onnx_model_name), "CUDAExecutionProvider")
        process_image(pic_dir, onnx_model_path, ml_model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend, output_path, extension)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="程序描述")
    parser.add_argument("--temporarily_dir", default=None, help="指定临时目录，当使用压缩包时，图片将解压到这里")
    parser.add_argument("--train_data_dir", default=None, help="指定训练数据目录或目录内的zip文件")
    parser.add_argument("--pic_path", default=None, help="指定单张图片路径")
    parser.add_argument("--onnx_model_path", default=None, help="指定ONNX模型路径")
    parser.add_argument("--onnx_model_name", default="ml_caformer_m36_dec-5-97527.onnx", help="指定ONNX模型名称")
    parser.add_argument("--output_path", default=None, help="指定输出路径")
    parser.add_argument("--batch_size", type=int, default=1, help="指定批处理大小")
    parser.add_argument("--extension", default="ml_danbooru", help="指定文件扩展名")
    parser.add_argument("--threshold", type=float, default=0.68, help="指定阈值")
    parser.add_argument("--size", type=int, default=512， help="指定图片大小")
    parser.add_argument("--keep_ratio", default=True, action="store_true", help="保持图片比例")
    parser.add_argument("--use_spaces", default=True, action="store_true", help="使用空格替代下划线")
    parser.add_argument("--use_escape", default=False, action="store_true", help="在标签中转义斜杠和括号")
    parser.add_argument("--include_ranks", default=False, action="store_true", help="在输出文本中包含排名")
    parser.add_argument("--score_descend", default=True, action="store_true", help="按分数降序排列标签")

    args = parser.parse_args()
    main(args)

In [None]:
def main(args):
    temporarily_dir = args.temporarily_dir
    train_data_dir = args.train_data_dir
    pic_dir = args.pic_path
    onnx_model_path = args.onnx_model_path
    onnx_model_name = args.onnx_model_name
    output_path = args.output_path
    batch_size = args.batch_size
    extension = args.extension
    threshold = args.threshold
    size = args.size
    keep_ratio = args.keep_ratio
    use_spaces = args.use_spaces
    use_escape = args.use_escape
    include_ranks = args.include_ranks
    score_descend = args.score_descend

    if onnx_model_path is None：
        print("请先下载仓库https://huggingface.co/deepghs/ml-danbooru-onnx，并指定存储目录为onnx_model_path")
        sys.exit(1)

    if train_data_dir is not None and pic_path is not None:
        print("错误：不能同时指定 --train_data_dir 和 --pic_path。")
        parser.print_help()
        sys.exit(1)

    if train_data_dir is None and pic_path is None:
        print("错误：--train_data_dir 和 --pic_path 至少需要指定一个。")
        parser.print_help()
        sys.exit(1)

    if train_data_dir is not None:
        if temporarily_dir is None:
            temporarily_dir = os.path.join(os.path.expanduser("~"), "tarin")
            os.makedirs(temporarily_dir, exist_ok=True)
        image_paths = get_image_paths(temporarily_dir, train_data_dir)
        ml_model = open_onnx_model(os.path.join(onnx_model_path, onnx_model_name), "CUDAExecutionProvider")
        process_images(image_paths, onnx_model_path, ml_model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend, output_path, batch_size, extension)

    if pic_dir is not None:
        ml_model = open_onnx_model(os.path.join(onnx_model_path, onnx_model_name), "CUDAExecutionProvider")
        process_image(pic_dir, onnx_model_path, ml_model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend, output_path, extension)



if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="程序描述")
    parser.add_argument("--temporarily_dir", default=None, help="指定临时目录，当使用压缩包时，图片将解压到这里")
    parser.add_argument("--train_data_dir", default=None, help="指定训练数据目录或目录内的zip文件")
    parser.add_argument("--pic_path", default=None, help="指定单张图片路径")
    parser.add_argument("--onnx_model_path", default=None, help="指定ONNX模型路径")
    parser.add_argument("--onnx_model_name", default="ml_caformer_m36_dec-5-97527.onnx", help="指定ONNX模型名称")
    parser.add_argument("--output_path", default=None, help="指定输出路径")
    parser.add_argument("--batch_size", type=int, default=1, help="指定批处理大小")
    parser.add_argument("--extension", default="ml_danbooru", help="指定文件扩展名")
    parser.add_argument("--threshold", type=float, default=0.68, help="指定阈值")
    parser.add_argument("--size", type=int, default=512， help="指定图片大小")
    parser.add_argument("--keep_ratio", default=True, action="store_true", help="保持图片比例")
    parser.add_argument("--use_spaces", default=True, action="store_true", help="使用空格替代下划线")
    parser.add_argument("--use_escape", default=False, action="store_true", help="在标签中转义斜杠和括号")
    parser.add_argument("--include_ranks", default=False, action="store_true", help="在输出文本中包含排名")
    parser.add_argument("--score_descend", default=True, action="store_true", help="按分数降序排列标签")

    args = parser.parse_args()
    main(args)

In [None]:
# 使用示例
image_paths = ["path/to/image1.jpg", "path/to/image2.jpg", ...]
onnx_model_path = "/models/ML-Danbooru"
onnx_model_name = "ml_caformer_m36_dec-5-97527.onnx"
ml_model = open_onnx_model(os.path.join(onnx_model_path, onnx_model_name), "CUDAExecutionProvider")
threshold = 0.7
size = 960 #图片短边长
keep_ratio = True #保持比例
use_spaces = False #使用空格替换下划线
use_escape = True #对标签中的反斜杠和括号进行转义
include_ranks = False #此选项将在输出文本中为每个标签添加排名或分数。排名格式为"tag:score"，并用括号括起来。
score_descend = True #根据其分数的排序顺序
print("加载ml模型")
ml_model = open_onnx_model(os.path.join(onnx_model_path, onnx_model_name), "CUDAExecutionProvider")
print("ml模型加载成功")
process_images(image_paths, onnx_model_path, ml_model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend, batch_size=16)


In [None]:
#加载ML-Danbooru模型
onnx_model_path = "/models/ML-Danbooru"
onnx_model_name = "ml_caformer_m36_dec-5-97527.onnx"
threshold = 0.7
size = 960 #图片短边长
keep_ratio = True #保持比例
use_spaces = False #使用空格替换下划线
use_escape = True #对标签中的反斜杠和括号进行转义
include_ranks = False #此选项将在输出文本中为每个标签添加排名或分数。排名格式为"tag:score"，并用括号括起来。
score_descend = True #根据其分数的排序顺序
print("加载ml模型")
ml_model = open_onnx_model(os.path.join(onnx_model_path, onnx_model_name), "CUDAExecutionProvider")
print("ml模型加载成功")

In [None]:
#使用ML-Danbooru模型
image_path = "/gemini/code/3.png"
result_text = main(image_path, onnx_model_path, ml_model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend)
print(result_text)

In [None]:
!pip install flask

In [None]:
#加载ShareGPT4V-13B
import os
import json
import time
import torch
from PIL import Image
import concurrent.futures
from flask import Flask, request, jsonify
from llava.model.builder import load_pretrained_model
from llava.conversation import conv_templates, SeparatorStyle
from llava.mm_utils import process_images, tokenizer_image_token, KeywordsStoppingCriteria

os.environ["HF_OFFLINE"] = "1"

model_path = "/models/ShareGPT4V-13B"
'''加载模型'''
setattr(torch.nn.Linear, "reset_parameters", lambda self: None)
setattr(torch.nn.LayerNorm, "reset_parameters", lambda self: None)
tokenizer, model, image_processor, context_len = load_pretrained_model(model_path, None, "llava-v1.5-7b", False, False)

def get_image_paths(directory):
    image_paths = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".jpg") or file.endswith(".jpeg") or file.endswith(".png") or file.endswith(".webp") or file.endswith(".gif"):
                image_path = os.path.join(root, file)
                image_paths.append(image_path)
                del image_path
    return image_paths

def get_responce(prompt_int, image_file, temperature=0.7, top_p=0.4, max_new_tokens=512, num_beams=1):
    '''输入内容、图片来获取输出'''
    prompt_use = "A chat between a curious human and an artificial intelligence assistant, the content of the chat and it should be accurate. \nUSER: <image>\n"
    prompt = prompt_use + prompt_int + "\nASSISTANT:"
    image = Image.open(image_file)
    images = [image.convert("RGB")]
    images_tensor = process_images(images, image_processor, model.config).to(model.device, dtype=torch.float16)
    input_ids = (tokenizer_image_token(prompt, tokenizer, -200, return_tensors="pt").unsqueeze(0).cuda())
    keywords = ["</s>"]
    stopping_criteria = KeywordsStoppingCriteria(keywords, tokenizer, input_ids)
    with torch.inference_mode():
        output_ids = model.generate(input_ids, images=images_tensor, do_sample=True if temperature > 0 else False, temperature=temperature, top_p=top_p, num_beams=num_beams, max_new_tokens=max_new_tokens, use_cache=True, stopping_criteria=[stopping_criteria])
    input_token_len = input_ids.shape[1]
    outputs = tokenizer.batch_decode(output_ids[:, input_token_len:], skip_special_tokens=True)[0]
    outputs = outputs.strip()
    if outputs.endswith("</s>"):
        outputs = outputs[: -len("</s>")]
    del prompt_use,prompt,image,images,output_ids,input_token_len,images_tensor,input_ids,stopping_criteria
    return outputs

def process_text(outputs):
    """去除重复的句子并拼合成一整段"""
    text = ' '.join(outputs.splitlines()).strip()
    sentences = text.split('.')
    unique_sentences = []
    for sentence in sentences:
        if sentence.strip() not in unique_sentences:
            unique_sentences.append(sentence.strip())
    final_text = '.'.join(unique_sentences)
    del text,sentences,unique_sentences,outputs
    return final_text

def process_savetxt(output, image_file_path):
    """输入图片获取返回json"""
    file_name = os.path.splitext(os.path.basename(image_file_path))[0]
    txt_file_path = os.path.join(os.path.dirname(image_file_path), file_name + ".txt")
    with open(txt_file_path, 'w') as f:
        f.write(process_text(output))
    del output,file_name,txt_file_path,image_file_path

def process_request(output):
    response_json = {"output": output}
    del output
    return response_json




In [None]:
prompt_int = 
temperature = 0.7


In [None]:
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/api', methods=['POST'])
def handle_request():
    prompt_default = "tell me about"
    image_save_path = "/gemini/code/image"
    os.makedirs(image_save_path, exist_ok=True)
    prompt_int = request.form.get('prompt_int', prompt_default)
    image_file = request.files['image']
    temperature = float(request.form.get('temperature', '0.7'))
    top_p = float(request.form.get('top_p', '0.4'))
    max_new_tokens = int(request.form.get('max_new_tokens', '512'))
    num_beams = 1
    if 'image' not in request.files:
        return jsonify({'error': '你需要发送一个图片'})
    image_file_dir = os.path.join(image_save_path, f"{time.time()}_input.jpg")
    image_file.save(image_file_dir)
    executor = concurrent.futures.ThreadPoolExecutor()
    result_text = main(image_file_dir, onnx_model_path, ml_model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend)
    prompt_int2 = "The main character in the picture is "+prompt_int+", The name of the role must be mentioned in the answer. This is the result of marking through the marker:"+result_text+". For all tags, they must be used when describing the image (synonyms can be used). Then describe the picture."
    print(prompt_int2)
    """
    future2 = executor.submit(get_responce, prompt_int2, image_file_dir, temperature, top_p, max_new_tokens, num_beams)
    response_json = process_request(future2.result())
    """
    future = get_responce(prompt_int2, image_file_dir, temperature, top_p, max_new_tokens, num_beams)
    print(future)
    response_json = process_request(future)
    return jsonify(response_json)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

In [None]:
image_file_dir = "/gemini/code/image/1703243535.1733773_input.jpg"
result_text = main(image_file_dir, onnx_model_path, model, threshold, size, keep_ratio, use_spaces, use_escape, include_ranks, score_descend)

In [None]:
#对单张图片进行处理，返回txt
result = "1girl, red_eyes, solo, official_alternate_costume, red_dress, silver_hair, white_background, dress, looking_at_viewer, long_sleeves, long_hair, simple_background, aqua_headwear, hat, breasts, bangs, nail_polish, parted_lips, medium_breasts, bare_shoulders, hair_over_one_eye, black_neckwear, off_shoulder, clothing_cutout, ascot, detached_sleeves, upper_body, black_headwear, holding, artist_name, hand_up, wide_sleeves, weibo_username, water, twitter_username, sitting"
prompt_int = "This is the result of marking through the marker:"+result+". For all tags, they must be used when describing the image (synonyms can be used). Then describe the picture."
#prompt_int = "describe the picture."
image_file_path = "/gemini/code/3.png"
get_responce(prompt_int, image_file_path, temperature=0.7, top_p=0.4, max_new_tokens=512, num_beams=1)

In [3]:
!ls -a /root/.cache/huggingface/hub/models--Lin-Chen--ShareGPT4V-13B_Pretrained_vit-large336-l12/snapshots/738faab4f9b1f76d62408d5ea7f36d55f5e55464

.  ..  config.json  preprocessor_config.json  pytorch_model.bin


In [1]:
!cp /gemini/code/preprocessor_config.json /root/.cache/huggingface/hub/models--Lin-Chen--ShareGPT4V-13B_Pretrained_vit-large336-l12/snapshots/738faab4f9b1f76d62408d5ea7f36d55f5e55464

In [None]:
image_paths = get_image_paths(image_directory)
output = get_responce(prompt_int, image_file_path, temperature=0.7, top_p=0.4, max_new_tokens=512, num_beams=1)
#process_savetxt(output, image_file_path)
print(output)