# Import libs

In [None]:
from typing import Dict
import numpy as np
import pandas as pd

import plotly.graph_objects as go
import plotly.express as px

# Create simulation engine

In [None]:
# Variable class

class SimVariable:
    def __init__(self, name, initial_value=0.0, accept_negative: bool=True):
        self.name = name
        self.value = initial_value
        self.history = [initial_value]
        self.influences = []  # list of (other_variable, weight, type)
        self.accept_negative = accept_negative

    def add_influence(self, source_var, weight=1.0, mode="linear"):
        self.influences.append((source_var, weight, mode))
    
    def update(self, dt):
        delta = 0.0
        for source, weight, mode in self.influences:
            if mode == "linear":
                delta += weight * source.value
            # Other modes: logistic, saturation, etc.
        self.value += delta * dt
        
        if not self.accept_negative:
            self.value = max(0, self.value) 
        
        self.history.append(self.value)

# Simulation class

class Simulation:
    def __init__(self, variables: Dict[str, SimVariable], time_steps=50, dt=1.0):
        self.variables = variables
        self.time_steps = time_steps
        self.dt = dt

    def run(self):
        for _ in range(self.time_steps - 1):
            for var in self.variables.values():
                var.update(self.dt)

    def get_results(self):
        return {name: var.history for name, var in self.variables.items()}
    
    



# Create and run simulation

In [None]:
# Initial values (simplified)
fomento = SimVariable("Fomento PFP", 0.1)
politicas = SimVariable("Políticas EE", 0.1)
adocao = SimVariable("Adoção AEE", 0.1)
consumo = SimVariable("Consumo de Energia", 1.0, accept_negative=False)
custos = SimVariable("Custos Operacionais", 0.5, accept_negative=False)
investimento = SimVariable("Investimento no Negócio", 0.2, accept_negative=False)
emprego = SimVariable("Geração de Emprego", 0.3, accept_negative=False)
atividade = SimVariable("Atividade Econômica", 0.3, accept_negative=False)
resultado = SimVariable("Resultado Operacional", 0.1)
faturamento = SimVariable("Faturamento", 0.3, accept_negative=False)
receita = SimVariable("Receita Tributária", 0.2, accept_negative=False)

# === Define Influences (based on the diagram) ===

politicas.add_influence(fomento, weight=0.1)

adocao.add_influence(politicas, weight=0.1)

consumo.add_influence(adocao, weight=-0.1)  

custos.add_influence(consumo, weight=0.1)

investimento.add_influence(custos, weight=0.1)

emprego.add_influence(investimento, weight=0.1)

atividade.add_influence(investimento, weight=0.1)

resultado.add_influence(investimento, weight=0.1)
resultado.add_influence(custos, weight=-0.1)

faturamento.add_influence(resultado, weight=0.1)
faturamento.add_influence(atividade, weight=0.1)

receita.add_influence(emprego, weight=0.1)
receita.add_influence(faturamento, weight=0.1)

fomento.add_influence(receita, weight=0.1)

# === Run Simulation ===

variables = {
    fomento.name: fomento,
    politicas.name: politicas,
    adocao.name: adocao,
    consumo.name: consumo,
    custos.name: custos,
    resultado.name: resultado,
    investimento.name: investimento,
    atividade.name: atividade,
    faturamento.name: faturamento,
    emprego.name: emprego,
    receita.name: receita,
}

sim = Simulation(variables, time_steps=360, dt=1.0)
sim.run()
results = sim.get_results()

# === Plotting ===

fig = go.Figure()
time = list(range(50))

for name, values in results.items():
    fig.add_trace(go.Scatter(x=time, y=values, mode='lines', name=name))

fig.update_layout(
    title='Simulação Baseada em Diagrama de Causalidade',
    xaxis_title='Tempo',
    yaxis_title='Valor',
    hovermode='x unified',
    template='plotly_white',
    legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1)
)

fig.show()

In [None]:
df = pd.DataFrame(results)
df.head(5)

# Improved Version

## Imports

In [None]:
from typing import Dict, List, Tuple, Optional, Callable
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
from dataclasses import dataclass
from enum import Enum
import math

## Engine

In [None]:
class VariableType(Enum):
    STOCK = "stock"        # Accumulates over time (integrals)
    FLOW = "flow"          # Rates of change
    AUXILIARY = "auxiliary" # Computed variables
    CONSTANT = "constant"   # Fixed parameters


