In [None]:
import os
import re
import sys
import time
import ipaddress
import numpy as np
import capnp
import json
from collections import OrderedDict


ws_root = r"E:\bitbucket\rhea_ws"
sys.path += [ws_root]

ron = """FrameMark((
    o_center: (
        point: [
            0.18285007924661123,
            0.6709791384801527,
            0.03982208520828473,
        ],
        color: (((0, 0, 255, 255)), ((255, 255, 255, 255))),
        display: true,
        base_pose: Some((
            rotation: [
                0.2892680193133737,
                0.6922257131134943,
                0.6188778163770924,
                -0.23267536075690054,
            ],
            translation: [
                0.16989584832610768,
                0.4051250510944656,
                0.0752799672578123,
            ],
        )),
    ),
    the_dir: (
        point: [
            0.18092108459747158,
            0.6224851376558993,
            0.0467822560303599,
        ],
        color: (((0, 255, 255, 255)), ((0, 0, 0, 255))),
        display: true,
        base_pose: Some((
            rotation: [
                0.2892680193133737,
                0.6922257131134943,
                0.6188778163770924,
                -0.23267536075690054,
            ],
            translation: [
                0.16989584832610768,
                0.4051250510944656,
                0.0752799672578123,
            ],
        )),
    ),
    x_dir: ForwardZAndXIsGlobalZ,
    base_pose: Some((
        rotation: [
            0.2892680193133737,
            0.6922257131134943,
            0.6188778163770924,
            -0.23267536075690054,
        ],
        translation: [
            0.16989584832610768,
            0.4051250510944656,
            0.0752799672578123,
        ],
    )),
    pose: (
        rotation: [
            0.5448278608056937,
            0.5235865623619453,
            0.4722667557509078,
            -0.4538544097135168,
        ],
        translation: [
            0.18285007924661117,
            0.6709791384801526,
            0.03982208520828476,
        ],
    ),
    display_model: Frame,
    display: true,
))"""

if True:
    from ws_world import World
    from ws_devices import Devices
    from common.utils_py import *


tool = (
    np.linalg.inv(Pose.fromRon(ron, "base_pose").homo) @ Pose.fromRon(ron, "pose").homo
)


np.set_printoptions(precision=4, suppress=True)


def print(*args):
    __builtins__.print(*("%.4f" % a if isinstance(a, float) else a for a in args))


def packageJointDict(joint: list, pose=None):
    ret = {}
    ret["joints"] = joint
    ret["pose"] = pose
    ret["at"] = time.time()
    ret["duration"] = 100
    return ret


def readTracker(file):
    poseFile = file
    poseList = []

    with open(poseFile, "r") as poseStr:
        meta = json.load(poseStr)

        for kv in meta["0. Default"]:
            poseList.append(kv["joints"])

    return poseList


def saveTracker(jointList, step=0):
    kv_L1 = {"0. Default": []}
    timestr = time.strftime("%H%M%S", time.localtime())
    month_day = time.strftime("%m%d", time.localtime())

    file = rf"E:\posePath\\shovel_{month_day}_{step}_" + timestr + ".tracker"
    for joint in jointList:
        joints = packageJointDict(joint)
        kv_L1["0. Default"].append(joints)

    with open(file, "w") as writer:
        meta = json.dumps(kv_L1)
        writer.write(meta)


def savePathDict(file, pathDict):
    with open(file, "w") as writer:
        meta = json.dumps(pathDict)
        writer.write(meta)


def savePath(step, pathDict):
    timestr = time.strftime("%H%M%S", time.localtime())
    month_day = time.strftime("%m%d", time.localtime())

    file = rf"E:\posePath\\shovel_{month_day}_{step}_" + timestr + ".path"
    with open(file, "w") as writer:
        meta = json.dumps(pathDict)
        writer.write(meta)

    pathDict = {}


def loadPathDict(file):
    with open(file, "r") as poseStr:
        pathDict = json.load(poseStr)

    return pathDict


