In [None]:
import time
import datetime
from functools import partial

from tqdm.notebook import tqdm, trange
from rich.console import Console
import numpy as np
import ipywidgets as widgets

from mmwave.fmc4030 import FMC4030, Braket
from mmwave.fmc4030.bracket import cal_running_time
from mmwave.mmwave import MMWaveCmd
from mmwave.util import turn_toml
from mmwave.config import short_range_cfg as mmw_cfg

print = Console().print


def cal_last(use_num, step):
    return (use_num // step + 1) * step - use_num


def cal_all_time(axis_info, step_num):
    all_time = 0
    for axi_info in axis_info:
        axi_time = cal_running_time(*axi_info) * step_num
        all_time += axi_time
    all_time += step_num * 0.1
    return all_time


inslider_style = {"description_width": "initial", "width": "50%"}
inslider_layout = widgets.Layout(width="50%")

IntSlider = partial(widgets.IntSlider, style=inslider_style, layout=inslider_layout)

# %matplotlib ipympl

In [None]:
offset_x = IntSlider(value=0, min=0, max=970, step=1, description="水平扫描偏移(mm)")
offset_y = IntSlider(value=0, min=0, max=1970, step=1, description="垂直扫描偏移(mm)")

fmc4030 = FMC4030()
with Braket(fmc4030) as braket:
    with braket.break_conrtol():
        time.sleep(2)
    braket.jog_x(0)
    braket.jog_y(0)
    braket.home_axis()


@widgets.interact(offset_x=offset_x, offset_y=offset_y)
def to_offset(offset_x, offset_y):
    with Braket(fmc4030) as braket:
        braket.jog_x(offset_x)
        braket.jog_y(offset_y)

In [None]:
sweep_length = IntSlider(value=900, min=0, max=970, step=1, description="水平扫描长度(mm)")
sample_points = IntSlider(value=901, min=1, max=901, step=1, description="水平采样点数(mm)")  # 采样点数

sweep_speed = IntSlider(value=40, min=0, max=80, step=1, description="水平扫描速度(mm/s)")

sweep_lines = IntSlider(value=23, min=1, max=300, step=1, description="垂直扫描线数")
sweep_dy = IntSlider(value=70, min=1, max=86, step=1, description="垂直扫描间距(mm)")


@widgets.interact(
    sweep_length=sweep_length, sample_points=sample_points, sweep_speed=sweep_speed, sweep_lines=sweep_lines, sweep_dy=sweep_dy
)
def set_config(sweep_length, sample_points, sweep_speed, sweep_lines, sweep_dy):
    framePeriodicity = 1000 * sweep_length / sweep_speed / (sample_points - 1)  # 水平采样周期 ms

    print(f"framePriodicity: {framePeriodicity} ms")

    axis_info = [
        (sweep_length, sweep_speed, 250, 250),
        (sweep_dy, 150, 200, 200),
        (sweep_length, 200, 250, 250),
    ]

    record_time = cal_all_time(axis_info, sweep_lines)
    record_time += 5
    print(f"record_time:{record_time} s")

    num_frames = int(record_time * 1000 / framePeriodicity)
    print(f"num_frames:{num_frames}")

    mmw_cfg.mimo.frame.framePeriodicity = framePeriodicity
    mmw_cfg.mimo.frame.numLoops = 4
    mmw_cfg.mimo.frame.numFrames = num_frames

    mmw_cfg.bracket.profile.dx = sweep_length / sample_points
    mmw_cfg.bracket.profile.dy = sweep_dy
    mmw_cfg.bracket.profile.row = sweep_lines
    mmw_cfg.bracket.profile.col = sample_points
    mmw_cfg.bracket.profile.timestamps = "timestamps.txt"
    mmw_cfg.bracket.profile.offset_time = -0.920  # 手动偏移校准
    mmw_cfg.bracket.profile.record_time = record_time

In [None]:
@widgets.interact_manual()
def sweep():
    now_date = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    mmw_cfg.bracket.profile.run_time = now_date
    print(now_date)
    config_path = f"./config_{now_date}.toml"
    turn_toml(config_path, mmw_cfg.model_dump())

    mmwave = MMWaveCmd(config_file=config_path)
    mmwave_dir = f"outdoor_{now_date}"
    timestamps_path = mmwave_dir + ".txt"

    record_time = mmw_cfg.bracket.profile.record_time
    framePeriodicity = mmw_cfg.mimo.frame.framePeriodicity

    time_list = []
    with mmwave.record(mmwave_dir, record_time):
        st = time.time()
        with Braket(fmc4030) as braket:
            for y_line in trange(sweep_lines.value):
                time.sleep(cal_last(time.time() - st, framePeriodicity / 1000))
                record_start = time.time() - st
                braket.jog_x(sweep_length.value + offset_x.value, speed=sweep_speed.value, acc=250, dec=250)
                record_end = time.time() - st
                braket.jog_y((y_line + 1) * sweep_dy.value + offset_y.value)
                braket.jog_x(0 + offset_x.value, acc=250, dec=250)
                tqdm.write(f"{(record_start, record_end)}")
                time_list.append((record_start, record_end))

            braket.jog_x(0)
            braket.jog_y(0)
        time_offset = mmwave.sync_time(st)
        time_list.append([time_offset, 0])
        np.savetxt(timestamps_path, np.array(time_list))

In [None]:
import subprocess


@widgets.interact_manual()
def ls_file():
    now_date = mmw_cfg.bracket.profile.run_time
    mmwave_dir = f"outdoor_{now_date}"

    res = subprocess.run(["ssh", "root@192.168.33.180", f"ls /mnt/ssd/{mmwave_dir}"], capture_output=True)
    res = res.stdout.decode()
    files = [i for i in res.split()]
    mmw_cfg.bracket.profile.files = files
    print(files)

In [None]:
from pathlib import Path
import shutil
import subprocess


@widgets.interact_manual()
def copy_file():
    now_date = mmw_cfg.bracket.profile.run_time

    mmwave_dir = f"outdoor_{now_date}"
    config_path = f"./config_{now_date}.toml"
    timestamps_path = mmwave_dir + ".txt"
    cmd = ["ssh", "root@192.168.33.180", "-C", "killall apps.out"]
    subprocess.run(cmd)
    # change there!!!!!!!!
    tartgetdir_path = f"/media/ubuntu/One Touch/{mmwave_dir}"
    files = mmw_cfg.bracket.profile.files
    Path(tartgetdir_path).mkdir(exist_ok=True)
    for f in tqdm(files):
        cmd = [
            "rsync",
            "-av",
            "--progress",
            "--whole-file",
            f"root@192.168.33.180:/mnt/ssd/{mmwave_dir}/{f}",
            f"{tartgetdir_path}/{f}",
        ]
        subprocess.run(cmd)
    shutil.copy(config_path, tartgetdir_path + "/config.toml")
    shutil.copy(timestamps_path, tartgetdir_path + "/timestamps.txt")