In [1]:
from environment_manager import *
import re
import System
import Grasshopper.Kernel as ghk
from Grasshopper.Kernel import IGH_Component, IGH_Param, GH_Document
from Grasshopper.Kernel import GH_DocumentIO

Create the version Environment

In [2]:
# class DataLoader:
#     def __init__(self, name, file_or_folder):
#         self.name = name
#         self.data_to_load = file_or_folder
#         self.env = load_create_environment(name)
#         self.gct = GHComponentTable
#         GHComponentTable.initialise()

name = "240308-initial_test"
env = load_create_environment(name)
gct = GHComponentTable
GHComponentTable.initialise()



Setting environment variables
Copying vanilla components
File copied successfully from C:\Users\jossi\Dropbox\Office_Work\Jos\GH_Graph_Learning\Grasshopper Components\240307-CoreComponents\vanilla_components.csv to ExtractionEnvironments\240308-initial_test\00-VanillaComponents\vanilla_components.csv.
Copying components
Copying gh files


In [3]:


def doc_save(doc):
    ghdio = GH_DocumentIO(doc)
    ghdio.Save()    



def remove_unwanted_items(doc, illegals_dict):
    illegals_set = set([str(v) for v in illegals_dict.values()])
    removables = [obj for obj in doc.Objects if str(obj.ComponentGuid) in illegals_set]
    for obj in removables:
        doc.RemoveObject(obj, True)
    return doc

def doc_save(doc):
    ghdio = GH_DocumentIO(doc)
    ghdio.Save()

def get_component_type(component):
    pattern = r"^(.*?)_OBSOLETE"
    match = re.search(pattern, str(component))
    return match.group(1) if match else None

def create_new_component(component_type):
    replacement_guids = GHComponentTable.get_guid_by_type(component_type)
    if replacement_guids:
        new_component_proxy = GHComponentTable.search_component_by_guid(System.Guid(replacement_guids[0]))
        if new_component_proxy:
            return new_component_proxy.CreateInstance()
    return None

def rewire_connections(old_component, new_component):
    old_component_obj = GHComponent_.convert_cls(old_component, ghk.IGH_Component)
    new_component_obj = GHComponent_.convert_cls(new_component, ghk.IGH_Component)

    if old_component_obj and new_component_obj:
        for input, new_input in zip(old_component_obj.Params.Input, new_component_obj.Params.Input):
            for source in input.Sources:
                new_input.AddSource(source)
        for output, new_output in zip(old_component_obj.Params.Output, new_component_obj.Params.Output):
            for recipient in output.Recipients:
                recipient.AddSource(new_output)

    else:
        old_param_obj = GHComponent_.convert_cls(old_component, ghk.IGH_Param)
        new_param_obj = GHComponent_.convert_cls(new_component, ghk.IGH_Param)

        if old_param_obj and new_param_obj:
            for source in old_param_obj.Sources:
                new_param_obj.AddSource(source)
            for recipient in old_param_obj.Recipients:
                recipient.AddSource(new_param_obj)

        else:
            print(f"Cannot rewire connections. Incompatible component types: {type(old_component)} and {type(new_component)}")

def replace_component(doc, old_component, new_component):
    if new_component:
        new_component.Attributes.Pivot = old_component.Attributes.Pivot
        doc.AddObject(new_component, False)
        rewire_connections(old_component, new_component)
        doc.RemoveObject(old_component, True)
        return True
    return False

def replace_obsolete_components(doc):
    for component in list(doc.Objects):
        if hasattr(component, 'Obsolete') and component.Obsolete:
            component_type = get_component_type(component)
            if component_type:
                new_component = create_new_component(component_type)
                if not replace_component(doc, component, new_component):
                    print(f"Could not replace obsolete component: {component.Name}")
    print("Replacement of obsolete components complete.")
    return doc

def remove_placeholder_components(doc, update=True):
    objects = doc.Objects
    placeholders = []
    for component in doc.Objects:
        a = GHComponentTable.get_guid_to_idx(str(component.ComponentGuid))
        if a is None:
            placeholders.append(component)
    if update:
        ghk.GH_Document.NewSolution(doc, False)

    for placeholder in placeholders:
        doc.RemoveObject(placeholder, False)

    return doc