class InfluenceMode(Enum):
    LINEAR = "linear"
    LOGISTIC = "logistic"
    SATURATION = "saturation"
    EXPONENTIAL = "exponential"
    THRESHOLD = "threshold"
    DELAY = "delay"


@dataclass
class Influence:
    """Represents an influence from one variable to another."""
    source_var: 'SimVariable'
    weight: float = 1.0
    mode: InfluenceMode = InfluenceMode.LINEAR
    delay: int = 0  # Time delay in steps
    threshold: float = 0.0  # For threshold mode
    
    def __post_init__(self):
        self.delay_buffer: List[float] = []
    
    def calculate_delta(self, source_value: float) -> float:
        """Calculate the delta contribution from this influence."""
        # Handle delay
        if self.delay > 0:
            self.delay_buffer.append(source_value)
            if len(self.delay_buffer) > self.delay:
                delayed_value = self.delay_buffer.pop(0)
            else:
                delayed_value = self.delay_buffer[0] if self.delay_buffer else 0.0
        else:
            delayed_value = source_value
        
        # Apply influence function
        if self.mode == InfluenceMode.LINEAR:
            return self.weight * delayed_value
        elif self.mode == InfluenceMode.LOGISTIC:
            # S-curve: weight * x / (1 + x)
            return self.weight * delayed_value / (1 + abs(delayed_value))
        elif self.mode == InfluenceMode.SATURATION:
            # Diminishing returns: weight * (1 - exp(-x))
            return self.weight * (1 - math.exp(-abs(delayed_value)))
        elif self.mode == InfluenceMode.EXPONENTIAL:
            # Exponential growth/decay
            return self.weight * delayed_value * abs(delayed_value)
        elif self.mode == InfluenceMode.THRESHOLD:
            # Step function at threshold
            return self.weight * delayed_value if delayed_value > self.threshold else 0.0
        return 0.0


class SimVariable:
    """Represents a variable in the system dynamics model."""
    
    def __init__(self, name: str, initial_value: float = 0.0, 
                 var_type: VariableType = VariableType.STOCK,
                 accept_negative: bool = True, min_value: Optional[float] = None,
                 max_value: Optional[float] = None,
                 auxiliary_func: Optional[Callable] = None):
        self.name = name
        self.initial_value = initial_value
        self.value = initial_value
        self.var_type = var_type
        self.history: List[float] = [initial_value]
        self.influences: List[Influence] = []
        self.accept_negative = accept_negative
        self.min_value = min_value
        self.max_value = max_value
        self.auxiliary_func = auxiliary_func  # For computed variables
        
        # System dynamics specific
        self.inflows: List['SimVariable'] = []
        self.outflows: List['SimVariable'] = []
        
        # Validate constraints
        if not accept_negative and min_value is None:
            self.min_value = 0.0

    def add_influence(self, source_var: 'SimVariable', weight: float = 1.0, 
                     mode: InfluenceMode = InfluenceMode.LINEAR,
                     delay: int = 0, threshold: float = 0.0) -> None:
        """Add an influence from another variable."""
        influence = Influence(source_var, weight, mode, delay, threshold)
        self.influences.append(influence)
    
    def add_inflow(self, flow_var: 'SimVariable') -> None:
        """Add an inflow (for stock variables)."""
        self.inflows.append(flow_var)
    
    def add_outflow(self, flow_var: 'SimVariable') -> None:
        """Add an outflow (for stock variables)."""
        self.outflows.append(flow_var)
    
    def calculate_delta(self) -> float:
        """Calculate the total change for this time step."""
        if self.var_type == VariableType.CONSTANT:
            return 0.0
        
        if self.var_type == VariableType.AUXILIARY and self.auxiliary_func:
            # Auxiliary variables are computed, not integrated
            return 0.0
        
        total_delta = 0.0
        
        # Stock-Flow dynamics
        if self.var_type == VariableType.STOCK:
            for inflow in self.inflows:
                total_delta += inflow.value
            for outflow in self.outflows:
                total_delta -= outflow.value
        
        # Regular influences
        for influence in self.influences:
            delta = influence.calculate_delta(influence.source_var.value)
            total_delta += delta
            
        return total_delta
    
    def update(self, dt: float) -> None:
        """Update the variable's value based on influences."""
        if self.var_type == VariableType.CONSTANT:
            return
        
        if self.var_type == VariableType.AUXILIARY and self.auxiliary_func:
            # Compute auxiliary variable
            self.value = self.auxiliary_func(self)
        else:
            # Integrate using Euler method
            delta = self.calculate_delta()
            new_value = self.value + delta * dt
            
            # Apply constraints
            if self.min_value is not None:
                new_value = max(self.min_value, new_value)
            if self.max_value is not None:
                new_value = min(self.max_value, new_value)
                
            self.value = new_value
            
        self.history.append(self.value)
    
    def get_rate_of_change(self) -> float:
        """Get current rate of change (derivative)."""
        if len(self.history) < 2:
            return 0.0
        return self.history[-1] - self.history[-2]
    
    def reset(self) -> None:
        """Reset variable to initial state."""
        self.value = self.initial_value
        self.history = [self.initial_value]
        # Reset delay buffers
        for influence in self.influences:
            influence.delay_buffer = []


