In [86]:
SCHEMA_BASE = """
schema {
    query: RootSchemaQuery
}

directive @filter(
    \"\"\"Name of the filter operation to perform.\"\"\"
    op_name: String!
    \"\"\"List of string operands for the operator.\"\"\"
    value: [String!]
) repeatable on FIELD | INLINE_FRAGMENT

directive @tag(
    \"\"\"Name to apply to the given property field.\"\"\"
    tag_name: String!
) on FIELD

directive @output(
    \"\"\"What to designate the output field generated from this property field.\"\"\"
    out_name: String!
) on FIELD

directive @output_source on FIELD

directive @optional on FIELD

directive @recurse(
    \"\"\"
    Recurse up to this many times on this edge. A depth of 1 produces the current
    vertex and its immediate neighbors along the given edge.
    \"\"\"
    depth: Int!
) on FIELD

directive @fold on FIELD

directive @macro_edge on FIELD_DEFINITION

directive @stitch(source_field: String!, sink_field: String!) on FIELD_DEFINITION
"""

KSP_PART_SCHEMA = SCHEMA_BASE + """

type Part {
    cfg_file_path: String
    internal_name: String
    name: String
    manufacturer: String
    description: String
    cost: Int
    dry_mass: Float
    crash_tolerance: Float  # expressed in m/s
    max_temp_tolerance: Float  # expressed in Kelvin, part explodes if above this temp
    
    out_Part_EngineModule: [EngineModule]
}

type EngineModule {    
    min_thrust: Float
    max_thrust: Float
    throttleable: Boolean  # e.g., solid boosters cannot be throttled down
    isp_vacuum: Float
    isp_at_1atm: Float
}

type RootSchemaQuery {
    Part: [Part]
}
"""

In [17]:
# You may need to change this to point to your KSP install. This is my KSP location, viewed from WSL.
default_ksp_install_path = "/mnt/c/Program Files (x86)/Steam/steamapps/common/Kerbal Space Program/GameData/"

In [154]:
from dataclasses import dataclass
from glob import iglob
from itertools import islice
import os
import string
import re
from typing import Any, Dict, Iterable, Optional, Set, Tuple

from graphql import parse, build_ast_schema
from graphql_compiler.compiler.compiler_frontend import graphql_to_ir
from graphql_compiler.interpreter import InterpreterAdapter, DataContext, DataToken, interpret_ir

In [172]:
@dataclass
class KerbalToken:
    type_name: str
    content: Dict[str, Any]

In [121]:
def get_ksp_part_cfg_files(ksp_install_path: str) -> Iterable[str]:
    glob_search_path = os.path.join(ksp_install_path, "**", "Parts", "**", "*.cfg")
    return iglob(glob_search_path, recursive=True)

In [150]:
_loc_pattern = re.compile(r"(#autoLOC_\d+)\s+//(?:\s*\1\s*=)?(?P<english>.*)")
_comment_sequence = "//"


def load_part_config_from_cfg_file(file_path: str) -> Optional[Dict[Tuple[Tuple[str, int], ...], Any]]:
    with open(file_path, "r") as f:
        lines: List[str] = [
            raw_line.strip()
            for raw_line in f
        ]
    
    if not lines:
        return None
    
    if lines[0][0] == "\ufeff":
        # Remove byte-order marks at the start of the file
        lines[0] = lines[0][1:]
    
    expected_part_file_start = ["PART", "{"]
    if lines[:2] != expected_part_file_start:
        # Not a part file, ignore.
        return None
    
    visited_sections: Set[Tuple[str, int]] = set()
    current_section: List[Tuple[str, int]] = []
    data: Dict[Tuple[Tuple[str, int], ...], Any] = {}
        
    expected_section_name_chars = set(string.ascii_letters) | set(string.digits) | {"_"}
    section_like_exceptions = {
        "fxOriginalOffset",
    }
    
    for line_index, line in enumerate(lines):
        if line == "":
            continue
        elif line.startswith(_comment_sequence):
            # Comment line, ignore.
            continue
        if "=" in line:
            components = [
                component.strip()
                for component in line.split("=", 1)
            ]
            key, value = components
            
            counter = 0
            current_field_key = (key, counter)
            full_key = tuple(current_section) + (current_field_key,)
            while full_key in data:
                counter += 1
                current_field_key = (key, counter)
                full_key = tuple(current_section) + (current_field_key,)
                
            data[full_key] = value
        elif line == "{":
            continue
        elif line.startswith("}"):
            current_section.pop()
            while line != "}":
                line = line.split("}", 1)[1].strip()
                if line.startswith("}"):
                    current_section.pop()
        elif line in section_like_exceptions:
            # Certain strings appear in section-like form (unary, not key = value),
            # and we know to ignore them.
            continue
        else:
            if line.endswith("{"):
                line = line.strip("{").strip()
            else:
                next_nonempty_line_index = line_index + 1
                line_count = len(lines)
                while next_nonempty_line_index < line_count:
                    if lines[next_nonempty_line_index] == "{":
                        break
                    elif lines[next_nonempty_line_index] == "":
                        next_nonempty_line_index += 1
                        continue
                    else:
                        raise AssertionError(f"Unexpected line {line_index} in file {file_path}: {line}")
                
                peek_next_line = lines[next_nonempty_line_index] if next_nonempty_line_index < line_count else ""
                if peek_next_line != "{":
                    raise AssertionError(f"Unexpected line {line_index} in file {file_path}: {line}")
            
            unexpected_chars = set(line) - expected_section_name_chars
            if unexpected_chars:
                raise AssertionError(
                    f"Unexpected section name at line {line_index} in file {file_path}: {line}"
                )
            
            section_name = line
            counter = 0

            section_key = (section_name, counter)
            fully_qualified_section_name = tuple(current_section) + tuple(section_key,)
            while fully_qualified_section_name in visited_sections:
                counter += 1
                section_key = (section_name, counter)
                fully_qualified_section_name = tuple(current_section) + tuple(section_key,)

            visited_sections.add(fully_qualified_section_name)
            current_section.append(section_key)
            
    return data


