In [1]:
#   Set the variables in the VARIABLE block to the values for your environment
#   TODO: Need to add graphbase as a mod to insert into the database

In [2]:
################   VARIABLES   #############
# set this to your username for the script_output_folder path
user = "YOUR_USERNAME"
script_output_folder = f"C:/Users/{user}/AppData/Roaming/Factorio/script-output"

# DataFactorio currently creates files for the game tick the export was created.
# Set this to which set of files you want to read from
# Leave 0 to get the maxium game tick available
read_timestamp = 70682288    


################   IMPORTS   #############
import networkx as nx
import json
import re
import os
from collections import Counter


In [None]:
################   GET THE DATA   ##############
def extract_repeated_timestamps(filenames) -> list[int]:
    """
    Extracts and returns a list of unique timestamps that are repeated in the filenames,
    assuming the timestamp format is an integer sequence before the '-' character.
    
    Args:
    - filenames (list of str): The list of filenames in the format 'timestamp-suffix.ext'.
    
    Returns:
    - list of int: A list of unique, repeated timestamps.
    """
    # Regular expression to match the timestamp pattern
    timestamp_pattern = re.compile(r"(\d+)-")
    timestamps = []
    for filename in filenames:
        match = timestamp_pattern.match(filename)
        if match:
            timestamps.append(int(match.group(1)))
    
    # Count occurrences of each timestamp
    timestamp_counts = Counter(timestamps)
    
    # Filter timestamps that are repeated, i.e., count > 1
    repeated_timestamps = [timestamp for timestamp, count in timestamp_counts.items() if count > 1]
    
    return repeated_timestamps

def get_data_files(read_timestamp:int = 0) -> tuple[str, str, str, str]:
    """
    Get the latest data files from the Factorio script-output folder.
    
    Returns:
    - tuple of str: The filenames of the latest recipe, entity, tech, and inventory data files.
    """
    # Get the latest timestamp
    script_output_folder = f"C:/Users/{user}/AppData/Roaming/Factorio/script-output"
    timestamps = extract_repeated_timestamps(os.listdir(script_output_folder))
    if len(timestamps) == 0:
        return "", "", "", "", 0
    if read_timestamp == 0:
        read_timestamp = max(timestamps)
    
    # Get all the files from the script output folder with the max timestamp
    files = os.listdir(script_output_folder)
    files = [f for f in files if f.startswith(str(read_timestamp))]
    
    # Get the files that are json
    files = [f for f in files if f.endswith(".json")]
    
    # Check if the file exists otherwise set file to ""
    recipe_file = next((f for f in files if "recipe" in f), "")
    entity_file = next((f for f in files if "entities" in f), "")
    tech_file = next((f for f in files if "tech" in f), "")
    inventory_file = next((f for f in files if "inventory" in f), "")

    return recipe_file, entity_file, tech_file, inventory_file, read_timestamp

def load_data_files(recipe_file, entity_file, tech_file, inventory_file) -> tuple[dict, dict, dict, dict]:
    """
    Load the data from the given files and return them as dictionaries.
    
    Args:
    - recipe_file (str): The filename of the recipe data file.
    - entity_file (str): The filename of the entity data file.
    - tech_file (str): The filename of the tech data file.
    - inventory_file (str): The filename of the inventory data file.
    
    Returns:
    - tuple of dict: The recipe, entity, tech, and inventory data as dictionaries.
    """
    # Check if file is not empty
    if recipe_file == "":
        recipe_data = {}
    else:
        with open(f"{script_output_folder}/{recipe_file}", "r") as f:
            recipe_data = json.load(f)
    if entity_file == "":
        entity_data = {}
    else:
        with open(f"{script_output_folder}/{entity_file}", "r") as f:
            entity_data = json.load(f)
    if tech_file == "":
        tech_data = {}
    else:
        with open(f"{script_output_folder}/{tech_file}", "r") as f:
            tech_data = json.load(f)
    if inventory_file == "":
        inventory_data = {}
    else:
        with open(f"{script_output_folder}/{inventory_file}", "r") as f:
            inventory_data = json.load(f)
    
    return recipe_data, entity_data, tech_data, inventory_data

