<a href="https://colab.research.google.com/github/siyonabehera/DS-340W/blob/main/340w.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Data Generation
##Details:
One row ~ one project

Features:

- `n_tasks` -> number of activities (project size)
- `n_edges` -> number of precedence links
- `network_density` -> how interconnected the tasks are {`n_edges`/ `n_tasks`^2}
- `T_baseline` -> total duration from CPM using most-likely (m)
- `cp_len` -> number of tasks on the critical path
- `pct_critical_tasks` -> proportion of critical tasks {`cp_len`/`n_tasks`}
- `avg_task_duration` -> mean(m) across all tasks
- `variability` -> average uncertainty width {mean(p-o)}
- `spi_early` -> {EV/PV} at 20% progress
- `cpi_early` -> {EV/AC} at 20% progress
- `instability` -> {std(T)} or variance of project durations across MC runs
- `risk_exposure` -> fraction of MC runs exceeding {baseline*1.025}
- `buffer_factor` -> buffer applied to baseline duration for 'on-time' threshold
- `p_late_diag` -> sanity check

Label:
`label_delay` = 1 if p_late > 1.025, 0 otherwise

##Steps:
1. Generate DAGs randomly (at least for now, later can use)
2. Generate PERT triplets
3. Run CPM logic (forward/backward passes)
4. Baseline definition
5. EVM snapshot
6. MC labels
7. Feature engineering
8. Write the csv


In [None]:
# front matter
import math, random, statistics
from dataclasses import dataclass
from typing import List, Tuple, Dict
import csv

RNG = random.Random(42)          # answer to ultimate question of life, the universe, and everything :)


In [None]:
# step 1: generate the DAGs-> no need for it to be connected (having dependencies per task is not necessary)
@dataclass
class ProjectSpec:
    n_tasks: int
    p_edge: float                # edge probability-> how likely is it that an edge exists from i to j
    buffer_factor: float = 1.15  # can vary as needed -- why 1.15 tho-- explicitly mention in report/writeup

def generate_dag(n: int, p_edge: float):
  preds = [[] for i in range(n)]        # empty list for storing predecessors for each n
  edges = []                            # stores (i,j) pairs

  # build edges with p_edge from i to j (ensure acyclic nature)
  for i in range(n):
    for j in range(i+1, n):
      if RNG.random() < p_edge:
        edges.append((i, j))
        preds[j].append(i)
  return preds, edges

# sanity check
def is_acyclic(edges):
    return all(i < j for i, j in edges)

# topological sort
def topo_sort(n: int, preds: List[List[int]]):
    # by construction, 0,...,n-1 is already topologically sorted
    return list(range(n))

In [None]:
# mini test for checking DAG generation
preds, edges = generate_dag(8, 0.2)
print(len(edges), "edges; acyclic:", is_acyclic(edges))
print("example", preds[:])

9 edges; acyclic: True
example [[], [], [0, 1], [2], [1], [3], [4], [1, 5, 6]]


In [None]:
# step 2: assign pert triplets with rules: m ~ Uniform(3,10); w ~ Uniform(1,5); o = max(0.5, m - 0.6*w); p = m + 0.4*w; o < m < p
@dataclass
class Pert:
  o: float
  m: float
  p: float
def assign_triplets(n_tasks: int):
  triplet = []
  for i in range(n_tasks):
    w = RNG.uniform(3,9)                    # uncertainty width for a task
    m = RNG.uniform(3,10)                   # most like duration: 3-10 is plausible range for moderate tasks
    o = max(0.5, m - 0.4*w)
    p = m + 0.6*w                           # pessimistic-leaning
    assert o <= m <= p and (p - o) > 1e-9   # sanity check
    triplet.append(Pert(o,m,p))
  return triplet

In [None]:
# mini test for checking triplets
pert = assign_triplets(8)
print(pert[:3])
# spread should roughly scale with w → (p - o) ≈ (0.4+0.6)*w = 1.0*w (except clamped at 0.5)
avg_range = sum(t.p - t.o for t in pert) / len(pert)
print("avg (p-o):", round(avg_range, 3))

[Pert(o=3.9920957403343333, m=7.226082219568237, p=12.077061938419092), Pert(o=4.971014650998212, m=8.108122506856725, p=12.813784290644495), Pert(o=7.324862928364313, m=9.811810347855594, p=13.542231477092518)]
avg (p-o): 6.886


