In [None]:
import os
import ipywidgets as widgets
from IPython.display import display

# Folder where uploads will be saved
UPLOAD_DIR = "uploaded_files"
os.makedirs(UPLOAD_DIR, exist_ok=True)

upload_btn = widgets.FileUpload(accept='.txt', multiple=False)
output = widgets.Output()

def on_upload_change(change):
    with output:
        output.clear_output()
        if upload_btn.value:
            file_info = upload_btn.value[0]
            filename = file_info['name']
            content = file_info['content']
            file_path = os.path.join(UPLOAD_DIR, filename)
            with open(file_path, 'wb') as f:
                f.write(content)
            global uploaded_file_path
            uploaded_file_path = file_path
            print(f" File uploaded: {filename}")
            print(f" Saved at: {uploaded_file_path}")

upload_btn.observe(on_upload_change, names='value')
display(widgets.VBox([upload_btn, output]))


VBox(children=(FileUpload(value=(), accept='.txt', description='Upload'), Output()))

In [52]:
import hashlib

def verify_uploaded_file(file_path, uploaded_widget):
    uploaded_bytes = uploaded_widget.value[0]['content']
    with open(file_path, 'rb') as f:
        saved_bytes = f.read()
    if hashlib.sha256(uploaded_bytes).hexdigest() == hashlib.sha256(saved_bytes).hexdigest():
        print(f" Verified OK: {file_path}")
    else:
        print(" File mismatch!")

verify_uploaded_file(uploaded_file_path, upload_btn)


 Verified OK: uploaded_files\C432.txt


In [53]:
# D-Algebra PODEM 
import os, time
from collections import deque

VERBOSE = True   # Set False to reduce printouts

#locate netlist -> txt file needed.
if 'uploaded_file_path' in globals():
    NETLIST_PATH = uploaded_file_path
else:
    fallback = "uploaded_files/C17.txt" #-> default netlist
    if os.path.exists(fallback):
        NETLIST_PATH = fallback
    else:
        NETLIST_PATH = None

if NETLIST_PATH is None:
    raise FileNotFoundError("Netlist file not found....")


def parse_netlist_from_file(path):
    inputs, outputs, gates = [], [], {}
    with open(path, 'r') as f:
        for raw in f:
            line = raw.strip()
            if not line or line.startswith('#'): 
                continue
            if line.startswith("INPUT("):
                inputs.append(line.split("(",1)[1].split(")")[0].strip())
            elif line.startswith("OUTPUT("):
                outputs.append(line.split("(",1)[1].split(")")[0].strip())
            elif "=" in line:
                left, right = line.split("=",1)
                gate_name = left.strip()
                typ = right.strip().split("(",1)[0].strip().lower()
                args = right.strip().split("(",1)[1].rsplit(")",1)[0]
                ins = [a.strip() for a in args.split(",") if a.strip()!=""]
                gates[gate_name] = {"type": typ, "inputs": ins}
    return inputs, outputs, gates

# 5-D-algebra helpers (0,1,X,D,D_BAR)
## NOT gate
def logic_not(a):
    return {'0':'1','1':'0','X':'X','D':'D_BAR','D_BAR':'D'}.get(a,'X')

## AND gate
def logic_and_pair(a,b):
    if a=='0' or b=='0': return '0'
    if a=='1': return b
    if b=='1': return a
    if a=='X' or b=='X':
        # if one is X and other is D or D_BAR -> X 
        return 'X'
    if a==b: return a
    # D & D_BAR = 0
    if set([a,b])==set(['D','D_BAR']): return '0'
    return 'X'

## OR gate
def logic_or_pair(a,b):
    if a=='1' or b=='1': return '1'
    if a=='0': return b
    if b=='0': return a
    if a=='X' or b=='X': return 'X'
    if a==b: return a
    if set([a,b])==set(['D','D_BAR']): return '1'
    return 'X'

