Skip to content

Commit

Permalink
daemon seems to be working as intended
Browse files Browse the repository at this point in the history
  • Loading branch information
louisponet committed Jul 28, 2021
1 parent c4ce90b commit 9ce9715
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 80 deletions.
8 changes: 4 additions & 4 deletions Project.toml
Expand Up @@ -13,6 +13,7 @@ Crayons = "a8cc5b0e-0ffa-5ad4-8c14-923d3ee1735f"
DaemonMode = "d749ddd5-2b29-4920-8305-6ff5a704e36e"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
DelimitedFiles = "8bb1440f-4735-579b-a4ab-409b98df4dab"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36"
Expand All @@ -30,10 +31,13 @@ spglib_jll = "ac4a9f1e-bdb2-5204-990c-47c8b2f70d4e"

[compat]
BinDeps = "^1"
CodeTracking = "^1"
ColorTypes = "^0.11"
Colors = "^0.12"
Crayons = "^4"
DaemonMode = "^0.1"
JLD2 = "^0.4"
LoggingExtras = "^0.4"
Media = "^0.5"
Parameters = "^0.12"
RecipesBase = "^1"
Expand All @@ -42,10 +46,6 @@ Requires = "^1"
StaticArrays = "^1"
Unitful = "^1"
julia = "^1"
LoggingExtras = "^0.4"
CodeTracking = "^1"
DaemonMode = "^0.1"


[extras]
BinDeps = "9e28174c-4ba2-5203-b857-d8d62c4213ee"
Expand Down
3 changes: 1 addition & 2 deletions src/DFControl.jl
Expand Up @@ -83,10 +83,9 @@ const dfprintln = println
const dfprint = print
include("display/overloads.jl")

using DaemonMode, Pkg, LoggingExtras
using DaemonMode, Pkg, LoggingExtras, Distributed
include("daemon.jl")


using Requires

function __init__()
Expand Down
191 changes: 119 additions & 72 deletions src/daemon.jl
@@ -1,11 +1,10 @@
const DAEMON_CONFIG_PATH = config_path("daemon.jld2")
const DAEMON_CONFIG_PATH = config_path("daemon.jld2")
delete_daemon_config!() = rm(DAEMON_CONFIG_PATH)

@with_kw mutable struct Daemon
port::Int = 9584
pid::Int = -1
job_dirs::Vector{String} = String[]
tasks::Vector{Task} = Task[]
pid::Int = -1
job_dirs_procs::Dict = Dict{String,Tuple{Int,Future}}()
query_time::Int = 10
started::Bool = false
main_loop = nothing
Expand All @@ -14,79 +13,94 @@ end
function isalive(d::Daemon)
d.pid == -1 && return false
try
run(pipeline(`ps -p $(d.pid)`, devnull));
run(pipeline(`ps -p $(d.pid)`, devnull))
return true
catch
return false
end
end

save(d::Daemon) =
JLD2.save(DAEMON_CONFIG_PATH, "port", d.port, "pid", d.pid, "query_time", d.query_time, "job_dirs", d.job_dirs, "started", d.started)
function save(d::Daemon)
return JLD2.save(DAEMON_CONFIG_PATH, "port", d.port, "pid", d.pid, "query_time",
d.query_time, "job_dirs_procs", d.job_dirs_procs, "started", d.started)
end

function Daemon(config_dir::String)
data = JLD2.load(DAEMON_CONFIG_PATH)
return Daemon(port = data["port"], pid=data["pid"], query_time = data["query_time"], job_dirs=data["job_dirs"], started=data["started"])
return Daemon(; port = data["port"], pid = data["pid"], query_time = data["query_time"],
job_dirs_procs = data["job_dirs_procs"], started = data["started"])
end

function start(d::Daemon)
julia_exec = joinpath(Sys.BINDIR, "julia")
project_path = Pkg.project().path
d.pid = getpid(run(`$julia_exec -t auto -e "using DFControl; DAEMON = DFControl.server_start($(d.port)); using DaemonMode; serve($(d.port), true)"`, wait=false))
p = run(Cmd(`$julia_exec -t auto -e "using DFControl; DAEMON = DFControl.server_start($(d.port)); using DaemonMode; serve($(d.port), true)"`; detach = true); wait = false)
d.pid = getpid(p)
@info "Daemon started, listening on port $(d.port), with PID $(d.pid)."
d.started = false
save(d)
return d
end

