In [3]:
import os
from pathlib import Path
import pandas as pd
import numpy as np
import pickle
import zipfile
import shutil

In [4]:
path = Path("dataset").absolute()
os.chdir(path)
print(os.getcwd())

/workspace/SAFETY/dataset


In [3]:
def init_data_dict(data_dict, tactic_id, y_int):
    num_squads = 12
    num_entities = 54
    
    data_dict[tactic_id] = {}

    num_combats = y_int['combat_id'].nunique()
    
    for combat_id in range(num_combats):
        data_dict[tactic_id][combat_id] = {}
        data_dict[tactic_id][combat_id]['squad_id'] = {}
        data_dict[tactic_id][combat_id]['entity_id'] = {}

        for squad in range(num_squads):
            
            data_dict[tactic_id][combat_id]['squad_id'][squad] = {}

        for entity in range(num_entities):
            
            data_dict[tactic_id][combat_id]['entity_id'][entity] = {}

    return data_dict


In [4]:
def generate_feat(xs, data_dict, tactic_id):

    window_size = 1
    
    xs['SimulationTime(sec)'] = xs['SimulationTime(sec)'] // window_size
    xs = xs.groupby(['tactic_id', 'combat_id', 'entity_id', 'SimulationTime(sec)']).mean().reset_index()
    
    entity_feat = xs.drop(['tactic_id', 'combat_id', 'entity_id'], axis=1)

    combat_ids = xs['combat_id'].unique()

    combat_ids = [int(x) for x in combat_ids]

    for combat_id in combat_ids:
        
        entity_ids = xs['entity_id'][xs['combat_id'] == combat_id].unique()
        entity_ids = [int(x) for x in entity_ids]

        for entity_id in entity_ids:
            x = entity_feat[(xs['combat_id'] == combat_id) & (xs['entity_id'] == entity_id)]
            x = x[['PositionLat(deg)', 'PositionLon(deg)', 'PositionAlt(m)', 'AttitudeYaw(deg)', 'Speed(km/h)', 'ForceIdentifier', 'T_Road', 'T_Forest','T_OpenLane','T_HidingPlace','T_Building']]

            data_dict[tactic_id][combat_id]['entity_id'][entity_id]['x'] = x.values
            
    return data_dict


In [5]:
def entity_squad_mapping(x_squad, data_dict, tactic_id):
    squad_id_mapper = -56 # squad_id starts from 56
    x_squad['squad_id'] = x_squad['squad_id'] + squad_id_mapper

    subordinates = x_squad['SubordinateID'].values
    entity_id_mapper = -2 # entity_id starts from 2

    for i in range(len(subordinates)):
        subordinates[i] = subordinates[i].replace('[', '')
        subordinates[i] = subordinates[i].replace(']', '')
        subordinates[i] = subordinates[i].split(';')
        subordinates[i] = [int(j) + entity_id_mapper for j in subordinates[i]]

    x_squad = x_squad[x_squad['SimulationTime(sec)'] < 10]

    combat_ids = x_squad['combat_id'].unique()
    combat_ids = [int(x) for x in combat_ids]

    for combat_id in combat_ids:

        x_squad_combat = x_squad[x_squad['combat_id'] == combat_id]
        subord_entity_ids = x_squad_combat['SubordinateID'].values
        for subord_entity_id in subord_entity_ids:
            idx = subord_entity_ids.tolist().index(subord_entity_id)
            squad_id = x_squad_combat['squad_id'].values[idx]
            for entity_id in subord_entity_id:
                data_dict[tactic_id][combat_id]['entity_id'][entity_id]['squad'] = squad_id

    return data_dict


