In [None]:
import numpy as np
import pandas as pd
from causallearn.search.ConstraintBased.PC import pc
from causallearn.utils.GraphUtils import GraphUtils
from causallearn.search.ConstraintBased.FCI import fci
from causallearn.utils.cit import fisherz
from causallearn.search.ScoreBased.GES import ges
from causallearn.search.FCMBased import lingam
from causallearn.search.FCMBased.lingam.utils import make_dot
from dowhy import CausalModel
import matplotlib.pyplot as plt
import io
import matplotlib.image as mpimg
import pydot
import networkx as nx

In [None]:
file_path ='data/student-por_raw.csv'
df = pd.read_csv(file_path)

In [None]:
df

In [None]:
df['G_avg'] = df[['G1', 'G2', 'G3']].mean(axis=1)

In [None]:
df_filtered = df.copy()
columns_to_keep = ['absences', 'failures', 'internet', 'higher', 'Medu', 'health', 'famsup', 'Pstatus', 'famrel', 'schoolsup', 'G_avg', 'paid', 'studytime']
df_filtered = df_filtered[columns_to_keep]
df_encoded = pd.get_dummies(df_filtered, columns=['internet', 'higher', 'famsup', 'paid'], drop_first=True)

In [None]:
ordinal_map = {'Pstatus': {'A': 1, 'T': 2},
               'famrel': {1: 1, 2: 2, 3: 3, 4: 4, 5: 5},  # Assuming 1-5 scale
               'schoolsup': {'no': 0, 'yes': 1},
               'health': {1: 1, 2: 2, 3: 3, 4: 4, 5: 5}}  # Assuming 1-5 scale

for col, mapping in ordinal_map.items():
    df_encoded[col] = df_encoded[col].map(mapping)
    
df_encoded = df_encoded * 1

In [None]:
labels = [f'{col}' for col in df_encoded.columns]
encoded_feature_names = df_encoded.columns.tolist()
data = df_encoded.to_numpy()

In [None]:
def plot_and_save_graph(graph, labels, filename):
    pyd = GraphUtils.to_pydot(graph, labels=labels)
    img = mpimg.imread(io.BytesIO(pyd.create_png(f="png")), format='png')
    plt.axis('off')
    plt.imshow(img)
    plt.savefig(filename, format='png')
    plt.show()

In [None]:
cg_pc = pc(data, alpha=0.1, labels=df_encoded.columns.tolist())
plot_and_save_graph(cg_pc.G, labels, 'pc_graph.png')

In [None]:
PC_G = nx.DiGraph()
for node in cg_pc.G.nodes:
    PC_G.add_node(node.get_name())

for i in range(len(cg_pc.G.graph)):
    for j in range(len(cg_pc.G.graph)):
        if cg_pc.G.graph[i, j] > 0:  # Check for a directed edge
            PC_G.add_edge(cg_pc.G.nodes[i].get_name(), cg_pc.G.nodes[j].get_name())

In [None]:
label_to_feature_map = dict(zip(labels, df_encoded.columns))
nx.set_node_attributes(PC_G, label_to_feature_map, 'feature_name')

In [None]:
PC_G = nx.relabel_nodes(PC_G, label_to_feature_map)
print("Node labels in the graph:", PC_G.nodes())

In [None]:
node_names = ['X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10', 'X11', 'X12', 'X13']
feature_names = ['absences', 'failures', 'Medu', 'health', 'Pstatus', 'famrel',
       'schoolsup', 'G_avg', 'studytime', 'internet_yes', 'higher_yes',
       'famsup_yes', 'paid_yes']

node_to_feature_mapping = {node: feature for node, feature in zip(node_names, feature_names)}

In [None]:
model = CausalModel(data=df_encoded, treatment='X1', outcome='X9', graph=PC_G)

identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
print(identified_estimand)

estimate = model.estimate_effect(
        identified_estimand,
        method_name="backdoor.linear_regression",
        control_value=0,
        treatment_value=1,
        confidence_intervals=True,
        test_significance=True
    )

print("Causal Estimate is " + str(estimate.value))

In [None]:
cg_fci = fci(data, alpha=0.1, labels=df_encoded.columns.tolist(), indep_test=fisherz, stable=True, uc_rule=0)
plot_and_save_graph(cg_fci[0], labels, 'fci_graph.png')

In [None]:
fci_graph = cg_fci[0] 

# Convert to networkx graph 
G_fci = nx.DiGraph()
# Add all nodes from the causallearn graph
for node in fci_graph.nodes:
    G_fci.add_node(node.get_name())

# Add edges from the causallearn graph
for i in range(len(fci_graph.graph)):
    for j in range(len(fci_graph.graph)):
        if fci_graph.graph[i, j] > 0:  # Check for a directed edge
            G_fci.add_edge(fci_graph.nodes[i].get_name(), fci_graph.nodes[j].get_name())

In [None]:
model = CausalModel(data=df_encoded, treatment='X1', outcome='X9', graph=G_fci)

identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
print(identified_estimand)

estimate = model.estimate_effect(
        identified_estimand,
        method_name="backdoor.linear_regression",
        control_value=0,
        treatment_value=1,
        confidence_intervals=True,
        test_significance=True
    )

print("Causal Estimate is " + str(estimate.value))

In [None]:
record_ges = ges(data)
plot_and_save_graph(record_ges['G'], labels, 'ges_graph.png')

In [None]:
ges_graph = record_ges['G'] 

# Convert to networkx graph 
G_ges = nx.DiGraph()
# Add all nodes from the causallearn graph
for node in ges_graph.nodes:
    G_ges.add_node(node.get_name())

