In [1]:
import pandas as pd
import numpy as np
import matplotlib as mlp
import matplotlib.pyplot as plt
import pickle
from scipy import stats

# Festlegen globaler Parameter der Simulation

In [2]:
# Breite der Zeitschritte
T_STEP = 15 
# Anzahl zu simulierender Fahrzeuge
EV_TOTAL = 250000
# Anzahl an Tagen die simuliert werden sollen 
# Noch nicht in Simulation eingebunden -> äußerer Loop
D_TOTAL = 1
# Tagtyp: 1 = Werktag, 2 = Samstag, 3 = Sonntag
TYPE_D = 1 
# Ladeszenario: 1 = nur Zuhause, 2 = Zuhause und Arbeit, 3 = Überall
CHARGE_SCEN = 3
# Ladeleistung 
PCHARGE_SLOW = 3.7 
# durchschnittliche Fahrgeschwindigkeit
# Dummy Wert aus anderem Modell -> anpassen zu späterem Zeitpunkt
EV_SPEED = 19

# Abspeichern der simulierten Fahrzeuge
simulated_evs = []

# Laden der Simulationsdaten

In [3]:
import pickle
import os
root = os.getcwd()+"\\Datenauswertung\\Werktag\\Simulationsdaten"

# Verteilungsfunktion für initiale Abreise
initial_departure_model = pickle.load(open(root+"\\Modell_Initiale_Abfahrtszeit_Werktag.pickle", "rb"))

"""
Übergangswahrscheinlichkeiten:
- Liste von Listen mit einer Liste pro Ausgangszustand mit einem Eintrag pro Zeitschritt (96 Einträge) 
- jeder der 96 Einträge enthält 4 Werte die der jeweiligen relativen Übergangswahrscheinlichkeit entsprechen
    -> sortiert nach numerischer Repräsentation der Zustände
- Beispiel: transition_prob[0] = Liste der Übergangswahrscheinlichkeiten des Zustands Zuhause = 1
            transition_prob[0][10] = [0.2, 0.4, 0.1, 0.3] = [p1_2, p1_3, p1_4, p1_5] im Zeitintervall 9-10(dummy Werte)
"""
transition_prob = pickle.load(open(root+"\\Übergangswahrscheinlichkeiten.pickle", "rb"))


"""
Parameter Verteilungsfunktion Wegstrecken: 
- jeweils m*m Liste wobei Parameter der Verteilungsfunktion der Weglänge von i nach j in Reihe i-1 und Spalte j-1 zu finden sind
- Beispiel: loc[0][1] = Mittelwert der Weglängenverteilung Zustand Zuhause(1) nach Zustand Arbeit(2)
"""
# shape = Formparameter der Verteilung
dist_wd_shape_ij = pickle.load(open(root+"\\Verteilung_Wegstrecken_Werktage_Shape.pickle", "rb"))
# scale = Standardabweichung der Verteilung
dist_wd_scale_ij = pickle.load(open(root+"\\Verteilung_Wegstrecken_Werktage_Scale.pickle", "rb"))
# loc = Mittelwert der Verteilung
dist_wd_loc_ij = pickle.load(open(root+"\\Verteilung_Wegstrecken_Werktage_Loc.pickle", "rb"))

"""
Verteilungsfunktion der Aufenthaltsdauern in den einzelnen Zuständen:
- Liste mit dem jeweiligen Density Estimation Modell des Zustands in sortierter Reihenfolge
- Modell des Zustands i in Index i-1 
- Beispiel: Modell Aufenthaltsdauer Zustand 2(= Arbeit) in stay_duration_model[1]
"""
stay_duration_model = pickle.load(open(root+"\\Modelle_Aufenthaltsdauer_Werktag.pickle", "rb"))



# EV Klasse und Initialisierungslogik

