In [1]:
import aiohttp
import asyncio
import json
import socket
from concurrent.futures import ThreadPoolExecutor
import nest_asyncio
nest_asyncio.apply()
%matplotlib widget
import matplotlib.pyplot as plt

In [2]:
session = aiohttp.ClientSession()

def scan_ip(ip):
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(1)
            s.connect((ip, 80))
            return ip
    except:
        return None

def get_online_ips():
    online_ips = []
    with ThreadPoolExecutor(max_workers=256) as executor:
        futures = []
        for i in range(2, 256):
            ip = f"192.168.31.{i}"
            futures.append(executor.submit(scan_ip, ip))
        for future in futures:
            result = future.result()
            if result:
                online_ips.append(result)
    return online_ips

async def call_get(url):
    try:
        async with session.get(url, timeout=5) as response:  # 设置了5秒的超时
            return url, await response.text(), response.status
    except Exception as e:
        return url, str(e), None

async def get_hosts_status(ips):
    tasks = []
    for i in ips:
        url = f"http://" + i + "/?status"
        tasks.append(call_get(url))
    
    results = await asyncio.gather(*tasks)

    online_robots = []
    for url, content, status in results:
        if status and "html" not in content and "HTML" not in content and content != "":
            online_robots.append(json.loads(content))

    return online_robots

def get_online_robots():
    online_robots = get_online_ips()
    return asyncio.run(get_hosts_status(online_robots))

def refesh_robot(robot):
    url, content, status = asyncio.run(call_get(f"http://{robot['ip']}/?status"))
    if status and "html" not in content:
        return json.loads(content)
    else:
        return None
    
def set_robot(robot, M1_position:int, M1_power:int, M2_position:int, M2_power:int, M3_direction:bool, M3_power:int):
    if M1_position < -2147483648 or M1_position > 2147483647 or M2_position < -2147483648 or M2_position > 2147483647:
        raise ValueError("Motor position should be in [-2147483648, 2147483647]")
    elif M1_power < 0 or M1_power > 255 or M2_power < 0 or M2_power > 255 or M3_power < 0 or M3_power > 255:
        raise ValueError("Motor power should be in [0, 255]")
    
    action = f"{int32_to_hex(M1_position)}{hex(M1_power)[2:].upper().zfill(2)}{int32_to_hex(M2_position)}{hex(M2_power)[2:].upper().zfill(2)}{int(M3_direction)}{hex(M3_power)[2:].upper().zfill(2)}"
    url, content, status = asyncio.run(call_get(f"http://{robot['ip']}/?set={action}"))
    if status and "html" not in content:
        return json.loads(content)
    else:
        return None
    
def calibrate_robot(robot, M1:bool, M2:bool):
    url, content, status = asyncio.run(call_get(f"http://{robot['ip']}/?calibrate={int(M1)}{int(M2)}"))
    if status and "html" not in content:
        return json.loads(content)
    else:
        return None
    
def reach_robot(robot, M1:bool, M2:bool):
    url, content, status = asyncio.run(call_get(f"http://{robot['ip']}/?reach={int(M1)}{int(M2)}"))
    if status and "html" not in content:
        return json.loads(content)
    else:
        return None
    
def pause_robot(robot, pause:bool):
    url, content, status = asyncio.run(call_get(f"http://{robot['ip']}/?pause{int(pause)}"))
    if status and "html" not in content:
        return json.loads(content)
    else:
        return None
    
def int32_to_hex(value: int):
    if value < 0:
        value = 0xFFFFFFFF + value + 1
    return hex(value)[2:].zfill(8).upper()
    
def get_power_robot(robot):
    url, content, status = asyncio.run(call_get(f"http://{robot['ip']}/?power"))
    if status and "html" not in content:
        raw_json = json.loads(content)
        raw_power_data = raw_json['power']
        int_power_data = [int(raw_power_data[i:i+4], 16) for i in range(0, len(raw_power_data), 4)]
        result_data = []
        for i in range(0, len(int_power_data), 6):
            bus_bus = float(int_power_data[i] >> 3) * 0.008
            right_bus = float(int_power_data[i+1] >> 3) * 0.008
            left_bus = float(int_power_data[i+2] >> 3) * 0.008
            bus_shunt = float(int_power_data[i+3] >> 3) * 0.00004
            right_shunt = float(int_power_data[i+4] >> 3) * 0.00004
            left_shunt = float(int_power_data[i+5] >> 3) * 0.00004
            bus_current = bus_shunt / 0.02
            right_current = right_shunt / 0.04
            left_current = left_shunt / 0.04
            result_data.append({
                "BUS_V": bus_bus,
                "RIGHT_V": right_bus,
                "LEFT_V": left_bus,
                "BUS_C": bus_current,
                "RIGHT_C": right_current,
                "LEFT_C": left_current,
            })
        return result_data
    else:
        return None
    