class Simulation:
    """System dynamics simulation engine."""
    
    def __init__(self, variables: Dict[str, SimVariable], 
                 time_steps: int = 50, dt: float = 1.0,
                 integration_method: str = "euler"):
        self.variables = variables
        self.time_steps = time_steps
        self.dt = dt
        self.current_step = 0
        self.integration_method = integration_method
        
        # Analysis storage
        self.equilibrium_detection = True
        self.equilibrium_threshold = 1e-6
        self.equilibrium_window = 10
        
    def step(self) -> None:
        """Execute one simulation step."""
        if self.current_step >= self.time_steps - 1:
            return
        
        if self.integration_method == "euler":
            self._euler_step()
        elif self.integration_method == "rk4":
            self._rk4_step()
        else:
            self._euler_step()
            
        self.current_step += 1
    
    def _euler_step(self) -> None:
        """Euler integration step."""
        for var in self.variables.values():
            var.update(self.dt)
    
    def _rk4_step(self) -> None:
        """Runge-Kutta 4th order integration (more accurate)."""
        # Store original values
        original_values = {name: var.value for name, var in self.variables.items()}
        
        # Calculate k1
        k1 = {}
        for name, var in self.variables.items():
            k1[name] = var.calculate_delta() * self.dt
        
        # Calculate k2
        for name, var in self.variables.items():
            var.value = original_values[name] + k1[name] / 2
        k2 = {}
        for name, var in self.variables.items():
            k2[name] = var.calculate_delta() * self.dt
        
        # Calculate k3
        for name, var in self.variables.items():
            var.value = original_values[name] + k2[name] / 2
        k3 = {}
        for name, var in self.variables.items():
            k3[name] = var.calculate_delta() * self.dt
        
        # Calculate k4
        for name, var in self.variables.items():
            var.value = original_values[name] + k3[name]
        k4 = {}
        for name, var in self.variables.items():
            k4[name] = var.calculate_delta() * self.dt
        
        # Final update
        for name, var in self.variables.items():
            if var.var_type != VariableType.CONSTANT:
                new_value = original_values[name] + (k1[name] + 2*k2[name] + 2*k3[name] + k4[name]) / 6
                
                # Apply constraints
                if var.min_value is not None:
                    new_value = max(var.min_value, new_value)
                if var.max_value is not None:
                    new_value = min(var.max_value, new_value)
                
                var.value = new_value
            var.history.append(var.value)
    
    def run(self) -> None:
        """Run the complete simulation."""
        self.reset()
        for step in range(self.time_steps - 1):
            self.step()
            
            # Check for equilibrium
            if self.equilibrium_detection and self._check_equilibrium():
                print(f"Equilibrium reached at step {step + 1}")
                break
    
    def _check_equilibrium(self) -> bool:
        """Check if system has reached equilibrium."""
        if self.current_step < self.equilibrium_window:
            return False
        
        for var in self.variables.values():
            if var.var_type == VariableType.CONSTANT:
                continue
            
            recent_values = var.history[-self.equilibrium_window:]
            if len(recent_values) < self.equilibrium_window:
                return False
            
            # Check if recent changes are below threshold
            max_change = max(abs(recent_values[i] - recent_values[i-1]) 
                           for i in range(1, len(recent_values)))
            if max_change > self.equilibrium_threshold:
                return False
        
        return True
    
    def sensitivity_analysis(self, var_name: str, parameter_range: Tuple[float, float], 
                           steps: int = 10) -> Dict[float, Dict[str, List[float]]]:
        """Perform sensitivity analysis on a variable."""
        results = {}
        original_value = self.variables[var_name].initial_value
        
        param_values = np.linspace(parameter_range[0], parameter_range[1], steps)
        
        for param_value in param_values:
            self.variables[var_name].initial_value = param_value
            self.run()
            results[param_value] = self.get_results()
        
        # Restore original value
        self.variables[var_name].initial_value = original_value
        return results
    
    def reset(self) -> None:
        """Reset simulation to initial state."""
        self.current_step = 0
        for var in self.variables.values():
            var.reset()
    
    def get_results(self) -> Dict[str, List[float]]:
        """Get simulation results as dictionary of histories."""
        return {name: var.history.copy() for name, var in self.variables.items()}
    
    def get_time_series(self) -> List[float]:
        """Get time series for plotting."""
        return [i * self.dt for i in range(len(next(iter(self.variables.values())).history))]
    
    def get_phase_space(self, var1_name: str, var2_name: str) -> Tuple[List[float], List[float]]:
        """Get phase space trajectory for two variables."""
        var1_history = self.variables[var1_name].history
        var2_history = self.variables[var2_name].history
        return var1_history, var2_history