is_started(d::Daemon) =
JLD2.load(DAEMON_CONFIG_PATH)["started"]
is_started(d::Daemon) = JLD2.load(DAEMON_CONFIG_PATH)["started"]

function server_start(port::Int)
with_logger(daemon_logger()) do
d = Daemon(port = port, pid = getpid())
if ispath(DAEMON_CONFIG_PATH)
prev_daemon = Daemon(DAEMON_CONFIG_PATH)
d.query_time = prev_daemon.query_time
for j in prev_daemon.job_dirs
if exists_job(j)
tjob = DFJob(j)
push!(d.tasks, run_queue(tjob; sleep_time = d.query_time))
push!(d.job_dirs, j)
d = Daemon(; port = port, pid = getpid())
if ispath(DAEMON_CONFIG_PATH)
prev_daemon = Daemon(DAEMON_CONFIG_PATH)
d.query_time = prev_daemon.query_time
for j in keys(prev_daemon.job_dirs_procs)
if exists_job(j)
tjob = DFJob(j)
spawn_worker(d, tjob)
end
end
end
end
d.main_loop = @async main_loop(d)
d.started = true
save(d)
return d
d.main_loop = @async main_loop(d)
d.started = true
save(d)
return d
end
end

daemon_logger() = FileLogger(config_path("daemon.log"), append=true)
daemon_logger() = FileLogger(config_path("daemon.log"); append = true)
function workflow_logger(job::DFJob)
return TeeLogger(MinLevelLogger(FileLogger(joinpath(job, ".workflow", "info.log");
append = true), Logging.Info),
MinLevelLogger(FileLogger(joinpath(job, ".workflow", "info.log");
append = true), Logging.Warn),
MinLevelLogger(FileLogger(joinpath(job, ".workflow", "error.log");
append = true), Logging.Error))
end

function main_loop(d::Daemon)
while true
to_rm = Int[]
for (i, (k, t)) in enumerate(zip(d.job_dirs, d.tasks))
if istaskfailed(t)
@warn """Workflow in job directory $(k) failed.
See $(joinpath(k, ".workflow/error.log")) for more info."""
push!(to_rm, i)
elseif istaskdone(t)
@info """Workflow for job directory $(k) done."""
push!(to_rm, i)
to_rm = String[]
for (k, t) in d.job_dirs_procs
if isready(t[2])
try
t_ = fetch(t[2])
@info """Workflow for job directory $(k) done."""
push!(to_rm, k)
catch e
@warn """Workflow in job directory $(k) failed.
See $(joinpath(k, ".workflow/error.log")) for more info."""
push!(to_rm, k)
end
end
end
if !isempty(to_rm)
deleteat!(d.job_dirs, to_rm)
deleteat!(d.tasks, to_rm)
for r in to_rm
rmprocs(d.job_dirs_procs[r][1])
end
pop!.((d.job_dirs_procs,), to_rm)
save(d)
end
sleep(d.query_time)
end
end

function init_daemon()
if ispath(DAEMON_CONFIG_PATH)
d = Daemon(DAEMON_CONFIG_PATH)
Expand All @@ -100,36 +114,38 @@ function init_daemon()
end
end

kill_daemon(d::Daemon) =
run(`kill $(d.pid)`)

function run_queue(job::DFJob; sleep_time = 10)
Threads.@spawn begin
logger = TeeLogger(MinLevelLogger(FileLogger(joinpath(job, ".workflow", "info.log"), append=true), Logging.Info),
MinLevelLogger(FileLogger(joinpath(job, ".workflow", "info.log"), append=true), Logging.Warn),
MinLevelLogger(FileLogger(joinpath(job, ".workflow", "error.log"), append=true), Logging.Error))
with_logger(logger) do
qd = queued_dir(job)
queued_steps = readdir(qd)
fd = finished_dir(job)
if !ispath(qd) || isempty(queued_steps)
return
kill_daemon(d::Daemon) = run(`kill $(d.pid)`)

