In [1]:
import pandas as pd
import xml.etree.ElementTree as ET
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import zipfile
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score

### Data Preprocessing

In [4]:
# load the tournament summary into a dataframe
tournament_name = "tournament_5"
df_games = pd.read_csv('C:/source/MicroRTS/' + tournament_name + '/tournament.csv', skiprows=21, skipfooter=63)

  df_games = pd.read_csv('C:/source/MicroRTS/' + tournament_name + '/tournament.csv', skiprows=21, skipfooter=63)


In [6]:
# read in the model names
ai_models = pd.read_csv('C:/source/MicroRTS/' + tournament_name + '/tournament.csv', usecols=[1], skiprows=1, nrows=8)
ai_models_list = ai_models.iloc[:,0].tolist()
# create a dict so we can use map()
model_names_dict = {i: name for i, name in enumerate(ai_models_list)}

In [8]:
# add the model names to the dataframe
df_games['ai1_name'] = df_games['ai1'].map(model_names_dict)
df_games['ai2_name'] = df_games['ai2'].map(model_names_dict)

# winner columns, so we know which column won
df_games['winning_model'] = np.where(df_games['winner'] == 0, df_games['ai1'], np.where(df_games['winner'] == 1, df_games['ai2'], -1))
df_games['winning_model_name'] = df_games['winning_model'].map(model_names_dict)
# loser columns, so we know which model was beaten
df_games['losing_model'] = np.where(df_games['winner'] == 0, df_games['ai2'], np.where(df_games['winner'] == 1, df_games['ai2'], -1))
df_games['losing_model_name'] = df_games['losing_model'].map(model_names_dict)

In [10]:
# Function to extract and parse XML from a zip file
def extract_and_parse_xml(zip_path, file_name):
    # use the xipfile library to get the content => https://docs.python.org/3/library/zipfile.html
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        with zip_ref.open(file_name) as xml_file:
            # Read the content of the file
            xml_content = xml_file.read()
            # Parse the XML to a python object => https://docs.python.org/3/library/xml.etree.elementtree.html
            root = ET.fromstring(xml_content)
    return root

In [12]:
# Initialize list to hold trace data
trace_data = []
# the path where traces are stored
traces_path = 'C:/source/MicroRTS/' + tournament_name + '/traces/'

# loop through all games in the tournament summary
for index, row in df_games.iterrows():

    # compose the filename where the traces for this game will be stored
    filename = "%d-vs-%d-%d-%d.zip" % (row['ai1'], row['ai2'], row['map'], row['iteration']) 
    # Load the XML file
    root = extract_and_parse_xml(traces_path + filename, 'game.xml')
    
    # Iterate through each TraceEntry to extract timestep data
    for entry in root.findall('.//rts.TraceEntry'):
        # timestep ID if this TraceEntry in the game
        time = int(entry.get('time'))
        
        # Extract players' data
        players = entry.findall('.//rts.Player')
        player_data = {f"player_{player.get('ID')}_resources": int(player.get('resources')) for player in players}
        
        # Initialize counters for unit types
        unit_counts = defaultdict(int)
        
        # Extract units - count unit types for each player
        units = entry.findall('.//rts.units.Unit')
        for unit in units:
            unit_type = unit.get('type')
            player_id = unit.get('player')
            if player_id != '-1':  # Exclude neutral units like resources
                unit_counts[f"player_{player_id}_{unit_type}_units"] += 1
        
        # Extract actions' data
        action_counts = defaultdict(int)
        # get an array of action tags
        actions = entry.findall('.//action')
        # loop through each action
        for action in actions:
            # the unit that the action was performed on
            unit_id = action.get('unitID')
            # the action that was performed
            unit_action = action.find('UnitAction')
            # if an action was defined
            if unit_action is not None:
                # action details
                action_type = unit_action.get('type')
                parameter = unit_action.get('parameter')
                for unit in units:
                    if unit.get('ID') == unit_id:
                        player_id = unit.get('player')
                        break
                # create columns for player and action type, or player and parameter
                action_counts[f"player_{player_id}_action_type_{action_type}_count"] += 1
                action_counts[f"player_{player_id}_action_parameter_{parameter}_count"] += 1
        
        # Combine data for this timestep
        timestep_data = {'time': time}
        timestep_data.update(player_data)
        timestep_data.update(unit_counts)
        timestep_data.update(action_counts)

        # add label columns
        timestep_data['opponent'] = row['ai2_name']
        timestep_data['game_id'] = filename
        
        # Add to data list of traces
        trace_data.append(timestep_data)

    # TEST : limit data for testing
    if index > 100:
        break;