def eval_gate_D(gtype, ins_values):
    g = gtype.lower()
    if g in ('and','nand'):
        val = ins_values[0]
        for v in ins_values[1:]:
            val = logic_and_pair(val, v)
        return logic_not(val) if g=='nand' else val #if nand then invert final output
    if g in ('or','nor'):
        val = ins_values[0]
        for v in ins_values[1:]:
            val = logic_or_pair(val, v)
        return logic_not(val) if g=='nor' else val
    if g == 'not':
        return logic_not(ins_values[0])
    if g == 'buf':
        return ins_values[0]
    if g in ('xor'):
        # support 2-input xor well with D algebra
        if len(ins_values) != 2:
            return 'X'
        a,b = ins_values
        if 'X' in (a,b):
            return 'X'
        def map_pair(x):
            if x=='0': return (0,0)
            if x=='1': return (1,1)
            if x=='D': return (1,0)
            if x=='D_BAR': return (0,1)
            return (None,None)
        ag,af = map_pair(a); bg,bf = map_pair(b)
        if ag is None or bg is None: return 'X'
        good = ag ^ bg
        bad  = af ^ bf
        if good==bad:
            return '1' if good==1 else '0'
        if good==1 and bad==0: return 'D'
        if good==0 and bad==1: return 'D_BAR'
        return 'X'
    if g in ('xnor'):
        # support 2-input xor well with D algebra
        if len(ins_values) != 2:
            return 'X'
        a,b = ins_values
        if 'X' in (a,b):
            return 'X'
        def map_pair(x):
            if x=='0': return (0,0)
            if x=='1': return (1,1)
            if x=='D': return (1,0)
            if x=='D_BAR': return (0,1)
            return (None,None)
        ag,af = map_pair(a); bg,bf = map_pair(b)
        if ag is None or bg is None: return 'X'
        good = 1-(ag ^ bg)
        bad  = 1-(af ^ bf)
        if good==bad:
            return '1' if good==1 else '0'
        if good==1 and bad==0: return 'D'
        if good==0 and bad==1: return 'D_BAR'
        return 'X'
    return 'X'

# --- forward simulation with immediate fault injection and propagation ---
def simulate_with_fault(PIs, gates, assign, fault_node=None, fault_type=None):
    vals = {pi: assign.get(pi,'X') for pi in PIs}

    # inject PI fault
    if fault_node in PIs:
        good_val = vals[fault_node]
        if fault_type == 'sa0' and good_val == '1':
            vals[fault_node] = 'D'
        elif fault_type == 'sa1' and good_val == '0':
            vals[fault_node] = 'D_BAR'

    changed = True
    while changed:
        changed = False
        for g, info in gates.items():
            ins = [vals.get(inp, 'X') for inp in info['inputs']]
            outv = eval_gate_D(info['type'], ins)

            # inject fault for gate outputs (existing logic)
            if g == fault_node and fault_type is not None:
                good_val = outv
                if fault_type == 'sa0' and good_val == '1':
                    outv = 'D'
                elif fault_type == 'sa1' and good_val == '0':
                    outv = 'D_BAR'

            prev = vals.get(g, 'X')
            if prev != outv:
                vals[g] = outv
                changed = True

    return vals


# --- helpers to choose non-controlling input when backtracing ---
def non_controlling_value_for_gate(gtype):
    g = gtype.lower()
    # For AND/NAND non-controlling is '1' (i.e., to avoid driving output to 0)
    # For OR/NOR non-controlling is '0'
    if g in ('and','nand'): return '1'
    if g in ('or','nor'):   return '0'
    if g in ('not','buf','xor','xnor'): return None
    return None

# Backtrace: 
    '''
        - find primary input nodes with X values.
        - find primary input nodes with non controling values.
        - pick the first input node.
    '''