recipe_file, entity_file, tech_file, inventory_file, read_timestamp = get_data_files(read_timestamp)
recipe_data, entity_data, tech_data, inventory_data = load_data_files(recipe_file, entity_file, tech_file, inventory_file)
if recipe_file == "":
    print(f"No data files found for timestamp {read_timestamp}.")

In [None]:
################   RECIPE DATA   ################

### What data facotrio outputs
# recipe_data[recipe.name] = {
#     "se-arcosphere-fold-a": {
    #   "ingredients": [
    #     { "type": "item", "name": "se-arcosphere-a", "amount": 1 },
    #     { "type": "item", "name": "se-arcosphere-h", "amount": 1 }
    #   ],
    #   "products": [
    #     {
    #       "type": "item",
    #       "name": "se-arcosphere-b",
    #       "probability": 1,
    #       "amount": 1
    #     },
    #     {
    #       "type": "item",
    #       "name": "se-arcosphere-d",
    #       "probability": 1,
    #       "amount": 1
    #     }
    #   ],
    #   "energy": 10
# }

### what we need it to look like to put into graphbase
# recipe edges will point to other receipe data pieces 
# so recipe will look something like this 
# {
#     name: "recipe",
#     nodes: [
#         {
#             name: "rep-item3",
#             nodetype: "recipe",
#             ingredients: ["item1"],
#             products: ["item2"],
#             energy: 100
#             edges: [
#                 {
#                     source: "item1", # where source is from ingredient to target recipe
#                     target: "rep-item1"
#                 },
#                 {
#                     source: "item2", # where source is from product to target recipe
#                     target: "rep-item3"
#                 }
#             ]
#         },
#     ],
# }

# Parse the recipe data into a NetworkX graph
def parse_recipe_json_to_networkx(file_data):
    """
    Parse the recipe data into a NetworkX graph.
    
    Parameters:
    - recipe_data: A dictionary representing the recipe data.
    
    Returns:
    - A NetworkX directed graph representing the recipes, ingredients, and products.
    """
    # Create a directed graph using NetworkX
    recipe_graph = nx.DiGraph(name="recipes")
    

    # First pass to create nodes for all recipes
    for item_name, item in file_data.items():
        if item_name == "recipes" : 
            for recipe_name, recipe_attrs in item.items():
                recipe_graph.add_node(
                    f"rec-{recipe_name}",
                    label=recipe_name,
                    type="recipe",
                    energy=recipe_attrs.get("energy", 0),
                    ingredients=recipe_attrs.get("ingredients", []),
                    products=recipe_attrs.get("products", [])
                )
    
    # Second pass to create edges for ingredients and products
    for item_name, item in file_data.items():
        if item_name == "recipes" : 
            for recipe_name, recipe_attrs in item.items():
                for ingredient in recipe_attrs.get("ingredients", []):
                    # Ensure ingredient nodes exist (as recipes)
                    ingredient_node_id = f"rec-{ingredient['name']}"
                    if ingredient_node_id not in recipe_graph:
                        recipe_graph.add_node(ingredient_node_id, label=ingredient["name"], type="recipe", enerty=0, ingredients=[], products=[])
                    
                    # Add edge from ingredient to recipe
                    recipe_graph.add_edge(
                        ingredient_node_id, f"rec-{recipe_name}",
                        weight=ingredient.get("amount", 1),
                        relationship="ingredient"
                    )
                
                for product in recipe_attrs.get("products", []):
                    product_node_id = f"rec-{product['name']}"
                    # Ensure product nodes exist (as recipes) and are not the same as the recipe itself
                    if product_node_id not in recipe_graph:
                        recipe_graph.add_node(product_node_id, label=product["name"], type="recipe", energy=0, ingredients=[], products=[])
                    
                    # Add edge from recipe to product, avoiding self-referencing
                    if product_node_id != f"rec-{recipe_name}":
                        recipe_graph.add_edge(
                            f"rec-{recipe_name}", product_node_id,
                            weight=product.get("amount", 1),
                            relationship="product"
                        )
        
    return recipe_graph

# Load the recipe data from the JSON file and parse it into a NetworkX graph
recipe_graph = None
if recipe_data:
    recipe_graph = parse_recipe_json_to_networkx(recipe_data)
print(recipe_graph)

In [None]:
################   ENTITY DATA   ################

### What data facotrio outputs
# entity_data[entity.name] = {
#     position = {x = entity.position.x, y = entity.position.y},
#     nodetype = "entity"
# }

