diff --git a/invokeai/backend/web/__init__.py b/invokeai/backend/web/__init__.py deleted file mode 100644 index c57600f72b7..00000000000 --- a/invokeai/backend/web/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -""" -Initialization file for the web backend. -""" -from .invoke_ai_web_server import InvokeAIWebServer diff --git a/invokeai/backend/web/invoke_ai_web_server.py b/invokeai/backend/web/invoke_ai_web_server.py deleted file mode 100644 index 88eb77551f6..00000000000 --- a/invokeai/backend/web/invoke_ai_web_server.py +++ /dev/null @@ -1,1654 +0,0 @@ -import base64 -import glob -import io -import json -import math -import mimetypes -import os -import shutil -import traceback -from pathlib import Path -from threading import Event -from uuid import uuid4 - -import eventlet -from compel.prompt_parser import Blend -from flask import Flask, make_response, redirect, request, send_from_directory -from flask_socketio import SocketIO -from PIL import Image -from PIL.Image import Image as ImageType -from werkzeug.utils import secure_filename - -import invokeai.backend.util.logging as logger -import invokeai.frontend.web.dist as frontend - -from .. import Generate -from ..args import APP_ID, APP_VERSION, Args, calculate_init_img_hash -from ..generator import infill_methods -from ..globals import Globals, global_converted_ckpts_dir, global_models_dir -from ..image_util import PngWriter, retrieve_metadata -from ...frontend.merge.merge_diffusers import merge_diffusion_models -from ..prompting import ( - get_prompt_structure, - get_tokens_for_prompt_object, -) -from ..stable_diffusion import PipelineIntermediateState -from .modules.get_canvas_generation_mode import get_canvas_generation_mode -from .modules.parameters import parameters_to_command - -# Loading Arguments -opt = Args() -args = opt.parse_args() - -# Set the root directory for static files and relative paths -args.root_dir = os.path.expanduser(args.root_dir or "..") -if not os.path.isabs(args.outdir): - args.outdir = os.path.join(args.root_dir, args.outdir) - -# normalize the config directory relative to root -if not os.path.isabs(opt.conf): - opt.conf = os.path.normpath(os.path.join(Globals.root, opt.conf)) - - -class InvokeAIWebServer: - def __init__(self, generate: Generate, gfpgan, codeformer, esrgan) -> None: - self.host = args.host - self.port = args.port - - self.generate = generate - self.gfpgan = gfpgan - self.codeformer = codeformer - self.esrgan = esrgan - - self.canceled = Event() - self.ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg"} - - def allowed_file(self, filename: str) -> bool: - return "." in filename and filename.rsplit(".", 1)[1].lower() in self.ALLOWED_EXTENSIONS - - def run(self): - self.setup_app() - self.setup_flask() - - def setup_flask(self): - # Fix missing mimetypes on Windows - mimetypes.add_type("application/javascript", ".js") - mimetypes.add_type("text/css", ".css") - # Socket IO - engineio_logger = True if args.web_verbose else False - max_http_buffer_size = 10000000 - - socketio_args = { - "logger": logger, - "engineio_logger": engineio_logger, - "max_http_buffer_size": max_http_buffer_size, - "ping_interval": (50, 50), - "ping_timeout": 60, - } - - if opt.cors: - _cors = opt.cors - # convert list back into comma-separated string, - # be defensive here, not sure in what form this arrives - if isinstance(_cors, list): - _cors = ",".join(_cors) - if "," in _cors: - _cors = _cors.split(",") - socketio_args["cors_allowed_origins"] = _cors - - self.app = Flask(__name__, static_url_path="", static_folder=frontend.__path__[0]) - - self.socketio = SocketIO(self.app, **socketio_args) - - # Keep Server Alive Route - @self.app.route("/flaskwebgui-keep-server-alive") - def keep_alive(): - return {"message": "Server Running"} - - # Outputs Route - self.app.config["OUTPUTS_FOLDER"] = os.path.abspath(args.outdir) - - @self.app.route("/outputs/") - def outputs(file_path): - return send_from_directory(self.app.config["OUTPUTS_FOLDER"], file_path) - - # Base Route - @self.app.route("/") - def serve(): - if args.web_develop: - return redirect("http://127.0.0.1:5173") - else: - return send_from_directory(self.app.static_folder, "index.html") - - @self.app.route("/upload", methods=["POST"]) - def upload(): - try: - data = json.loads(request.form["data"]) - filename = "" - # check if the post request has the file part - if "file" in request.files: - file = request.files["file"] - # If the user does not select a file, the browser submits an - # empty file without a filename. - if file.filename == "": - return make_response("No file selected", 400) - filename = file.filename - elif "dataURL" in data: - file = dataURL_to_bytes(data["dataURL"]) - if "filename" not in data or data["filename"] == "": - return make_response("No filename provided", 400) - filename = data["filename"] - else: - return make_response("No file or dataURL", 400) - - kind = data["kind"] - - if kind == "init": - path = self.init_image_path - elif kind == "temp": - path = self.temp_image_path - elif kind == "result": - path = self.result_path - elif kind == "mask": - path = self.mask_image_path - else: - return make_response(f"Invalid upload kind: {kind}", 400) - - if not self.allowed_file(filename): - return make_response( - f'Invalid file type, must be one of: {", ".join(self.ALLOWED_EXTENSIONS)}', - 400, - ) - - secured_filename = secure_filename(filename) - - uuid = uuid4().hex - truncated_uuid = uuid[:8] - - split = os.path.splitext(secured_filename) - name = f"{split[0]}.{truncated_uuid}{split[1]}" - - file_path = os.path.join(path, name) - - if "dataURL" in data: - with open(file_path, "wb") as f: - f.write(file) - else: - file.save(file_path) - - mtime = os.path.getmtime(file_path) - - pil_image = Image.open(file_path) - - if "cropVisible" in data and data["cropVisible"] == True: - visible_image_bbox = pil_image.getbbox() - pil_image = pil_image.crop(visible_image_bbox) - pil_image.save(file_path) - - (width, height) = pil_image.size - - thumbnail_path = save_thumbnail(pil_image, os.path.basename(file_path), self.thumbnail_image_path) - - response = { - "url": self.get_url_from_image_path(file_path), - "thumbnail": self.get_url_from_image_path(thumbnail_path), - "mtime": mtime, - "width": width, - "height": height, - } - - return make_response(response, 200) - - except Exception as e: - self.handle_exceptions(e) - return make_response("Error uploading file", 500) - - self.load_socketio_listeners(self.socketio) - - if args.gui: - logger.info("Launching Invoke AI GUI") - try: - from flaskwebgui import FlaskUI - - FlaskUI( - app=self.app, - socketio=self.socketio, - server="flask_socketio", - width=1600, - height=1000, - port=self.port, - ).run() - except KeyboardInterrupt: - import sys - - sys.exit(0) - else: - useSSL = args.certfile or args.keyfile - logger.info("Started Invoke AI Web Server") - if self.host == "0.0.0.0": - logger.info( - f"Point your browser at http{'s' if useSSL else ''}://localhost:{self.port} or use the host's DNS name or IP address." - ) - else: - logger.info("Default host address now 127.0.0.1 (localhost). Use --host 0.0.0.0 to bind any address.") - logger.info(f"Point your browser at http{'s' if useSSL else ''}://{self.host}:{self.port}") - if not useSSL: - self.socketio.run(app=self.app, host=self.host, port=self.port) - else: - self.socketio.run( - app=self.app, - host=self.host, - port=self.port, - certfile=args.certfile, - keyfile=args.keyfile, - ) - - def setup_app(self): - self.result_url = "outputs/" - self.init_image_url = "outputs/init-images/" - self.mask_image_url = "outputs/mask-images/" - self.intermediate_url = "outputs/intermediates/" - self.temp_image_url = "outputs/temp-images/" - self.thumbnail_image_url = "outputs/thumbnails/" - # location for "finished" images - self.result_path = args.outdir - # temporary path for intermediates - self.intermediate_path = os.path.join(self.result_path, "intermediates/") - # path for user-uploaded init images and masks - self.init_image_path = os.path.join(self.result_path, "init-images/") - self.mask_image_path = os.path.join(self.result_path, "mask-images/") - # path for temp images e.g. gallery generations which are not committed - self.temp_image_path = os.path.join(self.result_path, "temp-images/") - # path for thumbnail images - self.thumbnail_image_path = os.path.join(self.result_path, "thumbnails/") - # txt log - self.log_path = os.path.join(self.result_path, "invoke_logger.txt") - # make all output paths - [ - os.makedirs(path, exist_ok=True) - for path in [ - self.result_path, - self.intermediate_path, - self.init_image_path, - self.mask_image_path, - self.temp_image_path, - self.thumbnail_image_path, - ] - ] - - def load_socketio_listeners(self, socketio): - @socketio.on("requestSystemConfig") - def handle_request_capabilities(): - logger.info("System config requested") - config = self.get_system_config() - config["model_list"] = self.generate.model_manager.list_models() - config["infill_methods"] = infill_methods() - socketio.emit("systemConfig", config) - - @socketio.on("searchForModels") - def handle_search_models(search_folder: str): - try: - if not search_folder: - socketio.emit( - "foundModels", - {"search_folder": None, "found_models": None}, - ) - else: - ( - search_folder, - found_models, - ) = self.generate.model_manager.search_models(search_folder) - socketio.emit( - "foundModels", - {"search_folder": search_folder, "found_models": found_models}, - ) - except Exception as e: - self.handle_exceptions(e) - print("\n") - - @socketio.on("addNewModel") - def handle_add_model(new_model_config: dict): - try: - model_name = new_model_config["name"] - del new_model_config["name"] - model_attributes = new_model_config - if len(model_attributes["vae"]) == 0: - del model_attributes["vae"] - update = False - current_model_list = self.generate.model_manager.list_models() - if model_name in current_model_list: - update = True - - logger.info(f"Adding New Model: {model_name}") - - self.generate.model_manager.add_model( - model_name=model_name, - model_attributes=model_attributes, - clobber=True, - ) - self.generate.model_manager.commit(opt.conf) - - new_model_list = self.generate.model_manager.list_models() - socketio.emit( - "newModelAdded", - { - "new_model_name": model_name, - "model_list": new_model_list, - "update": update, - }, - ) - logger.info(f"New Model Added: {model_name}") - except Exception as e: - self.handle_exceptions(e) - - @socketio.on("deleteModel") - def handle_delete_model(model_name: str): - try: - logger.info(f"Deleting Model: {model_name}") - self.generate.model_manager.del_model(model_name) - self.generate.model_manager.commit(opt.conf) - updated_model_list = self.generate.model_manager.list_models() - socketio.emit( - "modelDeleted", - { - "deleted_model_name": model_name, - "model_list": updated_model_list, - }, - ) - logger.info(f"Model Deleted: {model_name}") - except Exception as e: - self.handle_exceptions(e) - - @socketio.on("requestModelChange") - def handle_set_model(model_name: str): - try: - logger.info(f"Model change requested: {model_name}") - model = self.generate.set_model(model_name) - model_list = self.generate.model_manager.list_models() - if model is None: - socketio.emit( - "modelChangeFailed", - {"model_name": model_name, "model_list": model_list}, - ) - else: - socketio.emit( - "modelChanged", - {"model_name": model_name, "model_list": model_list}, - ) - except Exception as e: - self.handle_exceptions(e) - - @socketio.on("convertToDiffusers") - def convert_to_diffusers(model_to_convert: dict): - try: - if model_info := self.generate.model_manager.model_info(model_name=model_to_convert["model_name"]): - if "weights" in model_info: - ckpt_path = Path(model_info["weights"]) - original_config_file = Path(model_info["config"]) - model_name = model_to_convert["model_name"] - model_description = model_info["description"] - else: - self.socketio.emit("error", {"message": "Model is not a valid checkpoint file"}) - else: - self.socketio.emit("error", {"message": "Could not retrieve model info."}) - - if not ckpt_path.is_absolute(): - ckpt_path = Path(Globals.root, ckpt_path) - - if original_config_file and not original_config_file.is_absolute(): - original_config_file = Path(Globals.root, original_config_file) - - diffusers_path = Path(ckpt_path.parent.absolute(), f"{model_name}_diffusers") - - if model_to_convert["save_location"] == "root": - diffusers_path = Path(global_converted_ckpts_dir(), f"{model_name}_diffusers") - - if model_to_convert["save_location"] == "custom" and model_to_convert["custom_location"] is not None: - diffusers_path = Path(model_to_convert["custom_location"], f"{model_name}_diffusers") - - if diffusers_path.exists(): - shutil.rmtree(diffusers_path) - - self.generate.model_manager.convert_and_import( - ckpt_path, - diffusers_path, - model_name=model_name, - model_description=model_description, - vae=None, - original_config_file=original_config_file, - commit_to_conf=opt.conf, - ) - - new_model_list = self.generate.model_manager.list_models() - socketio.emit( - "modelConverted", - { - "new_model_name": model_name, - "model_list": new_model_list, - "update": True, - }, - ) - logger.info(f"Model Converted: {model_name}") - except Exception as e: - self.handle_exceptions(e) - - @socketio.on("mergeDiffusersModels") - def merge_diffusers_models(model_merge_info: dict): - try: - models_to_merge = model_merge_info["models_to_merge"] - model_ids_or_paths = [self.generate.model_manager.model_name_or_path(x) for x in models_to_merge] - merged_pipe = merge_diffusion_models( - model_ids_or_paths, - model_merge_info["alpha"], - model_merge_info["interp"], - model_merge_info["force"], - ) - - dump_path = global_models_dir() / "merged_models" - if model_merge_info["model_merge_save_path"] is not None: - dump_path = Path(model_merge_info["model_merge_save_path"]) - - os.makedirs(dump_path, exist_ok=True) - dump_path = dump_path / model_merge_info["merged_model_name"] - merged_pipe.save_pretrained(dump_path, safe_serialization=1) - - merged_model_config = dict( - model_name=model_merge_info["merged_model_name"], - description=f'Merge of models {", ".join(models_to_merge)}', - commit_to_conf=opt.conf, - ) - - if vae := self.generate.model_manager.config[models_to_merge[0]].get("vae", None): - logger.info(f"Using configured VAE assigned to {models_to_merge[0]}") - merged_model_config.update(vae=vae) - - self.generate.model_manager.import_diffuser_model(dump_path, **merged_model_config) - new_model_list = self.generate.model_manager.list_models() - - socketio.emit( - "modelsMerged", - { - "merged_models": models_to_merge, - "merged_model_name": model_merge_info["merged_model_name"], - "model_list": new_model_list, - "update": True, - }, - ) - logger.info(f"Models Merged: {models_to_merge}") - logger.info(f"New Model Added: {model_merge_info['merged_model_name']}") - except Exception as e: - self.handle_exceptions(e) - - @socketio.on("requestEmptyTempFolder") - def empty_temp_folder(): - try: - temp_files = glob.glob(os.path.join(self.temp_image_path, "*")) - for f in temp_files: - try: - os.remove(f) - thumbnail_path = os.path.join( - self.thumbnail_image_path, - os.path.splitext(os.path.basename(f))[0] + ".webp", - ) - os.remove(thumbnail_path) - except Exception as e: - socketio.emit("error", {"message": f"Unable to delete {f}: {str(e)}"}) - pass - - socketio.emit("tempFolderEmptied") - except Exception as e: - self.handle_exceptions(e) - - @socketio.on("requestSaveStagingAreaImageToGallery") - def save_temp_image_to_gallery(url): - try: - image_path = self.get_image_path_from_url(url) - new_path = os.path.join(self.result_path, os.path.basename(image_path)) - shutil.copy2(image_path, new_path) - - if os.path.splitext(new_path)[1] == ".png": - metadata = retrieve_metadata(new_path) - else: - metadata = {} - - pil_image = Image.open(new_path) - - (width, height) = pil_image.size - - thumbnail_path = save_thumbnail(pil_image, os.path.basename(new_path), self.thumbnail_image_path) - - image_array = [ - { - "url": self.get_url_from_image_path(new_path), - "thumbnail": self.get_url_from_image_path(thumbnail_path), - "mtime": os.path.getmtime(new_path), - "metadata": metadata, - "width": width, - "height": height, - "category": "result", - } - ] - - socketio.emit( - "galleryImages", - {"images": image_array, "category": "result"}, - ) - - except Exception as e: - self.handle_exceptions(e) - - @socketio.on("requestLatestImages") - def handle_request_latest_images(category, latest_mtime): - try: - base_path = self.result_path if category == "result" else self.init_image_path - - paths = [] - - for ext in ("*.png", "*.jpg", "*.jpeg"): - paths.extend(glob.glob(os.path.join(base_path, ext))) - - image_paths = sorted(paths, key=lambda x: os.path.getmtime(x), reverse=True) - - image_paths = list( - filter( - lambda x: os.path.getmtime(x) > latest_mtime, - image_paths, - ) - ) - - image_array = [] - - for path in image_paths: - try: - if os.path.splitext(path)[1] == ".png": - metadata = retrieve_metadata(path) - else: - metadata = {} - - pil_image = Image.open(path) - (width, height) = pil_image.size - - thumbnail_path = save_thumbnail(pil_image, os.path.basename(path), self.thumbnail_image_path) - - image_array.append( - { - "url": self.get_url_from_image_path(path), - "thumbnail": self.get_url_from_image_path(thumbnail_path), - "mtime": os.path.getmtime(path), - "metadata": metadata.get("sd-metadata"), - "dreamPrompt": metadata.get("Dream"), - "width": width, - "height": height, - "category": category, - } - ) - except Exception as e: - socketio.emit("error", {"message": f"Unable to load {path}: {str(e)}"}) - pass - - socketio.emit( - "galleryImages", - {"images": image_array, "category": category}, - ) - except Exception as e: - self.handle_exceptions(e) - - @socketio.on("requestImages") - def handle_request_images(category, earliest_mtime=None): - try: - page_size = 50 - - base_path = self.result_path if category == "result" else self.init_image_path - - paths = [] - for ext in ("*.png", "*.jpg", "*.jpeg"): - paths.extend(glob.glob(os.path.join(base_path, ext))) - - image_paths = sorted(paths, key=lambda x: os.path.getmtime(x), reverse=True) - - if earliest_mtime: - image_paths = list( - filter( - lambda x: os.path.getmtime(x) < earliest_mtime, - image_paths, - ) - ) - - areMoreImagesAvailable = len(image_paths) >= page_size - image_paths = image_paths[slice(0, page_size)] - - image_array = [] - for path in image_paths: - try: - if os.path.splitext(path)[1] == ".png": - metadata = retrieve_metadata(path) - else: - metadata = {} - - pil_image = Image.open(path) - (width, height) = pil_image.size - - thumbnail_path = save_thumbnail(pil_image, os.path.basename(path), self.thumbnail_image_path) - - image_array.append( - { - "url": self.get_url_from_image_path(path), - "thumbnail": self.get_url_from_image_path(thumbnail_path), - "mtime": os.path.getmtime(path), - "metadata": metadata.get("sd-metadata"), - "dreamPrompt": metadata.get("Dream"), - "width": width, - "height": height, - "category": category, - } - ) - except Exception as e: - logger.info(f"Unable to load {path}") - socketio.emit("error", {"message": f"Unable to load {path}: {str(e)}"}) - pass - - socketio.emit( - "galleryImages", - { - "images": image_array, - "areMoreImagesAvailable": areMoreImagesAvailable, - "category": category, - }, - ) - except Exception as e: - self.handle_exceptions(e) - - @socketio.on("generateImage") - def handle_generate_image_event(generation_parameters, esrgan_parameters, facetool_parameters): - try: - # truncate long init_mask/init_img base64 if needed - printable_parameters = { - **generation_parameters, - } - - if "init_img" in generation_parameters: - printable_parameters["init_img"] = printable_parameters["init_img"][:64] + "..." - - if "init_mask" in generation_parameters: - printable_parameters["init_mask"] = printable_parameters["init_mask"][:64] + "..." - - logger.info(f"Image Generation Parameters:\n\n{printable_parameters}\n") - logger.info(f"ESRGAN Parameters: {esrgan_parameters}") - logger.info(f"Facetool Parameters: {facetool_parameters}") - - self.generate_images( - generation_parameters, - esrgan_parameters, - facetool_parameters, - ) - except Exception as e: - self.handle_exceptions(e) - - @socketio.on("runPostprocessing") - def handle_run_postprocessing(original_image, postprocessing_parameters): - try: - logger.info(f'Postprocessing requested for "{original_image["url"]}": {postprocessing_parameters}') - - progress = Progress() - - socketio.emit("progressUpdate", progress.to_formatted_dict()) - eventlet.sleep(0) - - original_image_path = self.get_image_path_from_url(original_image["url"]) - - image = Image.open(original_image_path) - - try: - seed = original_image["metadata"]["image"]["seed"] - except KeyError: - seed = "unknown_seed" - pass - - if postprocessing_parameters["type"] == "esrgan": - progress.set_current_status("common.statusUpscalingESRGAN") - elif postprocessing_parameters["type"] == "gfpgan": - progress.set_current_status("common.statusRestoringFacesGFPGAN") - elif postprocessing_parameters["type"] == "codeformer": - progress.set_current_status("common.statusRestoringFacesCodeFormer") - - socketio.emit("progressUpdate", progress.to_formatted_dict()) - eventlet.sleep(0) - - if postprocessing_parameters["type"] == "esrgan": - image = self.esrgan.process( - image=image, - upsampler_scale=postprocessing_parameters["upscale"][0], - denoise_str=postprocessing_parameters["upscale"][1], - strength=postprocessing_parameters["upscale"][2], - seed=seed, - ) - elif postprocessing_parameters["type"] == "gfpgan": - image = self.gfpgan.process( - image=image, - strength=postprocessing_parameters["facetool_strength"], - seed=seed, - ) - elif postprocessing_parameters["type"] == "codeformer": - image = self.codeformer.process( - image=image, - strength=postprocessing_parameters["facetool_strength"], - fidelity=postprocessing_parameters["codeformer_fidelity"], - seed=seed, - device="cpu" if str(self.generate.device) == "mps" else self.generate.device, - ) - else: - raise TypeError(f'{postprocessing_parameters["type"]} is not a valid postprocessing type') - - progress.set_current_status("common.statusSavingImage") - socketio.emit("progressUpdate", progress.to_formatted_dict()) - eventlet.sleep(0) - - postprocessing_parameters["seed"] = seed - metadata = self.parameters_to_post_processed_image_metadata( - parameters=postprocessing_parameters, - original_image_path=original_image_path, - ) - - command = parameters_to_command(postprocessing_parameters) - - (width, height) = image.size - - path = self.save_result_image( - image, - command, - metadata, - self.result_path, - postprocessing=postprocessing_parameters["type"], - ) - - thumbnail_path = save_thumbnail(image, os.path.basename(path), self.thumbnail_image_path) - - self.write_log_message( - f'[Postprocessed] "{original_image_path}" > "{path}": {postprocessing_parameters}' - ) - - progress.mark_complete() - socketio.emit("progressUpdate", progress.to_formatted_dict()) - eventlet.sleep(0) - - socketio.emit( - "postprocessingResult", - { - "url": self.get_url_from_image_path(path), - "thumbnail": self.get_url_from_image_path(thumbnail_path), - "mtime": os.path.getmtime(path), - "metadata": metadata, - "dreamPrompt": command, - "width": width, - "height": height, - }, - ) - except Exception as e: - self.handle_exceptions(e) - - @socketio.on("cancel") - def handle_cancel(): - logger.info("Cancel processing requested") - self.canceled.set() - - # TODO: I think this needs a safety mechanism. - @socketio.on("deleteImage") - def handle_delete_image(url, thumbnail, uuid, category): - try: - logger.info(f'Delete requested "{url}"') - from send2trash import send2trash - - path = self.get_image_path_from_url(url) - thumbnail_path = self.get_image_path_from_url(thumbnail) - - send2trash(path) - send2trash(thumbnail_path) - - socketio.emit( - "imageDeleted", - {"url": url, "uuid": uuid, "category": category}, - ) - except Exception as e: - self.handle_exceptions(e) - - # App Functions - def get_system_config(self): - model_list: dict = self.generate.model_manager.list_models() - active_model_name = None - - for model_name, model_dict in model_list.items(): - if model_dict["status"] == "active": - active_model_name = model_name - - return { - "model": "stable diffusion", - "model_weights": active_model_name, - "model_hash": self.generate.model_hash, - "app_id": APP_ID, - "app_version": APP_VERSION, - } - - def generate_images(self, generation_parameters, esrgan_parameters, facetool_parameters): - try: - self.canceled.clear() - - step_index = 1 - prior_variations = ( - generation_parameters["with_variations"] if "with_variations" in generation_parameters else [] - ) - - actual_generation_mode = generation_parameters["generation_mode"] - original_bounding_box = None - - progress = Progress(generation_parameters=generation_parameters) - - self.socketio.emit("progressUpdate", progress.to_formatted_dict()) - eventlet.sleep(0) - - """ - TODO: - If a result image is used as an init image, and then deleted, we will want to be - able to use it as an init image in the future. Need to handle this case. - """ - - """ - Prepare for generation based on generation_mode - """ - if generation_parameters["generation_mode"] == "unifiedCanvas": - """ - generation_parameters["init_img"] is a base64 image - generation_parameters["init_mask"] is a base64 image - - So we need to convert each into a PIL Image. - """ - - init_img_url = generation_parameters["init_img"] - - original_bounding_box = generation_parameters["bounding_box"].copy() - - initial_image = dataURL_to_image(generation_parameters["init_img"]).convert("RGBA") - - """ - The outpaint image and mask are pre-cropped by the UI, so the bounding box we pass - to the generator should be: - { - "x": 0, - "y": 0, - "width": original_bounding_box["width"], - "height": original_bounding_box["height"] - } - """ - - generation_parameters["bounding_box"]["x"] = 0 - generation_parameters["bounding_box"]["y"] = 0 - - # Convert mask dataURL to an image and convert to greyscale - mask_image = dataURL_to_image(generation_parameters["init_mask"]).convert("L") - - actual_generation_mode = get_canvas_generation_mode(initial_image, mask_image) - - """ - Apply the mask to the init image, creating a "mask" image with - transparency where inpainting should occur. This is the kind of - mask that prompt2image() needs. - """ - alpha_mask = initial_image.copy() - alpha_mask.putalpha(mask_image) - - generation_parameters["init_img"] = initial_image - generation_parameters["init_mask"] = alpha_mask - - # Remove the unneeded parameters for whichever mode we are doing - if actual_generation_mode == "inpainting": - generation_parameters.pop("seam_size", None) - generation_parameters.pop("seam_blur", None) - generation_parameters.pop("seam_strength", None) - generation_parameters.pop("seam_steps", None) - generation_parameters.pop("tile_size", None) - generation_parameters.pop("force_outpaint", None) - elif actual_generation_mode == "img2img": - generation_parameters["height"] = original_bounding_box["height"] - generation_parameters["width"] = original_bounding_box["width"] - generation_parameters.pop("init_mask", None) - generation_parameters.pop("seam_size", None) - generation_parameters.pop("seam_blur", None) - generation_parameters.pop("seam_strength", None) - generation_parameters.pop("seam_steps", None) - generation_parameters.pop("tile_size", None) - generation_parameters.pop("force_outpaint", None) - generation_parameters.pop("infill_method", None) - elif actual_generation_mode == "txt2img": - generation_parameters["height"] = original_bounding_box["height"] - generation_parameters["width"] = original_bounding_box["width"] - generation_parameters.pop("strength", None) - generation_parameters.pop("fit", None) - generation_parameters.pop("init_img", None) - generation_parameters.pop("init_mask", None) - generation_parameters.pop("seam_size", None) - generation_parameters.pop("seam_blur", None) - generation_parameters.pop("seam_strength", None) - generation_parameters.pop("seam_steps", None) - generation_parameters.pop("tile_size", None) - generation_parameters.pop("force_outpaint", None) - generation_parameters.pop("infill_method", None) - - elif generation_parameters["generation_mode"] == "img2img": - init_img_url = generation_parameters["init_img"] - init_img_path = self.get_image_path_from_url(init_img_url) - generation_parameters["init_img"] = Image.open(init_img_path).convert("RGB") - - def image_progress(intermediate_state: PipelineIntermediateState): - if self.canceled.is_set(): - raise CanceledException - - nonlocal step_index - nonlocal generation_parameters - nonlocal progress - - step = intermediate_state.step - if intermediate_state.predicted_original is not None: - # Some schedulers report not only the noisy latents at the current timestep, - # but also their estimate so far of what the de-noised latents will be. - sample = intermediate_state.predicted_original - else: - sample = intermediate_state.latents - - generation_messages = { - "txt2img": "common.statusGeneratingTextToImage", - "img2img": "common.statusGeneratingImageToImage", - "inpainting": "common.statusGeneratingInpainting", - "outpainting": "common.statusGeneratingOutpainting", - } - - progress.set_current_step(step + 1) - progress.set_current_status(f"{generation_messages[actual_generation_mode]}") - progress.set_current_status_has_steps(True) - - if ( - generation_parameters["progress_images"] - and step % generation_parameters["save_intermediates"] == 0 - and step < generation_parameters["steps"] - 1 - ): - image = self.generate.sample_to_image(sample) - metadata = self.parameters_to_generated_image_metadata(generation_parameters) - command = parameters_to_command(generation_parameters) - - (width, height) = image.size - - path = self.save_result_image( - image, - command, - metadata, - self.intermediate_path, - step_index=step_index, - postprocessing=False, - ) - - step_index += 1 - self.socketio.emit( - "intermediateResult", - { - "url": self.get_url_from_image_path(path), - "mtime": os.path.getmtime(path), - "metadata": metadata, - "width": width, - "height": height, - "generationMode": generation_parameters["generation_mode"], - "boundingBox": original_bounding_box, - }, - ) - - if generation_parameters["progress_latents"]: - image = self.generate.sample_to_lowres_estimated_image(sample) - (width, height) = image.size - width *= 8 - height *= 8 - img_base64 = image_to_dataURL(image, image_format="JPEG") - self.socketio.emit( - "intermediateResult", - { - "url": img_base64, - "isBase64": True, - "mtime": 0, - "metadata": {}, - "width": width, - "height": height, - "generationMode": generation_parameters["generation_mode"], - "boundingBox": original_bounding_box, - }, - ) - - self.socketio.emit("progressUpdate", progress.to_formatted_dict()) - eventlet.sleep(0) - - def image_done(image, seed, first_seed, attention_maps_image=None): - if self.canceled.is_set(): - raise CanceledException - - nonlocal generation_parameters - nonlocal esrgan_parameters - nonlocal facetool_parameters - nonlocal progress - - nonlocal prior_variations - - """ - Tidy up after generation based on generation_mode - """ - # paste the inpainting image back onto the original - if generation_parameters["generation_mode"] == "inpainting": - image = paste_image_into_bounding_box( - Image.open(init_img_path), - image, - **generation_parameters["bounding_box"], - ) - - progress.set_current_status("common.statusGenerationComplete") - - self.socketio.emit("progressUpdate", progress.to_formatted_dict()) - eventlet.sleep(0) - - all_parameters = generation_parameters - postprocessing = False - - if "variation_amount" in all_parameters and all_parameters["variation_amount"] > 0: - first_seed = first_seed or seed - this_variation = [[seed, all_parameters["variation_amount"]]] - all_parameters["with_variations"] = prior_variations + this_variation - all_parameters["seed"] = first_seed - elif "with_variations" in all_parameters: - all_parameters["seed"] = first_seed - else: - all_parameters["seed"] = seed - - if self.canceled.is_set(): - raise CanceledException - - if esrgan_parameters: - progress.set_current_status("common.statusUpscaling") - progress.set_current_status_has_steps(False) - self.socketio.emit("progressUpdate", progress.to_formatted_dict()) - eventlet.sleep(0) - - image = self.esrgan.process( - image=image, - upsampler_scale=esrgan_parameters["level"], - denoise_str=esrgan_parameters["denoise_str"], - strength=esrgan_parameters["strength"], - seed=seed, - ) - - postprocessing = True - all_parameters["upscale"] = [ - esrgan_parameters["level"], - esrgan_parameters["denoise_str"], - esrgan_parameters["strength"], - ] - - if self.canceled.is_set(): - raise CanceledException - - if facetool_parameters: - if facetool_parameters["type"] == "gfpgan": - progress.set_current_status("common.statusRestoringFacesGFPGAN") - elif facetool_parameters["type"] == "codeformer": - progress.set_current_status("common.statusRestoringFacesCodeFormer") - - progress.set_current_status_has_steps(False) - self.socketio.emit("progressUpdate", progress.to_formatted_dict()) - eventlet.sleep(0) - - if facetool_parameters["type"] == "gfpgan": - image = self.gfpgan.process( - image=image, - strength=facetool_parameters["strength"], - seed=seed, - ) - elif facetool_parameters["type"] == "codeformer": - image = self.codeformer.process( - image=image, - strength=facetool_parameters["strength"], - fidelity=facetool_parameters["codeformer_fidelity"], - seed=seed, - device="cpu" if str(self.generate.device) == "mps" else self.generate.device, - ) - all_parameters["codeformer_fidelity"] = facetool_parameters["codeformer_fidelity"] - - postprocessing = True - all_parameters["facetool_strength"] = facetool_parameters["strength"] - all_parameters["facetool_type"] = facetool_parameters["type"] - - progress.set_current_status("common.statusSavingImage") - self.socketio.emit("progressUpdate", progress.to_formatted_dict()) - eventlet.sleep(0) - - # restore the stashed URLS and discard the paths, we are about to send the result to client - all_parameters["init_img"] = ( - init_img_url if generation_parameters["generation_mode"] == "img2img" else "" - ) - - if "init_mask" in all_parameters: - # TODO: store the mask in metadata - all_parameters["init_mask"] = "" - - if generation_parameters["generation_mode"] == "unifiedCanvas": - all_parameters["bounding_box"] = original_bounding_box - - metadata = self.parameters_to_generated_image_metadata(all_parameters) - - command = parameters_to_command(all_parameters) - - (width, height) = image.size - - generated_image_outdir = ( - self.result_path - if generation_parameters["generation_mode"] in ["txt2img", "img2img"] - else self.temp_image_path - ) - - path = self.save_result_image( - image, - command, - metadata, - generated_image_outdir, - postprocessing=postprocessing, - ) - - thumbnail_path = save_thumbnail(image, os.path.basename(path), self.thumbnail_image_path) - - logger.info(f'Image generated: "{path}"\n') - self.write_log_message(f'[Generated] "{path}": {command}') - - if progress.total_iterations > progress.current_iteration: - progress.set_current_step(1) - progress.set_current_status("common.statusIterationComplete") - progress.set_current_status_has_steps(False) - else: - progress.mark_complete() - - self.socketio.emit("progressUpdate", progress.to_formatted_dict()) - eventlet.sleep(0) - - parsed_prompt, _ = get_prompt_structure(generation_parameters["prompt"]) - with self.generate.model_context as model: - tokens = ( - None - if type(parsed_prompt) is Blend - else get_tokens_for_prompt_object(model.tokenizer, parsed_prompt) - ) - attention_maps_image_base64_url = ( - None if attention_maps_image is None else image_to_dataURL(attention_maps_image) - ) - - self.socketio.emit( - "generationResult", - { - "url": self.get_url_from_image_path(path), - "thumbnail": self.get_url_from_image_path(thumbnail_path), - "mtime": os.path.getmtime(path), - "metadata": metadata, - "dreamPrompt": command, - "width": width, - "height": height, - "boundingBox": original_bounding_box, - "generationMode": generation_parameters["generation_mode"], - "attentionMaps": attention_maps_image_base64_url, - "tokens": tokens, - }, - ) - eventlet.sleep(0) - - progress.set_current_iteration(progress.current_iteration + 1) - - self.generate.prompt2image( - **generation_parameters, - step_callback=image_progress, - image_callback=image_done, - ) - - except KeyboardInterrupt: - # Clear the CUDA cache on an exception - self.empty_cuda_cache() - self.socketio.emit("processingCanceled") - raise - except CanceledException: - # Clear the CUDA cache on an exception - self.empty_cuda_cache() - self.socketio.emit("processingCanceled") - pass - except Exception as e: - # Clear the CUDA cache on an exception - self.empty_cuda_cache() - logger.error(e) - self.handle_exceptions(e) - - def empty_cuda_cache(self): - if self.generate.device.type == "cuda": - import torch.cuda - - torch.cuda.empty_cache() - - def parameters_to_generated_image_metadata(self, parameters): - try: - # top-level metadata minus `image` or `images` - metadata = self.get_system_config() - # remove any image keys not mentioned in RFC #266 - rfc266_img_fields = [ - "type", - "postprocessing", - "sampler", - "prompt", - "seed", - "variations", - "steps", - "cfg_scale", - "threshold", - "perlin", - "step_number", - "width", - "height", - "extra", - "seamless", - "hires_fix", - ] - - rfc_dict = {} - - for item in parameters.items(): - key, value = item - if key in rfc266_img_fields: - rfc_dict[key] = value - - postprocessing = [] - - rfc_dict["type"] = parameters["generation_mode"] - - # 'postprocessing' is either null or an - if "facetool_strength" in parameters: - facetool_parameters = { - "type": str(parameters["facetool_type"]), - "strength": float(parameters["facetool_strength"]), - } - - if parameters["facetool_type"] == "codeformer": - facetool_parameters["fidelity"] = float(parameters["codeformer_fidelity"]) - - postprocessing.append(facetool_parameters) - - if "upscale" in parameters: - postprocessing.append( - { - "type": "esrgan", - "scale": int(parameters["upscale"][0]), - "denoise_str": int(parameters["upscale"][1]), - "strength": float(parameters["upscale"][2]), - } - ) - - rfc_dict["postprocessing"] = postprocessing if len(postprocessing) > 0 else None - - # semantic drift - rfc_dict["sampler"] = parameters["sampler_name"] - - # 'variations' should always exist and be an array, empty or consisting of {'seed': seed, 'weight': weight} pairs - variations = [] - - if "with_variations" in parameters: - variations = [{"seed": x[0], "weight": x[1]} for x in parameters["with_variations"]] - - rfc_dict["variations"] = variations - - if rfc_dict["type"] == "img2img": - rfc_dict["strength"] = parameters["strength"] - rfc_dict["fit"] = parameters["fit"] # TODO: Noncompliant - rfc_dict["orig_hash"] = calculate_init_img_hash(self.get_image_path_from_url(parameters["init_img"])) - rfc_dict["init_image_path"] = parameters["init_img"] # TODO: Noncompliant - - metadata["image"] = rfc_dict - - return metadata - - except Exception as e: - self.handle_exceptions(e) - - def parameters_to_post_processed_image_metadata(self, parameters, original_image_path): - try: - current_metadata = retrieve_metadata(original_image_path)["sd-metadata"] - postprocessing_metadata = {} - - """ - if we don't have an original image metadata to reconstruct, - need to record the original image and its hash - """ - if "image" not in current_metadata: - current_metadata["image"] = {} - - orig_hash = calculate_init_img_hash(self.get_image_path_from_url(original_image_path)) - - postprocessing_metadata["orig_path"] = (original_image_path,) - postprocessing_metadata["orig_hash"] = orig_hash - - if parameters["type"] == "esrgan": - postprocessing_metadata["type"] = "esrgan" - postprocessing_metadata["scale"] = parameters["upscale"][0] - postprocessing_metadata["denoise_str"] = parameters["upscale"][1] - postprocessing_metadata["strength"] = parameters["upscale"][2] - elif parameters["type"] == "gfpgan": - postprocessing_metadata["type"] = "gfpgan" - postprocessing_metadata["strength"] = parameters["facetool_strength"] - elif parameters["type"] == "codeformer": - postprocessing_metadata["type"] = "codeformer" - postprocessing_metadata["strength"] = parameters["facetool_strength"] - postprocessing_metadata["fidelity"] = parameters["codeformer_fidelity"] - - else: - raise TypeError(f"Invalid type: {parameters['type']}") - - if "postprocessing" in current_metadata["image"] and isinstance( - current_metadata["image"]["postprocessing"], list - ): - current_metadata["image"]["postprocessing"].append(postprocessing_metadata) - else: - current_metadata["image"]["postprocessing"] = [postprocessing_metadata] - - return current_metadata - - except Exception as e: - self.handle_exceptions(e) - - def save_result_image( - self, - image, - command, - metadata, - output_dir, - step_index=None, - postprocessing=False, - ): - try: - pngwriter = PngWriter(output_dir) - - number_prefix = pngwriter.unique_prefix() - - uuid = uuid4().hex - truncated_uuid = uuid[:8] - - seed = "unknown_seed" - - if "image" in metadata: - if "seed" in metadata["image"]: - seed = metadata["image"]["seed"] - - filename = f"{number_prefix}.{truncated_uuid}.{seed}" - - if step_index: - filename += f".{step_index}" - if postprocessing: - filename += ".postprocessed" - - filename += ".png" - - path = pngwriter.save_image_and_prompt_to_png( - image=image, - dream_prompt=command, - metadata=metadata, - name=filename, - ) - - return os.path.abspath(path) - - except Exception as e: - self.handle_exceptions(e) - - def make_unique_init_image_filename(self, name): - try: - uuid = uuid4().hex - split = os.path.splitext(name) - name = f"{split[0]}.{uuid}{split[1]}" - return name - except Exception as e: - self.handle_exceptions(e) - - def calculate_real_steps(self, steps, strength, has_init_image): - import math - - return math.floor(strength * steps) if has_init_image else steps - - def write_log_message(self, message): - """Logs the filename and parameters used to generate or process that image to log file""" - try: - message = f"{message}\n" - with open(self.log_path, "a", encoding="utf-8") as file: - file.writelines(message) - - except Exception as e: - self.handle_exceptions(e) - - def get_image_path_from_url(self, url): - """Given a url to an image used by the client, returns the absolute file path to that image""" - try: - if "init-images" in url: - return os.path.abspath(os.path.join(self.init_image_path, os.path.basename(url))) - elif "mask-images" in url: - return os.path.abspath(os.path.join(self.mask_image_path, os.path.basename(url))) - elif "intermediates" in url: - return os.path.abspath(os.path.join(self.intermediate_path, os.path.basename(url))) - elif "temp-images" in url: - return os.path.abspath(os.path.join(self.temp_image_path, os.path.basename(url))) - elif "thumbnails" in url: - return os.path.abspath(os.path.join(self.thumbnail_image_path, os.path.basename(url))) - else: - return os.path.abspath(os.path.join(self.result_path, os.path.basename(url))) - except Exception as e: - self.handle_exceptions(e) - - def get_url_from_image_path(self, path): - """Given an absolute file path to an image, returns the URL that the client can use to load the image""" - try: - if "init-images" in path: - return os.path.join(self.init_image_url, os.path.basename(path)) - elif "mask-images" in path: - return os.path.join(self.mask_image_url, os.path.basename(path)) - elif "intermediates" in path: - return os.path.join(self.intermediate_url, os.path.basename(path)) - elif "temp-images" in path: - return os.path.join(self.temp_image_url, os.path.basename(path)) - elif "thumbnails" in path: - return os.path.join(self.thumbnail_image_url, os.path.basename(path)) - else: - return os.path.join(self.result_url, os.path.basename(path)) - except Exception as e: - self.handle_exceptions(e) - - def save_file_unique_uuid_name(self, bytes, name, path): - try: - uuid = uuid4().hex - truncated_uuid = uuid[:8] - - split = os.path.splitext(name) - name = f"{split[0]}.{truncated_uuid}{split[1]}" - - file_path = os.path.join(path, name) - - os.makedirs(os.path.dirname(file_path), exist_ok=True) - - newFile = open(file_path, "wb") - newFile.write(bytes) - - return file_path - except Exception as e: - self.handle_exceptions(e) - - def handle_exceptions(self, exception, emit_key: str = "error"): - self.socketio.emit(emit_key, {"message": (str(exception))}) - print("\n") - traceback.print_exc() - print("\n") - - -class Progress: - def __init__(self, generation_parameters=None): - self.current_step = 1 - self.total_steps = ( - self._calculate_real_steps( - steps=generation_parameters["steps"], - strength=generation_parameters["strength"] if "strength" in generation_parameters else None, - has_init_image="init_img" in generation_parameters, - ) - if generation_parameters - else 1 - ) - self.current_iteration = 1 - self.total_iterations = generation_parameters["iterations"] if generation_parameters else 1 - self.current_status = "common.statusPreparing" - self.is_processing = True - self.current_status_has_steps = False - self.has_error = False - - def set_current_step(self, current_step): - self.current_step = current_step - - def set_total_steps(self, total_steps): - self.total_steps = total_steps - - def set_current_iteration(self, current_iteration): - self.current_iteration = current_iteration - - def set_total_iterations(self, total_iterations): - self.total_iterations = total_iterations - - def set_current_status(self, current_status): - self.current_status = current_status - - def set_is_processing(self, is_processing): - self.is_processing = is_processing - - def set_current_status_has_steps(self, current_status_has_steps): - self.current_status_has_steps = current_status_has_steps - - def set_has_error(self, has_error): - self.has_error = has_error - - def mark_complete(self): - self.current_status = "common.statusProcessingComplete" - self.current_step = 0 - self.total_steps = 0 - self.current_iteration = 0 - self.total_iterations = 0 - self.is_processing = False - - def to_formatted_dict( - self, - ): - return { - "currentStep": self.current_step, - "totalSteps": self.total_steps, - "currentIteration": self.current_iteration, - "totalIterations": self.total_iterations, - "currentStatus": self.current_status, - "isProcessing": self.is_processing, - "currentStatusHasSteps": self.current_status_has_steps, - "hasError": self.has_error, - } - - def _calculate_real_steps(self, steps, strength, has_init_image): - return math.floor(strength * steps) if has_init_image else steps - - -class CanceledException(Exception): - pass - - -def copy_image_from_bounding_box(image: ImageType, x: int, y: int, width: int, height: int) -> ImageType: - """ - Returns a copy an image, cropped to a bounding box. - """ - with image as im: - bounds = (x, y, x + width, y + height) - im_cropped = im.crop(bounds) - return im_cropped - - -def dataURL_to_image(dataURL: str) -> ImageType: - """ - Converts a base64 image dataURL into an image. - The dataURL is split on the first comma. - """ - image = Image.open( - io.BytesIO( - base64.decodebytes( - bytes( - dataURL.split(",", 1)[1], - "utf-8", - ) - ) - ) - ) - return image - - -def image_to_dataURL(image: ImageType, image_format: str = "PNG") -> str: - """ - Converts an image into a base64 image dataURL. - """ - buffered = io.BytesIO() - image.save(buffered, format=image_format) - mime_type = Image.MIME.get(image_format.upper(), "image/" + image_format.lower()) - image_base64 = f"data:{mime_type};base64," + base64.b64encode(buffered.getvalue()).decode("UTF-8") - return image_base64 - - -def dataURL_to_bytes(dataURL: str) -> bytes: - """ - Converts a base64 image dataURL into bytes. - The dataURL is split on the first comma. - """ - return base64.decodebytes( - bytes( - dataURL.split(",", 1)[1], - "utf-8", - ) - ) - - -def paste_image_into_bounding_box( - recipient_image: ImageType, - donor_image: ImageType, - x: int, - y: int, - width: int, - height: int, -) -> ImageType: - """ - Pastes an image onto another with a bounding box. - """ - with recipient_image as im: - bounds = (x, y, x + width, y + height) - im.paste(donor_image, bounds) - return recipient_image - - -def save_thumbnail( - image: ImageType, - filename: str, - path: str, - size: int = 256, -) -> str: - """ - Saves a thumbnail of an image, returning its path. - """ - base_filename = os.path.splitext(filename)[0] - thumbnail_path = os.path.join(path, base_filename + ".webp") - - if os.path.exists(thumbnail_path): - return thumbnail_path - - thumbnail_width = size - thumbnail_height = round(size * (image.height / image.width)) - - image_copy = image.copy() - image_copy.thumbnail(size=(thumbnail_width, thumbnail_height)) - - image_copy.save(thumbnail_path, "WEBP") - - return thumbnail_path diff --git a/invokeai/backend/web/modules/__init__.py b/invokeai/backend/web/modules/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/invokeai/backend/web/modules/create_cmd_parser.py b/invokeai/backend/web/modules/create_cmd_parser.py deleted file mode 100644 index 856522989b7..00000000000 --- a/invokeai/backend/web/modules/create_cmd_parser.py +++ /dev/null @@ -1,56 +0,0 @@ -import argparse -import os - -from ...args import PRECISION_CHOICES - - -def create_cmd_parser(): - parser = argparse.ArgumentParser(description="InvokeAI web UI") - parser.add_argument( - "--host", - type=str, - help="The host to serve on", - default="localhost", - ) - parser.add_argument("--port", type=int, help="The port to serve on", default=9090) - parser.add_argument( - "--cors", - nargs="*", - type=str, - help="Additional allowed origins, comma-separated", - ) - parser.add_argument( - "--embedding_path", - type=str, - help="Path to a pre-trained embedding manager checkpoint - can only be set on command line", - ) - # TODO: Can't get flask to serve images from any dir (saving to the dir does work when specified) - # parser.add_argument( - # "--output_dir", - # default="outputs/", - # type=str, - # help="Directory for output images", - # ) - parser.add_argument( - "-v", - "--verbose", - action="store_true", - help="Enables verbose logging", - ) - parser.add_argument( - "--precision", - dest="precision", - type=str, - choices=PRECISION_CHOICES, - metavar="PRECISION", - help=f'Set model precision. Defaults to auto selected based on device. Options: {", ".join(PRECISION_CHOICES)}', - default="auto", - ) - parser.add_argument( - "--free_gpu_mem", - dest="free_gpu_mem", - action="store_true", - help="Force free gpu memory before final decoding", - ) - - return parser diff --git a/invokeai/backend/web/modules/get_canvas_generation_mode.py b/invokeai/backend/web/modules/get_canvas_generation_mode.py deleted file mode 100644 index 6d680016e7a..00000000000 --- a/invokeai/backend/web/modules/get_canvas_generation_mode.py +++ /dev/null @@ -1,113 +0,0 @@ -from typing import Literal, Union - -from PIL import Image, ImageChops -from PIL.Image import Image as ImageType - - -# https://stackoverflow.com/questions/43864101/python-pil-check-if-image-is-transparent -def check_for_any_transparency(img: Union[ImageType, str]) -> bool: - if type(img) is str: - img = Image.open(str) - - if img.info.get("transparency", None) is not None: - return True - if img.mode == "P": - transparent = img.info.get("transparency", -1) - for _, index in img.getcolors(): - if index == transparent: - return True - elif img.mode == "RGBA": - extrema = img.getextrema() - if extrema[3][0] < 255: - return True - return False - - -def get_canvas_generation_mode( - init_img: Union[ImageType, str], init_mask: Union[ImageType, str] -) -> Literal["txt2img", "outpainting", "inpainting", "img2img",]: - if type(init_img) is str: - init_img = Image.open(init_img) - - if type(init_mask) is str: - init_mask = Image.open(init_mask) - - init_img = init_img.convert("RGBA") - - # Get alpha from init_img - init_img_alpha = init_img.split()[-1] - init_img_alpha_mask = init_img_alpha.convert("L") - init_img_has_transparency = check_for_any_transparency(init_img) - - if init_img_has_transparency: - init_img_is_fully_transparent = True if init_img_alpha_mask.getbbox() is None else False - - """ - Mask images are white in areas where no change should be made, black where changes - should be made. - """ - - # Fit the mask to init_img's size and convert it to greyscale - init_mask = init_mask.resize(init_img.size).convert("L") - - """ - PIL.Image.getbbox() returns the bounding box of non-zero areas of the image, so we first - invert the mask image so that masked areas are white and other areas black == zero. - getbbox() now tells us if the are any masked areas. - """ - init_mask_bbox = ImageChops.invert(init_mask).getbbox() - init_mask_exists = False if init_mask_bbox is None else True - - if init_img_has_transparency: - if init_img_is_fully_transparent: - return "txt2img" - else: - return "outpainting" - else: - if init_mask_exists: - return "inpainting" - else: - return "img2img" - - -def main(): - # Testing - init_img_opaque = "test_images/init-img_opaque.png" - init_img_partial_transparency = "test_images/init-img_partial_transparency.png" - init_img_full_transparency = "test_images/init-img_full_transparency.png" - init_mask_no_mask = "test_images/init-mask_no_mask.png" - init_mask_has_mask = "test_images/init-mask_has_mask.png" - - print( - "OPAQUE IMAGE, NO MASK, expect img2img, got ", - get_canvas_generation_mode(init_img_opaque, init_mask_no_mask), - ) - - print( - "IMAGE WITH TRANSPARENCY, NO MASK, expect outpainting, got ", - get_canvas_generation_mode(init_img_partial_transparency, init_mask_no_mask), - ) - - print( - "FULLY TRANSPARENT IMAGE NO MASK, expect txt2img, got ", - get_canvas_generation_mode(init_img_full_transparency, init_mask_no_mask), - ) - - print( - "OPAQUE IMAGE, WITH MASK, expect inpainting, got ", - get_canvas_generation_mode(init_img_opaque, init_mask_has_mask), - ) - - print( - "IMAGE WITH TRANSPARENCY, WITH MASK, expect outpainting, got ", - get_canvas_generation_mode(init_img_partial_transparency, init_mask_has_mask), - ) - - print( - "FULLY TRANSPARENT IMAGE WITH MASK, expect txt2img, got ", - get_canvas_generation_mode(init_img_full_transparency, init_mask_has_mask), - ) - - -if __name__ == "__main__": - main() diff --git a/invokeai/backend/web/modules/parameters.py b/invokeai/backend/web/modules/parameters.py deleted file mode 100644 index 8ab74adb92b..00000000000 --- a/invokeai/backend/web/modules/parameters.py +++ /dev/null @@ -1,82 +0,0 @@ -import argparse - -from .parse_seed_weights import parse_seed_weights - -SAMPLER_CHOICES = [ - "ddim", - "ddpm", - "deis", - "lms", - "lms_k", - "pndm", - "heun", - "heun_k", - "euler", - "euler_k", - "euler_a", - "kdpm_2", - "kdpm_2_a", - "dpmpp_2s", - "dpmpp_2s_k", - "dpmpp_2m", - "dpmpp_2m_k", - "dpmpp_2m_sde", - "dpmpp_2m_sde_k", - "dpmpp_sde", - "dpmpp_sde_k", - "unipc", -] - - -def parameters_to_command(params): - """ - Converts dict of parameters into a `invoke.py` REPL command. - """ - - switches = list() - - if "prompt" in params: - switches.append(f'"{params["prompt"]}"') - if "steps" in params: - switches.append(f'-s {params["steps"]}') - if "seed" in params: - switches.append(f'-S {params["seed"]}') - if "width" in params: - switches.append(f'-W {params["width"]}') - if "height" in params: - switches.append(f'-H {params["height"]}') - if "cfg_scale" in params: - switches.append(f'-C {params["cfg_scale"]}') - if "sampler_name" in params: - switches.append(f'-A {params["sampler_name"]}') - if "seamless" in params and params["seamless"] == True: - switches.append(f"--seamless") - if "hires_fix" in params and params["hires_fix"] == True: - switches.append(f"--hires") - if "init_img" in params and len(params["init_img"]) > 0: - switches.append(f'-I {params["init_img"]}') - if "init_mask" in params and len(params["init_mask"]) > 0: - switches.append(f'-M {params["init_mask"]}') - if "init_color" in params and len(params["init_color"]) > 0: - switches.append(f'--init_color {params["init_color"]}') - if "strength" in params and "init_img" in params: - switches.append(f'-f {params["strength"]}') - if "fit" in params and params["fit"] == True: - switches.append(f"--fit") - if "facetool" in params: - switches.append(f'-ft {params["facetool"]}') - if "facetool_strength" in params and params["facetool_strength"]: - switches.append(f'-G {params["facetool_strength"]}') - elif "gfpgan_strength" in params and params["gfpgan_strength"]: - switches.append(f'-G {params["gfpgan_strength"]}') - if "codeformer_fidelity" in params: - switches.append(f'-cf {params["codeformer_fidelity"]}') - if "upscale" in params and params["upscale"]: - switches.append(f'-U {params["upscale"][0]} {params["upscale"][1]}') - if "variation_amount" in params and params["variation_amount"] > 0: - switches.append(f'-v {params["variation_amount"]}') - if "with_variations" in params: - seed_weight_pairs = ",".join(f"{seed}:{weight}" for seed, weight in params["with_variations"]) - switches.append(f"-V {seed_weight_pairs}") - - return " ".join(switches) diff --git a/invokeai/backend/web/modules/parse_seed_weights.py b/invokeai/backend/web/modules/parse_seed_weights.py deleted file mode 100644 index 7e15d4e166e..00000000000 --- a/invokeai/backend/web/modules/parse_seed_weights.py +++ /dev/null @@ -1,47 +0,0 @@ -def parse_seed_weights(seed_weights): - """ - Accepts seed weights as string in "12345:0.1,23456:0.2,3456:0.3" format - Validates them - If valid: returns as [[12345, 0.1], [23456, 0.2], [3456, 0.3]] - If invalid: returns False - """ - - # Must be a string - if not isinstance(seed_weights, str): - return False - # String must not be empty - if len(seed_weights) == 0: - return False - - pairs = [] - - for pair in seed_weights.split(","): - split_values = pair.split(":") - - # Seed and weight are required - if len(split_values) != 2: - return False - - if len(split_values[0]) == 0 or len(split_values[1]) == 1: - return False - - # Try casting the seed to int and weight to float - try: - seed = int(split_values[0]) - weight = float(split_values[1]) - except ValueError: - return False - - # Seed must be 0 or above - if not seed >= 0: - return False - - # Weight must be between 0 and 1 - if not (weight >= 0 and weight <= 1): - return False - - # This pair is valid - pairs.append([seed, weight]) - - # All pairs are valid - return pairs diff --git a/invokeai/backend/web/modules/test_images/init-img_full_transparency.png b/invokeai/backend/web/modules/test_images/init-img_full_transparency.png deleted file mode 100644 index 6cdeada6095..00000000000 Binary files a/invokeai/backend/web/modules/test_images/init-img_full_transparency.png and /dev/null differ diff --git a/invokeai/backend/web/modules/test_images/init-img_opaque.png b/invokeai/backend/web/modules/test_images/init-img_opaque.png deleted file mode 100644 index a45aec75ed4..00000000000 Binary files a/invokeai/backend/web/modules/test_images/init-img_opaque.png and /dev/null differ diff --git a/invokeai/backend/web/modules/test_images/init-img_partial_transparency.png b/invokeai/backend/web/modules/test_images/init-img_partial_transparency.png deleted file mode 100644 index 348e59fc8ac..00000000000 Binary files a/invokeai/backend/web/modules/test_images/init-img_partial_transparency.png and /dev/null differ diff --git a/invokeai/backend/web/modules/test_images/init-mask_has_mask.png b/invokeai/backend/web/modules/test_images/init-mask_has_mask.png deleted file mode 100644 index 88fe0729501..00000000000 Binary files a/invokeai/backend/web/modules/test_images/init-mask_has_mask.png and /dev/null differ diff --git a/invokeai/backend/web/modules/test_images/init-mask_no_mask.png b/invokeai/backend/web/modules/test_images/init-mask_no_mask.png deleted file mode 100644 index 2aecd3ea7db..00000000000 Binary files a/invokeai/backend/web/modules/test_images/init-mask_no_mask.png and /dev/null differ diff --git a/pyproject.toml b/pyproject.toml index 6e5b7549146..e13ae607b97 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,15 +45,10 @@ dependencies = [ "dynamicprompts", "easing-functions", "einops", - "eventlet", "facexlib", "fastapi==0.88.0", "fastapi-events==0.8.0", "fastapi-socketio==0.0.10", - "flask==2.1.3", - "flask_cors==3.0.10", - "flask_socketio==5.3.0", - "flaskwebgui==1.0.3", "huggingface-hub>=0.11.1", "invisible-watermark~=0.2.0", # needed to install SDXL base and refiner using their repo_ids "matplotlib", # needed for plotting of Penner easing functions