In [2]:
# Set up some convenient functions.

# Print the memory usage for julia processes.
function memstats()
  println(run(`ps -C julia -o pid,%mem,%cpu`))
end


# See what is defined on each worker.
function remote_vars()
  for w in workers()
    println(remotecall_fetch(w, whos))
  end
end

# This is like @everywhere but only runs on a particular process.
macro runat(p, ex)
  quote
    remotecall_fetch($p, ()->(eval(Main,$(Expr(:quote,ex))); nothing))
  end
end

# This is like @everywhere but for remote workers only.
macro everywhereelse(ex)
  quote
    @sync begin
      for w in workers()
        @async remotecall_fetch(w, ()->(eval(Main,$(Expr(:quote,ex))); nothing))
      end
    end
  end
end


# Define some objects to pass around that are big enough to show up in memstats().
mat = rand(int(1e4), int(1e4));
mat2 = rand(int(1e4), int(1e4));

# Remove remote workers
for w in workers()
    if w != myid()
        rmprocs(w)
    end
end

memstats()

  PID %MEM %CPU
16809  1.1  0.4
20861 20.2  105
nothing


In [3]:
# Counterintuitively, `workers()` at first contains `[1]` but after adding workers does not.
println(workers())
addprocs(2)
println(workers())
memstats()

# Also: workers() does not necessarily return the same order everywhere.
@everywhere println(workers())

[1]
[2,3]
  PID %MEM %CPU
16809  1.1  0.4
20861 20.2 92.7
20874  0.4  183
20875  0.5  165
nothing


In [4]:
# Here's one way to communicate `mat` to the workers.

# First, set up the remote references.
memstats()

# The RemoteRefs will be used to communicate data from worker 1 to the other workers
@everywhere rr = RemoteRef(1)
rr_array = [ remotecall_fetch(worker, () -> rr) for worker in workers() ]

# Next, put `mat` into each remote reference.
for worker_rr in rr_array
  put!(worker_rr, mat)
end

# Remote memory for `mat` has not been allocated yet.
memstats()


  PID %MEM %CPU
16809  1.1  0.4
20861 20.2 81.8
20874  0.5 61.6
20875  0.5 55.6
nothing
  PID %MEM %CPU
16809  1.1  0.4
20861 20.2 79.0
20874  0.5 55.2
20875  0.5 50.2
nothing


In [5]:
# We can also `fetch()` without allocating memory:
for worker_rr in rr_array
  println(worker_rr.where, ": ", fetch(worker_rr)[1:5, 1:5])
end
memstats()

1: [0.6071000616729052 0.7430958292688057 0.6778330076122223 0.8839054732433813 0.6121863724628822
 0.8828773574555082 0.4229148012881818 0.8639985321856845 0.0011281843066930364 0.5862599005689115
 0.10347513414522802 0.08126627600731551 0.7336119446745044 0.598626275685985 0.323083707220027
 0.4536449997364247 0.7080607729426764 0.7403655790387769 0.9829884215594171 0.242662917872968
 0.4845115893603835 0.46521813389559097 0.4791944656582876 0.12093660795342465 0.7385056336446294]
1: [0.6071000616729052 0.7430958292688057 0.6778330076122223 0.8839054732433813 0.6121863724628822
 0.8828773574555082 0.4229148012881818 0.8639985321856845 0.0011281843066930364 0.5862599005689115
 0.10347513414522802 0.08126627600731551 0.7336119446745044 0.598626275685985 0.323083707220027
 0.4536449997364247 0.7080607729426764 0.7403655790387769 0.9829884215594171 0.242662917872968
 0.4845115893603835 0.46521813389559097 0.4791944656582876 0.12093660795342465 0.7385056336446294]
  PID %MEM %CPU
16809  1

In [6]:
# However, assigning actually allocats memory.
# Note that extra memory is also allocated no the master during the copy but is freed by gc().
fetch_time1 = time()
@everywhereelse mat11 = fetch(rr)[1,1]
fetch_time1 = time() - fetch_time1
memstats()
gc()
memstats()


  PID %MEM %CPU
16809  1.1  0.4
20861 22.2 60.7
20874  5.3 22.4
20875  5.3 21.0
nothing
  PID %MEM %CPU
16809  1.1  0.4
20861 10.6 60.8
20874  5.3 22.4
20875  5.3 21.0
nothing


In [7]:
# Subsequent fetches take just as much time and allocate just as much memory
# before gc().  It is evidently re-fetching each time.
fetch_time2 = time()
@everywhereelse mat12 = fetch(rr)[1,2]
fetch_time2 = time() - fetch_time2
memstats()

