Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python pass #120

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions python/frontend/cirrus/cirrus/CostModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@ def get_cost_per_second(self):
* self.num_workers
total_lambda_cost = total_lambda_cost_h / (60 * 60)

# vm_cost
# vm_cost (demand price per hour)
vm_to_cost = {
'm5.large' : 0.096 # demand price per hour
'm5.large' : 0.096
'm4.large' : 0.1
'm4.2xlarge' : 0.4
'm4.4xlarge' : 0.8
'm4.10xlarge' : 2.0
'm4.16xlarge' : 3.2
}

if self.vm_type not in vm_to_cost:
Expand Down
43 changes: 30 additions & 13 deletions python/frontend/cirrus/cirrus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ def __init__(self,
self.kill_signal = threading.Event()
self.num_lambdas = 0
self.cost_model = CostModel(
'm5.large',
ps.instance.type,
self.n_ps,
0,
0, # FIX
self.n_workers,
self.lambda_size)

Expand All @@ -96,17 +96,27 @@ def __init__(self,
}

# Stored values
self.last_num_lambdas = 0
self.last_num_lambdas = 0 # not clear what this is. Need comment or better name

def get_name(self):
string = "Rate %f" % self.learning_rate
return string


def get_cost_per_second(self):
"""Returns cost

Returns the cost ($) of running this job on the cloud

"""
return self.time_cps_lst

def get_num_lambdas(self, fetch=True):
"""Returns number of active lambdas

Returns the number of active lambdas.
We contact the parameter server to get an accurate value

"""
if self.is_dead():
return 0
if fetch:
Expand All @@ -118,6 +128,11 @@ def get_num_lambdas(self, fetch=True):
return self.last_num_lambdas

def get_updates_per_second(self, fetch=True):
"""Returns number of updates per second

Returns the number of sgd updates per second

"""
if self.is_dead():
return self.time_ups_lst
if fetch:
Expand All @@ -135,14 +150,17 @@ def get_updates_per_second(self, fetch=True):

num_lambdas = self.get_num_lambdas()
self.get_updates_per_second()
num_task = 3

if num_lambdas == None:
if num_lambdas == None: # not clear when this can happen. It seems to me self.get_num_lambdas() never returns None
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yangryan0 Can you please check this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.get_num_lambdas returns none when there is no response from the parameter server or the connection times out.

return


# it doesn't make sense to have this code here
# why are we launching lambdas in a get_update_per_seconds() method?
# isn't that what the wait_then_maintain_workers code does?
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yangryan0 Can you please comment?

if num_lambdas < self.n_workers:
shortage = self.n_workers - num_lambdas

num_task = 3 # we should use constants like CIRRUS_WORKER, CIRRUS_PS, etc..
payload = '{"num_task": %d, "num_workers": %d, "ps_ip": \"%s\", "ps_port": %d}' \
% (num_task, self.n_workers, self.ps_ip_private, self.ps_ip_port)
for i in range(shortage):
Expand Down Expand Up @@ -236,8 +254,7 @@ def kill(self):


def is_dead(self):
return self.dead

return self.dead # when does this dead turns True?
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@abstractmethod
def define_config(self, fetch=False):
Expand Down Expand Up @@ -271,7 +288,7 @@ def get_updates_per_second(self, fetch=True):
return self.fetch_metric(self.UPDATES_PER_SECOND)

def get_time_loss(self, rtl=False):
self.maintain_error()
self.update_loss_metrics()
if rtl:
return self.fetch_metric(self.REAL_TIME_LOSS_VS_TIME)
else:
Expand All @@ -282,20 +299,20 @@ def fetch_metric(self, key):
return self.metrics[key]

# This will grab the Loss v. Time by communicating with the parameter server
def maintain_error(self):
def update_loss_metrics(self):
if self.is_dead():
return

out = messenger.get_last_time_error(self.ps)
if out is None:
return


t, loss, real_time_loss, total_loss = out
if t == 0:
return

if len(self.metrics[self.LOSS_VS_TIME]) == 0 or not ((t, loss) == self.metrics[self.LOSS_VS_TIME][-1]):
if len(self.metrics[self.LOSS_VS_TIME]) == 0 or
not ((t, loss) == self.metrics[self.LOSS_VS_TIME][-1]):
self.metrics[self.LOSS_VS_TIME].append((t, loss))

elapsed_time = time.time() - self.start_time
Expand Down
7 changes: 5 additions & 2 deletions src/InputReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ SparseDataset InputReader::read_jester_ratings(const std::string& input_file,
return SparseDataset(sparse_ds);
}

double InputReader::compute_mean(std::vector<std::pair<int, FEATURE_TYPE>>& user_ratings) {
double InputReader::compute_mean(
const std::vector<std::pair<int, FEATURE_TYPE>>& user_ratings) const {
double mean = 0;

for (auto& r : user_ratings) {
Expand All @@ -496,7 +497,9 @@ double InputReader::compute_mean(std::vector<std::pair<int, FEATURE_TYPE>>& user
return mean;
}

double InputReader::compute_stddev(double mean, std::vector<std::pair<int, FEATURE_TYPE>>& user_ratings) {
double InputReader::compute_stddev(
double mean,
const std::vector<std::pair<int, FEATURE_TYPE>>& user_ratings) const {
double stddev = 0;

for (const auto& r : user_ratings) {
Expand Down
9 changes: 5 additions & 4 deletions src/InputReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,13 @@ class InputReader {

/* Computes mean of a sparse list of features
*/
double compute_mean(std::vector<std::pair<int, FEATURE_TYPE>>&);
double compute_mean(const std::vector<std::pair<int, FEATURE_TYPE>>&) const;

/* Computes stddev of a sparse list of features
*/
double compute_stddev(double, std::vector<std::pair<int, FEATURE_TYPE>>&);

double compute_stddev(double,
const std::vector<std::pair<int, FEATURE_TYPE>>&) const;

/* Computes standardizes sparse dataset
*/
void standardize_sparse_dataset(std::vector<std::vector<std::pair<int, FEATURE_TYPE>>>&);
Expand Down