In [5]:
import re
import ast

def extract_snapshot_data(log_file):
    # Step 1: Read the log file
    with open(log_file, 'r') as file:
        log_data = file.read()

    # Step 2: Define regex patterns for both types of strings
    pattern_snapshot = re.compile(
        r'Iteration (?P<iteration>\d+) '
        r'dp_(?P<dp>\d+)_pp_(?P<pp>\d+)_tp_(?P<tp>\d+) '
        r'snapshot time: (?P<snapshot_time>[0-9.]+), '
        r'snapshot_size: (?P<snapshot_size>[0-9.]+) MB, '
        r'snapshot_speed: (?P<snapshot_speed>[0-9.]+) MB/s'
    )

    pattern_throughput = re.compile(
        r'dp_(?P<dp>\d+)_pp_(?P<pp>\d+)_tp_(?P<tp>\d+) '
        r'throughput_metrics: (?P<throughput_metrics>\{.*?\})'
    )

    # Step 3: Find all matches for both patterns
    matches_snapshot = list(pattern_snapshot.finditer(log_data))
    matches_throughput = list(pattern_throughput.finditer(log_data))

    # Step 4: Extract data and store in a dictionary
    snapshot_data = {}
    
    # Process snapshot matches
    for match in matches_snapshot:
        dp = match.group('dp')
        pp = match.group('pp')
        tp = match.group('tp')
        iteration = int(match.group('iteration'))
        key = f'dp_{dp}_pp_{pp}_tp_{tp}'

        if key not in snapshot_data:
            snapshot_data[key] = {}

        snapshot_data[key][iteration] = {
            'snapshot_time': float(match.group('snapshot_time')),
            'snapshot_size': float(match.group('snapshot_size')),
            'snapshot_speed': float(match.group('snapshot_speed'))
        }

    # Process throughput matches
    for match in matches_throughput:
        dp = match.group('dp')
        pp = match.group('pp')
        tp = match.group('tp')
        throughput_metrics_str = match.group('throughput_metrics')

        try:
            throughput_metrics = ast.literal_eval(throughput_metrics_str)
            iteration = throughput_metrics['throughput/iteration']
            key = f'dp_{dp}_pp_{pp}_tp_{tp}'

            if key not in snapshot_data:
                snapshot_data[key] = {}

            if iteration not in snapshot_data[key]:
                snapshot_data[key][iteration] = {}

            snapshot_data[key][iteration].update({
                'iteration_time': throughput_metrics['throughput/iteration-time'],
                'samples_per_sec': throughput_metrics['throughput/samples_per_sec'],
                'tflops': throughput_metrics['throughput/tflops'],
            })
        except (ValueError, SyntaxError, KeyError) as e:
            print(f"Error parsing throughput metrics: {e}")
            print(f"Throughput metrics string: {throughput_metrics_str}")

    return snapshot_data


In [2]:
import os
import pandas as pd

def process_experiment_logs(log_folder, experiment_mapping):
    experiment_data = {}

    # Traverse through the subfolders
    for subfolder in os.listdir(log_folder):
        subfolder_path = os.path.join(log_folder, subfolder)
        if os.path.isdir(subfolder_path):
            # Find the .log file within the subfolder
            log_file = None
            for file_name in os.listdir(subfolder_path):
                if file_name.endswith('.log'):
                    log_file = os.path.join(subfolder_path, file_name)
                    break

            if not log_file:
                continue

            # Determine the experiment name from the log file name using the mapping
            experiment_name = None
            for key, name in experiment_mapping.items():
                if key in log_file:
                    experiment_name = name
                    # Remove this item
                    del experiment_mapping[key]
                    break

            if not experiment_name:
                continue

            # Extract data from the log file
            snapshot_data = extract_snapshot_data(log_file)

            # Organize the extracted data by experiment and combination of dp, pp, tp
            if experiment_name not in experiment_data:
                experiment_data[experiment_name] = {}

            for key, iterations in snapshot_data.items():
                if key not in experiment_data[experiment_name]:
                    experiment_data[experiment_name][key] = []

                for iteration, data in iterations.items():
                    if (iteration > 5 and iteration < 15) or iteration > 22:
                        experiment_data[experiment_name][key].append(data)

    # Calculate mean values for each experiment and combination of dp, pp, tp
    mean_values = {}
    overall_means = {}
    for experiment, data_dict in experiment_data.items():
        experiment_combined_data = []
        mean_values[experiment] = {}
        for key, data_list in data_dict.items():
            df = pd.DataFrame(data_list)
            mean_dict = df.mean(numeric_only=True).to_dict()
            # Calculate snapshot_speed using averaged snapshot_size and snapshot_time
            if 'snapshot_size' in mean_dict and 'snapshot_time' in mean_dict and mean_dict['snapshot_time'] != 0:
                mean_dict['snapshot_speed'] = mean_dict['snapshot_size'] / mean_dict['snapshot_time']
            mean_values[experiment][key] = mean_dict
            experiment_combined_data.extend(data_list)
        
        # Calculate overall mean for the experiment
        combined_df = pd.DataFrame(experiment_combined_data)
        overall_mean_dict = combined_df.mean(numeric_only=True).to_dict()
        # Calculate snapshot_speed using averaged snapshot_size and snapshot_time
        if 'snapshot_size' in overall_mean_dict and 'snapshot_time' in overall_mean_dict and overall_mean_dict['snapshot_time'] != 0:
            overall_mean_dict['snapshot_speed'] = overall_mean_dict['snapshot_size'] / overall_mean_dict['snapshot_time']
        overall_means[experiment] = overall_mean_dict

    return overall_means