### what we need it to look like to put into graphbase
# entity edges will point to receipe nodes
# so entity will look something like this 
# {
#     name: "eneity",
#     nodes: [
#         {
#             name: "ent-item5",
#             nodetype: "entity",
#             position: {x: 100, y: 100},
#             edges: [
#                 {
#                     source: "rep-item5", # where source is from entity to target recipe
#                     target: "ent-item5"
#                 }
#         },
#     ],
# }

def parse_entity_json_to_networkx(data):
    # If data is a string, parse it as JSON
    if isinstance(data, str):
        data = json.loads(data)
    
    # Create a networkx graph
    G = nx.DiGraph()

    # Check if data is a dict
    if not isinstance(data, dict):
        raise ValueError("Data is not a dict")

    # Add nodes with positions 
    # loop through items in diction data
    for key, value in data.items():
        name = key
        position = None
        nodetype = 'entity'
        x = None
        y = None 
        for k, v in value.items(): 
            if k == "position":
                if not isinstance(v, dict):
                    raise ValueError("Position is not a dictionary")
                position = v
                x = position['x'] * 5
                y = position['y'] * 5
            elif k == "nodetype":
                nodetype = v
                
        G.add_node(name, type=nodetype, x=x, y=y, size='', color='', shape='')

    return G

# Load the entity data from the JSON file and parse it into a NetworkX graph
entity_graph = None
if entity_data:
    entity_graph = parse_entity_json_to_networkx(entity_data)

In [None]:
################   TECH DATA   ################

### What data facotrio outputs
# tech_data[tech.name] = { researched = tech.researched or false }

### what we need it to look like to put into graphbase
# tech edges will point to receipe nodes
# This will require a bit more information from tech data
# like which recipe it unlocks
# {
#     name: "tech",
#     nodes: [
#         {
#             name: "tec-item5",
#             nodetype: "tech",
#             researched: true
#         },
#     ],
# }

# Parse the tech data into a NetworkX graph
def parse_tech_json_to_networkx(data):
    # If data is a string, parse it as JSON
    if isinstance(data, str):
        data = json.loads(data)
    
    # Create a networkx graph
    G = nx.DiGraph()

    # Check if data is a dict
    if not isinstance(data, dict):
        raise ValueError("Data is not a dict")

    # Add nodes with positions 
    # loop through items in diction data
    for key, value in data.items():
        name = key
        researched = False
        nodetype = 'tech'
        for k, v in value.items(): 
            if k == "researched":
                researched = v
            elif k == "nodetype":
                nodetype = v
                
        G.add_node(name, type=nodetype, researched=researched)

    return G

# Load the tech data from the JSON file and parse it into a NetworkX graph
tech_graph = None
if tech_data:
    tech_graph = parse_tech_json_to_networkx(tech_data)

In [None]:
################   INVENTORY DATA   ################

### What data facotrio outputs
# inventory_contents[name] = count

### what we need it to look like to put into graphbase
# inventory edges will point to receipe nodes
# {
#     name: "inventory",
#     nodes: [
#         {
#             name: "inv-item5",
#             nodetype: "inventory",
#             count: 100
#             edges: [
#                 {
#                     source: "rep-item5", # where source is from inventory to target recipe
#                     target: "inv-item5"
#                 }
#             ]
#         },
#     ],
# }

# Function to create a networkx graph from the given JSON data
def parse_inventory_json_to_networkx(data): 
    # Create a networkx graph
    G = nx.DiGraph()

    # Add nodes
    for item_name, item in data.items():
        if item_name != "inventory" : continue
        for inv_item, count in item.items():
            G.add_node(f"inv-{inv_item}", label=inv_item, count=count, type='inventory')

    return G

# Load the inventory data from the JSON file and parse it into a NetworkX graph
inventory_graph = None
if inventory_data:
    inventory_graph = parse_inventory_json_to_networkx(inventory_data)

