-
Notifications
You must be signed in to change notification settings - Fork 64
/
algorithm.py
1241 lines (1181 loc) · 72.1 KB
/
algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
""" Algorithmus zur Berechnung der Ladeströme
"""
import copy
import logging
from typing import Dict, List, Optional, Tuple
from control import data
from control import loadmanagement
from control.chargepoint import Chargepoint
from helpermodules.pub import Pub
from control.ev import Ev
log = logging.getLogger(__name__)
class Algorithm:
"""Verteilung des Stroms auf die Ladepunkte
"""
def __init__(self):
# Lademodi in absteigender Priorität; Tupel-Inhalt:(eingestellter Modus, tatsächlich genutzter Modus, Priorität)
self.chargemodes = (("scheduled_charging", "instant_charging", True),
("scheduled_charging", "instant_charging", False),
(None, "time_charging", True),
(None, "time_charging", False),
("instant_charging", "instant_charging", True),
("instant_charging", "instant_charging", False),
("pv_charging", "instant_charging", True),
("pv_charging", "instant_charging", False),
("scheduled_charging", "pv_charging", True),
("scheduled_charging", "pv_charging", False),
("pv_charging", "pv_charging", True),
("pv_charging", "pv_charging", False),
(None, "standby", True),
(None, "standby", False),
(None, "stop", True),
(None, "stop", False))
def calc_current(self) -> None:
""" Einstiegspunkt in den Regel-Algorithmus
"""
try:
log.debug("# Algorithmus-Start")
evu_counter = data.data.counter_data["all"].get_evu_counter()
log.info(
f'EVU-Punkt: Leistung[W] {data.data.counter_data[evu_counter].data["get"]["power"]}, Ströme[A] '
f'{data.data.counter_data[evu_counter].data["get"].get("currents")}')
# zuerst die PV-Überschuss-Ladung zurück nehmen
self._reduce_used_evu_overhang()
# Lastmanagement für alle Zähler
self._check_loadmanagement()
# Phasenumschaltung
self._check_auto_phase_switch_delay()
# Abschaltschwelle prüfen und ggf. Abschaltverzögerung starten
for mode in reversed(self.chargemodes[8:-4]):
self._switch_off_threshold(mode)
# Überschuss auf Ladepunkte verteilen
self._manage_distribution()
# Übrigen Überschuss auf Ladepunkte im PV-Modus verteilen
self._distribute_unused_evu_overhang()
# LP stoppen, bei denen die Abschaltverzögerung abgelaufen ist (die frei werdende Leistung soll erst
# im nächsten Zyklus verteilt werden)
self._stop_after_switch_off_delay()
except Exception:
log.exception("Fehler im Algorithmus-Modul")
def _reduce_used_evu_overhang(self) -> None:
""" nimmt den Ladestrom, der über der eingestellten Stromstärke liegt, zurück, um zu schauen, ob er im
Algorithmus anderweitig verteilt wird.
Wenn nein, wird er am Ende wieder zugeteilt.
Alle EV im PV-Modus sollen laden, bevor eine Phasenumschaltung stattfindet. Deshalb wird zu Beginn des
Algorithmus der übrige Überschuss zurückgenommen und steht dann im weiteren Algorithmus, z.B. zum Erreichen
der Einschaltschwelle für ein weiteres EV, zur Verfügung.
"""
log.info("## Überschuss-Ladung über Mindeststrom bei PV-Laden zurücknehmen.")
for cp in data.data.cp_data.values():
try:
if cp.data.set.current == 0:
continue
if cp.data.set.charging_ev != -1:
charging_ev = cp.data.set.charging_ev_data
# Wenn beim PV-Laden über der eingestellten Stromstärke geladen wird, zuerst zurücknehmen.
if ((charging_ev.charge_template.data.chargemode.selected == "pv_charging" or
charging_ev.data.control_parameter.submode == "pv_charging") and
cp.data.set.current != 0):
if max(cp.data.get.currents) != 0:
# Strom, mit dem tatsächlich geladen wird, verwenden, da man sonst mehr Strom
# freigibt, als zur Verfügung steht.
released_current = charging_ev.data.control_parameter.required_current - max(
cp.data.get.currents)
released_power = released_current * 230 * cp.data.get.phases_in_use
# Nur wenn mit mehr als der benötigten Stromstärke geladen wird, kann der
# Überschuss ggf anderweitig in der Regelung verwendet werden.
if released_current < 0:
data.data.pv_data["all"].allocate_evu_power(released_power)
self._process_data(
cp, charging_ev.data.control_parameter.required_current)
loadmanagement.loadmanagement_for_cp(
cp,
released_current,
cp.data.get.phases_in_use)
log.debug(
"Ladung an LP" + str(cp.num) + " um " + str(released_current) +
"A auf " + str(charging_ev.data.control_parameter.required_current) +
"A angepasst.")
else:
# Wenn das EV nicht laden will, trotztdem zugeteilten Überschuss zurücknehmen,
# da es sonst ggf mit einer zu hohen Soll-Stromstärke anfängt zu laden, wenn es
# dann doch noch anfängt. Dieser Strom darf aber nicht freigegeben werden, da er
# keine Auswirkungen auf den EVU-Überschuss hat.
self._process_data(
cp, charging_ev.data.control_parameter.required_current)
log.debug(
"Ladung an LP" + str(cp.num) + " auf " +
str(charging_ev.data.control_parameter.required_current) +
"A angepasst.")
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt {cp.num}")
def _check_loadmanagement(self) -> None:
""" prüft, ob an einem der Zähler das Lastmanagement aktiv ist.
"""
try:
loadmanagement_state, overloaded_counters = loadmanagement.loadmanagement_for_counters()
if loadmanagement_state:
log.warning("## Ladung wegen aktiven Lastmanagements stoppen.")
# Zähler mit der größten Überlastung ermitteln
sorted_overloaded_counters = loadmanagement.sort_overloaded_counter(overloaded_counters)
n = 0 # Zähler, der betrachtet werden soll
# set current auf den maximalen get current stellen, damit der tatsächlich genutzte Strom reduziert
# wird und nicht der maximal nutzbare, der ja eventuell gar nicht voll ausgenutzt wird, sodass die
# Reduzierung wirkungslos wäre. Wenn set current bereits reduziert wurde, darf es nicht wieder
# hochgesetzt werden. EVs, die nicht laden, sollen nicht berücksichtigt werden.
for cp in data.data.cp_data.values():
try:
max_get_current = max(cp.data.get.currents)
if (cp.data.set.current > max_get_current
> cp.data.set.charging_ev_data.ev_template.data.nominal_difference):
# Manche EVs laden mit weniger Strom als der Min-Strom.
if (max_get_current > cp.data.set.charging_ev_data.ev_template.data.min_current):
cp.data.set.current = max_get_current
else:
cp.data.set.current = cp.data.set.charging_ev_data.ev_template.data.min_current
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt {cp.num}")
# Begrenzung der Schleifendurchläufe: Im ersten Durchlauf wird versucht, die Überlast durch Reduktion
# zu eliminieren, im zweiten durch Abschalten. Daher die zweifache Anzahl von Zählern als Durchläufe.
for b in range(0, len(data.data.counter_data)*2):
chargepoints = data.data.counter_data["all"].get_chargepoints_of_counter(
sorted_overloaded_counters[n][0])
overshoot = sorted_overloaded_counters[n][1][0]
# Das Lademodi-Tupel rückwärts durchgehen und LP mit niedrig priorisiertem Lademodus zuerst
# reduzieren/stoppen.
for mode in reversed(self.chargemodes[:-4]):
overshoot = self._down_regulation(
mode, chargepoints, overshoot, sorted_overloaded_counters[n][1][1])
if overshoot == 0:
break
# Wenn kein Ladepunkt lädt, kann die Wallbox nichts am Lastmanagement ausrichten. Die Überlastung
# kommt ausschließlich vom Hausverbrauch.
for cp in data.data.cp_data.values():
try:
if cp.data.set.current != 0:
break
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt {cp.num}")
else:
log.warning("Überlastung wird durch Hausverbrauch verursacht.")
break
if overshoot == 0:
# Nach dem Aktualisieren der Werte sollte der Zähler verschwunden sein, weil man genügend LP
# abschalten konnte
n = 0
else:
# Wenn die Überlastung nicht komplett beseitigt werden konnte, Zahlen aktualisieren und mit
# nächstem Zähler weitermachen.
n += 1
if n > len(overloaded_counters)-1:
# keine Zähler mehr, an denen noch reduziert werden kann. Wieder beim ersten Zähler anfangen,
# diesemal werden die
# LP dann abgeschaltet.
n = 0
# Werte aktualisieren
loadmanagement_state, overloaded_counters = loadmanagement.loadmanagement_for_counters()
if not loadmanagement_state:
# Lastmanagement ist nicht mehr aktiv
break
sorted_overloaded_counters = loadmanagement.sort_overloaded_counter(overloaded_counters)
else:
log.debug(
"## Ladung muss nicht wegen aktiven Lastmanagements gestoppt werden.")
# Ladepunkte, die nicht mit Maximalstromstärke laden, können wieder hochgeregelt werden.
for mode in self.chargemodes[:-4]:
self._adjust_chargepoints(mode)
except Exception:
log.exception("Fehler im Algorithmus-Modul")
def _down_regulation(self,
mode_tuple: Tuple[Optional[str], str, bool],
cps_to_reduce: List[str],
max_current_overshoot: float,
max_overshoot_phase: int,
prevent_stop: bool = False) -> float:
""" ermittelt die Ladepunkte und in welcher Reihenfolge sie reduziert/abgeschaltet werden sollen und ruft dann
die Funktion zum Runterregeln auf.
"""
mode = mode_tuple[0]
submode = mode_tuple[1]
prio = mode_tuple[2]
# LP, die abgeschaltet werden sollen
preferenced_chargepoints = []
# enthält alle LP, auf die das Tupel zutrifft
valid_chargepoints = {}
for cp in cps_to_reduce:
try:
chargepoint = data.data.cp_data[cp]
if chargepoint.data.set.current == 0:
continue
if chargepoint.data.set.charging_ev != -1:
charging_ev = chargepoint.data.set.charging_ev_data
# Wenn der LP erst in diesem Zyklus eingeschaltet wird, sind noch keine phases_in_use
# hinterlegt.
if not chargepoint.data.get.charge_state:
phases = charging_ev.data.control_parameter.phases
else:
phases = chargepoint.data.get.phases_in_use
if ((charging_ev.charge_template.data.prio == prio) and
(charging_ev.charge_template.data.chargemode.selected == mode or
mode is None) and
(charging_ev.data.control_parameter.submode == submode) and
# LP muss auf der Phase laden, die überlastet ist.
(phases == 3 or max_overshoot_phase == chargepoint.data.config.phase_1 or
chargepoint.data.config.phase_1 == 0)):
valid_chargepoints[chargepoint] = None
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt{cp}")
preferenced_chargepoints = self._get_preferenced_chargepoint(
valid_chargepoints, False)
if len(preferenced_chargepoints) != 0:
log.debug("Zu reduzierende Ladepunkte "+str([cp.num for cp in preferenced_chargepoints]) +
" in Lademodus "+str(mode)+" Submodus "+str(submode)+" Prio "+str(prio))
return self._perform_down_regulation(
preferenced_chargepoints, max_current_overshoot, max_overshoot_phase, prevent_stop)
def _perform_down_regulation(
self,
preferenced_chargepoints: List[Chargepoint],
max_current_overshoot: float,
max_overshoot_phase: int,
prevent_stop: bool = False) -> float:
""" prüft, ob der Ladepunkt reduziert werden kann. Wenn Nein und das Abschalten nicht verboten ist, wird der
Ladepunkt abgeschaltet. Ist noch nicht die Minimalstromstärke erreicht, wird der Ladestrom reduziert.
max_overshoot_phase: Phase, in der die maximale Stromstärke erreicht wird
(0, wenn alle Phasen berücksichtigt werden sollen.)
"""
message = None
remaining_current_overshoot = 0
try:
if len(preferenced_chargepoints) == 0:
# Es gibt keine Ladepunkte in diesem Lademodus, die noch nicht laden oder die noch gestoppt werden
# können.
return max_current_overshoot
else:
for cp in preferenced_chargepoints:
try:
# Wenn der LP erst in diesem Zyklus eingeschaltet wird, sind noch keine phases_in_use
# hinterlegt.
ev_data = cp.data.set.charging_ev_data
if not cp.data.get.charge_state:
phases = ev_data.data.control_parameter.phases
else:
phases = cp.data.get.phases_in_use
# Wenn max_overshoot_phase -1 ist, wurde die maximale Gesamtleistung überschritten und
# max_current_overshoot muss, wenn weniger als 3 Phasen genutzt werden, entsprechend
# multipliziert werden.
if max_overshoot_phase == -1 and phases < 3:
remaining_current_overshoot = max_current_overshoot * \
(3 - phases + 1)
else:
remaining_current_overshoot = max_current_overshoot
if cp.data.set.current != 0:
considered_current = cp.data.set.current
else:
# Dies ist der aktuell betrachtete Ladepunkt. Es wurde noch kein Strom gesetzt.
considered_current = ev_data.data.control_parameter.required_current
adaptable_current = considered_current - \
ev_data.ev_template.data.min_current
# Der Strom kann nicht weiter reduziert werden.
if adaptable_current <= 0:
# Ladung darf gestoppt werden.
if not prevent_stop:
taken_current = cp.data.set.current*-1
remaining_current_overshoot += taken_current
# In diesem Zyklus darf nicht mehr geladen werden.
ev_data.data.control_parameter.required_current = 0
self._process_data(cp, 0)
message = "Das Lastmanagement hat den Ladevorgang gestoppt."
log.debug(f"{cp.num}: Das Lastmanagement hat den Ladevorgang gestoppt.")
else:
continue
else:
if adaptable_current < remaining_current_overshoot:
remaining_current_overshoot -= adaptable_current
taken_current = adaptable_current * -1
else:
taken_current = remaining_current_overshoot * -1
remaining_current_overshoot = 0
required_current = considered_current+taken_current
self._process_data(cp, required_current)
log.debug("LP "+str(cp.num)+": Das Lastmanagement hat den Ladestrom um " +
str(round(taken_current, 2))+"A auf " +
str(round(required_current, 2))+"A angepasst.")
current_diff = round(
ev_data.data.control_parameter.required_current - required_current,
2)
message = f"Das Lastmanagement hat die Sollstromstärke um {current_diff}A runtergeregelt."
# Werte aktualisieren
loadmanagement.loadmanagement_for_cp(cp, taken_current, phases)
data.data.counter_data[data.data.counter_data["all"].get_evu_counter(
)].print_stats()
# Wenn max_overshoot_phase -1 ist, wurde die maximale Gesamtleistung überschritten und
# max_current_overshoot muss, wenn weniger als 3 Phasen genutzt werden, entsprechend dividiert
# werden.
if max_overshoot_phase == -1 and phases < 3:
remaining_current_overshoot = remaining_current_overshoot / \
(3 - phases + 1)
if remaining_current_overshoot < 0.01:
if message is not None:
cp.data.get.state_str = message
break
else:
max_current_overshoot = remaining_current_overshoot
if message is not None:
cp.data.get.state_str = message
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt {cp.num}")
return remaining_current_overshoot
except Exception:
log.exception("Fehler im Algorithmus-Modul")
return 0
def _adjust_chargepoints(self, mode_tuple: Tuple[Optional[str], str, bool]) -> None:
""" ermittelt die Ladepunkte, die nicht mit der benötigten Stromstärke laden und prüft, ob diese hochgeregelt
werden können und regelt diese, falls möglich, hoch.
"""
mode = mode_tuple[0]
submode = mode_tuple[1]
prio = mode_tuple[2]
message = None
try:
# LP, der abgeschaltet werden soll
preferenced_chargepoints = []
# enthält alle LP, auf die das Tupel zutrifft
valid_chargepoints = {}
for cp in data.data.cp_data.values():
try:
if cp.data.set.current == 0:
continue
if cp.data.set.charging_ev != -1:
charging_ev = cp.data.set.charging_ev_data
if ((charging_ev.charge_template.data.prio == prio) and
(charging_ev.charge_template.data.chargemode.selected == mode or
mode is None) and
(charging_ev.data.control_parameter.submode == submode) and
(charging_ev.data.control_parameter.required_current > cp.data.set.current)):
if (max(cp.data.get.currents) > cp.data.set.current
- charging_ev.ev_template.data.nominal_difference):
valid_chargepoints[cp] = None
else:
cp.set_state_and_log(f"LP{cp.num} wird nicht hochgeregelt, da das EV nicht mit der "
"Sollstromstärke (abzüglich der erlaubten Stromabweichung) lädt.")
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt {cp.num}")
preferenced_chargepoints = self._get_preferenced_chargepoint(
valid_chargepoints, False)
if len(preferenced_chargepoints) != 0:
log.info(
"## Ladepunkte, die nicht mit Maximalstromstärke laden, wieder hochregeln.")
log.debug("Hochzuregelnde Ladepunkte "+str([cp.num for cp in preferenced_chargepoints]) +
" in Lademodus "+str(mode)+" Submodus "+str(submode)+" Prio "+str(prio))
for cp in preferenced_chargepoints:
try:
# aktuelle Werte speichern (werden wieder hergestellt, wenn das Lastmanagement die Anpassung
# verhindert)
counter_data_old = copy.deepcopy(
data.data.counter_data)
pv_data_old = copy.deepcopy(data.data.pv_data)
bat_data = copy.deepcopy(data.data.bat_data)
cp_data_old = copy.deepcopy(data.data.cp_data)
# Fehlenden Ladestrom ermitteln
missing_current = (cp.data.set.charging_ev_data.data.control_parameter.required_current
- cp.data.set.current)
# Wenn der LP erst in diesem Zyklus eingeschaltet wird, sind noch keine phases_in_use
# hinterlegt.
if cp.data.get.phases_in_use == 0:
phases = cp.data.set.charging_ev_data.data.control_parameter.phases
else:
phases = cp.data.get.phases_in_use
# Lastmanagement für den fehlenden Ladestrom durchführen
log.debug(
(f"LP {cp.num}: Fehlenden Ladestrom anpassen: {missing_current}"))
loadmanagement_state, overloaded_counters = loadmanagement.loadmanagement_for_cp(
cp, missing_current, phases)
log.debug("Lastmanagement aktiv: "+str(loadmanagement_state))
if loadmanagement_state:
sorted_overloaded_counters = loadmanagement.sort_overloaded_counter(overloaded_counters)
# Wenn max_overshoot_phase -1 ist, wurde die maximale Gesamtleistung überschritten und
# max_current_overshoot muss,
# wenn weniger als 3 Phasen genutzt werden, entsprechend multipliziert werden.
if sorted_overloaded_counters[0][1][1] == -1 and phases < 3:
undo_missing_current = (
sorted_overloaded_counters[0][1][0] * (3 - phases + 1)) * -1
else:
undo_missing_current = sorted_overloaded_counters[0][1][0] * -1
# Dies tritt nur ein, wenn Bezug möglich ist, dieser aber unter dem Offset liegt. Dann
# wird nämlich durch das Lastmanagement versucht, das Offset auszugleichen.
# In diesem Fall soll keine Anpassung erfolgen.
if undo_missing_current*-1 > missing_current:
# Zustand von vor dem Lastmanagement wieder herstellen
data.data.counter_data = counter_data_old
data.data.pv_data = pv_data_old
data.data.bat_data = bat_data
data.data.cp_data = cp_data_old
log.debug("Keine Hochregelung für Ladepunkt "+str(cp.num) +
", da nur noch das Offset zum Maximalstrom verfügbar ist.")
message = ("Das Lastmanagement konnte den Ladepunkt nicht auf die gewünschte "
"Stromstärke hochregeln.")
# Beim Wiederherstellen der Kopie wird die Adresse der Kopie zugewiesen, sodass die
# Adresse des LP aktualisiert werden muss,
# um Änderungen in der Klasse vorzunehmen, die das data-Modul referenziert.
cp = data.data.cp_data["cp" + str(cp.num)]
# Es kann nur ein Teil des fehlenden Ladestroms hochgeregelt werden.
else:
# Werte aktualisieren
loadmanagement.loadmanagement_for_cp(
cp, undo_missing_current, phases)
self._process_data(
cp, cp.data.set.current + missing_current + undo_missing_current)
message = ("Das Lastmanagement hat den Ladestrom um "
f"{round((missing_current + undo_missing_current), 2)}A auf "
f"{cp.data.set.current + missing_current + undo_missing_current}A "
f"angepasst.")
log.debug(message)
current_to_allocate = (cp.data.set.charging_ev_data.data.control_parameter.required_current
- cp.data.set.current)
self._reduce_others(cp, current_to_allocate, phases, stop_others=False)
# Zuvor fehlender Ladestrom kann nun genutzt werden
else:
self._process_data(
cp, cp.data.set.current + missing_current)
message = "Das Lastmanagement hat den Ladestrom um " + \
str(round(missing_current, 2))+"A angepasst."
log.debug(message)
ev_data = cp.data.set.charging_ev_data
current_diff = round(ev_data.data.control_parameter.required_current - cp.data.set.current, 2)
if current_diff > ev_data.ev_template.data.nominal_difference:
cp.data.get.state_str = (f"Das Lastmanagement hat die Sollstromstärke um "
f"{current_diff}A runtergeregelt.")
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt {cp.num}")
except Exception:
log.exception("Fehler im Algorithmus-Modul")
def _switch_off_threshold(self, mode_tuple: Tuple[Optional[str], str, bool]) -> None:
"""Gibt es mehrere Ladepunkte, auf die das Tupel zutrifft, wird die Reihenfolge durch
_get_preferenced_chargepoint festgelegt. Dann wird geprüft, ob die Abschaltschwelle erreicht wurde.
"""
mode = mode_tuple[0]
submode = mode_tuple[1]
prio = mode_tuple[2]
# LP, der abgeschaltet werden soll
preferenced_chargepoints = []
# enthält alle LP, auf die das Tupel zutrifft
valid_chargepoints = {}
for cp in data.data.cp_data.values():
try:
if cp.data.set.charging_ev != -1:
charging_ev = cp.data.set.charging_ev_data
if ((charging_ev.charge_template.data.prio == prio) and
(charging_ev.charge_template.data.chargemode.selected == mode or mode is None) and
(charging_ev.data.control_parameter.submode == submode)):
valid_chargepoints[cp] = None
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt{cp.num}")
preferenced_chargepoints = self._get_preferenced_chargepoint(
valid_chargepoints, False)
if len(preferenced_chargepoints) == 0:
# Es gibt keine Ladepunkte in diesem Lademodus, die noch nicht laden oder die noch gestoppt werden können.
return
else:
# Solange die Liste durchgehen, bis die Abschaltschwelle nicht mehr erreicht wird.
# Nicht den Überhang aus dem PV-Modul verwenden, da in diesem der Regelbereich berücksichtigt ist und die
# Abschaltschwelle um die Mitte des Regelbereichs verzögert greifen würde.
overhang = data.data.counter_data[data.data.counter_data["all"].get_evu_counter(
)].data["get"]["power"] + data.data.bat_data["all"].power_for_bat_charging()
for cp in preferenced_chargepoints:
try:
if cp.data.set.current != 0:
if data.data.pv_data["all"].switch_off_check_threshold(cp, overhang) is False:
cp.data.set.current = 0
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt{cp.num}")
def _check_auto_phase_switch_delay(self) -> None:
""" geht alle LP durch und prüft, ob eine Ladung aktiv ist, ob automatische Phasenumschaltung
möglich ist und ob ob ein Timer gestartet oder gestoppt werden muss oder ob ein Timer abgelaufen ist.
"""
for cp in data.data.cp_data.values():
try:
if cp.data.set.charging_ev != -1:
charging_ev = cp.data.set.charging_ev_data
control_parameter = charging_ev.data.control_parameter
pv_auto_switch = (control_parameter.chargemode == "pv_charging" and
data.data.general_data.get_phases_chargemode("pv_charging") == 0)
scheduled_auto_switch = (control_parameter.chargemode == "scheduled_charging" and
control_parameter.submode == "pv_charging" and
data.data.general_data.get_phases_chargemode("scheduled_charging") == 0)
if (cp.data.config.auto_phase_switch_hw and cp.data.get.charge_state and
(pv_auto_switch or scheduled_auto_switch) and
control_parameter.timestamp_perform_phase_switch is None):
# Gibt die Stromstärke und Phasen zurück, mit denen nach der Umschaltung geladen werden
# soll. Falls keine Umschaltung erforderlich ist, werden Strom und Phasen, die übergeben
# wurden, wieder zurückgegeben.
log.debug(f"Ladepunkt {cp.num}: Prüfen, ob Phasenumschaltung durchgeführt werden soll.")
phases, current, message = charging_ev.auto_phase_switch(
cp.num,
cp.data.get.currents,
cp.data.get.power)
if message is not None:
cp.data.get.state_str = message
# Nachdem im Automatikmodus die Anzahl Phasen bekannt ist, Einhaltung des Maximalstroms
# prüfen.
required_current = charging_ev.check_min_max_current(current, control_parameter.phases)
charging_ev.data.control_parameter.required_current = required_current
Pub().pub("openWB/set/vehicle/"+str(charging_ev.num) +
"/control_parameter/required_current", required_current)
charging_ev.data.control_parameter.phases = phases
Pub().pub("openWB/set/vehicle/"+str(charging_ev.num) +
"/control_parameter/phases", phases)
self._process_data(cp, current)
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt{cp.num}")
def _manage_distribution(self) -> None:
""" verteilt den EVU-Überschuss und den maximalen Bezug auf die Ladepunkte, die dem Modus im Tupel entsprechen.
Die Funktion endet, wenn das Lastmanagement eingereift oder keine Ladepunkte mehr in diesem Modus vorhanden
sind. Die Zuteilung erfolgt gemäß der Reihenfolge in _get_preferenced_chargepoint.
"""
try:
log.info("## Zuteilung des Überschusses")
for mode_tuple in self.chargemodes:
try:
mode = mode_tuple[0]
submode = mode_tuple[1]
prio = mode_tuple[2]
preferenced_chargepoints = []
# enthält alle LP, auf die das Tupel zutrifft
valid_chargepoints = {}
for cp in data.data.cp_data.values():
if cp.data.set.charging_ev != -1:
charging_ev = cp.data.set.charging_ev_data
# set-> current enthält einen Wert, wenn das EV in diesem Zyklus eingeschaltet werden
# soll, aktuell aber noch nicht lädt.
if (cp.data.set.current != 0 or
charging_ev.data.control_parameter.required_current == 0):
continue
if ((charging_ev.charge_template.data.prio == prio) and
(charging_ev.charge_template.data.chargemode.selected == mode or
mode is None) and
(charging_ev.data.control_parameter.submode == submode)):
valid_chargepoints[cp] = None
preferenced_chargepoints = self._get_preferenced_chargepoint(
valid_chargepoints, True)
if len(preferenced_chargepoints) != 0:
log.debug("Zuteilung für Ladepunkte " +
str([cp.num for cp in preferenced_chargepoints]) +
" in Lademodus "+str(mode)+" Submodus "+str(submode)+" Prio "+str(prio))
current_mode_index = self.chargemodes.index(mode_tuple)
self._distribute_power_to_cp(
preferenced_chargepoints, current_mode_index)
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Modus {mode_tuple}")
else:
# kein Ladepunkt, der noch auf Zuteilung wartet
log.info("## Zuteilung beendet, da kein Ladepunkt mehr auf Zuteilung wartet.")
except Exception:
log.exception("Fehler im Algorithmus-Modul")
def _distribute_power_to_cp(self,
preferenced_chargepoints: List[Chargepoint],
current_mode_index: int) -> None:
""" Ladepunkte, die eingeschaltet werden sollen, durchgehen.
"""
for chargepoint in preferenced_chargepoints:
try:
charging_ev = chargepoint.data.set.charging_ev_data
phases = charging_ev.data.control_parameter.phases
required_current = charging_ev.data.control_parameter.required_current
required_power = phases * 230 * \
charging_ev.data.control_parameter.required_current
Pub().pub("openWB/set/chargepoint/"+str(chargepoint.num) +
"/set/required_power", required_power)
chargepoint.data.set.required_power = required_power
if charging_ev.data.control_parameter.submode == "pv_charging":
self._calc_pv_charging(
chargepoint, required_current, phases, current_mode_index)
elif (charging_ev.data.control_parameter.submode == "stop" or
(charging_ev.data.control_parameter.submode == "standby")):
required_current = 0
self._process_data(chargepoint, required_current)
else:
self._calc_normal_load(chargepoint, required_current, phases)
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt {chargepoint.num}")
def _calc_normal_load(self,
chargepoint: Chargepoint,
required_current: float,
phases: int) -> None:
try:
# Wenn bereits geladen wird, nur die Änderung zuteilen
current_to_allocate = required_current
max_used_current = max(chargepoint.data.get.currents)
if max_used_current != 0:
current_to_allocate -= max_used_current
self._reduce_others(chargepoint, current_to_allocate, phases)
except Exception:
log.exception("Fehler im Algorithmus-Modul")
def _reduce_others(self,
chargepoint: Chargepoint,
current_to_allocate: float,
phases: int,
stop_others: bool = True) -> None:
"""Falls erforderlich LP mit niedrigerer
Ladepriorität reduzieren/abschalten, um LP laden. Ladepunkte mit gleichem Lademodus und gleicher Priorität
dürfen nur reduziert, aber nicht abgeschaltet werden.
"""
try:
evu_counter = data.data.counter_data["all"].get_evu_counter()
# aktuelle Werte speichern (werden wieder hergestellt, wenn das Lastmanagement die Ladung verhindert)
counter_data_old = copy.deepcopy(data.data.counter_data)
pv_data_old = copy.deepcopy(data.data.pv_data)
bat_data = copy.deepcopy(data.data.bat_data)
cp_data_old = copy.deepcopy(data.data.cp_data)
log.debug("Benötigter neuer/zusätzlicher Strom "+str(current_to_allocate)+"A")
_, overloaded_counters = allocate_power(chargepoint, current_to_allocate, phases)
self._process_data(chargepoint,
chargepoint.data.set.charging_ev_data.data.control_parameter.required_current)
if data.data.counter_data["all"].data["set"]["loadmanagement_active"] and len(overloaded_counters) != 0:
# Lastmanagement hat eingegriffen
log.debug("Aktuell kalkulierte Ströme am EVU-Punkt[A]: "+str(
data.data.counter_data[evu_counter].data["set"].get("currents_used")))
log.warning(
"Für die Ladung an LP"+str(chargepoint.num) +
" muss erst ein Ladepunkt mit gleicher/niedrigerer Priorität reduziert/gestoppt werden.")
data.data.counter_data[evu_counter].print_stats()
# Zähler mit der größten Überlastung ermitteln
sorted_overloaded_counters = loadmanagement.sort_overloaded_counter(overloaded_counters)
# Ergebnisse des Lastmanagements holen, das beim Einschalten durchgeführt worden ist. Es ist
# ausreichend, Zähler mit der größten Überlastung im Pfad zu betrachten. Kann diese nicht eliminiert
# werden, kann der Ladepunkt nicht laden.
chargepoints = data.data.counter_data["all"].get_chargepoints_of_counter(
sorted_overloaded_counters[0][0])
remaining_current_overshoot = sorted_overloaded_counters[0][1][0]
control_parameter = chargepoint.data.set.charging_ev_data.data.control_parameter
current_mode_index = self._get_mode_index(
control_parameter.chargemode, control_parameter.submode, control_parameter.prio)
# LP mit niedrigerer Priorität reduzieren und ggf. stoppen
for mode in reversed(self.chargemodes[(current_mode_index+1):-4]):
try:
# Runterregeln
remaining_current_overshoot = self._down_regulation(
mode,
chargepoints,
remaining_current_overshoot,
sorted_overloaded_counters[0][1][1],
prevent_stop=True)
if remaining_current_overshoot <= 0:
# LP kann nun wie gewünscht eingeschaltet werden
self._process_data(chargepoint, control_parameter.required_current)
break
else:
# Abschalten
if (not chargepoint.data.set.charging_ev_data.ev_template.data.prevent_charge_stop and
stop_others):
remaining_current_overshoot = self._down_regulation(
mode, chargepoints,
remaining_current_overshoot,
sorted_overloaded_counters[0][1][1],
prevent_stop=False)
if remaining_current_overshoot <= 0:
# LP kann nun wie gewünscht eingeschaltet werden
self._process_data(chargepoint, control_parameter.required_current)
break
except Exception:
log.exception("Fehler im Algorithmus-Modul für Modus "+str(mode))
else:
# Ladepunkt, der gestartet werden soll reduzieren
remaining_current_overshoot = self._perform_down_regulation(
[chargepoint],
remaining_current_overshoot,
sorted_overloaded_counters[0][1][1],
prevent_stop=True)
# Ladepunkte mit gleicher Priorität reduzieren. Diese dürfen nicht gestoppt werden.
if remaining_current_overshoot != 0:
remaining_current_overshoot = self._down_regulation(
self.chargemodes[current_mode_index],
chargepoints, remaining_current_overshoot, sorted_overloaded_counters[0][1][1],
prevent_stop=True)
if remaining_current_overshoot != 0:
# Ladepunkt darf nicht laden
# Zustand von vor dem Lastmanagement wieder herstellen
data.data.counter_data = counter_data_old
data.data.pv_data = pv_data_old
data.data.bat_data = bat_data
data.data.cp_data = cp_data_old
# Beim Wiederherstellen der Kopie wird die Adresse der Kopie zugewiesen, sodass die Adresse des
# LP aktualisiert werden muss,
# um Änderungen in der Klasse vorzunehmen, die das data-Modul referenziert.
chargepoint = data.data.cp_data[f"cp{chargepoint.num}"]
# keine weitere Zuteilung
message = "Keine Ladung, da das Reduzieren/Abschalten der anderen Ladepunkte nicht ausreicht."
log.info("LP "+str(chargepoint.num)+": "+message)
log.debug("Wiederherstellen des Zustands, bevor LP" +
str(chargepoint.num)+" betrachtet wurde.")
chargepoint.data.get.state_str = message
self._process_data(chargepoint, 0)
else:
(log.info(
"LP: " + str(chargepoint.num) + ", Ladestrom: " + str(chargepoint.data.set.current) +
"A, Phasen: " +
str(chargepoint.data.set.charging_ev_data.data.control_parameter.phases) +
", Ladeleistung: " +
str(
(chargepoint.data.set.charging_ev_data.data.control_parameter.phases *
chargepoint.data.set.current * 230)) + "W"))
except Exception:
log.exception("Fehler im Algorithmus-Modul")
def _calc_pv_charging(self,
chargepoint: Chargepoint,
required_current: float,
phases: int,
current_mode_index: int) -> None:
"""prüft, ob Speicher oder EV Vorrang hat und wie viel Strom/Leistung genutzt werden kann.
"""
try:
# Wenn bereits geladen wird, nur die Änderung zuteilen
current_to_allocate = required_current
max_used_current = max(chargepoint.data.get.currents)
if max_used_current != 0:
current_to_allocate -= max_used_current
if not self._check_cp_without_feed_in_is_prioritised(chargepoint):
self._process_data(chargepoint, 0)
return
if chargepoint.data.set.charging_ev_data.data.control_parameter.timestamp_perform_phase_switch:
return
allocated_current, threshold_reached = data.data.pv_data["all"].switch_on(
chargepoint,
current_to_allocate,
phases,
data.data.bat_data["all"].power_for_bat_charging())
if threshold_reached:
set_current = max_used_current + allocated_current
else:
set_current = 0
# Zustand merken
pv_data_old = copy.deepcopy(data.data.pv_data)
cp_data_old = copy.deepcopy(data.data.cp_data)
# LP mit niedrigerer Priorität abschalten (Reduktion ist bereits zu Beginn des Zyklus erfolgt)
for mode_tuple in reversed(self.chargemodes[current_mode_index+1:-4]):
mode = mode_tuple[0]
submode = mode_tuple[1]
prio = mode_tuple[2]
preferenced_chargepoints = []
# enthält alle LP, auf die das Tupel zutrifft
valid_chargepoints = {}
for item in data.data.cp_data:
try:
if "cp" in item:
cp = data.data.cp_data[item]
if cp.data.set.charging_ev != -1:
charging_ev = cp.data.set.charging_ev_data
# set-> current enthält einen Wert, wenn das EV in diesem Zyklus eingeschaltet
# werden soll, aktuell aber noch nicht lädt.
if cp.data.set.current == 0:
continue
if ((charging_ev.charge_template.data.prio == prio) and
(charging_ev.charge_template.data.chargemode.selected == mode or
mode is None) and
(charging_ev.data.control_parameter.submode == submode)):
valid_chargepoints[cp] = None
except Exception:
log.exception("Fehler im Algorithmus-Modul für Ladepunkt "+item)
preferenced_chargepoints = self._get_preferenced_chargepoint(
valid_chargepoints, False)
if preferenced_chargepoints:
for cp in preferenced_chargepoints:
try:
# abschalten
if not cp.data.set.charging_ev_data.ev_template.data.prevent_charge_stop:
data.data.pv_data["all"].allocate_evu_power(
-1 * cp.data.set.charging_ev_data.data["control_parameter"]
["required_current"] * 230 * chargepoint.data.get.phases_in_use)
self._process_data(cp, 0)
log.warning(
"LP" + str(cp.num) + " abgeschaltet, um die Einschaltverzögerung an LP" +
str(chargepoint.num) + " zu starten.")
# switch_on erneut durchführen
allocated_current, threshold_reached = data.data.pv_data["all"].switch_on(
chargepoint,
required_current,
phases,
data.data.bat_data["all"].power_for_bat_charging())
if threshold_reached:
set_current = max_used_current + allocated_current
break
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt{cp.num}")
if threshold_reached:
break
else:
# Es konnte nicht genug Leistung freigegeben werden.
data.data.pv_data = pv_data_old
data.data.cp_data = cp_data_old
log.info("Keine Ladung an LP"+str(chargepoint.num) +
", da das Reduzieren/Abschalten der anderen Ladepunkte nicht ausreicht.")
log.debug("Wiederherstellen des Zustands, bevor LP" +
str(chargepoint.num)+" betrachtet wurde.")
self._process_data(chargepoint, set_current)
except Exception:
log.exception("Fehler im Algorithmus-Modul")
def _distribute_unused_evu_overhang(self) -> None:
""" prüft die Lademodi, die mit EVU-Überschuss laden, in absteigender Reihenfolge, ob für LP ohne
Einspeisegrenze noch EVU-Überschuss übrig ist und dann für die LP mit Einspeisegrenze.
"""
try:
log.info("## Übrigen Überschuss verteilen.")
for mode in self.chargemodes[6:-4]:
overhang = self._get_bat_and_evu_overhang()
if overhang != 0:
if overhang > 0.01:
self._distribute_remaining_overhang(mode, False)
if (overhang
- data.data.general_data.data.chargemode_config.pv_charging.feed_in_yield) > 0.01:
self._distribute_remaining_overhang(mode, True)
data.data.pv_data["all"].put_stats()
except Exception:
log.exception("Fehler im Algorithmus-Modul")
def _distribute_remaining_overhang(self, mode_tuple: Tuple[Optional[str], str, bool], feed_in_limit: bool) -> None:
""" Verteilt den verbleibenden EVU-Überschuss gleichmäßig auf alle mit EVU-Überschuss ladenden EV. Dazu wird
zunächst die Anzahl der EV ermittelt. Danach wird der Überschuss pro Phase, über die das EV lädt, ermittelt
und auf die Phasen aufgeschlagen.
"""
def regarding_vehicle(charging_ev: Ev, cp: Chargepoint) -> bool:
return ((charging_ev.charge_template.data.prio == prio and
(charging_ev.charge_template.data.chargemode.selected == mode or
mode is None) and
charging_ev.data.control_parameter.submode == submode and
charging_ev.charge_template.data.chargemode.pv_charging.
feed_in_limit == feed_in_limit) and cp.data.set.current != 0)
try:
num_of_ev = 0
mode = mode_tuple[0]
submode = mode_tuple[1]
prio = mode_tuple[2]
# Anzahl aller genutzten Phasen ermitteln
for cp in data.data.cp_data.values():
try:
if cp.data.set.charging_ev != -1:
charging_ev = cp.data.set.charging_ev_data
if regarding_vehicle(charging_ev, cp):
# Erst hochregeln, wenn geladen wird.
if ((cp.data.set.current - charging_ev.ev_template.data.nominal_difference)
< max(cp.data.get.currents)):
# Ev dieser Prioritätsstufe zählen
num_of_ev += 1
else:
log.debug(f'LP {cp.num}: EV kann nicht weiter hochgeregelt '
'werden, da der zugeteilte Strom nicht genutzt wird.')
cp.data.get.state_str = (f'LP {cp.num}: EV kann nicht weiter hochgeregelt '
'werden, da der zugeteilte Strom nicht genutzt '
'wird.')
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt{cp.num}")
# Ladung aktiv?
if num_of_ev > 0:
diff_per_ev_power = 0
dif_per_ev_current = 0
bat_overhang = data.data.bat_data["all"].power_for_bat_charging()
evu_overhang = data.data.pv_data["all"].overhang_left()
if not feed_in_limit:
diff_per_ev_power = (evu_overhang + bat_overhang) / num_of_ev
else:
feed_in_yield = data.data.general_data.data.chargemode_config.pv_charging.feed_in_yield
if evu_overhang <= feed_in_yield:
diff_per_ev_power = (evu_overhang - feed_in_yield + bat_overhang) / num_of_ev
log.debug(f"Zuteilbare Leistung pro EV: {diff_per_ev_power}W")
else:
# Wenn die Einspeisegrenze erreicht wird, Strom schrittweise erhöhen (1A pro Phase), bis
# dies nicht mehr der Fall ist.
dif_per_ev_current = 1
log.debug(f"Zuteilbarer Strom pro Phase pro EV: {dif_per_ev_current}W")
if diff_per_ev_power > 0 or dif_per_ev_current > 0:
for cp in data.data.cp_data.values():
try:
if cp.data.set.charging_ev != -1:
charging_ev = cp.data.set.charging_ev_data
if regarding_vehicle(charging_ev, cp):
if ((cp.data.set.current
- charging_ev.ev_template.data.nominal_difference)
< max(cp.data.get.currents)):
self.__distribute_remaining_overhang_to_cp(cp,
dif_per_ev_current,
diff_per_ev_power)
except Exception:
log.exception(f"Fehler im Algorithmus-Modul für Ladepunkt{cp.num}")
except Exception:
log.exception("Fehler im Algorithmus-Modul")
def __distribute_remaining_overhang_to_cp(self,
cp: Chargepoint,
dif_per_ev_current: float,
diff_per_ev_power: float):
new_current = 0
charging_ev = cp.data.set.charging_ev_data
bat_overhang = data.data.bat_data["all"].power_for_bat_charging()
if cp.data.get.charge_state:
phases = cp.data.get.phases_in_use
else:
phases = cp.data.set.charging_ev_data.data.control_parameter.phases
if diff_per_ev_power != 0:
new_current = (diff_per_ev_power / 230 / phases) + cp.data.set.current
else:
new_current = dif_per_ev_current + cp.data.set.current
log.debug(f"LP{cp.num}: max Überschuss pro Phase {new_current}A")
# Um max. +/- 5A pro Zyklus regeln
if ((-5-charging_ev.ev_template.data.nominal_difference)
< (new_current - max(cp.data.get.currents))
< (5+charging_ev.ev_template.data.nominal_difference)):
current = new_current
else:
if new_current < max(cp.data.get.currents):
current = max(cp.data.get.currents) - 5
msg = "Es darf um max 5A unter den aktuell genutzten Strom geregelt werden."
else:
current = max(cp.data.get.currents) + 5
msg = "Es darf um max 5A über den aktuell genutzten Strom geregelt werden."
log.debug(f"LP{cp.num}: {msg} -> neuer Strom {current}A")
if cp.data.get.state_str is None:
cp.data.get.state_str = msg
# Einhalten des Mindeststroms des Lademodus und Maximalstroms des EV
current = charging_ev.check_min_max_current(current, phases, pv=True)
current_diff = current - cp.data.set.current
power_diff = phases * 230 * (current - cp.data.set.current)
log.debug(
f'LP{cp.num}: Leistungsänderung: {power_diff}, neuer Strom {current},'
f' max genutzter Strom {max(cp.data.get.currents)}A')
if power_diff != 0:
# Laden nur mit der Leistung, die vorher der Speicher bezogen hat
if bat_overhang > 0 and power_diff > 0:
if (bat_overhang - power_diff) > 0:
missing_current = use_evu_bat_power(cp, current_diff, phases, pv_mode=False)
# Laden mit EVU-Überschuss und der Leistung, die vorher der
# Speicher bezogen hat
else:
pv_current_diff = (power_diff - bat_overhang) / 230 / phases
missing_current = use_evu_bat_power(cp, pv_current_diff, phases, pv_mode=True)
missing_current = use_evu_bat_power(cp, bat_overhang / 230 / phases, phases, pv_mode=False)
# Laden nur mit EVU-Überschuss bzw. Reduktion des EVU-Bezugs
else:
missing_current = use_evu_bat_power(cp, current_diff, phases, pv_mode=True)
else:
missing_current = 0