def get_experiment_values(experiment_name, dp_pp_tp, processed_values):
    experiment_data = processed_values.get(experiment_name, None)
    if experiment_data:
        return experiment_data.get(dp_pp_tp, None)
    return None


In [3]:
def save_processed_data_to_csv(mean_values, csv_file_name, dp_pp_tp=None):
    # Flatten the hierarchical dictionary into a list of dictionaries
    flattened_data = []
    for experiment, data_dict in mean_values.items():
        row = {'experiment': experiment}
        row.update(data_dict)
        flattened_data.append(row)

    # Create a DataFrame from the flattened data
    df = pd.DataFrame(flattened_data)

    # Save the DataFrame to a CSV file
    df.to_csv(csv_file_name, index=False)


In [18]:
# Weak scaling of batch size

import pprint
log_folder = "/hpc2hdd/home/zli755/xueze/reft_ds/Megatron-DeepSpeed/examples_deepspeed/data_efficiency/gpt/output/log"

experiment_mapping = {  "23.38": "strong_scaling-no_snapshot-no_sharding-dp_1-sync_record",
                        "23.58": "strong_scaling-sync_snapshot-no_sharding-dp_1-sync_record",
                        "00.13": "strong_scaling-pure_thread-no_sharding-dp_1-sync_record",
                        "00.20": "strong_scaling-thread-no_sharding-dp_1-sync_record",
                        "00.26": "strong_scaling-thread_stream-no_sharding-dp_1-sync_record",
                        "23.49": "strong_scaling-bubble-no_sharding-dp_1-sync_record",
                        "09.24": "strong_scaling-no_snapshot-no_sharding-dp_2-sync_record", 
                        "09.34": "strong_scaling-sync_snapshot-no_sharding-dp_2-sync_record",
                        "09.47": "strong_scaling-pure_thread-no_sharding-dp_2-sync_record",
                        "18.31": "strong_scaling-thread-no_sharding-dp_2-sync_record",
                        "10.09": "strong_scaling-thread_stream-no_sharding-dp_2-sync_record",
                        "10.13": "strong_scaling-bubble-no_sharding-dp_2-sync_record",
                        "12.19": "strong_scaling-no_snapshot-no_sharding-dp_4-sync_record",
                        "12.24": "strong_scaling-sync_snapshot-no_sharding-dp_4-sync_record",
                        "12.45": "strong_scaling-pure_thread-no_sharding-dp_4-sync_record",
                        "12.56": "strong_scaling-thread-no_sharding-dp_4-sync_record",
                        "13.03": "strong_scaling-thread_stream-no_sharding-dp_4-sync_record",
                        "13.08": "strong_scaling-bubble-no_sharding-dp_4-sync_record", 
                        "18.49": "strong_scaling-pure_thread-sharding-dp_2-sync_record",
                        "10.42": "strong_scaling-thread-sharding-dp_2-sync_record",
                        "10.49": "strong_scaling-thread_stream-sharding-dp_2-sync_record",
                        "11.25": "strong_scaling-bubble-sharding-dp_2-sync_record",
                        "13.33": "strong_scaling-pure_thread-sharding-dp_4-sync_record", 
                        "13.42": "strong_scaling-thread-sharding-dp_4-sync_record", 
                        "13.48": "strong_scaling-thread_stream-sharding-dp_4-sync_record", 
                        "13.55": "strong_scaling-bubble-sharding-dp_4-sync_record", 
                        "00.37": "strong_scaling-thread_stream-no_sharding-dp_1-no_sync_record",
                        "04.02": "strong_scaling-bubble-no_sharding-dp_1-no_sync_record",
                        "10.20": "strong_scaling-thread_stream-no_sharding-dp_2-no_sync_record", 
                        "10.25": "strong_scaling-bubble-no_sharding-dp_2-no_sync_record", 
                        "13.18": "strong_scaling-thread_stream-no_sharding-dp_4-no_sync_record", 
                        "13.27": "strong_scaling-bubble-no_sharding-dp_4-no_sync_record", 
                        "11.29": "strong_scaling-thread_stream-sharding-dp_2-no_sync_record", 
                        "11.32": "strong_scaling-bubble-sharding-dp_2-no_sync_record", 
                        "13.59": "strong_scaling-thread_stream-sharding-dp_4-no_sync_record", 
                        "14.03": "strong_scaling-bubble-sharding-dp_4-no_sync_record"}