def printPose(pose):
    print("{}".format(",".join(f"{x:.3f}" for x in pose)))


def printPoseList(poseList, offset=0):
    for index, l in enumerate(poseList):
        print(index + offset, "{}".format(",".join(f"{x:.4f}" for x in l)))


def printDict(pathDict, offset=0):
    for index, (k, v) in enumerate(pathDict.items()):
        print(index + offset, ": ", k, v)


def rads2degrees(rads):
    degrees = []
    for r in rads:
        degrees.append(r / np.pi * 180)
    return degrees


def deg2rad(degree):
    return degree / 180 * np.pi


def deltaRadians(
    rads_or_Pose,
    tool_matrix=tool,
    LIFT_mm=0,
    INTO_mm=0,
    PAN_mm=0,
    R_LIFT_deg=0,
    R_INTO_deg=0,
    R_PAN_deg=0,
):
    if isinstance(rads_or_Pose, Pose):
        pPose = rads_or_Pose
    else:
        pPose = world.getPoseFromDeg(rads2degrees(rads_or_Pose))

    pMutPose = pPose.TD(
        tool_matrix,
        LIFT_mm,
        INTO_mm,
        PAN_mm,
        deg2rad(R_LIFT_deg),
        deg2rad(R_INTO_deg),
        deg2rad(R_PAN_deg),
    )
    pRad = world.getDegrees(pMutPose.toP6(), radians=True)

    return pRad


def deltaRadiansList(
    rads_or_Pose_list,
    tool_matrix=tool,
    LIFT_mm=0,
    INTO_mm=0,
    PAN_mm=0,
    R_LIFT_deg=0,
    R_INTO_deg=0,
    R_PAN_deg=0,
):
    radsList = []
    for p in rads_or_Pose_list:
        radsList.append(
            deltaRadians(
                p,
                tool_matrix,
                LIFT_mm,
                INTO_mm,
                PAN_mm,
                deg2rad(R_LIFT_deg),
                deg2rad(R_INTO_deg),
                deg2rad(R_PAN_deg),
            )
        )

    return radsList


def partDeltaRadianList(
    rads_or_Pose_list,
    beginIndex,
    endIndex,
    tool_matrix=tool,
    LIFT_mm=0,
    INTO_mm=0,
    PAN_mm=0,
    R_LIFT_deg=0,
    R_INTO_deg=0,
    R_PAN_deg=0,
):
    for index, p in enumerate(rads_or_Pose_list[beginIndex:endIndex]):
        rads_or_Pose_list[index + beginIndex] = deltaRadians(
            p,
            tool_matrix,
            LIFT_mm,
            INTO_mm,
            PAN_mm,
            deg2rad(R_LIFT_deg),
            deg2rad(R_INTO_deg),
            deg2rad(R_PAN_deg),
        )

    return True


def partDeltaPathDict(
    pathDict,
    beginIndex,
    endIndex,
    tool_matrix=tool,
    LIFT_mm=0,
    INTO_mm=0,
    PAN_mm=0,
    R_LIFT_deg=0,
    R_INTO_deg=0,
    R_PAN_deg=0,
):
    retDict = {}

    for i in range(len(pathDict)):
        if i >= beginIndex and i <= endIndex:
            retDict[str(i)] = [
                deltaRadians(
                    pathDict[str(i)][0],
                    tool_matrix,
                    LIFT_mm,
                    INTO_mm,
                    PAN_mm,
                    deg2rad(R_LIFT_deg),
                    deg2rad(R_INTO_deg),
                    deg2rad(R_PAN_deg),
                ),
                pathDict[str(i)][1],
            ]
            print(" after ", i, retDict[str(i)])
        else:
            retDict[str(i)] = pathDict[str(i)]

    return retDict


def runPath(pathDict):
    world.clearPathPoints()

    for ps in pathDict.items():
        # print(ps[1][0], ps[1][1])
        world.addPathPointsRadian(ps[1][0], speed=ps[1][1])
    world.pathMove()