In [None]:
################  COMBINE GRAPHS   (Currently only combines Recipe and Inventory) ################
# TODO: For entity, we'll need the item the entity is as well as the recipe it is using (if it's using a recipe)
# TODO: For tech, we'll need the recipe it unlocks (if it unlocks a recipe
def combine_graph(recipe_graph, entity_graph, tech_graph, inventory_graph):
    """
    Combine the recipe, entity, tech, and inventory graphs into a single graph.

    Args:
    - recipe_graph (NetworkX graph): The graph representing the recipes.
    - entity_graph (NetworkX graph): The graph representing the entities.
    - tech_graph (NetworkX graph): The graph representing the technologies.
    - inventory_graph (NetworkX graph): The graph representing the inventory contents.

    Returns:
    - NetworkX graph: The combined graph.
    """
    # Combine the graphs into a single graph
    graph = nx.DiGraph()
    graph.name = f"factorio-{read_timestamp}"
    
    # Add nodes from recipe_graph and inventory_graph to the meta_graph, prefixing IDs to maintain uniqueness
    for node, data in recipe_graph.nodes(data=True):
        graph.add_node(node, **data)
    for u, v, data in recipe_graph.edges(data=True):
        graph.add_edge(u, v, **data)  # Add edges representing ingredient/product relationships
    for node, data in inventory_graph.nodes(data=True):
        graph.add_node(node, **data)

    # Iterate over nodes to connect matching recipe and inventory nodes
    for recipe_node, recipe_data in recipe_graph.nodes(data=True):
        # Extract NAME part, assuming node ID format "rec-NAME"
        recipe_name = recipe_node.replace("rec-", "")
        for inventory_node, inventory_data in inventory_graph.nodes(data=True):
            # Extract NAME part, assuming node ID format "inv-NAME"
            inventory_name = inventory_node.replace("inv-", "")
            if recipe_name == inventory_name:
                # Add edge between matching nodes in the meta_graph
                graph.add_edge(recipe_node, inventory_node, weight=1, relationship="inventory")

    return graph
combined_graph = combine_graph(recipe_graph, None, None, inventory_graph)
print(combined_graph)

In [None]:
################  SAVE TO DATABASE   ################
# TODO: Need to have graphbase module to save graph to database
# import graphbase as gb 
# gb.insert_graph(combined_graph)

In [None]:
################  PREP GRAPH   ################
default_edge_color = 'gray'
ingredient_edge_color = 'red'
product_edge_color = 'green'
inventory_edge_color = 'blue'
entity_edge_color = 'yellow'
tech_edge_color = 'purple'

default_node_color = 'gray'
recipe_node_color = 'black'
inventory_node_color = 'blue'
entity_node_color = 'yellow'
tech_node_color = 'purple'

def prep_graph(graph):
    """
    Prepare the graph for visualization by setting edge colors.
    - Ingredient edges are colored red.
    - Product edges are colored green.
    - Inventory edges are colored blue.
    
    Parameters:
    - graph (networkx.Graph): The graph to prepare.
    
    Returns:
    - networkx.Graph: The graph with colored edges.
    """
    # Copy the graph to avoid mutating the original
    prepped_graph = graph.copy()
    
    # Iterate over the nodes to assign colors based on the node type
    for node, data in prepped_graph.nodes(data=True):
        nodetype = data.get('type', '')
        if nodetype == 'recipe':
            color = recipe_node_color
        elif nodetype == 'entity':
            color = entity_node_color
        elif nodetype == 'tech':
            color = tech_node_color
        elif nodetype == 'inventory':
            color = inventory_node_color
        else:
            color = default_node_color  # Default color for unspecified or unrecognized node types
        
        # Set the color attribute for the node
        prepped_graph.nodes[node]['color'] = color

    # Iterate over the edges to assign colors based on the relationship
    for u, v, data in prepped_graph.edges(data=True):
        relationship = data.get('relationship', '')
        if relationship == 'ingredient':
            color = ingredient_edge_color
        elif relationship == 'product':
            color = product_edge_color
        elif relationship == 'inventory':
            color = inventory_edge_color
        else:
            color = default_edge_color  # Default color for unspecified or unrecognized relationships
        
        # Set the color attribute for the edge
        prepped_graph[u][v]['color'] = color
    
    return prepped_graph

factorio_graph = prep_graph(combined_graph)

In [None]:
#   Choose the layout you want to visualize the graph

In [None]:
################  HIERARCHY LAYOUT   ################
from collections import deque