In [6]:
def generate_intention_label(y_int, data_dict, tactic_id):

    squad_id_mapper = -56 # squad_id starts from 56
    num_squad_labels = 6 #len(np.unique(y_int['Intention'].values))

    combat_ids = y_int['combat_id'].unique()
    combat_ids = [int(x) for x in combat_ids]
    
    for combat_id in combat_ids:

        y_int_combat = y_int[y_int['combat_id'] == combat_id]
        squad_ids = np.unique(y_int_combat['ID'].values) + squad_id_mapper
        label = y_int_combat['Intention'].values
        squad_label_pair = list(zip(squad_ids, label))
        
        for i, (squad_id, label) in enumerate(squad_label_pair):
            squad_label = np.zeros((num_squad_labels))
            
            if label == 'Tactical Engagement': squad_label[0] = 1
            if label == 'Maneuvering Techniques': squad_label[1] = 1
            if label == 'Stategic Positioning': squad_label[2] = 1
            if label == 'Coordinated Rendezvous': squad_label[3] = 1
            if label == 'Stategic Surprise': squad_label[4] = 1
            if label == 'Forceful Engagement': squad_label[5] = 1

            data_dict[tactic_id][combat_id]['squad_id'][squad_id]['y_int'] = squad_label
        
    return data_dict


In [7]:
def generate_attack_label(y_atk, data_dict, tactic_id):

    combat_ids = y_atk['combat_id'].unique()
    combat_ids = [int(x) for x in combat_ids]

    for combat_id in combat_ids:

        y_atk_combat =  y_atk[y_atk['combat_id'] == combat_id].copy()

        # zero index entity_id
        y_atk_combat['FiringObjectID'] = y_atk_combat['FiringObjectID'] - 2
        y_atk_combat['TargetObjectID'] = y_atk_combat['TargetObjectID'] - 2

        # map entity_id to squad_id
        y_atk_combat['FiringSquadID'] = y_atk_combat['FiringObjectID'].apply(lambda x: data_dict[tactic_id][combat_id]['entity_id'][x]['squad'])
        y_atk_combat['TargetSquadID'] = y_atk_combat['TargetObjectID'].apply(lambda x: data_dict[tactic_id][combat_id]['entity_id'][x]['squad'])
        
        # remove duplicate firing target pairs
        y_atk_combat = y_atk_combat.drop_duplicates(subset=['FiringSquadID', 'TargetSquadID'], keep='first')
        squad_id_attacks = y_atk_combat['FiringSquadID'].values
        squad_id_targets = y_atk_combat['TargetSquadID'].values

        # one hot encoding attack target pairs
        num_squads = 12
        squad_id_target_one_hot = np.zeros((num_squads, num_squads))
        squad_id_target_one_hot[squad_id_attacks, squad_id_targets] = 1
        
        # assign attack labels to each squad_id
        for j in range(len(squad_id_target_one_hot)):
            data_dict[tactic_id][combat_id]['squad_id'][j]['y_atk'] = squad_id_target_one_hot[j]
    
    return data_dict


In [31]:
tactic_paths = [x for x in path.iterdir() if x.is_dir()]
tactic_paths.sort()

data_dict = {}

for tactic in range(len(tactic_paths)):

    tactic_id = int(tactic_paths[tactic].name) - 1
    print('processing tactic {}'.format(tactic_id))

    entity_attributes_path = tactic_paths[tactic] / "EntityAttributes_All.csv"
    xs = pd.read_csv(entity_attributes_path, engine='c')
    
    squad_attributes_path = tactic_paths[tactic] / "SquadAttributes_All.csv"
    x_squad = pd.read_csv(squad_attributes_path, engine='c')

    y_int_path = tactic_paths[tactic] / "SquadIntention_All.csv"
    y_int = pd.read_csv(y_int_path, engine='c')

    y_attack_path = tactic_paths[tactic] / "MunitionDetonation_All.csv"
    y_attack = pd.read_csv(y_attack_path, engine='c')

    data_dict = init_data_dict(data_dict, tactic_id, y_int)
    data_dict = generate_feat(xs, data_dict, tactic_id)
    data_dict = generate_intention_label(y_int, data_dict, tactic_id)
    data_dict = entity_squad_mapping(x_squad, data_dict, tactic_id)
    data_dict = generate_attack_label(y_attack, data_dict, tactic_id)    


processing tactic 0
processing tactic 1
processing tactic 2
processing tactic 3


In [32]:
# save data_dict
with open('data_dict.pkl', 'wb') as f:
   pickle.dump(data_dict, f)

# zip file and save
with zipfile.ZipFile('data_dict.zip', 'w', zipfile.ZIP_DEFLATED) as zipf:
    zipf.write('data_dict.pkl')