println("Fetch time 1: ", fetch_time1)
println("Fetch time 2: ", fetch_time2)

  PID %MEM %CPU
16809  1.1  0.4
20861 22.2 54.2
20874 10.1 20.8
20875 10.1 20.0
nothing
Fetch time 1: 1.8530800342559814
Fetch time 2: 1.9487011432647705


In [11]:
# Make a local copy of the whole array.  The fetch time is again the same.
fetch_time3 = time()
@everywhereelse mat_worker = fetch(rr)
fetch_time3 = time() - fetch_time3
memstats()

# Again, the extra memory is freed with garbage collection.
@everywhere gc()
memstats()
println("Fetch time 3: ", fetch_time3)

  PID %MEM %CPU
16809  1.1  0.4
20861 22.2 20.9
20874 10.1  6.9
20875 10.1  6.7
nothing
  PID %MEM %CPU
16809  1.1  0.4
20861 10.6 20.9
20874  5.3  7.0
20875  5.3  6.7
nothing
Fetch time 3: 1.8661770820617676


In [14]:
# Confirm that the copy on worker two is in fact a local copy by changing some values.
@runat 2 mat_worker[1,1]= -20.
@runat 2 rr2 = RemoteRef(2)
@runat 2 put!(rr2, mat_worker)
rr2 = remotecall_fetch(2, () -> rr2)
mat_worker = fetch(rr2);
println("Value from remote worker: ", mat_worker[1, 1])
println("Original value: ", mat[1, 1])

Value from remote worker: -20.0
Original value: 0.6071000616729052


In [15]:
# As expected, allocating memory on process 2 only runs up the memory usage on process 2.
memstats()
@runat 2 num_big_mats = 5
@runat 2 big_mats = Array(Any, num_big_mats);
@runat 2 for i = 1:num_big_mats
            big_mats[i] = rand(int(1e4), int(1e4))
          end
memstats()

  PID %MEM %CPU
16809  1.1  0.4
20861 20.2  9.5
20874 11.1  3.6
20875  5.3  2.7
nothing
  PID %MEM %CPU
16809  1.1  0.4
20861 20.2  9.3
20874 29.2  5.9
20875  5.3  2.6
nothing


In [16]:
# Subsequent fetches doesn't pick up changes in the immutable objects
@runat 2 rr = RemoteRef(1)
rr = remotecall_fetch(2, () -> rr)
myx = 1.23
put!(rr, myx)
@runat 2 println(fetch(rr))
myx += 1.45
@runat 2 println(fetch(rr))

# But it does in mutable ones
@runat 2 rr = RemoteRef(1)
rr = remotecall_fetch(2, () -> rr)
myxvec = [ 1.23 ]
put!(rr, myxvec)
@runat 2 println(fetch(rr))
myxvec[1] += 1.45
@runat 2 println(fetch(rr))

	From worker 2:	1.23
	From worker 2:	1.23
	From worker 2:	[1.23]
	From worker 2:	[2.6799999999999997]


In [23]:
# Note that @spawnat doesn't work for assignment since it resolves variables on the
# calling process.
for w in workers()
  @spawnat w y = 1.234
end
remote_vars()

# This works though
@everywhere y = 1.345
remote_vars()

	From worker 2:	Base                          Module
	From worker 2:	Core                          Module
	From worker 2:	Main                          Module
	From worker 2:	big_mats                      5-element Array{Any,1}
	From worker 2:	mat11                         Float64
	From worker 2:	mat12                         Float64
	From worker 2:	mat_worker                    10000x10000 Array{Float64,2}
	From worker 2:	num_big_mats                  Int64
	From worker 2:	rr                            RemoteRef
	From worker 2:	rr2                           RemoteRef
	From worker 2:	y                             Float64
nothing
	From worker 3:	Base                          Module
	From worker 3:	Core                          Module
	From worker 3:	Main                          Module
	From worker 3:	mat11                         Float64
	From worker 3:	mat12                         Float64
	From worker 3:	mat_worker                    10000x10000 Array{Float64,2}
	From worker 3:	rr   

In [25]:
# Note that once you have run put! on a RemoteRef, you cannot run it again.
@everywhere rr = RemoteRef(1)
rr_array = [ remotecall_fetch(worker, () -> rr) for worker in workers() ]

i = 10.0
for worker_rr in rr_array
  put!(worker_rr, i)
  i += 1
end

