## Take-home 1 Group 3
Julian Gootzen 1676512 <br>
Wout Beks <br>
Joep van den Hurk <br>

# Exercise 1
Consider the data (time, event, lotnr) as discussed during the lecture for an initially empty system. <br>

Implement the EPT-algorithm as described in the paper C.P.L. Veeger, L.F.P. Etman, E. Lefeber, I.J.B.F. Adan, J. van Herk, J.E. Rooda, Predicting cycle time distributions for integrated processing workstations: an aggregate modeling approach, In: IEEE Transactions on Semiconductor Manufacturing 24(2), 223-236, 2011 and verify that it reproduces the realizations for EPTs and overtaking distributions as described during the lecture.

In [None]:
# Read the file and parse the events
filename = "EPTdata-Lecture-Lefeber-1-empty-buffer.txt"

# Load data
events = []
with open(filename, 'r') as f:
    for line in f:
        parts = line.strip().split('\t')
        time = int(parts[0])
        lot = parts[1]
        event_type = parts[2]
        # Extract lot number from lot name
        lot_number = int(lot.replace('lot', ''))
        events.append((time, event_type, lot_number))

# EPT Algorithm

def detOvert(xs, i):
    """
    Determine the overtaking status of lots in the system.
    :param xs: List of tuples (arrival_number, aw) for lots in the system
    :param i: Arrival number of the departing lot
    :return: Tuple containing the updated list of lots, number of overtaken lots, and WIP level at arrival
    """
    ys = []  # List to store lots that remain after processing
    overtaken_count = 0  # Number of overtaken lots
    aw = 0  # WIP level at the time of the departing lot's arrival

    for idx, (j, wip) in enumerate(xs):
        if j < i:
            ys.append((j, wip))  # Keep lots that arrived before the departing lot
        elif j == i:
            aw = wip  # WIP level when the departing lot arrived
            # Count all lots that arrived after this lot but departed earlier
            overtaken_count = len(xs[:idx])  # Lots before this index are overtaken
            break

    # Remove the departing lot from the list
    ys.extend(xs[idx + 1:])

    return ys, overtaken_count, aw

xs = []  # list of (arrival_number, aw)
s = None  # EPT start time
sw = None  # WIP-level after EPT start

# Outputs
EPT_results = []  # (EPT_duration, WIP-level)
Overtaking_results = []  # (number_overtaken, aw)

for (tau, ev, i) in events:
    if ev == 'A':
        if len(xs) == 0:
            s, sw = tau, 1
        xs.append((i, len(xs)))
    elif ev == 'D':
        # End current EPT
        EPT_duration = tau - s
        EPT_results.append((EPT_duration, sw))
        
        xs, k, aw = detOvert(xs, i)
        Overtaking_results.append((k, aw, i))

        if len(xs) > 0:
            s, sw = tau, len(xs)

# Display Results
print("EPT Durations and WIP-levels:")
for duration, wip in EPT_results:
    print(f"EPT Duration: {duration}, WIP-level: {wip}")

print("\nOvertaking Details:")
for overtaken, aw, i in Overtaking_results:
    print(f"Lot: {i} Number of overtaken lots: {overtaken}, WIP before arrival: {aw}")

# Calculate and display mean EPT duration
mean_EPT = sum(duration for duration, _ in EPT_results) / len(EPT_results)
print(f"\nMean EPT Duration: {mean_EPT:.2f}")


EPT Durations and WIP-levels:
EPT Duration: 4, WIP-level: 1
EPT Duration: 2, WIP-level: 3
EPT Duration: 1, WIP-level: 3
EPT Duration: 4, WIP-level: 2
EPT Duration: 1, WIP-level: 3
EPT Duration: 3, WIP-level: 2
EPT Duration: 0, WIP-level: 1

Overtaking Details:
Lot: 1 Number of overtaken lots: 0, WIP before arrival: 0
Lot: 2 Number of overtaken lots: 0, WIP before arrival: 1
Lot: 3 Number of overtaken lots: 0, WIP before arrival: 2
Lot: 5 Number of overtaken lots: 1, WIP before arrival: 3
Lot: 4 Number of overtaken lots: 0, WIP before arrival: 3
Lot: 6 Number of overtaken lots: 0, WIP before arrival: 2
Lot: 7 Number of overtaken lots: 0, WIP before arrival: 3

Mean EPT Duration: 2.14


# Exercise 2
Assume now a system which is not initially empty, that is, assume that the data is only available after time 5. <br>

Based on this data, determine which lots are initially in the workstation. Next, using this information, determine (by hand) which of the EPT-realizations and overtaking-realizations you can still determine correctly. Clearly explain this reasoning in your report. Keep in mind that you do not know the order of arrival of the lots that are initially in the system (i.e., lot 3 might have arrived earlier than lot 2). Also implement an algorithm that automatically derives these EPT-realizations and overtaking-realizations from the input data.