# Add edges from the causallearn graph
for i in range(len(ges_graph.graph)):
    for j in range(len(ges_graph.graph)):
        if ges_graph.graph[i, j] > 0:  # Check for a directed edge
            G_ges.add_edge(ges_graph.nodes[i].get_name(), ges_graph.nodes[j].get_name())

In [None]:
model = CausalModel(data=df_data, treatment='X1', outcome='X9', graph=G_ges)

identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
print(identified_estimand)

estimate = model.estimate_effect(
        identified_estimand,
        method_name="backdoor.linear_regression",
        control_value=0,
        treatment_value=1,
        confidence_intervals=True,
        test_significance=True
    )

print("Causal Estimate is " + str(estimate.value))

In [None]:
model_lingam = lingam.ICALiNGAM()
model_lingam.fit(data)
digraph = make_dot(model_lingam.adjacency_matrix_, labels=labels)
graph_dot_string = digraph.source
digraph.render('lingam_graph', format='png')

In [None]:
# Create pydot graph object (This is now a Digraph object)
digraph = make_dot(model_lingam.adjacency_matrix_, labels=labels)

# Get DOT string representation from Digraph object
graph_dot_string = digraph.source 

In [None]:
df_data = pd.DataFrame(data, columns=labels)
model = CausalModel(data=df_data, treatment='absences', outcome='G_avg', graph=graph_dot_string)

identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
print(identified_estimand)

estimate = model.estimate_effect(identified_estimand,
                                 method_name="backdoor.linear_regression",
                                 control_value=0,
                                 treatment_value=1,
                                 confidence_intervals=True,
                                 test_significance=True)
print("Causal Estimate is " + str(estimate.value))

Example of an Evaluation Function
The evaluation function should be tailored to your specific needs, but a generic approach could involve comparing the inferred graph against a known ground truth or evaluating how well the causal model explains the data.

In [None]:
# def some_evaluation_function(graph):
    # Placeholder for an actual evaluation function
    # Example: compare against ground truth or use a scoring metric
    # return np.random.rand()  # Replace with actual evaluation logic

In [None]:
G_true = nx.DiGraph()

nodes = {
    "G_avg": {"pos": "0.176, -0.450"},
    "Medu": {"pos": "-0.630,0.589"},
    "Pstatus": {"pos": "-1.493,-0.841"},
    "absences": {"pos": "-1.468,0.878"},
    "address": {"pos": "-1.971,1.405"},
    "failures": {"pos": "1.150,1.680"},
    "famrel": {"pos": "-2.149,-1.148"},
    "famsup": {"pos": "-1.049,-0.302"},
    "health": {"pos": "-0.688,0.017"},
    "higher": {"pos": "0.155,0.551"},
    "internet": {"pos": "0.647,1.138"},
    "paid": {"pos": "-0.463,-1.386"},
    "schoolsup": {"pos": "1.035,0.158"},
    "studytime": {"pos": "0.302,-1.504"}
}

for node, pos in nodes.items():
    G_true.add_node(node, pos=pos)

G_true.add_edges_from([
    ("Medu", "G_avg"),
    ("Medu", "absences"),
    ("Medu", "higher"),
    ("Pstatus", "G_avg"),
    ("Pstatus", "absences"),
    ("Pstatus", "famrel"),
    ("address", "absences"),
    ("failures", "G_avg"),
    ("failures", "absences"),
    ("famsup", "G_avg"),
    ("famsup", "absences"),
    ("health", "G_avg"),
    ("health", "absences"),
    ("higher", "G_avg"),
    ("internet", "G_avg"),
    ("internet", "absences"),
    ("paid", "G_avg"),
    ("schoolsup", "G_avg"),
    ("studytime", "G_avg")
])

In [None]:
pos = nx.spring_layout(G_true)
nx.draw(G_true, pos, with_labels=True, node_size=2000, node_color='skyblue', font_size=10, font_color='black', font_weight='bold')
plt.show()

def evaluate_graph(inferred_graph, ground_truth_graph):
    # Convert graphs to adjacency matrices
    inferred_adj = nx.adjacency_matrix(inferred_graph).todense()
    ground_truth_adj = nx.adjacency_matrix(ground_truth_graph).todense()
    
    # Calculate evaluation metrics (precision, recall, F1-score)
    tp = np.sum(np.logical_and(inferred_adj == 1, ground_truth_adj == 1))
    fp = np.sum(np.logical_and(inferred_adj == 1, ground_truth_adj == 0))
    fn = np.sum(np.logical_and(inferred_adj == 0, ground_truth_adj == 1))
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision * recall) if (precision + recall) > 0 else 0
    
    return precision, recall, f1_score

# Example usage
precision, recall, f1_score = evaluate_graph(inferred_graph, G)
print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1_score}")

alphas = [0.01, 0.05, 0.1]
stables = [True, False]
uc_rules = [0, 1]

best_cg_fci = None
best_score = float('inf')

for alpha in alphas:
    for stable in stables:
        for uc_rule in uc_rules:
            cg_fci = fci(data, alpha=alpha, indep_test=fisherz, stable=stable, uc_rule=uc_rule)
            score = some_evaluation_function(cg_fci[0])  # Define your evaluation function
            if score < best_score:
                best_score = score
                best_cg_fci = cg_fci

# Use best_cg_fci
plot_and_save_graph(best_cg_fci[0], labels, 'fci_graph_best.png')

best_cg_pc = None
best_f1_score = float('-inf')

for alpha in alphas:
    cg_pc = pc(data, alpha=alpha)
    precision, recall, f1_score = evaluate_graph(cg_pc.G, G)
    if f1_score > best_f1_score:
        best_f1_score = f1_score
        best_cg_pc = cg_pc

# Use best_cg_pc
plot_and_save_graph(best_cg_pc.G, labels, 'pc_graph_best.png')