In [None]:
# cpm: forward pass + backtrack; (ES, EF, T, cp) -> (earliest start, earliest finish, total duration, critical path)
def cpm_early(preds: List[List[int]], durations: List[float], eps = 1e-9):
  n = len(durations)
  es = [0.0]*n
  ef = [0.0]*n
  order = topo_sort(n, preds)
  for i in order:
    if preds[i]:
      es[i] =  max(ef[j] for j in preds[i]) if preds else 0.0
    ef[i] = es[i] + durations[i]
  T = max(ef)
  # backtrack to get critical path
  cp = []
  i = ef.index(max(ef))                                      # position of largest EF (sink)
  while True:
    cp.append(i)                                             # since it is the last task, it definitely on the critical path
    if not preds[i]:
      break
    options = [j for j in preds[i] if abs(ef[i]-ef[j]) < eps]
    if options:
      best_j = max(options, key= lambda j: ef[j])            # for multiple options, pick the one with the longest ef
    else:
      best_j = max(preds[i], key=lambda j: ef[j])            # fallback: pick pred with latest EF
    i = best_j
  cp.reverse()
  return es, ef, T, cp

In [None]:
# mini test for checking cp
# A simple linear chain: 0 → 1 → 2
preds = [[], [0], [1]]
durations = [3, 5, 2]
es, ef, T, cp = cpm_early(preds, durations)
print("ES:", es, "\nEF:", ef, "\nT:", T, "\nCP:", cp)  # expect [0,3,8], [3,8,10], 10, [0,1,2]

# diamond 0→1→3 and 0→2→3 (1-branch longer)
preds = [[], [0], [0], [1,2]]
d = [2,5,4,3]
es, ef, T, cp = cpm_early(preds, d)
print("\nES:", es, "\nEF:", ef, "\nT:", T, "\nCP:", cp)  # expect ~[0,2,2,7], [2,7,6,10], 10, [0,1,3]

ES: [0.0, 3.0, 8.0] 
EF: [3.0, 8.0, 10.0] 
T: 10.0 
CP: [0, 1, 2]

ES: [0.0, 2.0, 2.0, 7.0] 
EF: [2.0, 7.0, 6.0, 10.0] 
T: 10.0 
CP: [0, 1, 3]


In [None]:
# getting baseline ES, EF, T, and CP by running CPM on the most likely values (from the triplets)
def baseline_schedule(preds: List[List[int]], pert_triplets: List[Pert], buffer_factor= 1.03, eps = 1e-9):       # buffer factor of 2.5%-> strict, gives more variations in data
  # extract most likely durations from Pert objects
  m = [t.m for t in pert_triplets]
  # run CPM on m_i
  es_b, ef_b, T_b, cp = cpm_early(preds, m, eps)
  # get deadline = buffer * baseline total duration
  deadline = buffer_factor*T_b
  # building output dictionary
  out ={
      "baseline_T": T_b,
      "baseline_es": es_b,
      "baseline_ef": ef_b,
      "baseline_cp": cp,
      "deadline": deadline,
      "buffer_factor": buffer_factor
  }
  return out


In [None]:
# mini test for generating baseline schedule
preds, edges = generate_dag(8, 0.2)
triplets = assign_triplets(8)
base = baseline_schedule(preds, triplets, 1.15)
base

{'baseline_T': 17.538852528749246,
 'baseline_es': [0.0,
  5.202745165589345,
  0.0,
  5.202745165589345,
  6.211962968111791,
  0.0,
  11.051154227772338,
  10.972168473013994],
 'baseline_ef': [5.202745165589345,
  10.972168473013994,
  6.211962968111791,
  9.92913771944723,
  11.051154227772338,
  9.284760185217339,
  15.586399541873321,
  17.538852528749246],
 'baseline_cp': [0, 1, 7],
 'deadline': 20.16968040806163,
 'buffer_factor': 1.15}

In [None]:
# helper-> sample duration d ~ Triangular(o,m,p)
def sample_triangular(pert):
  return [RNG.triangular(x.o, x.m, x.p) for x in pert]

