<a href="https://colab.research.google.com/github/jeancosilva-ops/10-DLA/blob/main/Motor_Diagnostico_Cronograma.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [25]:
from google.colab import files

uploaded = files.upload()

import pandas as pd
import networkx as nx # Moved import here

df = pd.read_csv("Teste 1.csv", encoding='latin-1', sep=';')
df.head()

df.columns = df.columns.str.strip()

df.rename(columns={
    'Id': 'TaskID',
    'Nome': 'TaskName',
    'Início': 'Start',
    'Término': 'Finish',
    'Duração': 'Duration',
    'Início LB': 'BaselineFinish', # Assuming 'LB' means 'Baseline'
    'Predecessoras': 'Predecessors'
}, inplace=True)

required = [
    "TaskID","TaskName","Start","Finish",
    "Duration","BaselineFinish","Predecessors"
]

missing = [c for c in required if c not in df.columns]
if missing:
    raise Exception(f"Colunas faltando: {missing}")

# Function to parse duration strings to a numerical format (e.g., days)
def parse_duration_to_days(duration_str):
    if pd.isna(duration_str) or str(duration_str).strip() == '0 dias':
        return 0

    duration_str = str(duration_str).lower().replace(',', '.')

    if 'dia' in duration_str:
        return float(duration_str.replace('dias', '').replace('dia', '').strip())
    elif 'sem' in duration_str:
        num = float(duration_str.replace('sems', '').replace('sem', '').strip())
        return num * 7 # Assuming 1 sem = 7 days
    elif 'hr' in duration_str:
        num = float(duration_str.replace('hrs', '').replace('hr', '').strip())
        return num / 24.0 # Convert hours to days
    # Default to float if no unit specified, or handle other units as needed
    try:
        return float(duration_str)
    except ValueError:
        print(f"Warning: Could not parse duration string '{duration_str}'. Returning 0.")
        return 0

df['Duration_Days'] = df['Duration'].apply(parse_duration_to_days)

G = nx.DiGraph()

for _, row in df.iterrows():
    G.add_node(row["TaskID"], duration=row["Duration_Days"])

    # The following block was previously part of a redundant nested loop
    # and had an indentation error. Now it's correctly placed and indented
    # under the single loop.
    if pd.notna(row["Predecessors"]):
        preds = str(row["Predecessors"]).split(",") # Ensure 'Predecessors' is string before splitting
        for p in preds:
            cleaned_p = p.replace("FS","",).strip()
            try:
                pred_id = int(cleaned_p) # Ensure pred_id is int for consistency
                G.add_edge(pred_id, row["TaskID"])
            except ValueError:
                print(f"Warning: Could not convert predecessor ID '{cleaned_p}' to int for TaskID '{row['TaskID']}'. Skipping this predecessor.")
ES, EF = {}, {}

for node in nx.topological_sort(G):
    preds = list(G.predecessors(node))
    ES[node] = max([EF[p] for p in preds], default=0)
    EF[node] = ES[node] + G.nodes[node]["duration"]

# Calculate LF and LS
project_finish_time = max(EF.values())
LF, LS = {}, {}
for node in reversed(list(nx.topological_sort(G))):
    succs = list(G.successors(node))
    if not succs:
        LF[node] = project_finish_time
    else:
        LF[node] = min([LS[s] for s in succs])
    LS[node] = LF[node] - G.nodes[node]["duration"]

df["ES"] = df["TaskID"].map(ES)
df["EF"] = df["TaskID"].map(EF)
df["LS"] = df["TaskID"].map(LS)
df["LF"] = df["TaskID"].map(LF)

df["TotalFloat"] = df["LS"] - df["ES"]
df["IsCritical"] = df["TotalFloat"] <= 0

df["DelayDays"] = (
    pd.to_datetime(df["Finish"], dayfirst=True) -
    pd.to_datetime(df["BaselineFinish"], dayfirst=True)
).dt.days

df["IsLate"] = df["DelayDays"] > 0

def risk_score(row):
    score = 0
    if row["TotalFloat"] <= 5: score += 40
    if row["DelayDays"] > 0: score += 30
    if row["Duration_Days"] > 20: score += 20 # Changed from 'Duration' to 'Duration_Days'
    if row["IsCritical"]: score += 30
    return min(score, 100)

df["RiskScore"] = df.apply(risk_score, axis=1)

import json
from datetime import datetime

output = {
    "schema": {
        "name": "schedule_analyzer_output",
        "version": "1.0",
        "generated_at": datetime.now().isoformat()
    },
    "kpis": {
        "tasks_total": len(df),
        "critical_tasks_count": int(df["IsCritical"].sum()),
        "late_tasks_count": int(df["IsLate"].sum())
    },
    "lists": {
        "critical_tasks": df[df["IsCritical"]][
            ["TaskID","TaskName","TotalFloat","Duration"]
        ].to_dict(orient="records"),
        "late_tasks": df[df["IsLate"]][
            ["TaskID","TaskName","DelayDays"]
        ].to_dict(orient="records"),
        "risk_forecast": df[df["RiskScore"] >= 60][
            ["TaskID","TaskName","RiskScore"]
        ].to_dict(orient="records")
    }
}

with open("diagnostico_cronograma.json","w") as f:
    json.dump(output, f, indent=2)
files.download("diagnostico_cronograma.json")


Saving Teste 1.csv to Teste 1 (20).csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>