# Create DataFrame from the traces
df = pd.DataFrame(trace_data)

# Fill NaN values with 0 (in case there are no actions of certain types in some timesteps)
df = df.fillna(0)

# Write to a file
df.to_csv('C:/source/MicroRTS/' + tournament_name + '/traces.csv')  
# Display a preview of the DataFrame
df.head()

Unnamed: 0,time,player_0_resources,player_1_resources,player_0_Base_units,player_1_Base_units,player_0_Worker_units,player_1_Worker_units,opponent,game_id,player_0_action_type_4_count,...,player_1_action_type_0_count,player_1_action_parameter_10_count,player_1_Barracks_units,player_1_Light_units,player_0_Barracks_units,player_0_Light_units,player_1_Heavy_units,player_1_Ranged_units,player_0_Heavy_units,player_0_Ranged_units
0,0,5,5,1.0,1.0,1.0,1.0,WorkerRush(AStarPathFinding),0-vs-0-0-0.zip,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,0,5,5,1.0,1.0,1.0,1.0,WorkerRush(AStarPathFinding),0-vs-0-0-0.zip,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,20,5,5,1.0,1.0,1.0,1.0,WorkerRush(AStarPathFinding),0-vs-0-0-0.zip,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,30,5,5,1.0,1.0,1.0,1.0,WorkerRush(AStarPathFinding),0-vs-0-0-0.zip,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,40,6,6,1.0,1.0,1.0,1.0,WorkerRush(AStarPathFinding),0-vs-0-0-0.zip,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [14]:
df.columns

