# Speckle Federator

The purpose of this notebook is to take 3 commits, join them into a new 4th commit and then use the embedded viewer to benchmark performance relative to overlaying them.

In [6]:
%%capture
# capture turns off the output for this cell which would just be the pip install log 🤫
%pip install specklepy
%pip install python-dotenv

%reload_ext dotenv
%dotenv

In [7]:
import os
HOST_SERVER = os.getenv('HOST_SERVER')
ACCESS_TOKEN = os.getenv('ACCESS_TOKEN')

In [8]:
from dotenv import load_dotenv

# specify the path to the .env file
dotenv_path = '/path/to/your/.env'

# load the .env file from the specified path
load_dotenv(dotenv_path)

# access the environment variable(s) defined in the .env file
my_var = os.getenv('MY_VAR')

In [11]:
from specklepy.api.client import SpeckleClient

client = SpeckleClient(host=HOST_SERVER)  # or whatever your host is
client.authenticate_with_token(ACCESS_TOKEN)  # or whatever your token is

client

SpeckleClient( server: https://speckle.xyz, authenticated: True )

I will choose some commits from the Speckle Server that have been sent as parts of a greater whole - in this case, a sample scooper from a Mars rover.

In [12]:
# Define 3 commits in a single stream.add()
commits = [
    "https://speckle.xyz/streams/7ce9010d71/commits/27a5df66a0",  # scooper bucket
    "https://speckle.xyz/streams/7ce9010d71/commits/f7ef7f5270",  # fixing plate
    "https://speckle.xyz/streams/7ce9010d71/commits/f6052eaa16",  # hinged arm
]

## Benchmark 1:
Generate an overlay URL that combines the 3 commits

`\streams\{STREAM_ID}\commits\{COMMIT_ID1}?overlay={COMMIT_ID2},{COMMIT_ID3}`

In [13]:
from specklepy.transports.server import ServerTransport
from specklepy.api.wrapper import StreamWrapper

wrappers = [StreamWrapper(commit_url) for commit_url in commits]

transport = ServerTransport(client=client, stream_id=wrappers[0].stream_id)

In [14]:
stream_id = wrappers[0].stream_id

commit_ids = [wrapper.commit_id for wrapper in wrappers]

transparency = True  # make the background transparent
autoload = True  # load the stream on page load
hide_controls = False  # hide the viewer tools button
hide_sidebar = True  # hide the sidebar
hide_selection_info = True  # hide the selection info

# url = https://speckle.xyz/embed?stream=ca99defd4b&commit=39009883ba&transparent=true&autoload=true&hidecontrols=true&hidesidebar=true&hideselectioninfo=true
embed_url = f"https://speckle.xyz/embed?stream={stream_id}&commit={commit_ids[0]}&overlay={','.join(commit_ids[1:])}&transparent={transparency}&autoload={autoload}&hidecontrols={hide_controls}&hidesidebar={hide_sidebar}&hideselectioninfo={hide_selection_info}"

In [15]:
embed_url

'https://speckle.xyz/embed?stream=7ce9010d71&commit=27a5df66a0&overlay=f7ef7f5270,f6052eaa16&transparent=True&autoload=True&hidecontrols=False&hidesidebar=True&hideselectioninfo=True'

In [39]:
from IPython.display import IFrame

IFrame(embed_url, width=400, height=300)

## Benchmark 2: Combine commits

Take the content of the 3 commits and commit that to speckle.

View that commit in the embedded viewer

In [18]:
commit_objects = [client.commit.get(stream_id, commit_id) for commit_id in commit_ids]

f"{commit_objects[0].authorName}: {commit_objects[0].message} on {commit_objects[0].createdAt}"

'Jonathon Broughton: Sent 197 elements from Navisworks. on 2022-12-30 23:26:33.611000+00:00'

If the 3 urls given are all commits then the first name object has a property called `referencedObject` which is the id of the wrapper object for the commit data.

In [19]:
referenced_objects = [r.referencedObject for r in commit_objects]

In [20]:
referenced_objects = [
    client.commit.get(stream_id, commit_id).referencedObject
    for commit_id in commit_ids
]

We can create a new Federation class which is essentially just adding a name for the collection.

In [40]:
from specklepy.objects import Base

class Federation(Base, speckle_type="Federation"):
    def __init__(self, **kwargs):
      self["Components"] = []

In [41]:
new_commit_object = Base(speckle_type="Federation")

new_commit_object["Components"] = [
    Base.of_type("reference", referencedId=commit_id)
    for commit_id in referenced_objects
]

For the Viewer to resolve this will require the closure table for each of the reference objects. These closures are used as a shortcut for it to handle the processable objects.

We can add a custom operation to our notebook to simply get this from the server.

Ideally we'd check the localTransport first to see if we have the closure table already, but for brevity we'll just get it from the server.

We'll make a straight GraphQL query of the commit


In [42]:
from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport


def get_closures(wrapper, object_id):
    client = Client(
        transport=RequestsHTTPTransport(
            url=f"{wrapper._account.serverInfo.url}/graphql", verify=True, retries=3
        )
    )

    query = gql(
        """ query Object($stream_id: String!, $object_id: String!) { 
            stream(id: $stream_id) { 
              object(id: $object_id) { 
                data 
              }
            }
          } """
    )
    params = {"stream_id": wrapper.stream_id, "object_id": object_id}
    return client.execute(query, variable_values=params)["stream"]["object"]["data"][
        "__closure"
    ]

The `new_commit_object` will need the `__closure` table from each of the commits were merging. We can use the `get_closures` function we created earlier to get this.

At this point we could refactor to always be using Lists rather than numbered variables, but for now we'll just add the closures to the new commit object.

In [43]:
closures = {
    k: v
    for d in [get_closures(wrappers[0], obj_id) for obj_id in referenced_objects]
    for k, v in d.items()
}
closures.update(dict.fromkeys(referenced_objects, 1))


new_commit_object["__closure"] = closures

Within a notebook it often helps to write defensive code that can be run repeatedly even if the method has already been run. This is because the notebook is stateful and can be run in any order. The following code checks if the `federated` branch exists and if not creates it.

In [44]:
from specklepy.transports.server import ServerTransport
from specklepy.api import operations
from specklepy.logging.exceptions import GraphQLException

def try_get_branch_or_create(client, stream_id, branch_name):
    try:
        client.branch.get(
            stream_id=stream_id, name=branch_name
        ) or client.branch.create(stream_id=stream_id, name=branch_name)
        return client.branch.get(
            stream_id=stream_id, name=branch_name)
    except GraphQLException:
        return client.branch.create(stream_id=stream_id, name=branch_name)


branch = try_get_branch_or_create(client, stream_id, "federated")

In [45]:
hash_2 = operations.send(base=new_commit_object, transports=[transport])

hash_2

'e654f22a34f02c5c6d8e34e0d6cf01e1'

This doesn't actually work as the default specklepy traversal strips props with __ prefix and neither does it resolve closure for Reference Objects. So we'll need to add a custom operation to the server to resolve this.

In [46]:
from typing import Any, Dict, List, Optional, Tuple
from specklepy.serialization.base_object_serializer import BaseObjectSerializer
from uuid import uuid4
import hashlib
import re
from enum import Enum
from specklepy.objects.base import Base, DataChunk
import ujson

PRIMITIVES = (int, float, str, bool)


def traverse_base(
    serializer: BaseObjectSerializer, base: Base, closures: Dict[str, Any] = {}
):
    if serializer.write_transports:
        for wt in serializer.write_transports:
            wt.begin_write()

    if not serializer.detach_lineage:
        serializer.detach_lineage = [True]

        serializer.lineage.append(uuid4().hex)
        object_builder = {"id": "", "speckle_type": "Base", "totalChildrenCount": 0}
        object_builder.update(speckle_type=base.speckle_type)
        obj, props = base, base.get_serializable_attributes()

        while props:
            prop = props.pop(0)
            value = getattr(obj, prop, None)
            chunkable = False
            detach = False

            # skip props marked to be ignored with "__" or "_"
            if prop.startswith(("__", "_")):
                continue

            # don't prepopulate id as this will mess up hashing
            if prop == "id":
                continue

            # only bother with chunking and detaching if there is a write transport
            if serializer.write_transports:
                dynamic_chunk_match = prop.startswith("@") and re.match(
                    r"^@\((\d*)\)", prop
                )
                if dynamic_chunk_match:
                    chunk_size = dynamic_chunk_match.groups()[0]
                    serializer._chunkable[prop] = (
                        int(chunk_size) if chunk_size else base._chunk_size_default
                    )

                chunkable = prop in base._chunkable
                detach = bool(
                    prop.startswith("@") or prop in base._detachable or chunkable
                )

            # 1. handle None and primitives (ints, floats, strings, and bools)
            if value is None or isinstance(value, PRIMITIVES):
                object_builder[prop] = value
                continue

            # NOTE: for dynamic props, this won't be re-serialised as an enum but as an int
            if isinstance(value, Enum):
                object_builder[prop] = value.value
                continue

            # 2. handle Base objects
            elif isinstance(value, Base):
                child_obj = serializer.traverse_value(value, detach=detach)
                if detach and serializer.write_transports:
                    ref_id = child_obj["id"]
                    object_builder[prop] = serializer.detach_helper(ref_id=ref_id)
                else:
                    object_builder[prop] = child_obj

            # 3. handle chunkable props
            elif chunkable and serializer.write_transports:
                chunks = []
                max_size = base._chunkable[prop]
                chunk = DataChunk()
                for count, item in enumerate(value):
                    if count and count % max_size == 0:
                        chunks.append(chunk)
                        chunk = DataChunk()
                    chunk.data.append(item)
                chunks.append(chunk)

                chunk_refs = []
                for c in chunks:
                    serializer.detach_lineage.append(detach)
                    ref_id, _ = serializer._traverse_base(c)
                    ref_obj = serializer.detach_helper(ref_id=ref_id)
                    chunk_refs.append(ref_obj)
                object_builder[prop] = chunk_refs

            # 4. handle all other cases
            else:
                child_obj = serializer.traverse_value(value, detach)
                object_builder[prop] = child_obj

            closure = {}
            # add closures & children count to the object
            detached = serializer.detach_lineage.pop()
            if serializer.lineage[-1] in serializer.family_tree:
                closure = {
                    ref: depth - len(serializer.detach_lineage)
                    for ref, depth in serializer.family_tree[
                        serializer.lineage[-1]
                    ].items()
                }

            ############ ADDING OUR MAGIC HERE #################################
            closure.update(closures)

            object_builder["totalChildrenCount"] = len(closure)

            obj_id = hashlib.sha256(ujson.dumps(object_builder).encode()).hexdigest()[
                :32
            ]

            object_builder["id"] = obj_id
            if closure:
                object_builder["__closure"] = serializer.closure_table[obj_id] = closure

            # write detached or root objects to transports
            if detached and serializer.write_transports:
                for t in serializer.write_transports:
                    t.save_object(
                        id=obj_id, serialized_object=ujson.dumps(object_builder)
                    )

            del serializer.lineage[-1]

            if serializer.write_transports:
                for wt in serializer.write_transports:
                    wt.end_write()

            return obj_id, object_builder

In [47]:
serializer = BaseObjectSerializer(write_transports=[transport])

obj_id, serialized_object = traverse_base(serializer, new_commit_object, closures)

In [48]:
serialized_object

{'id': '5e9ac0017b74034997dbe5fa45714a90',
 'speckle_type': 'Base',
 'totalChildrenCount': 482,
 'Components': [{'id': '8ca84c1c0447b4caaed8b622dad90263',
   'speckle_type': 'reference',
   'totalChildrenCount': 0,
   'applicationId': None,
   'referencedId': 'f048873d78d8833e1a2c0d7c2391a9bb',
   'units': None},
  {'id': 'e4b7f1ace651fa8a899d4860a0572af6',
   'speckle_type': 'reference',
   'totalChildrenCount': 0,
   'applicationId': None,
   'referencedId': 'de61f36d6a4c6b9713e445ab4d801ea9',
   'units': None},
  {'id': '5d1c1e466dd4df7ae76c7c9183b4317f',
   'speckle_type': 'reference',
   'totalChildrenCount': 0,
   'applicationId': None,
   'referencedId': '90f505f7625cd121e99af6e81a1a1013',
   'units': None}],
 '__closure': {'0042e47be89ba7af3cd0344012dd44fb': 6,
  '0225bdfc617ae2e2cfa3182e5f319026': 8,
  '03ab601e5a6e7743dbada875bd634a3d': 3,
  '04849987174c213dcfba897757bcf4b4': 6,
  '04b68bc41ce7aa7e58e088e997193684': 5,
  '062f59e346ab9ba7f59d60a46b4e421a': 4,
  '085d6f930431

In [49]:
commit_id2 = client.commit.create(
    branch_name=branch.name,
    stream_id=stream_id,
    object_id=obj_id,
    message="federated commit",
)

Once again we build the embed URL and display it.

In [50]:
embed_url2 = f"https://speckle.xyz/embed?stream={stream_id}&commit={commit_id2}&transparent={transparency}&autoload={autoload}&hidecontrols={hide_controls}&hidesidebar={hide_sidebar}&hideselectioninfo={hide_selection_info}"

from IPython.display import IFrame

IFrame(embed_url2, width=400, height=300)

In [51]:
embed_url2

'https://speckle.xyz/embed?stream=7ce9010d71&commit=53862c7bc6&transparent=True&autoload=True&hidecontrols=False&hidesidebar=True&hideselectioninfo=True'

This federation is kinda simple, kinda clunky and doesn't de-dupe at all as it is not even examining the content of the individual commits.

To do anything approaching this we need to:

- load the child members of each commit
- have a strategy for de-duping
- have a strategy for merging
- have a strategy for handling any other conflicts



## Benchmark 3: Combine commits and de-dupe

The next step is to extract the component objects from each commit and then de-dupe them. This is a bit more involved as we need to load the objects from the server and then compare them. We also need to be able to merge them.

In [22]:
from specklepy.api import operations

commit_data = [
    operations.receive(obj_id=ref_obj, remote_transport=wrap.get_transport())
    for ref_obj, wrap in zip(referenced_objects, wrappers)
]

We create 3 Base objects and create a commit essentially identical to Benchmark 2.

In [26]:
from specklepy.objects import Base

granular_commit_object = Base(speckle_type="Federation.Granular")
granular_commit_object["@Components"] = commit_data

... and hash them

In [27]:
hash3 = operations.send(base=another_commit_object, transports=[transport])

On the basis of some of the other discussions around Versions, Federations, Assemblies, Exchanges etc. We can store this new commit on a dedicated branch.

In [33]:
from specklepy.api import operations
from specklepy.logging.exceptions import GraphQLException

# A helper function to create a new branch if one doesn't exist, or return it if it does
def try_get_branch_or_create(client, stream_id, branch_name):
    try:
        client.branch.get(
            stream_id=stream_id, name=branch_name
        ) or client.branch.create(stream_id=stream_id, name=branch_name)
        return client.branch.get(
            stream_id=stream_id, name=branch_name)
    except GraphQLException:
        return client.branch.create(stream_id=stream_id, name=branch_name)


branch = try_get_branch_or_create(client, stream_id, "federated")

In [34]:
branch

Branch(id='45107b16b6', name='federated', description='No description provided', commits=Commits(totalCount=32, cursor=datetime.datetime(2023, 1, 12, 18, 5, 57, tzinfo=datetime.timezone.utc), items=[Commit( id: 322d9849b0, message: federated commit, referencedObject: 9aef9ae333044e2129f4ab443bd5f50c, authorName: Jonathon Broughton, branchName: federated, createdAt: 2023-01-16 14:34:11.672000+00:00 ), Commit( id: 6b6d7bc1fb, message: federated commit, referencedObject: 5e9ac0017b74034997dbe5fa45714a90, authorName: Jonathon Broughton, branchName: federated, createdAt: 2023-01-16 14:34:03.628000+00:00 ), Commit( id: 8671501e17, message: federated commit, referencedObject: 9aef9ae333044e2129f4ab443bd5f50c, authorName: Jonathon Broughton, branchName: federated, createdAt: 2023-01-12 18:46:30.143000+00:00 ), Commit( id: 8d1f6ccc12, message: federated commit, referencedObject: ca357c1a3580ff9203d4e11642c6a0d4, authorName: Jonathon Broughton, branchName: federated, createdAt: 2023-01-12 18:45:

And then we create a new commit that contains the resolved objects

In [36]:
commit_id3 = client.commit.create(

    branch_name=branch.name,
    stream_id=stream_id,
    object_id=hash3,
    message="federated commit",
)


In [37]:

embed_url3 = f"https://speckle.xyz/embed?stream={stream_id}&commit={commit_id3}&transparent={transparency}&autoload={autoload}&hidecontrols={hide_controls}&hidesidebar={hide_sidebar}&hideselectioninfo={hide_selection_info}"

from IPython.display import IFrame

IFrame(embed_url3, width=400, height=300)

In [38]:
embed_url3

'https://speckle.xyz/embed?stream=7ce9010d71&commit=c31bd79d1b&transparent=True&autoload=True&hidecontrols=False&hidesidebar=True&hideselectioninfo=True'