#### This notebook is to combined the annotated pickles together. Will do the following:

- Exception handling
- Mapping GT actions to meta actions by design
- Mapping further to defined categories (if necessary)
- One hot encoder the meta action (for VAD)

The meta actions by design:

| **Category**          | **Meta-actions**                                                                                     |
|-----------------------|-----------------------------------------------------------------------------------------------------|
| **Speed-control actions** | Speed up, Slow down, Slow down rapidly, Go straight slowly, Go straight at a constant speed, Stop, Wait, Reverse |
| **Turning actions**    | Turn left, Turn right, Turn around                                                                  |
| **Lane-control actions** | Change lane to the left, Change lane to the right, Shift slightly to the left, Shift slightly to the right      |




notice: `numpy` should be the same version with VAD, i.e. `1.26.3`

In [1]:
import pandas as pd
import os
import pickle
import sys
import numpy as np
cur_dir = os.getcwd()
sys.path.append(cur_dir + "/../../")

print(f"Current NumPy version: {np.__version__}")
# Assert specific version, for VAD model
assert np.__version__ == '1.26.3', f"NumPy version is {np.__version__}, but expected 1.26.3"

In [None]:
root_path_to_mapped_pickle = "/data/ceph/data/nuplan/vlm_mapped"
path_to_save_pickle = "/data/ceph/data/nuplan/ann_files/trainval_meta_action_mapped.pkl"

In [2]:
# Meta actions defined in VLM
meta_actions = {
    # speed control
    "speed up", 
    "slow down", 
    "slow down rapidly", 
    "go straight slowly", 
    "go straight at a constant speed", 
    "stop", 
    "wait", 
    "reverse", 
    # turn action
    "turn left", 
    "turn right", 
    "turn around", 
    # lane change
    "change lane to the left", 
    "change lane to the right", 
    "shift slightly to the left", 
    "shift slightly to the right", 
}


# Handle exceptions in LLM annotation
action_category_mapping = {
    # Speed Control
    "accelerate": "speed up",
    "accelerate and continue left turn": "speed up",
    "accelerate slightly": "speed up",
    "adjust speed": "speed up",  # General speed control
    "continue at constant speed": "go straight at a constant speed",
    "continue at current speed": "go straight at a constant speed",
    "maintain constant speed": "go straight at a constant speed",
    "maintain speed": "go straight at a constant speed",
    "maintain speed and lane": "go straight at a constant speed",
    "maintain speed and shift slightly to the left": "shift slightly to the left",
    "maintain speed and shift slightly to the right": "shift slightly to the right",
    "slightly slow down": "slow down",
    "speed up and shift slightly to the left": "shift slightly to the left",
    "speed up and shift slightly to the right": "shift slightly to the right",
    "speed up slightly": "speed up", 
    "maintain current speed": "go straight at a constant speed",

    # Turn Action
    "continue turning left": "turn left",
    "slight turn to the right": "turn right",
    "slightly turn left": "turn left",
    "slightly turn right": "turn right",
    "turn more left": "turn left",
    "turn more sharply right": "turn right",
    "turn right slightly": "turn right",
    "turn sharp right": "turn right",
    "turn sharply left": "turn left",
    "turn sharply right": "turn right",
    "turn sharply to the right": "turn right",
    "turn slight left": "turn left",
    "turn slight right": "turn right",
    "turn slightly left": "turn left",
    "turn slightly right": "turn right",
    "turn slightly to the left": "turn left",
    "turn slightly to the right": "turn right",
    "turn to the right": "turn right",

    # Lane Change
    "adjust to the center of the lane": "go straight at a constant speed",
    # "change lane": "change lane to the right",  # need extra handling
    "change lane slightly to the right": "change lane to the right",
    "maintain lane": "go straight at a constant speed",
    "maintain lane and speed": "go straight at a constant speed",
    "maintain lane position": "go straight at a constant speed",
    "maintain lane with slight adjustments": "go straight at a constant speed",
    "shift more to the left": "shift slightly to the left",
    "shift significantly to the left": "shift slightly to the left",
    "shift significantly to the right": "shift slightly to the right",
    "shift slightly left": "shift slightly to the left",
    "shift slightly right": "shift slightly to the right",
    "shift to the left": "shift slightly to the left",
    "shift to the right": "shift slightly to the right",
    "shift to the right lane": "change lane to the right",
    "slight left shift": "shift slightly to the left",
    "slight right shift": "shift slightly to the right",
    "slight shift right": "shift slightly to the right",
    "slight shift to the right": "shift slightly to the right",
    "slightly adjust to the left": "shift slightly to the left",
    "slightly shift left": "shift slightly to the left",
    "slightly shift right": "shift slightly to the right",
    "slightly shift to the left": "shift slightly to the left",
    "slightly shift to the right": "shift slightly to the right",

    # Continue/Position (Closest Meta Actions)
    "adjust course": "go straight at a constant speed",
    "continue forward": "go straight at a constant speed",
    "continue straight": "go straight at a constant speed",
    "go straight": "go straight at a constant speed",
    "maintain current lane": "go straight at a constant speed",
    "maintain current position": "wait",
    "maintain position": "wait",
    "maintain straight": "go straight at a constant speed",
    "move forward": "go straight at a constant speed",
    "move forward slightly to the right": "shift slightly to the right",
    "move straight": "go straight at a constant speed",
    "stabilize": "go straight at a constant speed",
    "stay in lane": "go straight at a constant speed"
}