def assign_hierarchical_levels(graph):
    """
    Assigns a hierarchical level to each node in the graph based on its position in the production chain.
    """
    # Initialize all nodes with a high level; it will be adjusted to the correct level during processing
    level = {node: int('15') for node in graph.nodes()}
    for node in graph.nodes():
        if graph.in_degree(node) == 0:  # Find all nodes with no incoming edges (start of a chain)
            level[node] = 0  # Start of the hierarchy
            queue = deque([node])
            while queue:
                current_node = queue.popleft()
                for _, next_node in graph.out_edges(current_node):
                    if level[next_node] > level[current_node] + 1:
                        level[next_node] = level[current_node] + 1
                        queue.append(next_node)
    return level

def layout_graph_hierarchy_ordered(graph):
    """
    Layout the graph nodes based on their hierarchical level determined by their production chain.
    Nodes with more connections are placed near the top within each hierarchical column.
    """
    # First, determine the hierarchical levels of all nodes
    node_levels = assign_hierarchical_levels(graph)
    
    # Calculate the degree for each node (sum of in-degree and out-degree)
    node_degrees = {node: graph.in_degree(node) + graph.out_degree(node) for node in graph.nodes()}
    
    # Organize nodes by level, sorting by degree within each level
    levels = {}
    for node, level in node_levels.items():
        if level not in levels:
            levels[level] = []
        levels[level].append(node)
    
    # Sort nodes within each level by their degree, in descending order (more connections -> higher placement)
    for level in levels:
        levels[level].sort(key=lambda node: node_degrees[node], reverse=True)
    
    # Define starting positions and gaps
    x_start = -1000
    y_start = -1150
    x_gap = 200  # Horizontal gap between levels
    y_gap = 50   # Vertical gap within the same level
    
    # Position nodes based on their level and order within the level
    for level, nodes in levels.items():
        for i, node in enumerate(nodes):
            graph.nodes[node]['x'] = x_start + level * x_gap
            graph.nodes[node]['y'] = y_start + i * y_gap

    return graph

factorio_graph = layout_graph_hierarchy_ordered(factorio_graph)

In [None]:
################  SPLATTER CHART LAYOUT   ################ (not working quite right yet)
def layout_graph_new(graph):
    """
    Assigns x and y positions to graph nodes based on their roles and connectivity.
    Ingredients on the left, products on the right, and inventory items proportionally positioned in between.
    """
    x_offset_left = 50        # X position for ingredients (left)
    x_offset_right = 500      # X position for products (right)
    y_start = 50              # Starting Y position
    y_gap = 10                # Gap between nodes
    
    # Trackers for positioning
    y_positions_left = y_start
    y_positions_right = y_start
    inventory_positions = {}

    # Initialize positions for all nodes to center as a fallback
    for node in graph.nodes():
        graph.nodes[node]['x'] = (x_offset_left + x_offset_right) / 2
        graph.nodes[node]['y'] = y_start
        y_start += y_gap

    # Adjust positions based on edge relationships
    for u, v, data in graph.edges(data=True):
        relationship = data.get('relationship')

        if relationship == 'ingredient':
            # Position u as product (right), v as ingredient (left)
            graph.nodes[u]['x'] = x_offset_right
            graph.nodes[u]['y'] = y_positions_right
            y_positions_right += y_gap

            graph.nodes[v]['x'] = x_offset_left
            graph.nodes[v]['y'] = y_positions_left
            y_positions_left += y_gap
        elif relationship == 'product':
            # Position u as ingredient (left), v as product (right)
            graph.nodes[v]['x'] = x_offset_right
            graph.nodes[v]['y'] = y_positions_right
            y_positions_right += y_gap

            graph.nodes[u]['x'] = x_offset_left
            graph.nodes[u]['y'] = y_positions_left
            y_positions_left += y_gap

    # For inventory items, calculate the average position based on connections
    for node in graph.nodes():
        connected_nodes = list(graph.predecessors(node)) + list(graph.successors(node))
        if connected_nodes:  # If the node has connections, adjust its position
            avg_x = sum(graph.nodes[connected]['x'] for connected in connected_nodes) / len(connected_nodes)
            avg_y = sum(graph.nodes[connected]['y'] for connected in connected_nodes) / len(connected_nodes)
            graph.nodes[node]['x'] = avg_x
            graph.nodes[node]['y'] = avg_y
    
    return graph
factorio_graph = layout_graph_new(factorio_graph)