In [None]:
# Early EVM snapshot: getting performance indices CPI, SPI
def evm_snapshot(preds: List[List[int]], pert_triplets: List[Pert], ef_baseline: List[float], T_baseline: float, frac= 0.20, eps=1e-9):
  # early snapshot time
  t_early = T_baseline * frac
  t_early = max(t_early, min(ef_baseline) + 1e-6)  # ensure ≥ first planned finish
  # sample duration (single draw)
  d = sample_triangular(pert_triplets)
  # running cpm to get ef_real
  es_real, ef_real, T_real, cp_real = cpm_early(preds, d, eps)
  # defining aggregates pv, ev, ac-> planned value, earned value, actual cost
  pv = sum(t.m  for i, t in enumerate(pert_triplets) if ef_baseline[i] <= t_early)
  ev = sum(t.m  for i, t in enumerate(pert_triplets) if ef_real[i]     <= t_early)
  ac = sum(d[i] for i, _ in enumerate(pert_triplets) if ef_real[i]     <= t_early)
  # calculating performance indices spi, cpi-> schedule performance index, cost performance index
  spi = ev/max(pv, eps)
  cpi= ev/max(ac, eps)
  # building output dictionary
  out = {
        "t_early": t_early,
        "PV": pv, "EV": ev, "AC": ac,
        "SPI_early": spi, "CPI_early": cpi
        }
  return out


In [None]:
preds, edges = generate_dag(10, 0.18)
trip = assign_triplets(10)
base = baseline_schedule(preds, trip, 1.15)
snap = evm_snapshot(preds, trip, base["baseline_ef"], base["baseline_T"])
print("SPI: ", round(snap["SPI_early"],3), "\nCPI: ", round(snap["CPI_early"],3))

SPI:  1.0 
CPI:  0.867


In [None]:
# monte carlo sims
def monte_carlo_label(preds: List[List[int]], pert_triplets: List[Pert], deadline: float, K, eps=1e-9):
  # init counter
  count = 0
  # run simulation K times-> sample duration, run cpm to get total duration for each run
  for k in range(K):
    d = sample_triangular(pert_triplets)
    es_k, ef_k, T_k, cp_k = cpm_early(preds, d, eps)
    # if total time is greater than the deadline, increment counter
    if T_k > deadline:
      count += 1
  # p_late-> fraction of runs that finish late (empirical probability)
  p_late = count/K
  label = 1 if p_late >= 0.5 else 0      # label as delayed if p_late is greater than 0.5

  return p_late, label

In [None]:
preds, edges = generate_dag(10, 0.26)
trip = assign_triplets(10)
base = baseline_schedule(preds, trip, buffer_factor=1.03)


p_late, label = monte_carlo_label(preds, trip, base["deadline"], K=200)
print("p_late:", round(p_late,3), "label:", label)

p_late: 0.52 label: 1


In [None]:
# putting it all together-> features for one row of the file
def project_features(preds, edges, pert_triplets, base, snap, p_late, label):
  """
  Inputs:
    preds, edges            # structure
    pert_triplets           # PERT triplets
    base                    # dict from baseline_schedule
    snap                    # dict from evm_snapshot
    p_late, label           # from monte_carlo_label

  """

  # structure:
  n_tasks = len(preds)
  n_edges = len(edges)
  max_edges = n_tasks * (n_tasks - 1) / 2
  density   = n_edges / max(max_edges, 1)
  cp = base["baseline_cp"]
  cp_len = len(cp)
  pct_critical_tasks = cp_len/max(n_tasks, 1)
  T_baseline = base["baseline_T"]

  # uncertainty:
  m_list = [t.m for t in pert_triplets]
  ranges = [t.p - t.o for t in pert_triplets]
  avg_task_duration = statistics.fmean(m_list)
  variability = statistics.fmean(ranges)

  # performance/early health:
  spi_early = snap["SPI_early"]
  cpi_early = snap["CPI_early"]
  instability = statistics.stdev(m_list)
  risk_exposure = p_late

  # policy knobs:
  buffer_factor = base["buffer_factor"]
  p_late_diag = p_late

  # building dict:
  row = {
        # structure
        "n_tasks": n_tasks,
        "n_edges": n_edges,
        "density": density,
        "critical_path_len": cp_len,
        "pct_critical_tasks": pct_critical_tasks,
        "T_baseline": T_baseline,

        # uncertainty
        "mean_m": avg_task_duration,
        "mean_range_po": variability,
        "instability_m": instability,   # optional

        # early health
        "spi_early": spi_early,
        "cpi_early": cpi_early,

        # policy
        "buffer_factor": buffer_factor,

        # labels/diagnostics
        "label_delay": label,
        "p_late_diag": p_late,
    }
  return row

In [None]:
# building full dataset
DEFAULT_P_EDGE_RANGE = (0.24, 0.26)
DEFAULT_BUFFER = 1.025
DEFAULT_K = 200
def synth_data():

SyntaxError: expected ':' (ipython-input-34407506.py, line 5)