def read_raw(
    config_data: Dict[Tuple[Tuple[str, int], ...], Any], 
    path: Tuple[Tuple[str, int], ...],
) -> Optional[str]:
    raw_value = config_data.get(path, None)
    if raw_value is None:
        return None

    if _comment_sequence in raw_value:
        raw_value = raw_value.split(_comment_sequence, 1)[0] 
        
    return raw_value


def read_float(
    config_data: Dict[Tuple[Tuple[str, int], ...], Any], 
    path: Tuple[Tuple[str, int], ...],
    *,
    default: Optional[float] = None,
) -> float:
    raw_value = read_raw(config_data, path)
    if raw_value is None:
        if default is None:
            raise KeyError(path)
        else:
            return default
        
    return float(raw_value)


def read_int(
    config_data: Dict[Tuple[Tuple[str, int], ...], Any], 
    path: Tuple[Tuple[str, int], ...],
    *,
    default: Optional[int] = None,
) -> int:
    raw_value = read_raw(config_data, path)
    if raw_value is None:
        if default is None:
            raise KeyError(path)
        else:
            return default
        
    return int(raw_value)


def read_bool(
    config_data: Dict[Tuple[Tuple[str, int], ...], Any], 
    path: Tuple[Tuple[str, int], ...],
    *,
    default: Optional[bool] = None,
) -> bool:
    raw_value = read_raw(config_data, path)
    if raw_value is None:
        if default is None:
            raise KeyError(path)
        else:
            return default
    
    if raw_value == "True":
        return True
    elif raw_value == "False":
        return False
    
    raise AssertionError(f"Unexpected value '{raw_value}' for expected boolean at path {path}")
    
    
def read_str(
    config_data: Dict[Tuple[Tuple[str, int], ...], Any], 
    path: Tuple[Tuple[str, int], ...],
    *,
    default: Optional[str] = None,
) -> str:
    # N.B.: The string localization data stores the English string in a comment section.
    #       Do not use the regular read_raw() function, since that will strip the comment!
    raw_value = config_data.get(path, default)
    if raw_value is None:
        if default is None:
            raise KeyError(path)
        else:
            return default
        
    if raw_value.startswith("#autoLOC"):
        match = _loc_pattern.match(raw_value)
        if match is None:
            raise AssertionError(
                f"Found a localization-like string at path {path} that did not match "
                f"the expected localization pattern: {raw_value}"
            )
        else:
            return match.group("english").replace("\\n", "\n").strip()
    else:
        return raw_value
        

In [144]:
def make_part_token(file_path: str) -> Optional[KerbalToken]:
    part_config = load_part_config_from_cfg_file(file_path)
    if part_config is None:
        return None
    
    type_name = "Part"
    
    base_key = (("PART", 0),)
    
    internal_name = read_str(part_config, base_key + (("name", 0),))
    
    non_part_blacklist: Set[str] = {
        "flag",
        "kerbalEVA",
        "kerbalEVAFuture",
        "kerbalEVAVintage",
        "kerbalEVAfemale",
        "kerbalEVAfemaleFuture",
        "kerbalEVAfemaleVintage",
    }
    if internal_name in non_part_blacklist:
        return None
    
    try:
        read_str(part_config, base_key + (("title", 0),))
    except:
        print(internal_name)
        print(internal_name in non_part_blacklist)
    
    content: Dict[str, Any] = {
        "cfg_file_path": file_path,
        
        "internal_name": internal_name,
        "name": read_str(part_config, base_key + (("title", 0),)),
        "manufacturer": read_str(part_config, base_key + (("manufacturer", 0),), default="N/A"),
        "cost": read_int(part_config, base_key + (("cost", 0),)),
        "dry_mass": read_float(part_config, base_key + (("mass", 0),)),
        "crash_tolerance": read_float(part_config, base_key + (("crashTolerance", 0),)),
        "max_temp_tolerance": read_float(part_config, base_key + (("maxTemp", 0),), default=1200.0),
    }
    
    return KerbalToken(type_name, content)