function run_queue(job::DFJob, ctx::Dict; sleep_time = 10)
logger = workflow_logger(job)
with_logger(logger) do
qd = queued_dir(job)
queued_steps = readdir(qd)
order = sortperm(parse.(Int, getindex.(splitext.(queued_steps), 1)))
fd = finished_dir(job)
if !ispath(qd) || isempty(queued_steps)
return
end
if !ispath(fd)
mkpath(fd)
else
for f in readdir(fd)
rm(joinpath(fd, f))
end
for f in queued_steps
step_file = joinpath(qd, f)
t = include(step_file)
@eval $(t)($(job))
while isrunning(job)
sleep(sleep_time)
yield()
end
if !ispath(fd)
mkpath(fd)
end
mv(step_file, joinpath(fd, f), force=true)
end

for f in queued_steps[order]
step_file = joinpath(qd, f)
t = include(step_file)
@eval $(t)($(job), $(ctx))
JLD2.save(joinpath(job, ".workflow/ctx.jld2"), "ctx", ctx)
while isrunning(job)
sleep(sleep_time)
end
mv(step_file, joinpath(fd, f); force = true)
end
end
return true
end

queued_dir(job::DFJob, args...) = joinpath(job, ".workflow/queued", args...)
Expand All @@ -143,10 +159,37 @@ function queue_steps(job::DFJob, funcs)
prev_steps = readdir(qd)
last_step = !isempty(prev_steps) ? parse(Int, splitext(prev_steps[end])[1]) : 0
for (i, f) in enumerate(funcs)
write(joinpath(qd, "$(i + last_step).jl"), @code_string f(job))
write(joinpath(qd, "$(i + last_step).jl"), @code_string f(job, Dict{Symbol,Any}()))
end
end

function write_workflow_files(job::DFJob)
all = string.(values(Base.loaded_modules))
valid = String[]
ks = keys(Pkg.project().dependencies)
for p in all
if p ks
push!(valid, p)
end
end
JLD2.save(joinpath(job, ".workflow/environment.jld2"), "modules", valid, "project",
Base.current_project())
return JLD2.save(joinpath(job, ".workflow/ctx.jld2"), "ctx", Dict{Symbol,Any}())
end

function spawn_worker(d::Daemon, job::DFJob)
env_dat = JLD2.load(joinpath(job, ".workflow/environment.jld2"))
proc = addprocs(1; exeflags = "--project=$(env_dat["project"])")[1]
using_expr = Base.Meta.parse("""using $(join(env_dat["modules"], ", "))""")
ctx_path = joinpath(job, ".workflow/ctx.jld2")

Distributed.remotecall_eval(Distributed.Main, proc, using_expr)
f = remotecall(DFControl.run_queue, proc, job, DFControl.JLD2.load(ctx_path)["ctx"];
sleep_time = d.query_time)
d.job_dirs_procs[job.local_dir] = (proc, f)
return save(d)
end

function clear_queue!(job::DFJob)
qd = queued_dir(job)
if !ispath(qd)
Expand All @@ -160,12 +203,16 @@ end
queued(job::DFJob) = readdir(queued_dir(job))
finished(job::DFJob) = readdir(finished_dir(job))

function submit_queue(job::DFJob, d::Daemon)
runexpr("""
DAEMON.jobs_tasks["$(job.local_dir)"] = DFControl.run_queue(DFJob("$(job.local_dir)"))
DFControl.save(DAEMON)
""", port=d.port)
function submit_workflow(job::DFJob, funcs, d::Daemon = init_daemon())
queue_steps(job, funcs)
write_workflow_files(job)
while !is_started(d)
sleep(1)
end
return runexpr("""
DFControl.spawn_worker(DAEMON, DFJob("$(job.local_dir)"))
DFControl.save(DAEMON)
"""; port = d.port)
end



mods_test() = Base.loaded_modules
1 change: 0 additions & 1 deletion src/job.jl
Expand Up @@ -312,4 +312,3 @@ isarchived(job::DFJob) = occursin(".archived", job.local_dir)


exists_job(d::AbstractString) = ispath(d) && ispath(joinpath(d, "job.tt"))

2 changes: 1 addition & 1 deletion src/qe/constants.jl
Expand Up @@ -65,7 +65,7 @@ push!(QECalculationInfos,
QEFlagInfo{String}(:spin_component,
"'none', 'up' or 'down'"),
QEFlagInfo{Bool}(:write_spn,
"Write .spn matrix elements."),
"Write .spn matrix elements."),
QEFlagInfo{Bool}(:write_mmn,
"compute M_mn matrix"),
QEFlagInfo{Bool}(:write_amn,
Expand Down

0 comments on commit 9ce9715

Please sign in to comment.