def packagePathDict(jointDict, speedDict, defaultSpeed=10):
    pathDict = {}
    for index, (k, v) in enumerate(jointDict.items()):
        pathDict[str(k)] = [v, speedDict.get(index, defaultSpeed)]

    return pathDict


def editPathDictSpeed(pathDict, speedKVList):
    for key_sp in speedKVList:
        pathDict[str(key_sp)][1] = speedKVList[key_sp]


class Path(object):
    def __init__(self, path_or_tracker_file="", dict=None, step=0, defaultSpeed=10):
        self._pathDict = {}
        self._jointDict = {}
        self._speedDict = {}
        self._step = step
        self._defaultSpeed = defaultSpeed
        self.init(path_or_tracker_file, dict)

    def init(self, path_or_tracker_file, dict):
        if path_or_tracker_file.endswith(".path"):
            self._pathDict = loadPathDict(path_or_tracker_file)
            for item in self._pathDict.items():
                self._jointDict[item[0]] = item[1][0]
                self._speedDict[item[0]] = item[1][1]
        elif path_or_tracker_file.endswith(".tracker"):
            for index, p in enumerate(readTracker(path_or_tracker_file)):
                self._jointDict[str(index)] = p
                self._speedDict[str(index)] = self._defaultSpeed
            self._pathDict = packagePathDict(
                self._jointDict, self._speedDict, self._defaultSpeed
            )
        else:
            self.setByDict(dict)

    def setByDict(self, pathDict):
        self._pathDict = pathDict
        for item in self._pathDict.items():
            self._jointDict[item[0]] = item[1][0]
            self._speedDict[item[0]] = item[1][1]

    def savePath(self):
        savePath(self._step, self._pathDict)

    def saveTracker(self):
        saveTracker(self._jointDict, step=self._step)

    def pathDict(self):
        return self._pathDict

    def jointDict(self):
        return self._jointDict

    def speedDict(self):
        return self._speedDict

    def run(self):
        runPath(self._pathDict)

    def deltaPathDict(
        self,
        begin=0,
        end=0,
        tool_matrix=tool,
        LIFT_mm=0,
        INTO_mm=0,
        PAN_mm=0,
        R_LIFT_deg=0,
        R_INTO_deg=0,
        R_PAN_deg=0,
    ):
        partDeltaPathDict(
            self._pathDict,
            begin,
            end,
            tool_matrix,
            LIFT_mm,
            INTO_mm,
            PAN_mm,
            R_LIFT_deg,
            R_INTO_deg,
            R_PAN_deg,
        )

    def insertDict(self, key, pose_speed):
        tmpDict = {}
        for k, v in enumerate(self._pathDict.items()):

            print(k, v)
            # tmpDict[k] = v

        return tmpDict


def shake():
    world.clearPathPoints()
    crt_pose = world.getDegrees(world.getPose(), radians=True)
    p1 = deltaRadians(crt_pose, INTO_mm=2)
    p2 = deltaRadians(crt_pose, INTO_mm=-2)
    p3 = deltaRadians(crt_pose, R_INTO_deg=5)
    p4 = deltaRadians(crt_pose, R_INTO_deg=-5)

    world.addPathPointsRadian(p3, speed=80, smooth=1, acc=100, dec=100)
    world.addPathPointsRadian(p4, speed=80, smooth=1, acc=100, dec=100)
    world.addPathPointsRadian(crt_pose, speed=80, smooth=1, acc=100, dec=100)
    world.addPathPointsRadian(p1, speed=80, smooth=1, acc=100, dec=100)
    world.addPathPointsRadian(p2, speed=80, smooth=1, acc=100, dec=100)
    world.addPathPointsRadian(p4, speed=80, smooth=1, acc=100, dec=100)
    world.addPathPointsRadian(p3, speed=80, smooth=1, acc=100, dec=100)
    world.addPathPointsRadian(crt_pose, speed=80, smooth=1, acc=100, dec=100)
    world.addPathPointsRadian(p2, speed=80, smooth=1, acc=100, dec=100)
    world.addPathPointsRadian(p1, speed=80, smooth=1, acc=100, dec=100)
    world.addPathPointsRadian(crt_pose, speed=80, smooth=1, acc=100, dec=100)
    world.pathMove()

