Skip to content

Commit

Permalink
Various fixes to get planner to work
Browse files Browse the repository at this point in the history
  • Loading branch information
sarahwooders committed Feb 27, 2024
1 parent bfe6a81 commit c2197b2
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 22 deletions.
8 changes: 4 additions & 4 deletions scripts/pack_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ set +e

>&2 echo -e "${BGreen}Building docker image${NC}"
set -e
>&2 sudo DOCKER_BUILDKIT=1 docker build -t skyplane --platform linux/x86_64 .
>&2 DOCKER_BUILDKIT=1 docker build -t skyplane --platform linux/x86_64 .
set +e

DOCKER_URL="ghcr.io/$1/skyplane:local-$(openssl rand -hex 16)"
>&2 echo -e "${BGreen}Uploading docker image to $DOCKER_URL${NC}"
set -e
>&2 sudo docker tag skyplane $DOCKER_URL
>&2 sudo docker push $DOCKER_URL
>&2 sudo docker system prune -f
>&2 docker tag skyplane $DOCKER_URL
>&2 docker push $DOCKER_URL
>&2 docker system prune -f
set +e

>&2 echo -e "${BGreen}SKYPLANE_DOCKER_IMAGE=$DOCKER_URL${NC}"
Expand Down
3 changes: 3 additions & 0 deletions skyplane/api/dataplane.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def _start_gateway(

# write gateway programs
gateway_program_filename = Path(f"{gateway_log_dir}/gateway_program_{gateway_node.gateway_id}.json")
print(gateway_program_filename)
with open(gateway_program_filename, "w") as f:
f.write(gateway_node.gateway_program.to_json())

Expand Down Expand Up @@ -233,6 +234,8 @@ def provision(
def copy_gateway_logs(self):
# copy logs from all gateways in parallel
def copy_log(instance):
out_file = f"{self.transfer_dir}/gateway_{instance.uuid()}.stdout"
err_file = f"{self.transfer_dir}/gateway_{instance.uuid()}.stderr"
typer.secho(f"Downloading log: {self.transfer_dir}/gateway_{instance.uuid()}.stdout", fg="bright_black")
typer.secho(f"Downloading log: {self.transfer_dir}/gateway_{instance.uuid()}.stderr", fg="bright_black")

Expand Down
17 changes: 9 additions & 8 deletions skyplane/api/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(
# self.cloud_regions = cloud_regions
# TODO: set max instances with VM CPU limits and/or config
self.max_instances = max_instances
self.n_connections = n_connections
self.n_connections = num_connections
self.provisioner = provisioner
self.transfer_config = transfer_config
self.http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=3))
Expand All @@ -71,19 +71,20 @@ def __init__(
if self.planning_algorithm == "direct":
# TODO: should find some ways to merge direct / Ndirect
#self.planner = UnicastDirectPlanner(self.max_instances, num_connections)
self.planner = MulticastDirectPlanner(self.max_instances, self.n_connections, self.transfer_config)
#self.planner = MulticastDirectPlanner(self.max_instances, self.n_connections, self.transfer_config)
self.planner = MulticastDirectPlanner(self.max_instances, self.n_connections)
#elif self.planning_algorithm == "Ndirect":
# self.planner = MulticastDirectPlanner(self.max_instances, num_connections)
elif self.planning_algorithm == "MDST":
self.planner = MulticastMDSTPlanner(self.max_instances, num_connections)
elif self.planning_algorithm == "ILP":
self.planning_algorithm = MulticastILPPlanner(self.max_instances, num_connections)
self.planner = MulticastILPPlanner(self.max_instances, num_connections)
elif self.planning_algorithm == "UnicastILP":
self.planning_algorithm = UnicastILPPlanner(self.max_instances, num_connections)
elif self.planning_algorithm == "src_one_sided":
self.planner = DirectPlannerSourceOneSided(self.max_instances, self.n_connections, self.transfer_config)
elif self.planning_algorithm == "dst_one_sided":
self.planner = DirectPlannerDestOneSided(self.max_instances, self.n_connections, self.transfer_config)
self.planner = UnicastILPPlanner(self.max_instances, num_connections)
#elif self.planning_algorithm == "src_one_sided":
# self.planner = DirectPlannerSourceOneSided(self.max_instances, self.n_connections, self.transfer_config)
#elif self.planning_algorithm == "dst_one_sided":
# self.planner = DirectPlannerDestOneSided(self.max_instances, self.n_connections, self.transfer_config)
else:
raise ValueError(f"No such planning algorithm {planning_algorithm}")

Expand Down
11 changes: 8 additions & 3 deletions skyplane/api/transfer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def _run_multipart_chunk_thread(
for _ in range(num_chunks):
file_size_bytes = min(chunk_size_bytes, src_object.size - offset)
assert file_size_bytes > 0, f"file size <= 0 {file_size_bytes}"
print("partition", part_num, self.num_partitions)
chunk = Chunk(
src_key=src_object.key,
dest_key=dest_key, # dest_object.key, # TODO: upload basename (no prefix)
Expand Down Expand Up @@ -350,6 +351,7 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) ->
multipart_chunk_threads.append(t)

# begin chunking loop
part_num = 0
for transfer_pair in transfer_pair_generator:
# print("transfer_pair", transfer_pair.src_obj.key, transfer_pair.dst_objs)
src_obj = transfer_pair.src_obj
Expand All @@ -365,9 +367,11 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) ->
dest_key=transfer_pair.dst_key, # TODO: get rid of dest_key, and have write object have info on prefix (or have a map here)
chunk_id=uuid.uuid4().hex,
chunk_length_bytes=transfer_pair.src_obj.size,
partition_id=str(0), # TODO: fix this to distribute across multiple partitions
#partition_id=str(0), # TODO: fix this to distribute across multiple partitions
partition_id=str(part_num % self.num_partitions),
)
)
part_num += 1