## Create simulation model

In [None]:

def create_model() -> Dict[str, SimVariable]:
    """Create an system dynamics model."""
    
    # Define variables with proper types
    variables = {
        # Policy variables (auxiliary/constants)
        "Fomento PFP": SimVariable("Fomento PFP", 0.1, VariableType.STOCK),
        "Políticas EE": SimVariable("Políticas EE", 0.1, VariableType.AUXILIARY),
        
        # Adoption and consumption (stocks)
        "Adoção AEE": SimVariable("Adoção AEE", 0.1, VariableType.STOCK, accept_negative=False),
        "Consumo de Energia": SimVariable("Consumo de Energia", 1.0, VariableType.STOCK, accept_negative=False),
        
        # Economic variables (stocks)
        "Custos Operacionais": SimVariable("Custos Operacionais", 0.5, VariableType.STOCK, accept_negative=False),
        "Investimento no Negócio": SimVariable("Investimento no Negócio", 0.2, VariableType.STOCK, accept_negative=False),
        "Geração de Emprego": SimVariable("Geração de Emprego", 0.3, VariableType.STOCK, accept_negative=False),
        "Atividade Econômica": SimVariable("Atividade Econômica", 0.3, VariableType.STOCK, accept_negative=False),
        
        # Financial results (stocks)
        "Resultado Operacional": SimVariable("Resultado Operacional", 0.1, VariableType.STOCK),
        "Faturamento": SimVariable("Faturamento", 0.3, VariableType.STOCK, accept_negative=False),
        "Receita Tributária": SimVariable("Receita Tributária", 0.2, VariableType.STOCK, accept_negative=False),
        
        # Flow variables
        "Taxa Adoção": SimVariable("Taxa Adoção", 0.05, VariableType.FLOW, accept_negative=False),
        "Taxa Investimento": SimVariable("Taxa Investimento", 0.1, VariableType.FLOW, accept_negative=False),
    }
    
    # Influence relationships with different modes and delays
    variables["Políticas EE"].add_influence(variables["Fomento PFP"], weight=0.1, mode=InfluenceMode.LOGISTIC)
    variables["Taxa Adoção"].add_influence(variables["Políticas EE"], weight=0.1, delay=2)  # Policy delay
    variables["Adoção AEE"].add_inflow(variables["Taxa Adoção"])
    
    variables["Consumo de Energia"].add_influence(variables["Adoção AEE"], weight=-0.1, mode=InfluenceMode.SATURATION)
    variables["Custos Operacionais"].add_influence(variables["Consumo de Energia"], weight=0.08)
    
    variables["Taxa Investimento"].add_influence(variables["Custos Operacionais"], weight=-0.05)
    variables["Taxa Investimento"].add_influence(variables["Resultado Operacional"], weight=0.15)
    variables["Investimento no Negócio"].add_inflow(variables["Taxa Investimento"])
    
    variables["Geração de Emprego"].add_influence(variables["Investimento no Negócio"], weight=0.1, mode=InfluenceMode.THRESHOLD, threshold=0.1)
    variables["Atividade Econômica"].add_influence(variables["Investimento no Negócio"], weight=0.12)
    variables["Atividade Econômica"].add_influence(variables["Geração de Emprego"], weight=0.08)
    
    variables["Resultado Operacional"].add_influence(variables["Faturamento"], weight=0.1)
    variables["Resultado Operacional"].add_influence(variables["Custos Operacionais"], weight=-0.15)
    
    variables["Faturamento"].add_influence(variables["Atividade Econômica"], weight=0.12)
    variables["Faturamento"].add_influence(variables["Adoção AEE"], weight=0.08, delay=1)
    
    variables["Receita Tributária"].add_influence(variables["Geração de Emprego"], weight=0.1)
    variables["Receita Tributária"].add_influence(variables["Faturamento"], weight=0.12)
    
    # Feedback loops
    variables["Fomento PFP"].add_influence(variables["Receita Tributária"], weight=0.1, delay=3)
    variables["Políticas EE"].add_influence(variables["Atividade Econômica"], weight=0.05, delay=2)
    
    return variables