In [3]:
robots = get_online_robots()
print(robots)

[{'status': 'ok', 'ip': '192.168.31.170', 'mac': 'D8:3B:DA:C3:87:96', 'm1_current': 18, 'm1_target': 0, 'm2_current': 9, 'm2_target': 0, 'm1_power': 255, 'm2_power': 255, 'm3_power': 0, 'm1_error': 0, 'm2_error': 0, 'battery': 1.2, 'pause': False}]


In [4]:
robot = robots[0]
print(refesh_robot(robot))

{'status': 'ok', 'ip': '192.168.31.170', 'mac': 'D8:3B:DA:C3:87:96', 'm1_current': -14, 'm1_target': 0, 'm2_current': -7, 'm2_target': 0, 'm1_power': 255, 'm2_power': 255, 'm3_power': 0, 'm1_error': 0, 'm2_error': 0, 'battery': 1.21, 'pause': False}


In [5]:
print(set_robot(robots[0], 4000, 255, 4000, 255, True, 0))
#print(set_robot(robots[1], -1000, 255, -1000, 255, True, 0))

{'status': 'set ok', 'ip': '192.168.31.170', 'mac': 'D8:3B:DA:C3:87:96', 'm1_current': -14, 'm1_target': 4000, 'm2_current': -7, 'm2_target': 4000, 'm1_power': 255, 'm2_power': 255, 'm3_power': 0, 'm1_error': 0, 'm2_error': 0, 'battery': 1.2, 'pause': False}


In [6]:
print(calibrate_robot(robot, True, True))

{'status': 'calibrate ok', 'ip': '192.168.31.170', 'mac': 'D8:3B:DA:C3:87:96', 'm1_current': 3, 'm1_target': 0, 'm2_current': 4, 'm2_target': 0, 'm1_power': 255, 'm2_power': 255, 'm3_power': 0, 'm1_error': 0, 'm2_error': 0, 'battery': 1.18, 'pause': False}


In [7]:
robot = robots[0]
data = get_power_robot(robot)

# 计算功率 (W)
bus_w = [item['BUS_V'] * item['BUS_C'] for item in data]
top_w = [item['RIGHT_V'] * item['RIGHT_C'] for item in data]
bot_w = [item['LEFT_V'] * item['LEFT_C'] for item in data]

# 创建x轴数据点
x = range(len(data))

# 绘制图表
plt.figure(figsize=(10, 6))
plt.plot(x, bus_w, label='BUS')
plt.plot(x, top_w, label='RIGHT')
plt.plot(x, bot_w, label='LEFT')

plt.title('Power (W) over time')
plt.xlabel('Time')
plt.ylabel('Power (W)')
plt.legend()
plt.grid(True)

# 显示图表
plt.show()

KeyError: 'power'

In [None]:
robots = get_online_robots()
sorted_robots = sorted(robots, key=lambda x: x['mac'])
for i in sorted_robots:
    print(i)
# print(set_robot(sorted_robots[0], -300, 255, -300, 255, True, 0))

In [None]:
print(set_robot(sorted_robots[4], -200, 255, -200, 255, True, 0))

In [None]:
robots = get_online_robots()
sorted_robots = sorted(robots, key=lambda x: x['mac'])
# for i in sorted_robots:
#     print(pause_robot(i, True))
    
set_robot(sorted_robots[0], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[1], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[2], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[3], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[4], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[5], 0, 255, 0, 255, True, 0)

In [None]:
import time
sorted_robots = sorted(robots, key=lambda x: x['mac'])

shrink = -4000

# 全部收缩，然后全部伸展
set_robot(sorted_robots[0], shrink, 255, shrink, 255, True, 0)
set_robot(sorted_robots[1], shrink, 255, shrink, 255, True, 0)
set_robot(sorted_robots[2], shrink, 255, shrink, 255, True, 0)
set_robot(sorted_robots[3], shrink, 255, shrink, 255, True, 0)
set_robot(sorted_robots[4], shrink, 255, shrink, 255, True, 0)
set_robot(sorted_robots[5], shrink, 255, shrink, 255, True, 0)
time.sleep(3)
set_robot(sorted_robots[0], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[1], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[2], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[3], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[4], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[5], 0, 255, 0, 255, True, 0)
time.sleep(3)

