## Read image data

In [30]:
import os
import pprint
from lib.metadata import comfy_metadata
import json
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Union
from copy import deepcopy

pp = pprint.PrettyPrinter()


In [2]:
""" Move into comfy.py """


# If you need to add to this list, find new node types here. [print(nt) for nt in comfy_unique_node_types(FILE_PATH)]
PROMPT_NODE_TYPES = {'DPCombinatorialGenerator': 'text', 'CLIPTextEncode': 'text', 'Power Prompt (rgthree)': 'prompt'}
def scrape_node(node):
    class_type = node['class_type']
    if class_type in PROMPT_NODE_TYPES:
        return node['inputs'][PROMPT_NODE_TYPES[class_type]]
    else:
        raise Exception('Cannot parse node: ' + str(node))

class ComfyImage:
    def __init__(self, filename: str):
        self.prompt, self.workflow = comfy_metadata(filename)
        self.filename = filename
    def nodes(self):
        return {id: value for (id, value) in self.prompt.items()}
    def unique_nodes(self):
        node_types = [value['class_type'] for (_, value) in self.prompt.items()]
        return sorted(set(node_types))
    def text_nodes(self):
        return [value for (_, value) in self.prompt if value['class_type'] in PROMPT_NODE_TYPES]
    def text_values(self):
        return map(scrape_node, self.text_nodes())



# Test single image

In [None]:

# TODO: Use sqlite dump
data_path = 'data/'

images = [img for img in os.listdir(data_path) if img[-4:] == '.png']

print(f"Processing {len(images)} images")
ImageObjects = [ComfyImage(data_path + img) for img in images]
# 34.6s for 1300 images


In [None]:
img = [img for img in ImageObjects if 'aerys' in img.filename][0]

# img.workflow.keys()
# Workflow keys 'last_node_id', 'last_link_id', 'nodes', 'links', 'groups', 'config', 'extra', 'version', 'widget_idx_map', 'seed_widgets'


# Workflow contains all nodes with ui props like color, size, position, as well as node properties.
# 85 nodes in this thing.
# len(img.workflow['nodes'])
img.workflow['nodes']


# 48 nodes in prompt. Stripped down, map from ID to a struct of values. IDs are identical.
# Contains title in metadata '_meta': {'title': 'Negative'}},
# 'class_type' gives type
img.prompt

## Schemas


In [None]:

# Define a type for a reader function that takes a dict (the node's JSON) and returns a value.
ReaderFunc = Callable[[Dict[str, Any]], Any]

@dataclass
class SchemaNode:
    role: str
    node_id: str
    input_name: str
    node_type: Optional[str] = None
    reader: ReaderFunc = field(default=lambda node: node.get('inputs', {}).get('default'))

    def read_value(self, node_data: Dict[str, Any]) -> Any:
        """
        Applies the reader function to the given node's data.
        If input_name is provided and no custom reader is set,
        the default reader returns node_data["inputs"][input_name].
        """
        # If a custom input_name is provided and the default lambda is in use,
        # create a default reader function on the fly.
        if self.reader == (lambda node: node.get('inputs', {}).get('default')) and self.input_name:
            return node_data.get('inputs', {}).get(self.input_name)
        return self.reader(node_data)

@dataclass
class Schema:
    version: str
    checkpoint_node: SchemaNode
    seed_node: SchemaNode
    positive_node: SchemaNode
    negative_node: SchemaNode
    loras_node: SchemaNode
    steps_node: SchemaNode
    sampler_node: SchemaNode
    scheduler_node: SchemaNode
    aspect_ratio_node: SchemaNode
    swap_dimensions_node: SchemaNode


In [None]:

# Schema 3 (most recent)
Schema3 = Schema(
    version="3.0",
    checkpoint_node=SchemaNode(
        role="checkpoint",
        node_id="474",
        input_name="ckpt_name",
        node_type="Checkpoint Loader",
    ),
    seed_node=SchemaNode(role="seed", node_id="463", input_name="seed"),
    positive_node=SchemaNode(
        role="positive",
        node_id="553",
        input_name="text",
        node_type="CLIP Text Encode for Positive Prompt",
    ),
    negative_node=SchemaNode(
        role="negative",
        node_id="448",
        input_name="customtext",
        node_type="CR Prompt Text for Negative Prompt",
    ),
    loras_node=SchemaNode(
        role="loras",
        node_id="340",
        input_name="prompt",
        node_type="Power Prompt (rgthree) for Lora",
    ),
    steps_node=SchemaNode(role="steps", node_id="445", input_name="steps_total"),
    sampler_node=SchemaNode(role="sampler", node_id="445", input_name="sampler_name"),
    scheduler_node=SchemaNode(role="scheduler", node_id="445", input_name="scheduler"),
    aspect_ratio_node=SchemaNode(
        role="aspect_ratio", node_id="346", input_name="aspect_ratio"
    ),
    swap_dimensions_node=SchemaNode(
        role="swap_dimensions", node_id="346", input_name="swap_dimensions"
    ),
)