In [4]:
filename = "EPTdata-Lecture-Lefeber-1-full-buffer.txt"

# Load data
events = []
arrival_set = set()
departure_set = set()

with open(filename, 'r') as f:
    for line in f:
        parts = line.strip().split('\t')
        time = int(parts[0])
        lot = parts[1]
        event_type = parts[2]
        lot_number = int(lot.replace('lot', ''))
        events.append((time, event_type, lot_number))
        if event_type == 'A':
            arrival_set.add(lot_number)
        elif event_type == 'D':
            departure_set.add(lot_number)

# Handle missing arrivals
missing_arrivals = departure_set - arrival_set
for lot in missing_arrivals:
    events.insert(0, (0, 'A', lot))  # Add synthetic arrival at time 0

# Re-sort events to ensure proper order
events.sort()

# EPT Algorithm

def detOvert(xs, i, arrived_before_start):
    """
    Determine the overtaking status of lots in the system.
    :param xs: List of tuples (arrival_number, aw) for lots in the system
    :param i: Arrival number of the departing lot
    :return: Tuple containing the updated list of lots, number of overtaken lots, and WIP level at arrival
    """
    ys = []  # List to store lots that remain after processing
    overtaken_count = 0  # Number of overtaken lots
    aw = 0  # WIP level at the time of the departing lot's arrival
    idx = 0

    for idx, (j, wip) in enumerate(xs):
        if j < i:
            ys.append((j, wip))  # Keep lots that arrived before the departing lot
        elif j == i:
            aw = wip  # WIP level when the departing lot arrived
            overtaken_count = len(xs[:idx])  # Lots before this index are overtaken
            break

    # Remove the departing lot from the list
    ys.extend(xs[idx + 1:])

    return ys, overtaken_count + len(arrived_before_start), aw

xs = []  # list of (arrival_number, aw)
s = None  # EPT start time
sw = None  # WIP-level after EPT start

# Outputs
EPT_results = []  # (EPT_duration, WIP-level)
Overtaking_results = []  # (number_overtaken, aw, lot_number)

arrived_lots = []  # List to keep track of lots that arrived before EPT start
departed_lots = []  # List to keep track of departed lots

for (tau, ev, i ) in events:
    if ev == 'A':
        arrived_lots.append(i)  # Keep track of lots that arrived before EPT start
    elif ev == 'D':
        departed_lots.append(i)  # Keep track of departed lots

arrived_before_start = set(departed_lots) - set(arrived_lots)  # Lots that arrived before EPT start and have not departed

for (tau, ev, i) in events:
    if ev == 'A':
        arrived_lots.append(i)  # Keep track of arrived lots
        if len(xs) == 0:
            s, sw = tau, 1
        xs.append((i, len(xs)))
    elif ev == 'D':
        if i in arrived_before_start:
            arrived_before_start.remove(i)
            continue

        EPT_duration = tau - s
        if i not in missing_arrivals:
            EPT_results.append((EPT_duration, sw))

        xs, k, aw = detOvert(xs, i, arrived_before_start)
        if i not in missing_arrivals:
            Overtaking_results.append((k, aw, i))  # Always include

        if len(xs) > 0:
            s, sw = tau, len(xs)

# Display Results
print("EPT Durations and WIP-levels:")
for duration, wip in EPT_results:
    print(f"EPT Duration: {duration}, WIP-level: {wip}")

print("\nOvertaking Details:")
for overtaken, aw, i in Overtaking_results:
    print(f"Lot {i}: Number of overtaken lots: {overtaken}, WIP before arrival: {aw}")

# Calculate and display mean EPT duration
if EPT_results:
    mean_EPT = sum(duration for duration, _ in EPT_results) / len(EPT_results)
    print(f"\nMean EPT Duration: {mean_EPT:.2f}")
else:
    print("\nNo valid EPT records found.")

EPT Durations and WIP-levels:
EPT Duration: 4, WIP-level: 2
EPT Duration: 3, WIP-level: 2
EPT Duration: 0, WIP-level: 1

Overtaking Details:
Lot 5: Number of overtaken lots: 1, WIP before arrival: 3
Lot 6: Number of overtaken lots: 0, WIP before arrival: 2
Lot 7: Number of overtaken lots: 0, WIP before arrival: 3

Mean EPT Duration: 2.33


# Exercise 3
Implement an algorithm which extracts the arrival and departure data of lots for Workstation 1 from your file group??.txt, so that you can feed it to your algorithm of the previous exercise to determine the EPT-realizations and overtaking-realizations of lots at Workstation 1. <br>

