In [None]:
repository_filter: list[str] = []
count_threshold = "0"

In [None]:
from code_data_science import data_table as dt
import warnings

warnings.simplefilter("ignore")

df = dt.read_csv("../samples/composite_recipe_results_sankey.csv")

df["repositoryWithBranch"] = df["repositoryPath"] + "/" + df["repositoryBranch"]
# Filter the data frame to only include rows where repositoryWithBranch contain
# a term in the repository_filter (case insensitive)
if len(repository_filter) > 0:
    df = df[
        df["repositoryWithBranch"].str.contains("|".join(repository_filter), case=False)
    ]

In [None]:
df = df[["parentRecipe", "recipe"]].assign(count=lambda r: 1)
recipe_counts = (
    df[["parentRecipe", "recipe", "count"]]
    .groupby(by=["parentRecipe", "recipe"])["count"]
    .count()
    .sort_values(ascending=False)
    .reset_index(name="count")
)
recipe_counts["parentRecipe"] = recipe_counts["parentRecipe"].transform(
    lambda s: s.split(".")[-1]
)
recipe_counts["recipe"] = recipe_counts["recipe"].transform(lambda s: s.split(".")[-1])

# sort recipe_counts by count
recipe_counts = recipe_counts.sort_values(by=["count"], ascending=False)

count_threshold_int = int(count_threshold)

if count_threshold_int > 0:
    recipe_counts = recipe_counts[recipe_counts["count"] > count_threshold_int]

In [None]:
import plotly.graph_objects as go

# Extract all unique nodes (parent and child recipes)
all_nodes = list(set(recipe_counts["parentRecipe"]).union(set(recipe_counts["recipe"])))

# Create node indices mapping for preparing for plotly
node_indices = {node: i for i, node in enumerate(all_nodes)}

# iterates over each row in recipe_counts and find the index of the 'parentRecipe' in node_indices and adds it to the source list.
source = [node_indices[row["parentRecipe"]] for _, row in recipe_counts.iterrows()]

# iterates over each row in recipe_counts and find the index of the 'recipe' in node_indices and adds it to the target list.
target = [node_indices[row["recipe"]] for _, row in recipe_counts.iterrows()]

# convert the 'count' column to a list
value = list(recipe_counts["count"])

# Format labels with node names and counts


def getCount(node):
    left = recipe_counts[recipe_counts["recipe"] == node]["count"].sum()
    if left == 0:
        return recipe_counts[recipe_counts["parentRecipe"] == node]["count"].sum()
    else:
        return left


formatted_labels = [f"{node} - {getCount(node)}" for node in all_nodes]
base_colors = [
    "#2f42ff",
    "#7e9bd2",
    "#27aa88",
    "#dcefde",
    "#84C9AA",
    "#ECB81F",
    "#F9A91B",
    "#DB4197",
    "#992FB9",
    "#D9D8E8",
    "#FF3232",
]

# create a color palette for the nodes
# important that the number of colors is equal to the number of nodes

colors = []
for i in range(len(all_nodes)):
    if i < len(base_colors):
        colors.append(base_colors[i])
    else:
        colors.append(base_colors[i % len(base_colors)])


# Create the Sankey diagram
fig = go.Figure(
    data=[
        go.Sankey(
            node=dict(
                pad=15,
                thickness=15,
                line=dict(color="black", width=0.5),
                label=formatted_labels,
                color=colors,
            ),
            link=dict(
                source=source,
                target=target,
                value=value,
            ),
        )
    ]
)

# Update layout
fig.update_layout(
    title_text="Recipes that made changes",
    font_size=10,
    height=max(len(all_nodes) * 15, 400),
)

fig.show()