In [152]:
non_part_files = 0
for part_file in get_ksp_part_cfg_files(default_ksp_install_path):
    part_token = make_part_token(part_file)
    if part_token is None:
        non_part_files += 1
        print(f"No part could be read from file {part_file}")

print(f"Non-part files: {non_part_files}")

No part could be read from file /mnt/c/Program Files (x86)/Steam/steamapps/common/Kerbal Space Program/GameData/Squad/Parts/VariantThemes.cfg
No part could be read from file /mnt/c/Program Files (x86)/Steam/steamapps/common/Kerbal Space Program/GameData/Squad/Parts/Prebuilt/flag.cfg
No part could be read from file /mnt/c/Program Files (x86)/Steam/steamapps/common/Kerbal Space Program/GameData/Squad/Parts/Prebuilt/kerbalEVA.cfg
No part could be read from file /mnt/c/Program Files (x86)/Steam/steamapps/common/Kerbal Space Program/GameData/Squad/Parts/Prebuilt/kerbalEVAfemale.cfg
No part could be read from file /mnt/c/Program Files (x86)/Steam/steamapps/common/Kerbal Space Program/GameData/SquadExpansion/MakingHistory/Parts/Prebuilt/kerbalEVAfemaleVintage.cfg
No part could be read from file /mnt/c/Program Files (x86)/Steam/steamapps/common/Kerbal Space Program/GameData/SquadExpansion/MakingHistory/Parts/Prebuilt/kerbalEVAVintage.cfg
No part could be read from file /mnt/c/Program Files (x8

In [167]:
class KerbalDataAdapter(InterpreterAdapter[KerbalToken]):
    ksp_install_path: str
    
    def __init__(self, ksp_install_path: str) -> None:
        self.ksp_install_path = ksp_install_path
        
    def get_tokens_of_type(
        self,
        type_name: str,
        **hints: Dict[str, Any],
    ) -> Iterable[KerbalToken]:
        if type_name == "Part":
            for cfg_file in get_ksp_part_cfg_files(default_ksp_install_path):
                part_token = make_part_token(cfg_file)
                if part_token is not None:
                    yield part_token
        else:
            raise NotImplementedError()
    
    def project_property(
        self,
        data_contexts: Iterable[DataContext[KerbalToken]],
        current_type_name: str,
        field_name: str,
        **hints: Dict[str, Any],
    ) -> Iterable[Tuple[DataContext[KerbalToken], Any]]:
        for data_context in data_contexts:
            token = data_context.current_token
            current_value = None
            if token is not None:
                current_value = token.content[field_name]
            
            yield (data_context, current_value)
    
    def project_neighbors(
        self,
        data_contexts: Iterable[DataContext[KerbalToken]],
        current_type_name: str,
        direction: str,
        edge_name: str,
        **hints: Dict[str, Any],
    ) -> Iterable[Tuple[DataContext[KerbalToken], Iterable[KerbalToken]]]:
        pass
    
    def can_coerce_to_type(
        self,
        data_contexts: Iterable[DataContext[KerbalToken]],
        current_type_name: str,
        coerce_to_type_name: str,
        **hints: Dict[str, Any],
    ) -> Iterable[Tuple[DataContext[KerbalToken], bool]]:
        pass

In [156]:
schema = build_ast_schema(parse(KSP_PART_SCHEMA))

def execute_query(query: str, args: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
    adapter = KerbalDataAdapter(default_ksp_install_path)
    
    ir_and_metadata = graphql_to_ir(schema, query)
    return interpret_ir(adapter, ir_and_metadata, args)


In [171]:
list(execute_query("""
{
    Part {
        name @output(out_name: "part_name")
        internal_name @output(out_name: "internal_name")
        dry_mass @filter(op_name: ">=", value: ["$min_mass"]) @output(out_name: "dry_mass")
    }
}
""", {
    "min_mass": 20.0,
}))

[{'part_name': 'S2-33 "Clydesdale" Solid Fuel Booster',
  'internal_name': 'Clydesdale',
  'dry_mass': 21.0},
 {'part_name': 'A potato like rock',
  'internal_name': 'PotatoRoid',
  'dry_mass': 150.0},
 {'part_name': 'Kerbodyne S4-512 Fuel Tank',
  'internal_name': 'Size4_Tank_04',
  'dry_mass': 32.0}]