Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions cacheflow/core/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
import enum
import os
import pickle
import time
from typing import Any, Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple

from cacheflow.core.block_manager import BlockSpaceManager
from cacheflow.logger import init_logger
from cacheflow.core.policy import PolicyFactory
from cacheflow.sampling_params import SamplingParams
from cacheflow.sequence import Sequence
from cacheflow.sequence import SequenceGroup
from cacheflow.sequence import SequenceGroupMetadata
from cacheflow.sequence import SequenceOutputs
from cacheflow.sequence import SequenceStatus

from cacheflow.sequence import (Sequence, SequenceGroup, SequenceGroupMetadata,
SequenceOutputs, SequenceStatus)

logger = init_logger(__name__)

_LOGGING_INTERVAL_SEC = 10


Expand Down Expand Up @@ -129,7 +124,6 @@ def _schedule(

# Swap in the sequence groups in the SWAPPED state if possible.
self.swapped = self.policy.sort_by_priority(now, self.swapped)
# FCFS
while self.swapped and not blocks_to_swap_out:
seq_group = self.swapped[0]
# If the sequence group has been preempted in this step, stop.
Expand Down Expand Up @@ -162,7 +156,9 @@ def _schedule(
# This is because we want to bound the amount of CPU memory taken by
# the swapped sequence groups.
if not self.swapped:
self.waiting = self.policy.sort_by_priority(now, self.waiting)
# Optimization: We do not sort the waiting queue since the preempted
# sequence groups are added to the front and the new sequence groups
# are added to the back.
while self.waiting:
seq_group = self.waiting[0]
# If the sequence group has been preempted in this step, stop.
Expand Down Expand Up @@ -347,7 +343,6 @@ def _allocate(self, seq_group: SequenceGroup) -> None:
self.block_manager.allocate(seq_group)
for seq in seq_group.seqs:
seq.status = SequenceStatus.RUNNING
# FIXME(woosuk): Support interactive generation.
if seq_group.group_id not in self.num_steps:
self.num_steps[seq_group.group_id] = 0

Expand Down Expand Up @@ -404,7 +399,9 @@ def _preempt_by_recompute(
for seq in seqs:
seq.status = SequenceStatus.WAITING
self.block_manager.free(seq)
self.waiting.append(seq_group)
# NOTE: For FCFS, we insert the preempted sequence group to the front
# of the waiting queue.
self.waiting.insert(0, seq_group)

def _preempt_by_swap(
self,
Expand Down
1 change: 0 additions & 1 deletion cacheflow/core/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from cacheflow.utils import get_gpu_memory, get_cpu_memory
from cacheflow.worker.controller import Controller, DeviceID


logger = init_logger(__name__)


Expand Down
1 change: 0 additions & 1 deletion cacheflow/frontend/simple_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from cacheflow.sequence import Sequence, SequenceGroup
from cacheflow.utils import Counter


logger = init_logger(__name__)


Expand Down
1 change: 0 additions & 1 deletion cacheflow/model_executor/memory_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from cacheflow.logger import init_logger
from cacheflow.model_executor.utils import get_dtype_size


logger = init_logger(__name__)

_GiB = 1 << 30
Expand Down