In [4]:
class Electric_Vehicle(object):
    model_df = pd.read_excel(os.getcwd()+"\\Rohdaten\\EV_Modelle_Tabelle.xlsx", index_col='Modell')
    SEGMENTS = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, -9])
    # Dummy Wahrscheinlichkeiten aus MOP Studie
    PROB_SEGMENT = [0.0592, #1
        0.1958, #2
        0.2621, #3
        0.1597, #4
        0.029, #5
        0.0058, #6
        0.0354, #7
        0.0122, #8
        0.0637, #9
        0.0586, #10
        0.0386, #11
        0.0058, #12
        0.0528, #13
        0.0213 #-9
        ]

    
    def __init__(self):
        segment = self.choose_segment()
        model = self.choose_model(segment)
        self.MODEL = model
        
        # Bug da Modelle mehrfach in Liste keine eindeutige Zuordnung sondern Array als output -> fixed -> elegantere Lösung?
        # Segment des Fahrzeugs
        if np.isscalar(Electric_Vehicle.model_df.at[model, "Segment"]):
            self.SEGMENT = Electric_Vehicle.model_df.at[model, "Segment"]
        else:
            self.SEGMENT = Electric_Vehicle.model_df.at[model, "Segment"][0]
        
        # Batteriekapazität in kWh
        if np.isscalar(Electric_Vehicle.model_df.at[model, "Batterie"]):
            self.CAPACITY = Electric_Vehicle.model_df.at[model, "Batterie"]
        else:
            self.CAPACITY = Electric_Vehicle.model_df.at[model, "Batterie"][0]
        
        # Verbrauch in kWh/100km
        if np.isscalar(Electric_Vehicle.model_df.at[model, "Verbrauch"]):
            self.CONSUMPTION = Electric_Vehicle.model_df.at[model, "Verbrauch"]
        else:
            self.CONSUMPTION = Electric_Vehicle.model_df.at[model, "Verbrauch"][0]
        
        # Leistung -> aktuell nicht gebraucht
        if np.isscalar(Electric_Vehicle.model_df.at[model, "Leistung"]):
            self.POWER = Electric_Vehicle.model_df.at[model, "Leistung"]
        else:
            self.POWER = Electric_Vehicle.model_df.at[model, "Leistung"][0]
            
        self.SOC = 100
        self.trip_no = 0
        self.trips = []
    
    def choose_segment(self): 
        # wähle p zufällig auf Basis gegebener W'keiten
        choice = np.random.choice(Electric_Vehicle.SEGMENTS, p=Electric_Vehicle.PROB_SEGMENT)
        
        # Vorerst Wahl des häufigsten Semgents bei keiner Angabe -> später überarbeiten
        if choice == -9: 
            choice = 3
        
        return choice
    
    def choose_model(self, segment):
        models = Electric_Vehicle.model_df
        
        # filtern der infragekommenden Fahrzeuge über Segment
        filt = models["Segment"] == segment
        choices = models[filt]
        
        # Wahl einse zufälligen Fahrzeugs aus der Liste
        pick = np.random.randint(0, len(choices))
        model = choices.index[pick]
        return model
    
    # drive() und charge() testen
    
    def drive(self, distance):
        trip_consumption = distance * (self.CONSUMPTION / 100) 
        self.SOC = self.SOC - (trip_consumption / self.CAPACITY) * 100
                
    
    def check_distance(self, distance):
        # überprüfe SOC für gegebene Distanz ausreichend ist
        trip_consumption = distance * (self.CONSUMPTION / 100)
        if trip_consumption < self.SOC/100 * self.CAPACITY:
            return True 
        else:
            return False
        
    def max_distance(self):
        # maximale Distanz die mit aktuellem SOC zurückgelegt werden kann
        max_dist = round(((self.SOC/100)*self.CAPACITY / self.CONSUMPTION) * 100, 2)
        return max_dist
        
    def charge(self):       
        # Fahrzeug befindet sich im Zielzustand des letzten Trips
        trip = self.trips[len(self.trips)-1]
        # nötige Zeit zum vollständigen Aufladen in Minuten
        t_charge_full = (((100 - ev.SOC)/100 * ev.CAPACITY) / PCHARGE_SLOW) * 60
        # Ladezeit beschränkt durch Zeit zum vollständigen Aufladen und Länge des Aufenthalts
        t_charge = min(t_charge_full, trip.stay_duration)
        # update SOC über Ladedauer und Ladeleistung
        self.SOC = self.SOC + ((t_charge/60 * PCHARGE_SLOW) / self.CAPACITY) * 100
        trip.charge_start = int(trip.arrival)
        trip.charge_end = int(round(trip.charge_start + t_charge))

# Trip Klasse

In [5]:
class Trip(object):
    
    def __init__(self, whyfrom, departure):
        self.trip_id = None
        self.trip_no = None
        self.whyfrom = whyfrom
        self.whyto = None
        self.departure = departure
        self.departure_t = round(float(departure/T_STEP))
        self.arrival = None
        self.trip_duration = None
        self.distance = None
        self.stay_duration = None
        self.SOC_start = None
        self.SOC_end = None
        self.charge_start = None 
        self.charge_end = None

# Simulation 