## Create Plots

In [None]:

def create_plots(results: Dict[str, List[float]], time_series: List[float], 
                         sim: Simulation) -> go.Figure:
    """Create system dynamics plots."""
    
    # Create subplots
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Main Variables', 'Economic Indicators', 'Policy Variables', 'Phase Space'),
        specs=[[{"secondary_y": True}, {"secondary_y": True}],
               [{"secondary_y": False}, {"secondary_y": False}]]
    )
    
    # Main variables plot
    main_vars = ["Adoção AEE", "Consumo de Energia", "Investimento no Negócio"]
    for var_name in main_vars:
        if var_name in results:
            fig.add_trace(
                go.Scatter(x=time_series, y=results[var_name], name=var_name, line=dict(width=3)),
                row=1, col=1
            )
    
    # Economic indicators
    econ_vars = ["Faturamento", "Geração de Emprego", "Atividade Econômica"]
    for var_name in econ_vars:
        if var_name in results:
            fig.add_trace(
                go.Scatter(x=time_series, y=results[var_name], name=var_name, line=dict(width=2)),
                row=1, col=2
            )
    
    # Policy variables
    policy_vars = ["Fomento PFP", "Políticas EE", "Receita Tributária"]
    for var_name in policy_vars:
        if var_name in results:
            fig.add_trace(
                go.Scatter(x=time_series, y=results[var_name], name=var_name, line=dict(width=2)),
                row=2, col=1
            )
    
    # Phase space (Investment vs Activity)
    if "Investimento no Negócio" in results and "Atividade Econômica" in results:
        fig.add_trace(
            go.Scatter(
                x=results["Investimento no Negócio"], 
                y=results["Atividade Econômica"],
                mode='lines+markers',
                name='Phase Space',
                line=dict(width=2),
                marker=dict(size=4)
            ),
            row=2, col=2
        )
    
    fig.update_layout(
        title='System Dynamics Analysis',
        height=800,
        showlegend=True,
        template='plotly_white'
    )
    
    return fig


# Running

In [None]:

TIME_STEPS = 36
DT = 1

# Create and run simulation
variables = create_model()
sim = Simulation(variables, time_steps=TIME_STEPS, dt=DT, integration_method="rk4")

print("Running system dynamics simulation...")
sim.run()

# Get results and create plots
results = sim.get_results()
time_series = sim.get_time_series()

fig = create_plots(results, time_series, sim)
fig.show()

# Print system analysis
print("\n=== System Analysis ===")
print(f"Simulation completed in {sim.current_step + 1} steps")

print("\nFinal Values:")
for name, var in variables.items():
    if var.var_type != VariableType.CONSTANT:
        rate = var.get_rate_of_change()
        print(f"{name}: {var.value:.3f} (rate: {rate:.4f})")


In [None]:
# Example sensitivity analysis
print("\n=== Sensitivity Analysis ===")
print("Analyzing sensitivity to 'Fomento PFP' initial value...")
sensitivity_results = sim.sensitivity_analysis("Fomento PFP", (0.5, 0.9), steps=20)

for param_value, scenario_results in sensitivity_results.items():
    final_activity = scenario_results["Atividade Econômica"][-1]
    print(f"Fomento PFP = {param_value:.3f} → Final Economic Activity = {final_activity:.3f}")