def preprocess_and_replace(doc, illegals_dict, overwrite=True):
    doc = remove_unwanted_items(doc, illegals_dict)
    doc = replace_obsolete_components(doc)
    doc = remove_placeholder_components(doc)  # Add this line to remove placeholder components

    if overwrite:
        try:
            doc_save(doc)
        except Exception as e:
            print(f"Error in saving {doc.Name}: {e}")
    #         move the file to the error bin
                
    return doc
    
    print("Preprocessing, replacement, and placeholder removal complete.")
    return doc

In [4]:
class GHNode_:
    def __init__(self, obj: ghk.IGH_DocumentObject):
        self.obj = obj
        self.category = obj.Category
        self.name = obj.Name
        self.id = str(obj.InstanceGuid)
        self.position = obj.Attributes.Pivot if hasattr(obj.Attributes, "Pivot") else None
        self.uid = f"{self.category}_{self.name}_{self.id[-5:]}"
        # Assuming global_idx is somehow related to GHComponentTable, which might need instance reference
        self.global_idx = GHComponentTable.component_to_idx(self)  # This requires GHComponentTable method adjustment
        self.graph_id = None

    def get_recipients(self):
        """To be implemented by the subclass"""
        pass

    def __str__(self):
        return f"{self.uid}"

    def __repr__(self):
        return f"<GHNode {self.__str__()}>"

    def log_properties(self):
        log = {
            f"Category: {self.category}, "
            f"Name: {self.name}, "
            f"ID: {self.id[-5:]}, "
            f"Position: {self.position}"
            f"Global: {self.global_idx}"
        }
        # This method seems intended for logging or debugging, consider how it's used and adapt accordingly.

In [5]:
class GHParam_:

    def __init__(self, obj):
        self.obj = obj
        self.parent = GHNode(ghk.IGH_DocumentObject(obj).Attributes.GetTopLevel.DocObject)
        self.name = obj.Name
        self.datamapping = int(obj.DataMapping)  # enumerator 0:none, 1:Flatten, 2:Graft
        self.pkind = obj.Kind  # the kind: floating (top level), input (parameter tied to component as input), output (parameter tied to a component as an output
        self.dataEmitter = obj.IsDataProvider  # boolean stating whether this object is able to emit data
        # self.typ = obj.Type
        self.typname = obj.TypeName  # human-readable descriptor of this parameter
        self.optional = obj.Optional  # gets whether this parameter is optional to the functioning of the component
        # logging.info(f'GHComponent {self.parent.name} Params: {self.log_properties()}')

    @property
    def recipients(self):
        # if there are no recipents to this parameter, return none
        return [rcp for rcp in self.obj.Recipients] if len(self.obj.Recipients) > 0 else None

    @property
    def sources(self):
        # if there are no recipents to this parameter, return none
        return [rcp for rcp in self.obj.Sources] if len(self.obj.Sources) > 0 else None

    @property
    def data(self):
        return self.obj.VolatileData.DataDescription(False, False)

    def log_properties(self):
        properties = (
            f"Name: {self.name}, "
            f"DataMapping: {self.datamapping}, "
            f"Kind: {self.pkind}, "
            f"DataEmitter: {self.dataEmitter}, "
            f"TypeName: {self.typname}, "
            f"Optional: {self.optional}, "
            f"Data: {self.data}, "
        )
        return properties

    def __str__(self):
        repr_obj = ghk.IGH_DocumentObject(self.obj)
        return f"param:{self.name}"

    def __repr__(self):
        return f"<GHParam {self.__str__()}>"