# Exercise 4
Implement an algorithm which extracts the arrival and departure data of batches for Workstation 2 from your file group??.txt, so that you can feed it to your algorithm of the previous exercise to determine the EPT-realizations and overtaking-realizations of batches at Workstation 2. Assume that the arrival of a batch to the workstation is determined by the arrival of the final lot of that batch to the workstation (as in the EPT-batch-example in Lecture Lefeber-1 about the control framework).

In [32]:
# EXERCISE 3 + 4
def next_wstation(workstation: str) -> str:
    """
    Get the next workstation based on the current one.
    :param workstation: Current workstation
    :return: Next workstation
    """
    id = workstation[-1]
    next_id = int(id) + 1
    if next_id > 3:
        return None  # No more workstations
    return f"W{next_id}"

# Read the file and parse the events
filename = "group03.txt"
filter = "W2"  # Set to "W1","W2" or "W3" to filter the data per workstation, False to not filter

# Load data
events = []
if filename.startswith("group03"):
    with open(filename, 'r') as f:
        for line in f:
            parts = line.strip().split('\t')
            time = int(parts[0])
            lot = parts[1]
            event_type = parts[2]
            if event_type == 'Created':
                workstation = 'W0'  # initial workstation
                event_type = 'D'
            elif event_type[0] == 'W':
                workstation = parts[2]
                event_type = parts[3]
            # Extract lot number from lot name
            lot_number = int(lot.replace('lot', ''))
            if workstation == filter:
                events.append((time, workstation, event_type, lot_number))
            # add corresponding arrival event at next workstation
            next_workstation = next_wstation(workstation)
            if next_workstation != None and next_workstation == filter:
                events.append((time, next_workstation, 'A', lot_number))
else:
    with open(filename, 'r') as f:
        for line in f:
            parts = line.strip().split('\t')
            time = int(parts[0])
            lot = parts[1]
            event_type = parts[2]
            # Extract lot number from lot name
            lot_number = int(lot.replace('lot', ''))
            workstation = 'W1'
            events.append((time, workstation, event_type, lot_number))

# EPT Algorithm
def detOvert(xs, i, arrived_before_start):
    """
    Determine the overtaking status of lots in the system.
    :param xs: List of tuples (arrival_number, aw) for lots in the system
    :param i: Arrival number of the departing lot
    :return: Tuple containing the updated list of lots, number of overtaken lots, and WIP level at arrival
    """
    ys = []  # List to store lots that remain after processing
    overtaken_count = 0  # Number of overtaken lots
    aw = 0  # WIP level at the time of the departing lot's arrival
    idx = 0

    for idx, (j, wip) in enumerate(xs):
        if j < i:
            ys.append((j, wip))  # Keep lots that arrived before the departing lot
        elif j == i:
            aw = wip  # WIP level when the departing lot arrived
            overtaken_count = len(xs[:idx])  # Lots before this index are overtaken
            break

    ys.extend(xs[idx + 1:])
    return ys, overtaken_count + len(arrived_before_start), aw

xs = []
s = None
sw = None

# Outputs
EPT_results = []
Overtaking_results = []

arrived_lots = []
departed_lots = []

prev_w2_depture = None
prev_lot = None
w2_arrival_times = []
batches = []
batch_arrived = []
current_batch = []

for (tau, wstation, ev, i) in events:
    if ev == 'A':
        arrived_lots.append(i)
    elif ev == 'D':
        departed_lots.append(i)

    if wstation == 'W2' and ev == 'D':
        if prev_w2_depture == tau or prev_w2_depture is None:
            current_batch.append(i)
        else:
            batches.append(current_batch)
            for lot in w2_arrival_times.copy():
                if lot[0] == prev_lot:
                    batch_arrived.append(lot[1])
                    w2_arrival_times.remove(lot)
                if lot[0] in current_batch[0:-1]:
                    w2_arrival_times.remove(lot)
            current_batch = [i]
        prev_w2_depture = tau
        prev_lot = i

    if wstation == 'W2' and ev == 'A':
        w2_arrival_times.append([i, tau])

arrived_before_start = set(departed_lots) - set(arrived_lots)
if current_batch:
    batches.append(current_batch)

for (tau, wstation, ev, i) in events:
    if ev == 'A':
        arrived_lots.append(i)
        if len(xs) == 0:
            s, sw = tau, 1
        xs.append((i, len(xs)))
    elif ev == 'D':
        if i in arrived_before_start:
            arrived_before_start.remove(i)
            continue

        if wstation == 'W2':
            for (batch, arrival) in zip(batches, batch_arrived):
                if i in batch:
                    s = arrival
                    if i == batch[-1]:
                        batches.remove(batch)
                        batch_arrived.remove(arrival)
                    break

        EPT_duration = tau - s
        EPT_results.append((EPT_duration, sw))

        xs, k, aw = detOvert(xs, i, arrived_before_start)
        Overtaking_results.append((k, aw, i))

        if len(xs) > 0:
            s, sw = tau, len(xs)

