diff --git a/cacheflow/core/scheduler.py b/cacheflow/core/scheduler.py index 7656ae978ec0..02e864e626c3 100644 --- a/cacheflow/core/scheduler.py +++ b/cacheflow/core/scheduler.py @@ -1,21 +1,16 @@ import enum -import os -import pickle import time -from typing import Any, Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from cacheflow.core.block_manager import BlockSpaceManager from cacheflow.logger import init_logger from cacheflow.core.policy import PolicyFactory from cacheflow.sampling_params import SamplingParams -from cacheflow.sequence import Sequence -from cacheflow.sequence import SequenceGroup -from cacheflow.sequence import SequenceGroupMetadata -from cacheflow.sequence import SequenceOutputs -from cacheflow.sequence import SequenceStatus - +from cacheflow.sequence import (Sequence, SequenceGroup, SequenceGroupMetadata, + SequenceOutputs, SequenceStatus) logger = init_logger(__name__) + _LOGGING_INTERVAL_SEC = 10 @@ -129,7 +124,6 @@ def _schedule( # Swap in the sequence groups in the SWAPPED state if possible. self.swapped = self.policy.sort_by_priority(now, self.swapped) - # FCFS while self.swapped and not blocks_to_swap_out: seq_group = self.swapped[0] # If the sequence group has been preempted in this step, stop. @@ -162,7 +156,9 @@ def _schedule( # This is because we want to bound the amount of CPU memory taken by # the swapped sequence groups. if not self.swapped: - self.waiting = self.policy.sort_by_priority(now, self.waiting) + # Optimization: We do not sort the waiting queue since the preempted + # sequence groups are added to the front and the new sequence groups + # are added to the back. while self.waiting: seq_group = self.waiting[0] # If the sequence group has been preempted in this step, stop. @@ -347,7 +343,6 @@ def _allocate(self, seq_group: SequenceGroup) -> None: self.block_manager.allocate(seq_group) for seq in seq_group.seqs: seq.status = SequenceStatus.RUNNING - # FIXME(woosuk): Support interactive generation. if seq_group.group_id not in self.num_steps: self.num_steps[seq_group.group_id] = 0 @@ -404,7 +399,9 @@ def _preempt_by_recompute( for seq in seqs: seq.status = SequenceStatus.WAITING self.block_manager.free(seq) - self.waiting.append(seq_group) + # NOTE: For FCFS, we insert the preempted sequence group to the front + # of the waiting queue. + self.waiting.insert(0, seq_group) def _preempt_by_swap( self, diff --git a/cacheflow/core/server.py b/cacheflow/core/server.py index 2f968f8b4e24..a35a27bcb129 100644 --- a/cacheflow/core/server.py +++ b/cacheflow/core/server.py @@ -17,7 +17,6 @@ from cacheflow.utils import get_gpu_memory, get_cpu_memory from cacheflow.worker.controller import Controller, DeviceID - logger = init_logger(__name__) diff --git a/cacheflow/frontend/simple_frontend.py b/cacheflow/frontend/simple_frontend.py index da3639530cd6..9d65e4f0de55 100644 --- a/cacheflow/frontend/simple_frontend.py +++ b/cacheflow/frontend/simple_frontend.py @@ -7,7 +7,6 @@ from cacheflow.sequence import Sequence, SequenceGroup from cacheflow.utils import Counter - logger = init_logger(__name__) diff --git a/cacheflow/model_executor/memory_analyzer.py b/cacheflow/model_executor/memory_analyzer.py index bc85d6586ae0..fb910e6403de 100644 --- a/cacheflow/model_executor/memory_analyzer.py +++ b/cacheflow/model_executor/memory_analyzer.py @@ -4,7 +4,6 @@ from cacheflow.logger import init_logger from cacheflow.model_executor.utils import get_dtype_size - logger = init_logger(__name__) _GiB = 1 << 30