In [6]:
from typing import Union
class GHComponent_(GHNode_):
    """Subclass of GHNode that handles GH components that implement IGH_Component.
    Each GHComponent object should contain a list of input parameter and output parameter objects.
    These parameter objects have access to the sources and recipients of the parameter"""

    def __init__(self, obj):
        super().__init__(obj)
        self.obj: Union[IGH_Component or IGH_Param] = None
        self.iparams = []
        self.oparams = []
        self.recipients = []
        self.iparams_dict = {}
        self.oparams_dict = {}

        # Attempt to initialize parameters
        self.initialize_parameters(obj)
    
    def initialize_parameters(self, obj):
        if GHComponent_.is_cls(obj, ghk.IGH_Component):
            self.obj = IGH_Component(obj)
            self.iparams = [GHParam_(p) for p in self.obj.Params.Input]
            self.oparams = [GHParam_(p) for p in self.obj.Params.Output]
        
        elif GHComponent_.is_cls(obj, ghk.IGH_Param):
            self.obj = IGH_Param(obj)
            param = GHParam_(self.obj)
            self.iparams = [param]
            self.oparams = [param]
        else:
            print(obj)
            # self.obj = GHComponentTable.idx_to_component()

    @staticmethod
    def is_cls(obj, clas):
        try:
            clas(obj)
            return True
        except Exception:
            return False
    @staticmethod
    def convert_cls(obj, clas):
        try:
            return clas(obj)
        except Exception:
            return None
    @staticmethod
    def input_params(obj: IGH_Component):
        assert is_cls(obj, IGH_Component), "The component is not coming in as an IGH_Component"
        obj = convert_cls(obj, IGH_Component)
        parameters = []
        if obj.Params.Input:
            for p in obj.Params.Input:
                    parameters.append(GHParam_(p))
        return parameters
    @staticmethod
    def output_params(obj: IGH_Component):
        assert is_cls(obj, IGH_Component), "The component is not coming in as an IGH_Component"
        obj = convert_cls(obj, IGH_Component)
        parameters = []
        if obj.Params.Output:
            for p in obj.Params.Output:
                parameters.append(GHParam_(p))
        return parameters
    
    def get_connections(self, canvas):
        """Returns the source and recipient connections for this component"""
        source_connections = []
        recipient_connections = []

        # Handle connections to recipients from this component's output parameters
        if self.oparams is not None:
            for i, oparam in enumerate(self.oparams):  # Iterate over output parameters
                if oparam.recipients:  # Ensure there are recipients to consider
                    for r in oparam.recipients:
                        recipient = GHParam(r)
                        # Search the canvas for the corresponding objects. Remember to convert the InstanceGUID into a str
                        recipient_component = canvas.find_object_by_guid(str(recipient.parent.obj.Attributes.InstanceGuid))
                        if recipient_component is not None:
                            parent_instance = canvas.find_object_by_guid(recipient_component.id)
                            if parent_instance is not None:
                                recipient_parameter_index = recipient_component.iparams_dict.get(recipient.name)
                                if recipient_parameter_index is not None:
                                    recipient_conn = {
                                        'to': parent_instance,
                                        'edge': (i, recipient_parameter_index)
                                    }
                                    recipient_connections.append(recipient_conn)
        if self.iparams is not None:
            # Handle connections from sources to this component's input parameters
            for i, iparam in enumerate(self.iparams):  # Iterate over input parameters
                if iparam.sources:  # Ensure there are sources to consider
                    for s in iparam.sources:
                        source = GHParam(s)
                        # Search the canvas for the corresponding objects. Remember to convert the InstanceGUID into a str
                        source_component = canvas.find_object_by_guid(str(source.parent.obj.Attributes.InstanceGuid))
                        if source_component is not None:
                            source_instance = canvas.find_object_by_guid(source_component.id)
                            if source_instance is not None:
                                source_parameter_index = source_component.oparams_dict.get(source.name)
                                if source_parameter_index is not None:
                                    source_conn = {
                                        'from': source_instance,
                                        'edge': (source_parameter_index, i)
                                    }
                                    source_connections.append(source_conn)

        return source_connections, recipient_connections

    def print_conns(self, canvas):
        source_conns, recipient_conns = self.get_connections(canvas)
        print(f"Component: {self.name}")
        print(f"Sources: {source_conns}")
        print(f"Recipients: {recipient_conns}")

    def __str__(self):
        return f"Comp:{self.name}"
    
    def __repr__(self):
        return f"<GHNComponent {self.__str__()}>"
            

