In [None]:
from matplotlib import pyplot as plt
import numpy as np
from scipy.ndimage import interpolation
import pandas as pd
%matplotlib widget

time_steps = 6000

# file has power data for 6000 seconds in 1s interval
raw_possible_power = pd.read_csv(r"production.csv").me_active_power
possible_power = interpolation.zoom(raw_possible_power, time_steps / len(raw_possible_power))

fig, ax = plt.subplots()
ax.plot(possible_power)
ax.set_title('Irradiated Power')
ax.set(xlabel="Time [s]", ylabel="Power [%]")

def create_setpoints(steps):
    setpoint = []
    for step in steps:
        for i in range(0, int(time_steps / len(steps) + 0.5)):
            setpoint.append(step)
    return setpoint

setpoint = create_setpoints([0, 0, 100, 30, 70])

pd.DataFrame(setpoint, columns=["value"]).to_csv("setpoint.csv")

fig, ax = plt.subplots()
ax.plot(setpoint)
ax.set_title('Setpoint')
ax.set(xlabel="Time [s]", ylabel="Power [%]")

def create_load(load):
    np.random.seed(423)
    draws = np.random.randint(0, 2, size=time_steps)
    steps = np.where(draws > 0, 0.05, -0.05)
    walk = steps.cumsum()
    # clamp to 0 ... 100
    walk = np.clip(walk, -100, 0)
    # scale random load 
    walk = walk * load / 100
    return walk

load = create_load(1000)

pd.DataFrame(load, columns=["value"]).to_csv("load.csv")

fig, ax = plt.subplots()
ax.plot(load)
ax.set_title('Load')
ax.set(xlabel="Time [s]", ylabel="Power [%]")

In [None]:
def clamp_value(value, limits):
    lower, upper = limits
    if value is None:
        return None
    elif (upper is not None) and (value > upper):
        return upper
    elif (lower is not None) and (value < lower):
        return lower
    return value

class PID(object):
    """
    PID controller
    """
    def __init__(self, kP=1.0, kI=1.0, kD=0.0, beta=1, gamma=1, output_limits=(None, None)):
        self.kP, self.kI, self.kD = kP, kI, kD
        self.beta, self.gamma = beta, gamma
        self.output_limits = output_limits

        self._proportional = 0
        self._integral = 0
        self._derivative = 0

        self._last_co = None
        self._last_pv = 0
        self._last_error = 0


        self.reset()

    def __call__(self, pv, sp, dt):
        if dt <= 0:
            raise ValueError('dt has negative value {}, must be positive'.format(dt))

        error = sp - pv
        error_p = self.beta * error
        error_i = error

        self._proportional = self.kP * error_p
        self._integral = self._integral + self.kI * error_i * dt
        self._integral = clamp_value(self._integral, self.output_limits)  # Avoid integral windup

        # D on error
        #self._derivative = self.kD * self.gamma * (error - self._last_error) / dt

        # D on PV
        self._derivative = self.kD * self.gamma * (pv - self._last_pv) / dt

        co = self._proportional + self._integral + self._derivative
        co = clamp_value(co, self.output_limits)

        self._last_co = co
        self._last_pv = pv
        self._last_error = error

        return co

    def reset(self):
        """
        Reset the PID controller internals.
        """
        self._proportional = 0
        self._integral = 0
        self._derivative = 0

        self._last_co = None
        self._last_pv = 0
        self._last_error = 0


In [None]:


# create 1s of dead time
delay_buffer = np.zeros(time_steps // 6000)

last_co = 0

def model_function(co, time):
    global last_co
    result = delay_buffer[0]
    np.roll(delay_buffer, 1)

    # limit raise time to 100% / 60s
    if co > last_co + 100 / 60 * (6000 / time_steps):
        co = last_co + 100 / 60 * (6000 / time_steps)

    # limit fall time to 6000% / 60s
    if co < last_co - 6000 / 60 * (6000 / time_steps):
        co = last_co - 6000 / 60 * (6000 / time_steps)

    last_co = co

    delay_buffer[0] = min(co, possible_power[time]) + load[time]
    return result

def get_pid_results(kP, kI, kD):
    pid_outputs = np.empty(time_steps)
    model_outputs = np.empty(time_steps)
    pid_internal_p = np.empty(time_steps)
    pid_internal_i = np.empty(time_steps)
    pid_internal_d = np.empty(time_steps)
    pid_internal_error = np.empty(time_steps)
    pid = PID(kP, kI, kD, beta=1, gamma=1, output_limits=(0, 100))
    last_process_value = 0
    for time in range(0, time_steps):
        pid_out = pid(pv=last_process_value, sp=setpoint[time], dt=1)
        model_out = model_function(pid_out, time)
        pid_outputs[time] = pid_out
        model_outputs[time] = model_out
        last_process_value = model_out
        pid_internal_p[time] = pid._proportional
        pid_internal_i[time] = pid._integral
        pid_internal_d[time] = pid._derivative
        pid_internal_error[time] = pid._last_error

    return pid_outputs, model_outputs, pid_internal_p, pid_internal_i, pid_internal_d, pid_internal_error


In [None]:
from ipywidgets import interact

fig, ax = plt.subplots(1, figsize=(16, 6), sharex=True)

line_pv, = ax.plot(np.empty(time_steps), label="PV")
line_co, = ax.plot(np.empty(time_steps), label="CO")
line_co.set_dashes([2, 2])
line_sp, = ax.plot(setpoint, label="SP")
line_sp.set_dashes([5, 10])
ax.set_ylim(-50, 110)
ax.set_title('Controlled Production')
ax.set_xlabel('Time [s]')
ax.set_ylabel('Power [%]')
ax.grid(True)
ax.legend()

fig2, ax2 = plt.subplots(4, figsize=(16, 6), sharex=True)

line_internal_p, = ax2[0].plot(np.empty(time_steps), label="P")
ax2[0].set_ylim(-50, 110)
ax2[0].set_title('Control Internals')
ax2[0].set_ylabel('Power [%]')
ax2[0].grid(True)
ax2[0].legend()

line_internal_i, = ax2[1].plot(np.empty(time_steps), label="I")
ax2[1].set_ylim(-50, 110)
ax2[1].set_ylabel('Power [%]')
ax2[1].grid(True)
ax2[1].legend()

line_internal_d, = ax2[2].plot(np.empty(time_steps), label="D")
ax2[2].set_ylim(-50, 110)
ax2[2].set_ylabel('Power [%]')
ax2[2].grid(True)
ax2[2].legend()

line_internal_error, = ax2[3].plot(np.empty(time_steps), label="Error")
ax2[3].set_ylim(-50, 110)
ax2[3].set_xlabel('Time [s]')
ax2[3].set_ylabel('Power [%]')
ax2[3].grid(True)
ax2[3].legend()

def draw_plot(kP=0.2, kI=0.57, kD=0.1):
    co, pv, pid_internal_p, pid_internal_i, pid_internal_d, pid_internal_error = get_pid_results(kP, kI, kD)

    line_pv.set_ydata(pv)
    line_co.set_ydata(co)

    line_internal_p.set_ydata(pid_internal_p)
    line_internal_i.set_ydata(pid_internal_i)
    line_internal_d.set_ydata(pid_internal_d)
    line_internal_error.set_ydata(pid_internal_error)

interact(draw_plot, kP=(0.0, 2.0, 0.01), kI=(0.0, 2.0, 0.01), kD=(0.0, 2.0, 0.01))
