Train an XGBoost model to predict scan parquet GPU scan time.

In [None]:
import csv
import json
import os

import numpy as np
import pandas as pd
import xgboost as xgb

cpu_profile_dirs = ["/opt/data/profiles/dataproc-cpu-sf100",
                    "/opt/data/profiles/dataproc-cpu-sf3k",
                    "/opt/data/profiles/dataproc-cpu-sf5k",
                    "/opt/data/profiles/dataproc-cpu-sf10k"]
gpu_profile_dirs = ["/opt/data/profiles/dataproc-gpu-sf100",
                    "/opt/data/profiles/dataproc-gpu-sf3k",
                    "/opt/data/profiles/dataproc-gpu-sf5k",
                    "/opt/data/profiles/dataproc-gpu-sf10k"]
cpu_event_dirs = ["/opt/data/events/dataproc-cpu-sf100",
                  "/opt/data/events/dataproc-cpu-sf3k",
                  "/opt/data/events/dataproc-cpu-sf5k",
                  "/opt/data/events/dataproc-cpu-sf10k"]
gpu_event_dirs = ["/opt/data/events/dataproc-gpu-sf100",
                  "/opt/data/events/dataproc-gpu-sf3k",
                  "/opt/data/events/dataproc-gpu-sf5k",
                  "/opt/data/events/dataproc-gpu-sf10k"]
scale_factors = ["100", "3K", "5K", "10K"]

In [None]:
class ScanParquet:
    def __init__(self, group=None):
        self.num_output_rows = 0
        self.num_files_read = 0
        self.metadata_time = 0
        self.size_files_read = 0
        self.scan_time_median = 0
        self.scan_time_max = 0
        self.scan_time_total = 0
        self.num_partitions_read = 0
        self.dynamic_partition_pruning_time = 0
        self.static_num_files_read = 0
        self.static_size_files_read = 0
        self.accumulator_id = 0
        if group is None:
            return
        for row in group.itertuples():
            if row.name == "number of output rows" or row.name == "output rows":
                self.num_output_rows = row.total
            elif row.name == "number of files read":
                self.num_files_read = row.total
            elif row.name == "metadata time":
                self.metadata_time = row.total
            elif row.name == "size of files read":
                self.size_files_read = row.total
            elif row.name == "scan time":
                self.scan_time_median = row.median
                self.scan_time_max = row.max
                self.scan_time_total = row.total
                self.accumulator_id = row.accumulatorId
            elif row.name == "number of partitions read" or row.name == "partitions":
                self.num_partitions_read = row.total
            elif row.name == "dynamic partition pruning time":
                self.dynamic_partition_pruning_time = row.total
            elif row.name == "static number of files read":
                self.static_num_files_read = row.total
            elif row.name == "static size of files read":
                self.static_size_files_read = row.total

    def __eq__(self, other):
        if isinstance(other, ScanParquet):
            return (self.num_output_rows == 0 or
                    other.num_output_rows == 0 or
                    self.num_output_rows == other.num_output_rows) and \
                (self.num_files_read == 0 or
                 other.num_files_read == 0 or
                 self.num_files_read == other.num_files_read) and \
                (self.size_files_read == 0 or
                 other.size_files_read == 0 or
                 self.size_files_read == other.size_files_read) and \
                (self.num_partitions_read == 0 or
                 other.num_partitions_read == 0 or
                 self.num_partitions_read == other.num_partitions_read)
        return False

    def __str__(self):
        return (f"{self.num_output_rows}, {self.num_files_read}, {self.size_files_read}, {self.num_partitions_read}, "
                f"{self.scan_time_total}")


def collect(node, scan_list, scan_map):
    if node['nodeName'] == 'Scan parquet ' or node['nodeName'] == 'GpuScan parquet ':
        for m in node['metrics']:
            if m['name'] == 'scan time':
                accumulator_id = m['accumulatorId']
                scan_list.append(scan_map.get(accumulator_id, ScanParquet()))
    for child in node['children']:
        collect(child, scan_list, scan_map)