def backtrace_to_pi(node, desired_val, PIs, gates, current_vals):
    # If node is PI, done
    if node in PIs:
        return node, desired_val
    gate = gates[node]
    gtype = gate['type']
    inv = gtype in ('nand','nor','not')
    # choose an input to backtrace:
    # prefer inputs that are X or those that won't block propagation (non-controlling)
    pref = None
    # choose inputs that are X first
    for inp in gate['inputs']:
        if current_vals.get(inp,'X') == 'X':
            pref = inp; break
    if pref is None:
        # choose non-controlling input if any
        ncv = non_controlling_value_for_gate(gtype)
        if ncv is not None:
            for inp in gate['inputs']:
                if current_vals.get(inp,'X') != ncv:
                    pref = inp
                    break
    if pref is None:
        pref = gate['inputs'][0]
    next_val = logic_not(desired_val) if inv else desired_val
    return backtrace_to_pi(pref, next_val, PIs, gates, current_vals)

# --- Objective function: activation or propagation target ---
def objective(fault_node, fault_type, PIs, POs, gates, current_vals):
    # activation objective
    v_fault = current_vals.get(fault_node, 'X')
    if v_fault in ('X',) or v_fault not in ('D','D_BAR'):
        desired = '1' if fault_type == 'sa0' else '0'
        return fault_node, desired

    # propagation objective: choose a gate output (candidate) that is X and has D on some input
    for g, info in gates.items():
        outv = current_vals.get(g, 'X')
        if outv == 'X':
            in_vals = [ current_vals.get(inp,'X') for inp in info['inputs'] ]
            if any(iv in ('D','D_BAR') for iv in in_vals):
                # prefer gating where other inputs can be set to non-controlling
                return g, '1'  # desired value heuristic (chosen to allow backtrace)
    # as fallback, if a PO already shows D/D_BAR then objective satisfied
    for po in POs:
        if current_vals.get(po) in ('D','D_BAR'):
            return None, None
    return None, None

# --- implication helper (apply assignment + simulate) ---
def imply(PIs, gates, assign, fault_node, fault_type):
    return simulate_with_fault(PIs, gates, assign, fault_node, fault_type)

# --- PODEM recursive search ---
def podem(PIs, gates, POs, fault_node, fault_type, verbose=False, max_depth=200):
    # initial assign: all X
    init_assign = {pi:'X' for pi in PIs}
    visited = set()

    def assign_key(assign):
        return tuple((pi, assign[pi]) for pi in sorted(assign.keys()))

    def search(assign, depth=0):
        if depth > max_depth:
            return None
        vals = imply(PIs, gates, assign, fault_node, fault_type)
        # if verbose:
        #     print("  " * depth + "Sim vals:", vals)
        # check if fault propagated to any PO
        if any(vals.get(po) in ('D','D_BAR') for po in POs):
            return assign
        # if all PI assigned and no propagation -> fail
        if all(assign[pi] in ('0','1') for pi in PIs):
            return None

        key = assign_key(assign)
        if key in visited:
            return None
        visited.add(key)

        obj_node, obj_val = objective(fault_node, fault_type, PIs, POs, gates, vals)
        if obj_node is None:
            return None

        pi, pi_val = backtrace_to_pi(obj_node, obj_val, PIs, gates, vals)
        # if verbose:
        #     print("  " * depth + f"Objective: {obj_node}->{obj_val}, Backtrace: {pi}->{pi_val}")

        # If PI already assigned conflicting value, we should try the other value or backtrack
        if assign[pi] in ('0','1'):
            # nothing to try here - continue search deeper
            return search(assign, depth+1)

        # Try desired value first (heuristic), then opposite
        trial_order = [pi_val, '0' if pi_val=='1' else '1']
        for tv in trial_order:
            new_assign = assign.copy()
            new_assign[pi] = tv
            # if verbose:
            #     print("  " * depth + f"Try {pi}={tv}")
            res = search(new_assign, depth+1)
            if res is not None:
                return res
        return None

    return search(init_assign, depth=0)