In [6]:
#  Simulieren der Gesamtheit an Fahrzeugen
for i in range(EV_TOTAL):
    # erzeugen Fahrzeug
    ev = Electric_Vehicle()
    time = 0 
    
    # Fahrten bis Tagesende (24:00)
    while (time < 1440):
        # Unterscheidung zwischen erster Fahrt und restlichen Fahrten
        if len(ev.trips) == 0:
            # Abfahrtszeit = sample der Verteilungsfunktion der initialen Abfahrtszeit
            departure = round(float(initial_departure_model.sample()))
            if departure > 1440:
                departure = departure - 1440
            # erster Trip startet zuhause mit Abfahrtszeit "departure"
            ev_trip = Trip(whyfrom=1, departure=departure)        

        else: 
            # Ursprungszustand des Trips = Zielzustand des letzten Trips
            whyfrom = ev_trip.whyto
            # neuer Trip mit Ausgangszustand = Zielzustand des letzten Trips 
            # Abfahrtszeit = Ankunftszeit des letzten Trips + Aufenthaltsdauer im Zielzustand
            ev_trip = Trip(whyfrom=whyfrom, departure=time)  

        # wähle nächsten Zustand in Abhängigkeit des aktuellen Zustands und des Abfahrtszeitpunkt des Trips
        if ev_trip.whyfrom == 1:
            ev_trip.whyto = np.random.choice([2, 3, 4, 5], p=transition_prob[0][ev_trip.departure_t])
        elif ev_trip.whyfrom == 2:
            ev_trip.whyto = np.random.choice([1, 3, 4, 5], p=transition_prob[1][ev_trip.departure_t])
        elif ev_trip.whyfrom == 3:
            ev_trip.whyto = np.random.choice([1, 2, 4, 5], p=transition_prob[2][ev_trip.departure_t])
        elif ev_trip.whyfrom == 4:
            ev_trip.whyto = np.random.choice([1, 2, 3, 5], p=transition_prob[3][ev_trip.departure_t])
        elif ev_trip.whyfrom == 5:
            ev_trip.whyto = np.random.choice([1, 2, 3, 4], p=transition_prob[4][ev_trip.departure_t])


        # Samplen der Aufenthaltsdauer
        ev_trip.stay_duration = round(float(stay_duration_model[ev_trip.whyto - 1].sample()))
        # Da in Density Estimation negative Werte möglich -> Modell verbessern?!
        while(ev_trip.stay_duration < 0):
            ev_trip.stay_duration = round(float(stay_duration_model[ev_trip.whyto - 1].sample()))

        # Parameter Verteilungsfunktion der Wegstrecke, in Abhängigkeit der Zustandskombination
        # Erläutering Datenstruktur siehe "Parameter der Verteilungsfunktion Wegstrecken:" in "Laden der Simulationsdaten"
        dist_shape = dist_wd_shape_ij[ev_trip.whyfrom - 1][ev_trip.whyto - 1]
        dist_scale = dist_wd_scale_ij[ev_trip.whyfrom - 1][ev_trip.whyto - 1]
        dist_loc = dist_wd_loc_ij[ev_trip.whyfrom - 1][ev_trip.whyto - 1]

        # Samplen der lognorm Verteilungsfunktion der Zustandskombination
        ev_trip.distance = round(stats.lognorm.rvs(s=dist_shape, loc=dist_loc, scale=dist_scale), 2)
        
        # Ladevorgang  
        if ev.SOC < 100:
            location = ev.trips[len(ev.trips)-1].whyto
            # obere Schranke der Ladezeit, tatsächliche wird in ev.charge() bestimmt
            duration = ev.trips[len(ev.trips)-1].stay_duration
            if CHARGE_SCEN == 1:
                # geladen wird nur wenn sich das Fahrzeug sich Zuhause befindet und sich dort länger als 15 Minuten aufhält
                if location == 1 and duration > 15:
                    ev.charge()

            elif CHARGE_SCEN == 2:
                # geladen wird nur wenn sich das Fahrzeug Zuhause oder auf der Arbeit befindet und sich dort länger als 15 Minuten aufhält
                if (location == 1 or location == 2) and duration > 15:
                    ev.charge()

            elif CHARGE_SCEN == 3:
                # geladen wird in jedem Zustand, sofern die Parkdauer 15 Minuten übersteigt
                if duration > 15:
                    ev.charge()
                    
        # Speichen des SOC zu Beginn des Trips
        ev_trip.SOC_start = round(ev.SOC, 1)

        # Was tun wenn Energie des Trips die Restenergie der Batterie übersteigt
        if not ev.check_distance(ev_trip.distance):
            ev_trip.distance = ev.max_distance()
                    
        # Fahrvorgang -> update SOC 
        ev.drive(ev_trip.distance)
        # Speichern des SOC des Fahrzeugs bei Ankunft
        ev_trip.SOC_end = round(ev.SOC, 1)
        # Fahrtdauer über Weglänge und durchschnittliche Geschwindigkeit berechnen
        ev_trip.trip_duration = round((ev_trip.distance / EV_SPEED) * 60)
        ev_trip.arrival = ev_trip.departure + ev_trip.trip_duration
        # Trip in Wegetagebuch des Fahrzeugs ablegen
        ev.trips.append(ev_trip)
        # inkrementieren des Wegezählers
        ev.trip_no += 1
        ev_trip.trip_no = ev.trip_no
        # Trip_id = Fahrzeugnummer(= i).Tripnummer 
        ev_trip.trip_id = float(f"{i}.{ev.trip_no}")
        # Update der aktuellen Zeit
        time = round(ev_trip.arrival + ev_trip.stay_duration)
    
    # Ladevorgang bei Ankunft vom letzten Trip
    # Code wiederholen suboptimal -> überarbeiten
    if ev.SOC < 100:
        location = ev.trips[len(ev.trips)-1].whyto
        # obere Schranke der Ladezeit, tatsächliche wird in ev.charge() bestimmt
        duration = ev.trips[len(ev.trips)-1].stay_duration
        if CHARGE_SCEN == 1:
            # geladen wird nur wenn sich das Fahrzeug sich Zuhause befindet und sich dort länger als 15 Minuten aufhält
            if location == 1 and duration > 15:
                ev.charge()

        elif CHARGE_SCEN == 2:
            # geladen wird nur wenn sich das Fahrzeug Zuhause oder auf der Arbeit befindet und sich dort länger als 15 Minuten aufhält
            if (location == 1 or location == 2) and duration > 15:
                ev.charge()

        elif CHARGE_SCEN == 3:
            # geladen wird in jedem Zustand, sofern die Parkdauer 15 Minuten übersteigt
            if duration > 15:
                ev.charge()

    # Speichern des Fahrzeugs
    simulated_evs.append(ev)
    
    