def collect_scans(profile_dir, event_file):
    sql_info = pd.read_csv(f"{profile_dir}/sql_plan_metrics_for_application.csv")
    condition = (sql_info['nodeName'] == 'Scan parquet ') | (sql_info['nodeName'] == 'GpuScan parquet ')
    filtered = sql_info[condition]
    grouped = filtered.groupby("nodeID")
    scan_dict = {}
    for _, group in grouped:
        scan = ScanParquet(group)
        scan_dict[scan.accumulator_id] = scan

    scans = []
    with open(event_file, "r") as f:
        for line in f:
            event = json.loads(line)
            if "sparkPlanInfo" in event:
                scans.clear()
                collect(event["sparkPlanInfo"], scans, scan_dict)
    return sorted(scans, key=lambda s: (s.num_output_rows, s.num_files_read, s.size_files_read, s.num_partitions_read))


num_scale_factors = len(scale_factors)
cpu_times = []
gpu_times = []
for i in range(num_scale_factors):
    cpu_profiles = os.listdir(cpu_profile_dirs[i])
    cpu_profiles.sort()
    num_apps = len(cpu_profiles)
    gpu_profiles = os.listdir(gpu_profile_dirs[i])
    gpu_profiles.sort()
    cpu_events = os.listdir(cpu_event_dirs[i])
    cpu_events.sort()
    gpu_events = os.listdir(gpu_event_dirs[i])
    gpu_events.sort()
    assert num_apps == len(gpu_profiles) == len(cpu_events) == len(gpu_events)
    for j in range(num_apps):
        print(f"Processing cpu profile {cpu_profiles[j]}, event file {cpu_events[j]}")
        cpu_scans = collect_scans(f"{cpu_profile_dirs[i]}/{cpu_profiles[j]}",
                                  f"{cpu_event_dirs[i]}/{cpu_events[j]}")
        print(f"Processing gpu profile {gpu_profiles[j]}, event file {gpu_events[j]}")
        gpu_scans = collect_scans(f"{gpu_profile_dirs[i]}/{gpu_profiles[j]}",
                                  f"{gpu_event_dirs[i]}/{gpu_events[j]}")
        if cpu_scans != gpu_scans:
            print("Warning: cpu and gpu scans are different:")
            print("; ".join(str(s) for s in cpu_scans))
            print("; ".join(str(s) for s in gpu_scans))
            continue
        cpu_times.extend(cpu_scans)
        gpu_times.extend(gpu_scans)
print(f"Total number of cpu scans: {len(cpu_times)}")
print(f"Total number of gpu scans: {len(gpu_times)}")

In [None]:
header = ["gpu_scan_time_total", "num_output_rows", "num_files_read", "metadata_time", "size_files_read",
          "scan_time_median", "scan_time_max", "scan_time_total", "num_partitions_read",
          "dynamic_partition_pruning_time", "static_num_files_read", "static_size_files_read"]
fields = header[1:]

with open("scan_parquet.csv", "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=header)
    writer.writeheader()
    assert len(cpu_times) == len(gpu_times)
    for i, cpu_time in enumerate(cpu_times):
        row = {field: getattr(cpu_time, field) for field in fields}
        row["gpu_scan_time_total"] = gpu_times[i].scan_time_total
        writer.writerow(row)

In [None]:
df = pd.read_csv('scan_parquet.csv', na_values=["0"])
df.drop_duplicates(inplace=True)
print(df.shape[0])
X = df.iloc[:, :1]
y = df.iloc[:, 0]
dtrain = xgb.DMatrix(X, label=y, missing=np.nan)
param = {
    'validate_parameters': True,
    'eta': 0.1,
    'max_depth': 0,
    'tree_method': 'hist',
    'grow_policy': 'lossguide',
    'max_leaves': 255,
    'objective': 'reg:squarederror',
    'eval_metric': 'rmse',
}
num_round = 500
results = xgb.cv(
    params=param,
    dtrain=dtrain,
    num_boost_round=num_round,
    nfold=10,
    seed=42,
    callbacks=[
        xgb.callback.EvaluationMonitor(show_stdv=False),
        xgb.callback.EarlyStopping(3),
    ],
)
print(results)