In [None]:
world = World(ws_root)
world.mountDevices()
world.startGrab()

#试管  9454.5-8215.5 = 1239.0mg    余 8402-8215.5 = 186.5  无水硫酸铜
#试管  9483-8216 = 1267mg 

In [None]:
world.getPose()
world.getJoints()

In [None]:
#存勺子坐标轴位置
p_load = np.array([197.26, 340.77, 83.02,  1.7296, -0.6842, 3.0043])
world.movePose(p_load)

In [None]:
pp1 = Path(r"E:\posePath\shovel_0603_1_1745.tracker", step=1)
pp_1 = pp1.pathDict()
speedKVList1 = {
            0:50,
            1:40,
            2:30,
            3:20,
            4:12,
            5:8,
            6:3,
            7:3,  
            8:3,  
            9:3,  
            10:3,  
            11:3,  
            12:3,  
            13:3
               }
editPathDictSpeed(pp_1, speedKVList1)
printDict(pp_1) 

In [None]:
pp2 = Path(r"E:\posePath\shovel_0603_2_1900.tracker", step=2, defaultSpeed=2)
p_2 = pp2.pathDict()
speedKVList2 = {
            3:2,
               }

editPathDictSpeed(p_2, speedKVList2)
printDict(p_2) 


In [None]:
pp3 = Path(r"E:\posePath\shovel_0603_3_1800.tracker", step=3, defaultSpeed=2)
p_3 = pp3.pathDict()
""" speedKVList3 = {
    0:4, 1:6, 2:6, 3:6, 4:6, 5:6, 
    6:6, 7:6, 8:6, 9:20, 10:100, 
    11:100, 12:80, 13:80, 14:80
    
               } """
speedKVList3 = {
    7:6, 8:30, 9:30, 10:30, 
    11:20, 12:20, 13:30, 14:30, 15:30, 
    16:30, 17:30, 18:40, 19:60, 20:50, 
    21:50, 22:50, 23:50
               }

editPathDictSpeed(p_3, speedKVList3)
printDict(p_3) 

In [None]:
p_1 = partDeltaPathDict(pp_1, 3, 6, PAN_mm=1, INTO_mm=6)
# PAN,INTO [-3, 0] [3,0] [0,8] 

In [None]:
runPath(p_1)

In [None]:
shake()

In [None]:
runPath(p_2)

In [None]:
runPath(p_3)

In [None]:
world.hStartRecord(r"D:\records\powder_photos\hcam", 2000)
world.vStartRecord(r"D:\records\powder_photos\vcam", 2000)

155702

In [None]:
id = int(time.strftime("%H%M%S", time.localtime()))
world.hTrigger(id, count=1)
world.vTrigger(id, count=1)

In [None]:
world.hStopRecord()
world.vStopRecord()

In [None]:
from common.utils_py import *

capnp.add_import_hook([os.path.join(ws_root, "common", "types"),
    os.path.join(ws_root, "node", "dev", "mt_weigh_module", "types")])
capnp.cleanup_global_schema_parser()
import node_dev_mt_weigh_module_capnp  # type: ignore

In [None]:
topic_name= ""
msg = ""
time = ""

def weightCallback(topic_name, msg, time):
    with node_dev_mt_weigh_module_capnp.WeightData.from_bytes(msg) as weightMsg:
        print("weightMsg", weightMsg)


c_sub = BlobSubscriber("/dev/weight/stream", "node_dev_mt_weigh_module:WeightData")
c_sub.set_callback(weightCallback)