processed_data_strong_scaling = process_experiment_logs(log_folder, experiment_mapping)

# pprint.pprint(processed_data_strong_scaling)

csv_file_name = "/hpc2hdd/home/zli755/xueze/reft_ds/Megatron-DeepSpeed/examples_deepspeed/data_efficiency/gpt/data_process/processed_data/strong_scaling.csv"
save_processed_data_to_csv(processed_data_strong_scaling, csv_file_name)

In [9]:
import pprint
# Weak scaling of model size

log_folder = "/hpc2hdd/home/zli755/yuhan/reft_ds/Megatron-DeepSpeed/examples_deepspeed/data_efficiency/gpt/output/log"
experiment_mapping = {
    "05.20_19.18": "weak_scaling_bs-7B-bubble-no_sharding-pp_8-bs_2",
    "05.21_20.39": "weak_scaling_bs-7B-no_snapshot-no_sharding-pp_8-bs_2",
    "05.21_20.43": "weak_scaling_bs-7B-sync_snapshot-no_sharding-pp_8-bs_2",
    "05.21_21.01": "weak_scaling_bs-7B-thread-no_sharding-pp_8-bs_2",
    "05.21_20.55": "weak_scaling_bs-7B-thread_stream-no_sharding-pp_8-bs_2",
    "05.21_21.28": "weak_scaling_bs-7B-pure_thread-no_sharding-pp_8-bs_2",
    "05.20_23.25": "weak_scaling_bs-13B-bubble-no_sharding-pp_16-bs_2",
    "05.21_20.23": "weak_scaling_bs-13B-no_snapshot-no_sharding-pp_16-bs_2",
    "05.21_20.08": "weak_scaling_bs-13B-sync_snapshot-no_sharding-pp_16-bs_2",
    "05.21_19.55": "weak_scaling_bs-13B-thread-no_sharding-pp_16-bs_2",
    "05.21_20.02": "weak_scaling_bs-13B-thread_stream-no_sharding-pp_16-bs_2",
    "05.21_22.05": "weak_scaling_bs-13B-pure_thread-no_sharding-pp_16-bs_2",
    "05.21_18.29": "weak_scaling_bs-34B-bubble-no_sharding-pp_32-bs_2",
    "05.21_18.41": "weak_scaling_bs-34B-no_snapshot-no_sharding-pp_32-bs_2",
    "05.21_18.49": "weak_scaling_bs-34B-sync_snapshot-no_sharding-pp_32-bs_2",
    "05.21_19.44": "weak_scaling_bs-34B-thread-no_sharding-pp_32-bs_2",
    "05.21_19.38": "weak_scaling_bs-34B-thread_stream-no_sharding-pp_32-bs_2",
    "05.21_21.52": "weak_scaling_bs-34B-pure_thread-no_sharding-pp_32-bs_2",
}
processed_data_weak_scaling_bs = process_experiment_logs(log_folder, experiment_mapping)

pprint.pprint(processed_data_weak_scaling_bs)

csv_file_name = "/hpc2hdd/home/zli755/xueze/reft_ds/Megatron-DeepSpeed/examples_deepspeed/data_efficiency/gpt/data_process/processed_data/weak_scaling_bs.csv"
save_processed_data_to_csv(processed_data_weak_scaling_bs, csv_file_name)

{'weak_scaling_bs-13B-bubble-no_sharding-pp_16-bs_2': {'iteration_time': 3.382360321633956,
                                                       'samples_per_sec': 4.730697916441348,
                                                       'tflops': 49.46547382839215},
 'weak_scaling_bs-13B-no_snapshot-no_sharding-pp_16-bs_2': {'iteration_time': 3.5804108048186585,
                                                            'samples_per_sec': 4.539332446166725,
                                                            'tflops': 47.46450402885609},
 'weak_scaling_bs-13B-pure_thread-no_sharding-pp_16-bs_2': {'iteration_time': 10.170005672118243,
                                                            'samples_per_sec': 1.5983936179321443,
                                                            'tflops': 16.713241697488932},
 'weak_scaling_bs-13B-sync_snapshot-no_sharding-pp_16-bs_2': {'iteration_time': 23.146857473780127,
                                                        