# --- run PODEM for a single fault nicely ---
def run_podem_for_fault_true(netlist_path, fault_node, fault_type, verbose=VERBOSE):
    PIs, POs, GATES = parse_netlist_from_file(netlist_path)
    # if verbose:
        # print("PIs:", PIs)
        # print("POs:", POs)
        # print("Gates:", [(g,GATES[g]['type'],GATES[g]['inputs']) for g in GATES])
        # print(f"\nRunning PODEM for {fault_node} {fault_type}...\n")
    vec = podem(PIs, GATES, POs, fault_node, fault_type, verbose=verbose)
    if vec:
        # keep Xs as don't care in output
        print(f"\n Test cube for {fault_node} {fault_type}:")
        for pi in PIs:
            print(f"  {pi}: {vec.get(pi,'X')}")
        return True, vec
    else:
        print(f"\n Fault {fault_node} {fault_type} is untestable (or not found).")
        return False, None

# --- run all faults and produce coverage table ---
import pandas as pd
def run_all_faults_true(netlist_path, verbose=False, export_csv=True):
    PIs, POs, GATES = parse_netlist_from_file(netlist_path)
    results = []
    total = len(GATES) * 2
    detected = 0
    start = time.time()
    for g in GATES:
        for t in ('sa0','sa1'):
            if verbose:
                print(f"\n--- Fault: {g} {t} ---")
            ok, vec = run_podem_for_fault_true(netlist_path, g, t, verbose=False)
            if ok:
                detected += 1
                vec_str = " ".join(f"{pi}={vec.get(pi,'X')}" for pi in PIs)
            else:
                vec_str = "-"
            results.append({"Gate":g, "Fault_Type":t, "Detected":"Yes" if ok else "No", "Test_Cube":vec_str})
    elapsed = time.time() - start
    df = pd.DataFrame(results)
    coverage = (detected / total) * 100
    print(f"\nFault Coverage: {detected}/{total} = {coverage:.2f}%, Time: {elapsed:.2f}s")
    if export_csv:
        out = "podem_true_fault_coverage.csv"
        df.to_csv(out, index=False)
        print("Exported:", os.path.abspath(out))
    return df


print("Using netlist:", NETLIST_PATH)
PIs, POs, GATES = parse_netlist_from_file(NETLIST_PATH)
# print("PIs:", PIs)
# print("POs:", POs)
# print("Gates:", list(GATES.keys()))
# print("\nPODEM on a sample fault)
# _ok, _vec = run_podem_for_fault_true(NETLIST_PATH, "B", "sa1", verbose=VERBOSE)



Using netlist: uploaded_files\C432.txt


In [54]:
# for i in GATES.keys():
#     _ok, _vec = run_podem_for_fault_true(NETLIST_PATH, i, "sa0", verbose=VERBOSE)
#     _ok, _vec = run_podem_for_fault_true(NETLIST_PATH, i, "sa1", verbose=VERBOSE)

In [55]:
# GATES.keys()

In [56]:
import time
import pandas as pd

fault_results = []
total_start = time.time() 
PIs, POs, GATES = parse_netlist_from_file(uploaded_file_path)
fault_list = list(GATES.keys()) + list(PIs)

print("\n========== Running PODEM for All Faults ==========\n")

for gate_name in fault_list:

    start = time.time()
    ok0, vec0 = run_podem_for_fault_true(NETLIST_PATH, gate_name, "sa0", verbose=VERBOSE)
    end = time.time()
    t0 = round(end - start, 6)

    if ok0 and vec0:
        vec_str0 = " ".join([f"{pi}={vec0.get(pi, 'X')}" for pi in PIs])
    else:
        vec_str0 = "-"

    fault_results.append({
        "Fault_Node": gate_name,
        "Fault_Type": "sa0",
        "Detected": "YES" if ok0 else "NO",
        "Test_Cube": vec_str0,
        "Solve_Time_s": t0
    })

    start = time.time()
    ok1, vec1 = run_podem_for_fault_true(NETLIST_PATH, gate_name, "sa1", verbose=VERBOSE)
    end = time.time()
    t1 = round(end - start, 6)

    if ok1 and vec1:
        vec_str1 = " ".join([f"{pi}={vec1.get(pi, 'X')}" for pi in PIs])
    else:
        vec_str1 = "-"

    fault_results.append({
        "Fault_Node": gate_name,
        "Fault_Type": "sa1",
        "Detected": "YES" if ok1 else "NO",
        "Test_Cube": vec_str1,
        "Solve_Time_s": t1
    })