In [None]:
# Schema 2 had loras combined into a single node.
# We'll need to extract them somehow.
def read_loras(node_data: Dict[str, Any]) -> Union[str, None]:
    """
    Custom reader that extracts LoRA strings from the 'prompt' field.
    For example, it could search for tokens like "<lora:"...">".
    """
    prompt = node_data.get('inputs', {}).get('prompt', '')
    # A trivial implementation: find and return the lora substring if present.
    start = prompt.find("<lora:")
    if start == -1:
        return None
    end = prompt.find(">", start)
    if end == -1:
        return None
    return prompt[start:end+1]

loras_node_gen2 = SchemaNode(role="loras", node_id="340", input_name="prompt", reader=get_loras)
prompt_node_gen2 = SchemaNode(role="positive", node_id="340", input_name="prompt", reader=remove_loras)


# Create a deep copy of Schema3 and modify it for Schema2
Schema2 = deepcopy(Schema3)
Schema2.version = "2.0"
Schema2.loras_node = loras_node_gen2
Schema2.positive_node = prompt_node_gen2



#TODO: Identify when PromptToSchedule nodes are used.

prompt_schedule_gen2 = SchemaNode(role="prompt", node_id="445", input_name="text")





# Some old workflows used Combinatorial Prompts
prompt_schedule_gen05 = SchemaNode(role="prompt", node_id="", input_name="text", node_type="DPCombinatorialGenerator")




# Some prompts can look for Primitive -> ClipTextEncode -> KSampler



In [None]:



example_node_data = {
    "inputs": {
        "text": "Example positive prompt",
        "prompt": "<lora:G3NSHIN IL.safetensors:1> additional text",
        "seed": 123456789,
        "default": "Fallback value"
    }
}

print("Positive Node value:", positive_node.read_value(example_node_data))
print("LoRAs Node value:", loras_node.read_value(example_node_data))
print("Seed Node value:", seed_node.read_value(example_node_data))

# Analyze Loras

In [67]:
# LoRA Regex, matches strings as follows
# <lora:TeradaOchiko.safetensors:1.0>
# <lora:Personal/Hex/HexXL_v5_FewerYellow.safetensors:0.5>
import re
from collections import Counter
loraMatch = re.compile(r'<lora:(.*?):([0-9\.]+)>')


loraCounter = Counter()

for img in ImageObjects:
    item340 = img.prompt.get('340', None)
    if item340 is None:
        print("Unable to find node 340 in " + img.filename)
        continue
    if item340['class_type'] != 'Power Prompt (rgthree)':
        print("Unexpected node type: ")
        print(img.filename)
    pprompt = item340['inputs']['prompt']
    # Match Loras
    loras = loraMatch.findall(pprompt)
    for l in loras:
        lora_name = l[0].replace("\\", "/").split('/')[-1].replace('.safetensors', '')
        loraCounter[lora_name] += 1


In [70]:

# Build orderedDictionary from loraCounter Counter object()
loraCounterOut = dict(loraCounter.most_common())

with open('data/loraCounter_damn.json', 'w') as f:
    # Output json dictionary by top counts
    json.dump(loraCounterOut, f, indent=4)

In [4]:

import os
import pprint
pp = pprint.PrettyPrinter()
import json
from collections import Counter

loraCounter = Counter()

for path in os.listdir('data'):
    if path[-5:] == '.json':
        with open('data/' + path) as f:
            data = json.load(f)
            for lora, value in data.items():
                loraCounter[lora] += value


In [None]:
loraCounter

In [12]:

targets = {
    'Sirius': [['Sirius', 'bl4ckdr4g'], {}],
    'Bats': [['Bats', 'Aerys', 'TanBat'], {}],
    'Hex': [['Hex'], {}],
    'PixelSketcher': [['PixelSketcher'], {}],
    'Valziel': [['Valziel'], {}],
    'Zaush': [['Zaush'], {}],
    'Tricksta': [['Tricksta'], {}],
}



for name, matchData in targets.items():
    for loraName, count in loraCounter.items():
        if any([loraName.find(match) > -1 for match in matchData[0]]):
            matchData[1][loraName] = count
    matchData[1] = dict(sorted(matchData[1].items(), key = lambda item: item[1], reverse=True))






In [None]:
targets