if self.transfer_config.multipart_enabled:
# drain multipart chunk queue and yield with updated chunk IDs
Expand Down Expand Up @@ -688,9 +692,10 @@ def chunk_request(server, chunk_batch, n_added):
# send chunk requests to source gateways
chunk_batch = [cr.chunk for cr in batch if cr.chunk is not None]
# TODO: allow multiple partition ids per chunk
for chunk in chunk_batch: # assign job UUID as partition ID
chunk.partition_id = self.uuid
#for chunk in chunk_batch: # assign job UUID as partition ID
# chunk.partition_id = self.uuid
min_idx = queue_size.index(min(queue_size))
print([b.chunk.partition_id for b in batch if b.chunk])
n_added = 0
while n_added < len(chunk_batch):
# TODO: should update every source instance queue size
Expand Down
2 changes: 1 addition & 1 deletion skyplane/api/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import requests
from rich import print as rprint
from typing import Optional, Dict
from typing import Optional, Dict, List

import skyplane
from skyplane.utils.definitions import tmp_log_dir
Expand Down
4 changes: 2 additions & 2 deletions skyplane/gateway/gateway_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ def to_dict(self):
for p in program_all:
if p["value"] == program: # equivalent partition exists
for pid in partition_id:
p["partitions"].append(pid)
p["partitions"].append(str(pid))
exists = True
break
if not exists:
program_all.append({"value": program, "partitions": partition_id})
program_all.append({"value": program, "partitions": str(partition_id)})

return program_all

Expand Down
10 changes: 6 additions & 4 deletions skyplane/planner/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def verify_job_src_dsts(self, jobs: List[TransferJob], multicast=False) -> Tuple
return src_region_tag, dst_region_tags

@functools.lru_cache(maxsize=None)
def make_nx_graph(self, tp_grid_path: Optional[Path] = files("skyplane.data") / "throughput.csv") -> nx.DiGraph:
def make_nx_graph(self, tp_grid_path: Optional[Path] = files("data") / "throughput.csv") -> nx.DiGraph:
# create throughput / cost graph for all regions for planner
G = nx.DiGraph()
throughput = pd.read_csv(tp_grid_path)
Expand Down Expand Up @@ -101,6 +101,7 @@ def add_src_or_overlay_operator(
:param bucket_info: tuple of (bucket_name, bucket_region) for object store
:param dst_op: if None, then this is either the source node or a overlay node; otherwise, this is the destination overlay node
"""
g = solution_graph
# partition_ids are set of ids that follow the same path from the out edges of the region
any_id = partition_ids[0] - partition_offset
next_regions = set([edge[1] for edge in g.out_edges(region, data=True) if str(any_id) in edge[-1]["partitions"]])
Expand All @@ -127,7 +128,7 @@ def add_src_or_overlay_operator(
receive_op = dst_op

# find set of regions to send to for all partitions in partition_ids
g = solution_graph

region_to_id_map = {}
for next_region in next_regions:
region_to_id_map[next_region] = []
Expand Down Expand Up @@ -268,7 +269,7 @@ def logical_plan_to_topology_plan(self, jobs: List[TransferJob], solution_graph:
partitions,
partition_offset=i,
plan=plan,
obj_store=(src_bucket, node),
bucket_info=(src_bucket, node),
)

# dst receive data, write to object store, forward data if needed
Expand All @@ -287,7 +288,7 @@ def logical_plan_to_topology_plan(self, jobs: List[TransferJob], solution_graph:
# overlay node only forward data
else:
self.add_src_or_overlay_operator(
solution_graph, node_gateway_program, node, partitions, partition_offset=i, plan=plan, obj_store=None
solution_graph, node_gateway_program, node, partitions, partition_offset=i, plan=plan, bucket_info=None
)
region_to_gateway_program[node] = node_gateway_program
assert len(region_to_gateway_program) > 0, f"Empty gateway program {node}"
Expand Down Expand Up @@ -381,6 +382,7 @@ def logical_plan(
filter_edge: bool = False,
solver_verbose: bool = False,
save_lp_path: Optional[str] = None,
solver: Optional[str] = None,
) -> nx.DiGraph:
import cvxpy as cp

Expand Down

0 comments on commit c2197b2

Please sign in to comment.