In [7]:
from collections import defaultdict

# erzeugen Dictionary "Key : List"
total_trips_dict = defaultdict(list)

# speichern der Trips jedes einzelnen Fahrzeugs im dict
for ev in simulated_evs:
    for trip in ev.trips:
        # .__dict__.items() returns Dictionary mit allen Member Variablen und dazugehörigen Werten des Objekts
        for key, val in trip.__dict__.items():
            total_trips_dict[key].append(val)

# umwandeln in DataFrame
data = pd.DataFrame(total_trips_dict)

In [8]:
# Spaltennamen umformatieren
data = data.rename(str.capitalize, axis='columns')
data.head()

Unnamed: 0,Trip_id,Trip_no,Whyfrom,Whyto,Departure,Departure_t,Arrival,Trip_duration,Distance,Stay_duration,Soc_start,Soc_end,Charge_start,Charge_end
0,0.1,1,1,2,556.0,37,640.0,84.0,26.69,201,100.0,87.1,640.0,724.0
1,0.2,2,2,1,841.0,56,874.0,33.0,10.48,47,100.0,94.9,874.0,907.0
2,0.3,3,1,3,921.0,61,924.0,3.0,0.95,30,100.0,99.5,924.0,927.0
3,0.4,4,3,1,954.0,64,977.0,23.0,7.14,1379,100.0,96.5,977.0,999.0
4,1.1,1,1,2,420.0,28,474.0,54.0,17.01,232,100.0,95.8,474.0,525.0


# Überprüfe Ergebnisse

In [9]:
# ACHTUNG: Vorgehen ändern wenn Abfolge mehrerer Tage simuliert wird?
data["Arrival"] = data["Arrival"].apply(lambda x: x-1440 if x > 1440 else x)
data["Charge_start"] = data["Charge_start"].apply(lambda x: x-1440 if x > 1440 else x)
data["Charge_end"] = data["Charge_end"].apply(lambda x: x-1440 if x > 1440 else x)
data["Charge_end"] = data["Charge_end"].apply(lambda x: x-1440 if x > 1440 else x)

In [10]:
path = os.getcwd()+"\\Simulationsergebnisse"
if not os.path.exists(path):
    os.makedirs(path)

pickle.dump(data, open(path+"\\Results_5_WT_250k_CS3.pickle", "wb"))