Skip to content

Commit

Permalink
Fix Spark worker not using spark-defaults.conf file configs
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelomendoncasoares committed Nov 17, 2023
1 parent ba13d19 commit 7623acd
Showing 1 changed file with 32 additions and 18 deletions.
50 changes: 32 additions & 18 deletions scripts/start_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ def _get_mem_from_cmd(cmd: str, divide_by: int) -> float:
return min(host_mem_gb, container_mem_gb)


def mem_to_str(mem_gb: float) -> str:
"""
Convert memory in GB to a string in GB or MB with the appropriate symbol.
"""
if round(mem_gb) > 0 and mem_gb % round(mem_gb) <= 0.01:
return f"{int(mem_gb)}GB"
return f"{int(mem_gb * 1024)}MB"


@dataclass
class MemoryConfig:
"""
Expand Down Expand Up @@ -155,6 +164,13 @@ def as_gb(self) -> float:
return self.min
return max(self.pct * total_mem_gb(), self.min)

def to_str(self) -> str:
"""
The memory configuration parsed as a string with the "GB" symbol, if
the memory is greater than 1GB, or "MB" otherwise.
"""
return mem_to_str(self.as_gb())


class ClusterSpec(TypedDict):
"""
Expand Down Expand Up @@ -222,21 +238,19 @@ def calculate(self) -> ClusterSpec:

return ClusterSpec(
driver_cores=self.driver_cores,
driver_memory=f"{round(self.driver_mem.as_gb(), 1)}G",
driver_memory=self.driver_mem.to_str(),
worker_cores=max(remaining_cores, executor_cores),
worker_memory=f"{round(remaining_mem, 1)}G",
worker_memory=mem_to_str(remaining_mem),
executor_instances=num_executors,
executor_cores=executor_cores,
executor_memory=f"{round(executor_mem, 1)}G",
executor_memory=mem_to_str(executor_mem),
)

def _apply_to_conf(self) -> None:
def _apply_to_conf(self, spark_conf: ClusterSpec) -> None:
"""
Idempotently apply the cluster specifications to the conf file.
"""

# Calculated configs plus the default auto scale configs, if set.
spark_conf = self.calculate()
if self.auto_scale:
max_executors = spark_conf["executor_instances"]
spark_conf = {
Expand Down Expand Up @@ -264,31 +278,31 @@ def start(self) -> None:
"""
Start the calculated cluster.
"""
self._apply_to_conf()
conf = self.calculate()
self._apply_to_conf(conf)

start_prefix = "/opt/spark/sbin/start"
print(f"Starting master...")
subprocess.run([f"{start_prefix}-master.sh"], check=True)
print(f"Starting worker...")
subprocess.run([f"{start_prefix}-worker.sh", "spark://0.0.0.0:7077"], check=True)

conf = self.calculate()
def fmt_mem(
name: Literal['driver_memory', 'worker_memory', 'executor_memory'],
) -> str:
return f"{conf[name].replace('G', '')} GB RAM"
print(f"Starting worker...")
worker_command = [f"{start_prefix}-worker.sh", "spark://0.0.0.0:7077"]
worker_mem = ["--memory", conf["worker_memory"]]
worker_cores = ["--cores", str(conf["worker_cores"])]
subprocess.run([*worker_command, *worker_mem, *worker_cores], check=True)

print(
f"# \n"
f"# Spark standalone cluster started with success.\n"
f"# The cluster have been configured to preserve {self.free_cores} core(s) "
f"and {round(self.free_mem.as_gb(), 1)}GB of memory for \n"
f"and {self.free_mem.to_str()} of memory for \n"
f"# the OS and other apps, using the following resources:\n"
f"# \n"
f"# * Driver: {conf['driver_cores']} cores / {fmt_mem('driver_memory')}\n"
f"# * Worker: {conf['worker_cores']} cores / {fmt_mem('worker_memory')}\n"
f"# * Driver: {conf['driver_cores']} cores / {conf['driver_memory']} RAM\n"
f"# * Worker: {conf['worker_cores']} cores / {conf['worker_memory']} RAM\n"
f"# \n"
f"# The worker will spawn {conf['executor_instances']} executor(s) with "
f"{conf['executor_cores']} cores and {fmt_mem('executor_memory')}. "
f"{conf['executor_cores']} cores and {conf['executor_memory']} RAM. "
f"{'Auto scale is enabled.' if self.auto_scale else ''}\n"
f"# "
)
Expand Down

0 comments on commit 7623acd

Please sign in to comment.