Index(['time', 'player_0_resources', 'player_1_resources',
       'player_0_Base_units', 'player_1_Base_units', 'player_0_Worker_units',
       'player_1_Worker_units', 'opponent', 'game_id',
       'player_0_action_type_4_count', 'player_0_action_parameter_0_count',
       'player_0_action_type_2_count', 'player_0_action_parameter_3_count',
       'player_1_action_type_4_count', 'player_1_action_parameter_1_count',
       'player_1_action_type_2_count', 'player_0_action_type_1_count',
       'player_0_action_parameter_2_count', 'player_1_action_type_1_count',
       'player_1_action_parameter_3_count', 'player_0_action_type_3_count',
       'player_0_action_parameter_1_count', 'player_1_action_type_3_count',
       'player_1_action_parameter_0_count',
       'player_1_action_parameter_2_count', 'player_0_action_type_5_count',
       'player_0_action_parameter_None_count', 'player_1_action_type_5_count',
       'player_1_action_parameter_None_count', 'player_0_action_type_0_count',
   

### Create Training Data of sliding windows

In [17]:
window_size = 5
X, y = [], []
i = 0
# loop through the timestep data
while i < len(df) - window_size:
    # make sure the next 5 traces are within the same game (don't want to mix games in a window)
    if df.iloc[i]['game_id'] == df.iloc[i + window_size]['game_id']:
        # Take take 5 traces at a time
        window = df.iloc[i:i + window_size]
        X.append(window.drop(columns=['time', 'game_id', 'opponent']).values)
        # Use the opponent label at the end of the window
        y.append(window['opponent'].iloc[-1])
        # increment i to make a sliding window starting with the next
        i += 1
    else:
        # we need to skip to start a new window at the next different value
        i = i + window_size - 1

X = np.array(X)
y = np.array(y)
# preview the data
X

array([[[5., 5., 1., ..., 0., 0., 0.],
        [5., 5., 1., ..., 0., 0., 0.],
        [5., 5., 1., ..., 0., 0., 0.],
        [5., 5., 1., ..., 0., 0., 0.],
        [6., 6., 1., ..., 0., 0., 0.]],

       [[5., 5., 1., ..., 0., 0., 0.],
        [5., 5., 1., ..., 0., 0., 0.],
        [5., 5., 1., ..., 0., 0., 0.],
        [6., 6., 1., ..., 0., 0., 0.],
        [5., 5., 1., ..., 0., 0., 0.]],

       [[5., 5., 1., ..., 0., 0., 0.],
        [5., 5., 1., ..., 0., 0., 0.],
        [6., 6., 1., ..., 0., 0., 0.],
        [5., 5., 1., ..., 0., 0., 0.],
        [5., 5., 1., ..., 0., 0., 0.]],

       ...,

       [[1., 2., 0., ..., 0., 0., 0.],
        [1., 2., 0., ..., 0., 0., 0.],
        [1., 2., 0., ..., 0., 0., 0.],
        [1., 2., 0., ..., 0., 0., 0.],
        [1., 2., 0., ..., 0., 0., 0.]],

       [[1., 2., 0., ..., 0., 0., 0.],
        [1., 2., 0., ..., 0., 0., 0.],
        [1., 2., 0., ..., 0., 0., 0.],
        [1., 2., 0., ..., 0., 0., 0.],
        [1., 2., 0., ..., 0., 0., 0.]],

  

In [19]:

# Check the shape of the generated windows
print("Shape of X:", X.shape)
print("Shape of y:", y.shape)

Shape of X: (48635, 5, 38)
Shape of y: (48635,)


In [21]:
# Encode the opponent labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

print("Classes:", label_encoder.classes_)

Classes: ['HeavyDefense(AStarPathFinding)' 'HeavyRush(AStarPathFinding)'
 'LightDefense(AStarPathFinding)' 'LightRush(AStarPathFinding)'
 'RangedDefense(AStarPathFinding)' 'RangedRush(AStarPathFinding)'
 'WorkerDefense(AStarPathFinding)' 'WorkerRush(AStarPathFinding)']


In [23]:
y_encoded.shape

(48635,)

### Create the Train-Test split

In [26]:
# We need to flatten the X array for Scikit-Learn
num_samples, window_size, num_features = X.shape# Flatten each window
X_flat = np.array([window.flatten() for window in X])

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_flat, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded)

### Train the model

In [29]:
# Initialize the classifier
classifier = RandomForestClassifier(n_estimators=100, random_state=42)

# Train the classifier
classifier.fit(X_train, y_train)

### Evaluate the model

In [32]:
# Make predictions on the test set
y_pred = classifier.predict(X_test)
# Evaluate performance
print("Overall accuracy:", accuracy_score(y_test, y_pred))
print("Report:")
print(classification_report(y_test, y_pred, target_names=label_encoder.classes_))

Overall accuracy: 0.7401048627531613
Report:
                                 precision    recall  f1-score   support

 HeavyDefense(AStarPathFinding)       0.72      0.68      0.70      1470
    HeavyRush(AStarPathFinding)       0.73      0.76      0.74      1700
 LightDefense(AStarPathFinding)       0.77      0.84      0.80      1241
    LightRush(AStarPathFinding)       0.82      0.79      0.80      1141
RangedDefense(AStarPathFinding)       0.54      0.54      0.54      1144
   RangedRush(AStarPathFinding)       0.57      0.54      0.56      1249
WorkerDefense(AStarPathFinding)       0.96      0.96      0.96      1263
   WorkerRush(AStarPathFinding)       0.89      0.90      0.90       519

                       accuracy                           0.74      9727
                      macro avg       0.75      0.75      0.75      9727
                   weighted avg       0.74      0.74      0.74      9727