total_end = time.time()
total_time = round(total_end - total_start, 4)

df = pd.DataFrame(fault_results)

print("\n========== PODEM FAULT SUMMARY ==========")
print(df)

total_faults = len(df)
detected = df["Detected"].value_counts().get("YES", 0)
coverage = (detected / total_faults) * 100

print("\n------------ SUMMARY -------------")
print(f"Total Faults     : {total_faults}")
print(f"Detected Faults  : {detected}")
print(f"Undetected Faults: {total_faults - detected}")
print(f"Fault Coverage   : {coverage:.2f}%")
print(f"Total ATPG Time  : {total_time} seconds")
print("-----------------------------------")

net_name = os.path.splitext(os.path.basename(NETLIST_PATH))[0]
csv_file = f"{net_name}_podem_results.csv"
df.to_csv(csv_file, index=False)

print(f"\nResults saved to: {csv_file}")





 Test cube for G118gat sa0:
  G1gat: 0
  G4gat: 1
  G8gat: X
  G11gat: 1
  G14gat: X
  G17gat: X
  G21gat: X
  G24gat: 1
  G27gat: X
  G30gat: X
  G34gat: X
  G37gat: 1
  G40gat: X
  G43gat: X
  G47gat: X
  G50gat: 1
  G53gat: X
  G56gat: X
  G60gat: X
  G63gat: 1
  G66gat: X
  G69gat: X
  G73gat: X
  G76gat: 1
  G79gat: X
  G82gat: X
  G86gat: X
  G89gat: 1
  G92gat: X
  G95gat: X
  G99gat: X
  G102gat: 1
  G105gat: X
  G108gat: X
  G112gat: X
  G115gat: X

 Test cube for G118gat sa1:
  G1gat: 1
  G4gat: 1
  G8gat: X
  G11gat: 1
  G14gat: X
  G17gat: X
  G21gat: X
  G24gat: 1
  G27gat: X
  G30gat: X
  G34gat: X
  G37gat: 1
  G40gat: X
  G43gat: X
  G47gat: X
  G50gat: 1
  G53gat: X
  G56gat: X
  G60gat: X
  G63gat: 1
  G66gat: X
  G69gat: X
  G73gat: X
  G76gat: 1
  G79gat: X
  G82gat: X
  G86gat: X
  G89gat: 1
  G92gat: X
  G95gat: X
  G99gat: X
  G102gat: 1
  G105gat: X
  G108gat: X
  G112gat: X
  G115gat: X

 Test cube for G119gat sa0:
  G1gat: X
  G4gat: 0
  G8gat: 0
  G11gat: 

In [24]:
df = pd.read_csv("ex1_podem_results.csv")

In [25]:
df.head(100)

Unnamed: 0,Fault_Node,Fault_Type,Detected,Test_Cube,Solve_Time_s
0,e,sa0,YES,A=1 B=1 C=0 D=1,0.001
1,e,sa1,YES,A=1 B=0 C=0 D=1,0.000514
2,f,sa0,YES,A=1 B=X C=1 D=1,0.00051
3,f,sa1,YES,A=1 B=0 C=0 D=1,0.0
4,g,sa0,NO,-,0.00051
5,g,sa1,YES,A=1 B=X C=1 D=0,0.0
6,h,sa0,YES,A=1 B=X C=1 D=1,0.0
7,h,sa1,YES,A=0 B=X C=0 D=X,0.001012
8,k,sa0,YES,A=1 B=1 C=0 D=1,0.0
9,k,sa1,YES,A=1 B=X C=1 D=0,0.001037