# 扭曲，然后全部伸展
set_robot(sorted_robots[0], shrink, 255, 0, 255, True, 0)
set_robot(sorted_robots[1], 0, 255, shrink, 255, True, 0)
set_robot(sorted_robots[2], shrink, 255, 0, 255, True, 0)
set_robot(sorted_robots[3], 0, 255, shrink, 255, True, 0)
set_robot(sorted_robots[4], shrink, 255, 0, 255, True, 0)
set_robot(sorted_robots[5], 0, 255, shrink, 255, True, 0)
time.sleep(3)
set_robot(sorted_robots[0], 0, 255, shrink, 255, True, 0)
set_robot(sorted_robots[1], shrink, 255, 0, 255, True, 0)
set_robot(sorted_robots[2], 0, 255, shrink, 255, True, 0)
set_robot(sorted_robots[3], shrink, 255, 0, 255, True, 0)
set_robot(sorted_robots[4], 0, 255, shrink, 255, True, 0)
set_robot(sorted_robots[5], shrink, 255, 0, 255, True, 0)
time.sleep(3)
set_robot(sorted_robots[0], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[1], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[2], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[3], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[4], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[5], 0, 255, 0, 255, True, 0)
time.sleep(3)

# 单侧扭曲，然后全部伸展
set_robot(sorted_robots[0], shrink, 255, 0, 255, True, 0)
set_robot(sorted_robots[1], shrink, 255, 0, 255, True, 0)
set_robot(sorted_robots[2], shrink, 255, 0, 255, True, 0)
set_robot(sorted_robots[3], shrink, 255, 0, 255, True, 0)
set_robot(sorted_robots[4], shrink, 255, 0, 255, True, 0)
set_robot(sorted_robots[5], shrink, 255, 0, 255, True, 0)
time.sleep(3)
set_robot(sorted_robots[0], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[1], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[2], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[3], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[4], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[5], 0, 255, 0, 255, True, 0)
time.sleep(3)
set_robot(sorted_robots[0], 0, 255, shrink, 255, True, 0)
set_robot(sorted_robots[1], 0, 255, shrink, 255, True, 0)
set_robot(sorted_robots[2], 0, 255, shrink, 255, True, 0)
set_robot(sorted_robots[3], 0, 255, shrink, 255, True, 0)
set_robot(sorted_robots[4], 0, 255, shrink, 255, True, 0)
set_robot(sorted_robots[5], 0, 255, shrink, 255, True, 0)
time.sleep(3)

set_robot(sorted_robots[0], shrink, 255, shrink, 255, True, 0)
set_robot(sorted_robots[1], shrink, 255, shrink, 255, True, 0)
set_robot(sorted_robots[2], shrink, 255, shrink, 255, True, 0)
set_robot(sorted_robots[3], shrink, 255, shrink, 255, True, 0)
set_robot(sorted_robots[4], shrink, 255, shrink, 255, True, 0)
set_robot(sorted_robots[5], shrink, 255, shrink, 255, True, 0)
time.sleep(3)
set_robot(sorted_robots[0], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[1], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[2], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[3], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[4], 0, 255, 0, 255, True, 0)
set_robot(sorted_robots[5], 0, 255, 0, 255, True, 0)
time.sleep(3)


In [7]:
robot = robots[0]
print(set_robot(robot, -0, 255, -3000, 255, True, 255))

{'status': 'set ok', 'ip': '192.168.31.170', 'mac': 'D8:3B:DA:C3:87:96', 'm1_current': 18, 'm1_target': 0, 'm2_current': 20, 'm2_target': -3000, 'm1_power': 255, 'm2_power': 255, 'm3_power': 255, 'm1_error': 0, 'm2_error': 0, 'battery': 1.2, 'pause': False}


In [8]:
robot = robots[0]
print(set_robot(robot, 0, 255, 0, 255, True, 0))

{'status': 'set ok', 'ip': '192.168.31.170', 'mac': 'D8:3B:DA:C3:87:96', 'm1_current': 18, 'm1_target': 0, 'm2_current': -3011, 'm2_target': 0, 'm1_power': 255, 'm2_power': 255, 'm3_power': 0, 'm1_error': 0, 'm2_error': 0, 'battery': 1.15, 'pause': False}
