In [None]:
!pip install ray

Collecting ray
[?25l  Downloading https://files.pythonhosted.org/packages/97/52/0a1894137a0592787f9f5f8a2bee271ee9eb17a3d88baa036dc526165371/ray-1.0.1-cp36-cp36m-manylinux1_x86_64.whl (23.1MB)
[K     |████████████████████████████████| 23.1MB 1.7MB/s 
Collecting aiohttp-cors
  Downloading https://files.pythonhosted.org/packages/13/e7/e436a0c0eb5127d8b491a9b83ecd2391c6ff7dcd5548dfaec2080a2340fd/aiohttp_cors-0.7.0-py3-none-any.whl
Collecting gpustat
[?25l  Downloading https://files.pythonhosted.org/packages/b4/69/d8c849715171aeabd61af7da080fdc60948b5a396d2422f1f4672e43d008/gpustat-0.6.0.tar.gz (78kB)
[K     |████████████████████████████████| 81kB 12.2MB/s 
Collecting aioredis
[?25l  Downloading https://files.pythonhosted.org/packages/b0/64/1b1612d0a104f21f80eb4c6e1b6075f2e6aba8e228f46f229cfd3fdac859/aioredis-1.3.1-py3-none-any.whl (65kB)
[K     |████████████████████████████████| 71kB 11.7MB/s 
Collecting opencensus
[?25l  Downloading https://files.pythonhosted.org/packages/50/68/4f

In [None]:
import numpy as np
def funPrint(x):
  print(len(x))
  for i in range(len(x)):
    print(x[i])



class RustModelSim(object):
  def __init__(self,reward,p,beta=0.9999):
    self.S = int(91)
    self.p = np.array(p)
    self.reward = reward
    self.beta = beta
    # A (SxS) matrix indicating the probability of a bus transitioning
    # from a state s to a state s' (used to compute maintenance utility)
    self.trans_mat = np.zeros((self.S, self.S))
    for i in range(self.S):
      for j, _p in enumerate(self.p):
        if i + j < self.S-1:
          self.trans_mat[i][i+j] = _p
        elif i + j == self.S-1:
          self.trans_mat[i][self.S-1] = self.p[j:].sum()
        else:
          pass

  def myopic_costs(self): # - reward function
    rc = self.reward[0]          #F : action 1
    thetas = self.reward[1]     #c : action 0
    maint_cost = np.reshape([-s * thetas for s in range(self.S)],(1,-1))
    repl_cost = np.reshape([-rc for s in range(self.S)],(1,-1))  #action 1
    #print(rc,thetas)
    #print(maint_cost.shape,repl_cost.shape)

    return np.vstack((maint_cost, repl_cost)).T
  
  def fl_costs(self, threshold=1e-4, suppr_output=False): #compute V^n
    
    achieved = True
    
    # Initialization of the contraction mapping
    k = 0
    EV = np.ones((self.S, 1))        
    
    EV_myopic = self.myopic_costs()
    EV_new = np.zeros((self.S, 1))
    
    # Contraction mapping Loop
    while abs(EV_new-EV).max() > threshold:
      EV = EV_new 
      #pchoice = self.choice_prob(EV) #\pi_theta(s,a)
      Q0 = EV_myopic[:,0] + self.beta * self.trans_mat.dot(EV).reshape(-1)
      Q1 = EV_myopic[:,1] + self.beta * EV[0]
      Q = np.vstack((Q0,Q1)).T
      
      min_cost = Q.max(1).reshape(-1,1)
      cost = Q - min_cost
      util = np.exp(cost)
      EV_new =  min_cost + np.log(util.sum(1).reshape(-1,1))
      
      k += 1
      if k == 1000:
        achieved = False
        break
    
    # Output:
    if not suppr_output:
      if achieved:
        print("Convergence achieved in {} iterations".format(k))
      else:
        print("CM could not converge! Mean difference = {:.6f}".format(
            (EV_new-EV).mean()))
    return EV_new,Q

In [None]:
max_iteration = 90
sample_size = 3000
Rc = 20
r = 0.4
theta = np.array([0.3, 0.4, 0.3])
beta=0.9999
rm = RustModelSim([Rc,r],theta,beta = beta)
EV,_ = rm.fl_costs()

print(EV.shape)
print(EV)

states = np.zeros([max_iteration+1,sample_size],dtype=int)
states_tep = np.zeros([max_iteration+1,sample_size],dtype=int)
action = np.zeros([max_iteration,sample_size],dtype=int)

for ss in range(sample_size):
  aId = 0
  for mi in range(max_iteration):
    ds = np.random.choice([0,1,2],p=theta)
    states[mi+1][ss] = states[mi][ss] +ds
    states_tep[mi+1][ss] = states[mi][ss] +ds
    #unobs = np.random.gumbel(-np.euler_gamma, 1, size=2)
    if aId==0:
      st = states[mi+1][ss]
      maint_cost = -r*st  + beta*(theta.dot(np.reshape(EV[st:st+3],(3,1)))[0])
      repl_cost = -Rc  + beta*(theta.dot(np.reshape(EV[0:3],(3,1)))[0])  #action 1
      #maint_cost = -r*st +unobs[0] +beta*(theta.dot(np.reshape(EV[st:st+3],(3,1)))[0])
      #repl_cost = -Rc +unobs[1] +beta*(theta.dot(np.reshape(EV[0:3],(3,1)))[0])  #action 1
      #a0 = states[mi+1][ss] * r
      #if a0 < Rc:
      if repl_cost < maint_cost:
        action[mi][ss] = 0
      else:
        action[mi][ss] = 1
        aId = mi +1
    else:
      states_tep[mi+1][ss] = states[mi+1][ss] - states[aId][ss]
      st = states_tep[mi+1][ss]
      maint_cost = -r * st  +beta*(theta.dot(np.reshape(EV[st:st+3],(3,1)))[0])
      repl_cost = -Rc  +beta*(theta.dot(np.reshape(EV[0:3],(3,1)))[0])  #action 1
#      maint_cost = -r * st +unobs[0] +beta*(theta.dot(np.reshape(EV[st:st+3],(3,1)))[0])
#      repl_cost = -Rc +unobs[1] +beta*(theta.dot(np.reshape(EV[0:3],(3,1)))[0])  #action 1
      #a0 = (states[mi+1][ss] - states[aId][ss])* r
      #if a0 < Rc:
      if repl_cost < maint_cost:
        action[mi][ss] = 0
      else:
        action[mi][ss] = 1
        aId = mi +1
#funPrint(states)
#funPrint(action)
states = np.split(states, [500,1500], axis = 1)
states_tep = np.split(states_tep, [500,1500], axis = 1)
action = np.split(action, [500,1500], axis = 1)
funPrint(states_tep[0])
funPrint(action[0])

CM could not converge! Mean difference = -2.905047
(91, 1)
[[-3041.46425247]
 [-3044.7933847 ]
 [-3047.72282179]
 [-3050.2525286 ]
 [-3052.38253341]
 [-3054.11357935]
 [-3055.45105466]
 [-3056.41798773]
 [-3057.07130646]
 [-3057.49323568]
 [-3057.76153509]
 [-3057.93270622]
 [-3058.04300726]
 [-3058.11482054]
 [-3058.16197307]
 [-3058.19312992]
 [-3058.21381061]
 [-3058.22758095]
 [-3058.23676985]
 [-3058.2429106 ]
 [-3058.24701843]
 [-3058.2497682 ]
 [-3058.25160971]
 [-3058.25284335]
 [-3058.25366994]
 [-3058.25422387]
 [-3058.2545951 ]
 [-3058.25484392]
 [-3058.25501069]
 [-3058.25512248]
 [-3058.25519741]
 [-3058.25524763]
 [-3058.2552813 ]
 [-3058.25530387]
 [-3058.25531899]
 [-3058.25532913]
 [-3058.25533593]
 [-3058.25534048]
 [-3058.25534354]
 [-3058.25534559]
 [-3058.25534696]
 [-3058.25534788]
 [-3058.25534849]
 [-3058.25534891]
 [-3058.25534919]
 [-3058.25534937]
 [-3058.2553495 ]
 [-3058.25534958]
 [-3058.25534963]
 [-3058.25534967]
 [-3058.2553497 ]
 [-3058.25534971]
 [-30

In [None]:
#import argparse
#import time
#import numpy as np
#import os
#import ray
#import networkx as nx


#@ray.remote
#def worker_task(current_worker_index,ac= action, st = states):
#    return np.shape(ac[current_worker_index])
    
#if __name__ == "__main__":
#    ray.shutdown()
#    ray.init(num_cpus = 2, num_gpus=2)
#    worker_tasks = [worker_task.remote(i) for i in range(3)]
#    print(ray.get(worker_tasks))
##    ray.wait(worker_tasks, num_returns=3)

In [None]:
import numpy as np
import scipy.optimize as opt

class RustModel(object):
  def __init__(self,ac_worker,st_worker,p,npars):
    self.endog = np.reshape(ac_worker,(-1,1))
    self.exog = np.reshape(st_worker,(-1,1))
    self.npars = npars
    self.N = self.endog.shape[0]
    self.S = int(91)
    self.p = np.array(p)
    # A (SxS) matrix indicating the probability of a bus transitioning
    # from a state s to a state s' (used to compute maintenance utility)
    self.trans_mat = np.zeros((self.S, self.S))
    for i in range(self.S):
      for j, _p in enumerate(self.p):
        if i + j < self.S-1:
          self.trans_mat[i][i+j] = _p
        elif i + j == self.S-1:
          self.trans_mat[i][self.S-1] = self.p[j:].sum()
        else:
          pass
    # A second (SxS) matrix which regenerates the bus' state to 0 with
    # certainty (used to compute the replacement utility)
    self.regen_mat = np.vstack((np.ones((1, self.S)),np.zeros((self.S-1, self.S)))).T

  def myopic_costs(self,params): # - reward function
    rc = params[0]          #F : action 1
    thetas = params[1:]     #c : action 0
    maint_cost = np.reshape([-s * thetas for s in range(0, self.S)],(1,-1))
    repl_cost = np.reshape([-rc for s in range(0, self.S)],(1,-1))  #action 1
    #print(np.shape(maint_cost),np.shape(repl_cost))
    #print(np.shape(np.vstack((maint_cost, repl_cost)).T))

    return np.vstack((maint_cost, repl_cost)).T
  
  def fl_costs(self,params, beta=0.9999, threshold=1e-4, suppr_output=False): #compute V^n
    
    achieved = True
    
    # Initialization of the contraction mapping
    k = 0
    EV = np.ones((self.S, 1))        
    
    EV_myopic = self.myopic_costs(params)
    EV_new = np.zeros((self.S, 1))
    
    # Contraction mapping Loop
    while abs(EV_new-EV).max() > threshold:
      EV = EV_new 
      #pchoice = self.choice_prob(EV) #\pi_theta(s,a)
      Q0 = EV_myopic[:,0] + beta * self.trans_mat.dot(EV).reshape(-1)
      Q1 = EV_myopic[:,1] + beta * EV[0]
      Q = np.vstack((Q0,Q1)).T
      
      min_cost = Q.max(1).reshape(-1,1)
      cost = Q - min_cost
      util = np.exp(cost)
      EV_new =  min_cost + np.log(util.sum(1).reshape(-1,1))
      
      k += 1
      if k == 1000:
        achieved = False
        break
    
    # Output:
    if not suppr_output:
      if achieved:
        print("Convergence achieved in {} iterations".format(k))
      else:
        print("CM could not converge! Mean difference = {:.6f}".format(
            (EV_new-EV).mean()))
    return EV_new,Q
  
  def choice_prob(self,cost_array):  #\pi_theta(s,a)
    cost = cost_array - cost_array.max(1).reshape(-1,1)
    util = np.exp(cost)
    pchoice = util/(np.sum(util, 1).reshape(-1,1))
    return pchoice
        
  def loglike(self,params):
    utilV,utilQ = self.fl_costs(params, suppr_output=True) 
    pchoice = self.choice_prob(utilQ)    
    logprob = 0
    for sample_data in range(self.N):
      ac_tep = int(self.endog[sample_data])
      st_tep = int(self.exog[sample_data])
      logprob += np.log(pchoice[st_tep,ac_tep]) 
    return -logprob   
##################################Rust################################
  def fit_likelihood(self,x0=None,bounds=None):
    if bounds == None:
      bounds = [(1e-6, None) for i in range(self.npars)]
    if x0 == None:
      x0 = [0.1 for i in range(self.npars)]
    fitted = opt.fmin_l_bfgs_b(self.loglike, 
                               x0=x0, 
                               approx_grad=True, 
                                bounds=bounds,
                               maxiter=1)
    return fitted[0]
    
#rm = RustModel(action[0],states_tep[0],theta,2)
#rm.fit_likelihood(x0=[0.1,0.2])

In [None]:
import ray
import numpy as np
@ray.remote
class ParameterServer:
    def __init__(self, num_workers, weights_ids):
        self.num_workers = num_workers
        self.weights_ids = weights_ids

    def get_weights_ids(self):
      return self.weights_ids

    def get_num_workers(self):
      return self.num_workers

    def set_weights_ids(self, worker_index, id):
      self.weights_ids[worker_index] = id[0]

@ray.remote

def worker_task(current_worker_index,ps,ac=action,st = states_tep,p=theta,npars=2):
  rm = RustModel(ac[current_worker_index],st[current_worker_index],p,npars)
  
  #return rm.fit_likelihood()


  def get_flocking_potential():
    all_weights_ids = ray.get(ps.get_weights_ids.remote())
    nw = ray.get(ps.get_num_workers.remote())
    #print(all_weights_ids,nw)
    flocking_dis = []
    for fw in range(nw):
      w = ray.get(all_weights_ids[fw])
      flocking_dis.append(w)
    #print(flocking_dis)
    return np.mean(np.array(flocking_dis), axis=0)
  #return get_flocking_potential()[0]    
  step = 0  
  while step < 1000:
    f_p = get_flocking_potential()
    #print('x0',f_p)
    new_weights = rm.fit_likelihood(x0=[f_p[0][0],f_p[0][1]])
    #print('x_new',new_weights)
    weights = [[new_weights[0],new_weights[1]]]
    #print('x_new',weights)
    #weights_id = ray.put(new_weights)
    weights_id = ray.put(weights)
    ps.set_weights_ids.remote(current_worker_index, [weights_id])
    
    step += 1
    
    if step % 10 == 0 and current_worker_index == 0:
      print('step', step, 'weights',ray.get(ray.get(ps.get_weights_ids.remote())[0]),ray.get(ray.get(ps.get_weights_ids.remote())[1]))
      #print('step', step, 'weights', new_weights)
  #return ray.get(ray.get(ps.get_weights_ids.remote())[0]),ray.get(ray.get(ps.get_weights_ids.remote())[1])

if __name__ == "__main__":
    ray.shutdown()
    ray.init(num_gpus = 2)
#    print(ray.get(worker_tasks))
##    ray.wait(worker_tasks, num_returns=3)

    init_weight = 0.1*np.ones((1,2))
    #print('\n\n shape is', init_weight.shape, '\n\n')
    weights = [init_weight for _ in range(2)]
    #print(weights)
    weights_ids = [ray.put(w) for w in weights]
    #print(weights_ids)

    ps = ParameterServer.remote(num_workers=2, weights_ids=weights_ids)
    #print(ray.get(ps.get_num_workers.remote()))
    #print(ps)
    worker_tasks = [worker_task.remote(i, ps) for i in range(2)]
    #print(ray.get(worker_tasks[0]))
    ray.wait(worker_tasks, num_returns=2)

2020-11-12 03:13:49,632	INFO services.py:1092 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


[2m[36m(pid=200)[0m step 10 weights [[5.812890624185292, 0.09992246931704779]] [[5.295312499711992, 0.09995155571439436]]
[2m[36m(pid=200)[0m step 20 weights [[11.76102407939075, 0.09957952719877261]] [[11.576093283821292, 0.09965505269614773]]
[2m[36m(pid=200)[0m step 30 weights [[17.221700764275912, 0.09764731454598236]] [[16.762507595071877, 0.09858430734693022]]
[2m[36m(pid=200)[0m step 40 weights [[17.558134464156925, 0.10227063340025933]] [[17.813920621398353, 0.1013397965559642]]