In [None]:
################   CIRCLE LAYOUT   ################
def layout_graph_circular(graph):
    """
    Positions nodes in a circle, which can be useful for emphasizing cycles or groups.
    """
    pos = nx.circular_layout(graph)
    for node, (x, y) in pos.items():
        graph.nodes[node]['x'] = x*950
        graph.nodes[node]['y'] = y*950
    return graph
factorio_graph = layout_graph_circular(factorio_graph)

In [None]:
################  SHELL LAYOUT   ################
def layout_graph_shell(graph):
    """
    Uses a shell layout to place nodes in concentric circles, each representing a different hierarchy level.
    """
    # Determine hierarchy levels if not already done
    node_levels = assign_hierarchical_levels(graph)
    max_level = round(max(node_levels.values()))
    shells = [[] for _ in range(max_level + 1)]
    for node, level in node_levels.items():
        shells[level].append(node)
    pos = nx.shell_layout(graph, shells)
    for node, (x, y) in pos.items():
        graph.nodes[node]['x'] = x*1200
        graph.nodes[node]['y'] = y*1200
    return graph
factorio_graph = layout_graph_shell(factorio_graph)

In [None]:
################  FORCE DIRECTED LAYOUT   ################
def layout_graph_force_directed(graph):
    """
    Applies a force-directed layout to naturally cluster connected nodes and spread out the network.
    """
    pos = nx.spring_layout(graph)
    for node, (x, y) in pos.items():
        graph.nodes[node]['x'] = x*1000
        graph.nodes[node]['y'] = y*1000
    return graph

factorio_graph = layout_graph_force_directed(factorio_graph)

In [None]:
################  SPECTRAL LAYOUT   ################
def layout_graph_spectral(graph):
    """
    Uses spectral layout to position nodes based on the graph's eigenvectors, often revealing community structures.
    """
    pos = nx.spectral_layout(graph)
    for node, (x, y) in pos.items():
        graph.nodes[node]['x'] = x*5000
        graph.nodes[node]['y'] = y*5000
    return graph

factorio_graph = layout_graph_spectral(factorio_graph)

In [None]:
#   Will visualize the layout you chose

In [None]:
################  VISUALIZE   ################

import gravis as gv
gv.d3(
    # general
    data=factorio_graph,
    graph_height=1000,
    details_height=800,
    show_details=False,
    show_details_toggle_button=False,
    show_menu=True,
    show_menu_toggle_button=True,

    # nodes
    show_node=True,
    node_size_factor=2,
    node_size_data_source='size',
    use_node_size_normalization=True,
    node_size_normalization_min=10.0,
    node_size_normalization_max=77.0,
    node_drag_fix=False, 
    node_hover_neighborhood=True,
    node_hover_tooltip=True,
    show_node_image=True,
    node_image_size_factor=1.6,
    
    # node lables
    show_node_label=True,
    show_node_label_border=True,
    node_label_data_source='id',
    node_label_size_factor=0.8,
    node_label_rotation=0,
    node_label_font='Arial',

    # edges
    show_edge=True,
    edge_size_factor=1.0,
    edge_size_data_source='size',
    use_edge_size_normalization=True,
    edge_size_normalization_min=0.1,
    edge_size_normalization_max=5.0,
    edge_curvature=0.0,
    edge_hover_tooltip=True,
    
    # edge lables
    show_edge_label=False,
    show_edge_label_border=False,
    edge_label_data_source='id',
    edge_label_size_factor=1.0,
    edge_label_rotation=0,
    edge_label_font='Arial',
    zoom_factor=0.4,
    large_graph_threshold=500,
    layout_algorithm_active=True,

    # specific for d3
    use_many_body_force=True,
    many_body_force_strength=- 1200.0,
    many_body_force_theta=0.35,
    use_many_body_force_min_distance=True,
    many_body_force_min_distance=1450.0,
    use_many_body_force_max_distance=True,
    many_body_force_max_distance=4100.0,
    use_links_force=True,
    links_force_distance=210.0,
    links_force_strength=0.8,
    use_collision_force=True,
    collision_force_radius=25.0,
    collision_force_strength=0.9,
    use_x_positioning_force=False,
    x_positioning_force_strength=0.10,
    use_y_positioning_force=False,
    y_positioning_force_strength=0.10,
    use_centering_force=True,
)