In [3]:
root_path = root_path_to_mapped_pickle
all_data = []
for file in os.listdir(root_path):
    if file.endswith(".pkl"):
        file_path = os.path.join(root_path, file)
        with open(file_path, 'rb') as f:
            data = pickle.load(f)
            assert type(data) == list
            all_data += data

## Handle Exception Values in annotation

In [4]:
# Assert
actions = set()
for data in all_data:
    data['meta_action']['Action'] = data['meta_action']['Action'].lower()
    if data['meta_action']['Action'] not in meta_actions:
        if data['meta_action']['Action'] in action_category_mapping.keys():
            data['meta_action']['Action'] = action_category_mapping[data['meta_action']['Action']]
        elif data['meta_action']['Action'] == "change lane":
            subject = data['meta_action']['Subject'].lower()
            if "left" in subject:
                data['meta_action']['Action'] = "change lane to the left"
            elif "right" in subject:
                data['meta_action']['Action'] = "change lane to the right"
            else:
                print("Failed to map action: ", data['meta_action'], " Use default action: go straight at a constant speed")
                data['meta_action']['Action'] = "go straight at a constant speed"
        else:
            print(data['meta_action'])
            raise ValueError(f"Action {data['meta_action']['Action']} not in meta_actions or action_category_mapping")
        

Failed to map action:  {'Action': 'change lane', 'Subject': 'lane', 'Duration': '2s'}  Use default action: go straight at a constant speed
Failed to map action:  {'Action': 'change lane', 'Subject': 'lane', 'Duration': '2s'}  Use default action: go straight at a constant speed
Failed to map action:  {'Action': 'change lane', 'Subject': 'lane', 'Duration': '2s'}  Use default action: go straight at a constant speed
Failed to map action:  {'Action': 'change lane', 'Subject': 'lane', 'Duration': '2s'}  Use default action: go straight at a constant speed
Failed to map action:  {'Action': 'change lane', 'Subject': 'lane', 'Duration': '1s'}  Use default action: go straight at a constant speed
Failed to map action:  {'Action': 'change lane', 'Subject': 'lane', 'Duration': '6s'}  Use default action: go straight at a constant speed
Failed to map action:  {'Action': 'change lane', 'Subject': 'lane', 'Duration': '2s'}  Use default action: go straight at a constant speed
Failed to map action:  {'Ac

## Mapping to target categories

In [5]:
# mapping to target categories
mapping_to_target_action = {
    "speed up": "FORWARD",
    "slow down": "FORWARD",
    "slow down rapidly": "FORWARD",
    "go straight slowly": "FORWARD",
    "go straight at a constant speed": "FORWARD",
    "stop": "FORWARD",
    "wait": "FORWARD",
    "reverse": "FORWARD", 
    "turn left": "LEFT",
    "turn right": "RIGHT",
    "turn around": "LEFT",
    "change lane to the left": "CHANGE_LANE_LEFT",
    "change lane to the right": "CHANGE_LANE_RIGHT",
    "shift slightly to the left": "CHANGE_LANE_LEFT",
    "shift slightly to the right": "CHANGE_LANE_RIGHT",
}

target_actions = [
"FORWARD", #: np.array([1, 0, 0, 0, 0]),
"LEFT", #: np.array([0, 1, 0, 0, 0]),
"RIGHT", #: np.array([0, 0, 1, 0, 0]),
"CHANGE_LANE_LEFT", #: np.array([0, 0, 0, 1, 0]),
"CHANGE_LANE_RIGHT", #: np.array([0, 0, 0, 0, 1]),
]

num_actions = len(target_actions)
actions2ohe = {action: np.zeros(num_actions, dtype=np.float32) for action in target_actions}

for mapping in actions2ohe.keys():
    actions2ohe[mapping][target_actions.index(mapping)] = 1

print(actions2ohe)

for data in all_data:
    if "gt_ego_fut_cmd_old" not in data.keys():
        data["gt_ego_fut_cmd_old"] = data['gt_ego_fut_cmd']
    data['gt_ego_fut_cmd'] = actions2ohe[mapping_to_target_action[data['meta_action']['Action']]]

{'FORWARD': array([1., 0., 0., 0., 0.], dtype=float32), 'LEFT': array([0., 1., 0., 0., 0.], dtype=float32), 'RIGHT': array([0., 0., 1., 0., 0.], dtype=float32), 'CHANGE_LANE_LEFT': array([0., 0., 0., 1., 0.], dtype=float32), 'CHANGE_LANE_RIGHT': array([0., 0., 0., 0., 1.], dtype=float32)}


## Saving to Modified Pickle

In [6]:
# Create the mmcv_data dictionary
mmcv_data = {
    "infos": all_data,
    'metadata': {'version': '1.0-trainval'},
}

# Save the mmcv_data to the specified output file
with open(path_to_save_pickle, "wb") as f:
    pickle.dump(mmcv_data, f)