In [7]:
def test_component(file):
    doc = GHProcessor.get_ghdoc(str(file))
    for i, component in enumerate(doc.Objects):
        print(f'Loading... {i+1}/{len(list(doc.Objects))}', end='\r')
        a = GHComponent_(component)
        if a is None:
            print(component)
    print(f"Processed {len(doc.Objects)} components in {file.name}")


# Test Single

In [12]:
import shutil
from pathlib import Path

run_preprocessing = True
error_bin = Path(r"C:\Users\jossi\Dropbox\Office_Work\Jos\GH_Graph_Learning\TTD\Error_Bin")

def preprocess_single(file: Path, overwrite=True) -> GH_Document:
    illegals_dict = {
        "Bifocals": "aced9701-8be9-4860-bc37-7e22622afff4",
        "Group": "c552a431-af5b-46a9-a8a4-0fcbc27ef596",
        "Sketch": "2844fec5-142d-4381-bd5d-4cbcef6d6fed",
        "Cluster": "f31d8d7a-7536-4ac8-9c96-fde6ecda4d0a",
        "Scribble": "7f5c6c55-f846-4a08-9c9a-cfdc285cc6fe"
    }


    doc = GHProcessor.get_ghdoc(str(file))
    try:
        doc = preprocess_and_replace(doc, illegals_dict, overwrite)
        return doc
    except Exception as e:
        print(f"Error in preprocessing {file.name}: {e}")
        
        shutil.move(file, error_bin / file.name + "_error")
        a = GH_DocumentIO(doc).SaveQuiet(str(error_bin / file.name))
        print(a)
    

file = Path(r"C:\Users\jossi\Dropbox\Office_Work\Jos\GH_Graph_Learning\TTD\Test_many\265ee37e22db.gh")
preprocess_single(file, True)
        

Replacement of obsolete components complete.
Preprocessing, replacement, and placeholder removal complete.


<Grasshopper.Kernel.GH_Document object at 0x00000241A6F45480>

# Test Many

## Test preprocessing

In [None]:
run_preprocessing = False
def preprocess(folder: Path,  overwrite=True) -> GH_Document:
    assert isinstance(folder, Path), "The folder must be a Path object"
    illegals_dict = {
    "Bifocals": "aced9701-8be9-4860-bc37-7e22622afff4",
    "Group": "c552a431-af5b-46a9-a8a4-0fcbc27ef596",
    "Sketch": "2844fec5-142d-4381-bd5d-4cbcef6d6fed",
    "Cluster": "f31d8d7a-7536-4ac8-9c96-fde6ecda4d0a",
    "Scribble": "7f5c6c55-f846-4a08-9c9a-cfdc285cc6fe"
    }   
    for file in folder.iterdir():
        if file.suffix == ".gh":
            print(f"Preprocessing {file.name}...")
            doc = GHProcessor.get_ghdoc(str(file))
            doc = preprocess_and_replace(doc, illegals_dict, overwrite)
            print(f"Preprocessing of {file.name} complete.")
    print("finished processing all files")

folder = Path(r"C:\Users\jossi\Dropbox\Office_Work\Jos\GH_Graph_Learning\TTD\Test_many")
print(type(folder))
if run_preprocessing:
    preprocess(folder)


## Test Node Conversion

In [None]:
# for file in folder.iterdir():
#     if file.suffix == ".gh":
#         print(f"Processing {file.name}...")
#         doc = GHProcessor.get_ghdoc(str(file))
#         for i, component in enumerate(doc.Objects):
#             print(f'Loading... {i+1}/{len(list(doc.Objects))}', end='\r')
#             a = GHComponent_(component)
#             if a is None:
#                 print(component)
#         print(f"Processed {len(doc.Objects)} components in {file.name}")

In [None]:
# file = r"C:\Users\jossi\Dropbox\Office_Work\Jos\GH_Graph_Learning\TTD\missing comps\missingcomps.gh"
# doc = GHProcessor.get_ghdoc(file)
# for component in doc.Objects:
#     a = GHComponentTable.get_guid_to_idx(str(component.ComponentGuid))
#     if a is None:
#         print(component)
#     print(GHComponent_(component))