In [24]:
import torch
import numpy as np
from scenarionet import read_dataset_summary, read_scenario
from metadrive.engine.asset_loader import AssetLoader

In [21]:
class AdaptiveTokenizer:
    def __init__(self, importance_threshold=0.5, max_tokens=512, token_length=10):
        self.importance_threshold = importance_threshold
        self.max_tokens = max_tokens
        self.token_length = token_length
    def score_importance(self, trajectories, road_vectors):
        importance_map = np.zeros(trajectories.shape[0])
        for i in range(trajectories.shape[0]):
            activity_score = np.linalg.norm(trajectories[i, :, :2], axis=1).sum()
            distances = np.linalg.norm(
                road_vectors[:, None, :] - trajectories[i, :, :2][None, :, :], axis=2
            )
            
            road_proximity = np.min(distances)
            
            importance_map[i] = activity_score / (road_proximity + 1e-5)
        
        importance_map = importance_map / (importance_map.max() + 1e-5)
        return importance_map

    def pad_or_truncate(self, token):
        if token.shape[0] > self.token_length:
            return token[:self.token_length]
        else:
            pad_length = self.token_length - token.shape[0]
            pad = np.zeros((pad_length, token.shape[1]), dtype=np.float32)
            return np.vstack((token, pad))

    def tokenize(self, trajectories, road_vectors, metadata):
        importance_scores = self.score_importance(trajectories, road_vectors)
        token_regions = []
        token_types = []

        for i, score in enumerate(importance_scores):
            if score >= self.importance_threshold:
                token = self.pad_or_truncate(trajectories[i])
                token_regions.append(token)
                token_types.append('high-detail')
            else:
                coarse_token = self.pad_or_truncate(trajectories[i][::2]) 
                token_regions.append(coarse_token)
                token_types.append('low-detail')
        if len(token_regions) > self.max_tokens:
            token_regions = token_regions[:self.max_tokens]
            token_types = token_types[:self.max_tokens]

        try:
            token_tensor = torch.tensor(np.stack(token_regions), dtype=torch.float32)
        except ValueError as e:
            raise ValueError("Failed to stack token regions into a tensor. Ensure all tokens have the same dimensions.") from e

        tokens = {
            'token_regions': token_tensor,
            'token_types': token_types,
            'metadata': metadata
        }
        return tokens



In [22]:
sample_trajectories = np.random.rand(100, 10, 3)
sample_road_vectors = np.random.rand(50, 2) 
sample_metadata = {'scenario_id': 'sample_001', 'map': 'city_map_1'}
tokenizer = AdaptiveTokenizer()
tokens = tokenizer.tokenize(sample_trajectories, sample_road_vectors, sample_metadata)
print("Tokenization Complete!")
print(f"Number of Tokens: {len(tokens['token_regions'])}")
print(f"Token Types: {set(tokens['token_types'])}")
print(f"Metadata: {tokens['metadata']}")

Tokenization Complete!
Number of Tokens: 100
Token Types: {'low-detail', 'high-detail'}
Metadata: {'scenario_id': 'sample_001', 'map': 'city_map_1'}


In [28]:
av2_data =  AssetLoader.file_path("/home/light/Documents/Thesis/preprocessed_dataset", unix_style=False)
dataset_summary, scenario_ids, mapping = read_dataset_summary(dataset_path=av2_data)

scenario_file_name = scenario_ids[0]
scenario = read_scenario(dataset_path=av2_data, mapping=mapping, scenario_file_name=scenario_file_name)

In [36]:
scenario["tracks"]["73980"].keys()

dict_keys(['type', 'state', 'metadata'])