# Display Results
print("EPT Durations and WIP-levels:")
for duration, wip in EPT_results:
    print(f"EPT Duration: {duration}, WIP-level: {wip}")

print("\nOvertaking Details:")
for overtaken, aw, i in Overtaking_results:
    print(f"Lot: {i} Number of overtaken lots: {overtaken}, WIP before arrival: {aw}")

mean_EPT = sum(duration for duration, _ in EPT_results) / len(EPT_results)
print(f"\nMean EPT Duration: {mean_EPT:.2f}")

from collections import defaultdict
import statistics

# Map lot number to workstation
lot_departures = {}
for (tau, wstation, ev, i) in events:
    if ev == 'D':
        lot_departures[i] = wstation

ept_per_ws = defaultdict(list)
overtaking_per_ws = defaultdict(list)

for (duration, wip), (overtaken, aw, i) in zip(EPT_results, Overtaking_results):
    wstation = lot_departures.get(i)
    if wstation:
        ept_per_ws[wstation].append((duration, wip))
        overtaking_per_ws[wstation].append((overtaken, aw))

print("\n--- WIP-Dependent EPT Distributions ---")
for ws in sorted(ept_per_ws):
    durations = [d for d, _ in ept_per_ws[ws]]
    wips = [w for _, w in ept_per_ws[ws]]
    print(f"\nWorkstation {ws}:")
    print(f"Total Observations: {len(durations)}")
    print("EPT Durations by WIP Level:")
    wip_groups = defaultdict(list)
    for duration, wip in ept_per_ws[ws]:
        wip_groups[wip].append(duration)
    for wip_level in sorted(wip_groups):
        vals = wip_groups[wip_level]
        print(f"  WIP {wip_level}: Count={len(vals)}, Mean={sum(vals)/len(vals):.2f}, StdDev={statistics.stdev(vals) if len(vals)>1 else 0:.2f}")

print("\n--- Overtaking Distributions ---")
for ws in sorted(overtaking_per_ws):
    overts = [o for o, _ in overtaking_per_ws[ws]]
    aws = [a for _, a in overtaking_per_ws[ws]]
    print(f"\nWorkstation {ws}:")
    print(f"Total Departures: {len(overts)}")
    print(f"Mean overtaken lots: {sum(overts)/len(overts):.2f}")
    print(f"Mean WIP at arrival: {sum(aws)/len(aws):.2f}")

print("\n--- Mean and Coefficient of Variation of EPT per Workstation ---")
for ws in sorted(ept_per_ws):
    durations = [d for d, _ in ept_per_ws[ws]]
    mean_ept = sum(durations) / len(durations)
    stdev_ept = statistics.stdev(durations) if len(durations) > 1 else 0
    cv_ept = stdev_ept / mean_ept if mean_ept != 0 else 0
    print(f"Workstation {ws}: Mean EPT = {mean_ept:.2f}, Coefficient of Variation = {cv_ept:.2f}")


EPT Durations and WIP-levels:
EPT Duration: 450, WIP-level: 1
EPT Duration: 536, WIP-level: 8
EPT Duration: 536, WIP-level: 16
EPT Duration: 536, WIP-level: 15
EPT Duration: 536, WIP-level: 14
EPT Duration: 536, WIP-level: 13
EPT Duration: 536, WIP-level: 12
EPT Duration: 536, WIP-level: 11
EPT Duration: 536, WIP-level: 10
EPT Duration: 536, WIP-level: 9
EPT Duration: 536, WIP-level: 8
EPT Duration: 593, WIP-level: 7
EPT Duration: 593, WIP-level: 15
EPT Duration: 593, WIP-level: 14
EPT Duration: 593, WIP-level: 13
EPT Duration: 593, WIP-level: 12
EPT Duration: 593, WIP-level: 11
EPT Duration: 593, WIP-level: 10
EPT Duration: 593, WIP-level: 9
EPT Duration: 593, WIP-level: 8
EPT Duration: 593, WIP-level: 7
EPT Duration: 627, WIP-level: 6
EPT Duration: 627, WIP-level: 17
EPT Duration: 627, WIP-level: 16
EPT Duration: 627, WIP-level: 15
EPT Duration: 627, WIP-level: 14
EPT Duration: 627, WIP-level: 13
EPT Duration: 627, WIP-level: 12
EPT Duration: 627, WIP-level: 11
EPT Duration: 627, WIP