Skip to content

Commit

Permalink
various fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjingwang committed Jun 18, 2015
1 parent 925f4ff commit a986c9f
Show file tree
Hide file tree
Showing 14 changed files with 42 additions and 40 deletions.
2 changes: 1 addition & 1 deletion myriadeploy/create_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _get_runtime(heap=None):
""" Generates the runtime section of a Myria deployment file """
runtime = '[runtime]\n'
if heap:
runtime += 'max_heap_size_gb = %s\n' % heap
runtime += 'max_heap_size.gb = %s\n' % heap
else:
runtime += '# No runtime options specified\n'
return runtime + '\n'
Expand Down
4 changes: 2 additions & 2 deletions myriadeploy/deployment.cfg.local
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ rest_port = 8753

[runtime]
# Uncomment to set the minimum heap size of the worker processes
#min_heap_size_gb = 1
#min_heap_size.gb = 1
# Uncomment to set the maximum heap size of the worker processes
#max_heap_size_gb = 2
#max_heap_size.gb = 2

4 changes: 2 additions & 2 deletions myriadeploy/deployment.cfg.postgres
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ rest_port = 8753

[runtime]
# Uncomment to set the minimum heap size of the worker processes
#min_heap_size_gb = 1
#min_heap_size.gb = 1
# Uncomment to set the maximum heap size of the worker processes
#max_heap_size_gb = 2
#max_heap_size.gb = 2

4 changes: 2 additions & 2 deletions myriadeploy/deployment.cfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ rest_port = 8753

[runtime]
# Uncomment to set the minimum heap size of the worker processes
#min_heap_size_gb = 1
#min_heap_size.gb = 1
# Uncomment to set the maximum heap size of the worker processes
#max_heap_size_gb = 2
#max_heap_size.gb = 2
8 changes: 4 additions & 4 deletions myriadeploy/myriadeploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ def split_hostportpathdbname_key_append_id(hostport):
ret['nodes'] = [ret['master']] + ret['workers']
# .. max_heap_size is the Java maximum heap size
try:
ret['max_heap_size_gb'] = config.get('runtime', 'max_heap_size_gb')
ret['max_heap_size.gb'] = config.get('runtime', 'max_heap_size.gb')
except ConfigParser.NoOptionError:
ret['max_heap_size_gb'] = ''
ret['max_heap_size.gb'] = ''
# .. min_heap_size is the Java minimum heap size
try:
ret['min_heap_size_gb'] = config.get('runtime', 'min_heap_size_gb')
ret['min_heap_size.gb'] = config.get('runtime', 'min_heap_size.gb')
except ConfigParser.NoOptionError:
ret['min_heap_size_gb'] = ''
ret['min_heap_size.gb'] = ''
try:
ret['admin_password'] = config.get('deployment', 'admin_password')
except ConfigParser.NoOptionError:
Expand Down
4 changes: 2 additions & 2 deletions myriadeploy/setup_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def make_deployment(input_args):
"Copy the distribution (jar and libs and conf) to compute nodes."
args = ["./using_deployment_utils.sh", input_args.config_file, "--deploy"]
if input_args.clean_catalog:
args.append("--clean_catalog")
args.append("--clean-catalog")
if subprocess.call(args):
logging.error("Error copying distribution")

Expand All @@ -17,7 +17,7 @@ def main(argv):
parser = argparse.ArgumentParser(description='Setup a Myria cluster')
parser.add_argument('config_file',
help='The deployment config file')
parser.add_argument('--clean_catalog', action='store_true',
parser.add_argument('--clean-catalog', dest='clean_catalog', action='store_true',
help='If deploying with a new master catalog')

make_deployment(parser.parse_args())
Expand Down
2 changes: 1 addition & 1 deletion myriadeploy/start_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


def start_master(config_file):
args = ["./using_deployment_utils.sh", config_file, "--start_master"]
args = ["./using_deployment_utils.sh", config_file, "--start-master"]
if subprocess.call(args):
sys.exit(1)

Expand Down
2 changes: 1 addition & 1 deletion myriadeploy/start_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


def start_workers(config_file):
args = ["./using_deployment_utils.sh", config_file, "--start_workers"]
args = ["./using_deployment_utils.sh", config_file, "--start-workers"]
if subprocess.call(args):
sys.exit(1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,26 +123,26 @@ public void init() {
String workingDir = Paths.get(tempFilePath, BENCHMARKTEST_NAME).toString();
/* The SQLite connection */
String jsonConnInfo =
ConnectionInfo.toJson(MyriaConstants.STORAGE_SYSTEM_SQLITE, BENCHMARKTEST_HOSTNAME, workingDir, "0", null,
ConnectionInfo.toJson(MyriaConstants.STORAGE_SYSTEM_SQLITE, BENCHMARKTEST_HOSTNAME, workingDir, 0, null,
null, null);
connections.add(ConnectionInfo.of("sqlite", jsonConnInfo));

/* The MySQL connection */
jsonConnInfo =
ConnectionInfo.toJson(MyriaConstants.STORAGE_SYSTEM_MYSQL, BENCHMARKTEST_HOSTNAME, workingDir, "0", "myria1",
ConnectionInfo.toJson(MyriaConstants.STORAGE_SYSTEM_MYSQL, BENCHMARKTEST_HOSTNAME, workingDir, 0, "myria1",
jdbcPassword, null);
// Uncomment the next line to add tests for MySQL. However, be sure that the MySQL service is up and running.
// connections.add(ConnectionInfo.of(MyriaConstants.STORAGE_SYSTEM_MYSQL, jsonConnInfo));

/* The PostgreSQL connection */
jsonConnInfo =
ConnectionInfo.toJson(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL, BENCHMARKTEST_HOSTNAME, workingDir, "0",
ConnectionInfo.toJson(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL, BENCHMARKTEST_HOSTNAME, workingDir, 0,
"myria1", jdbcPassword, null);
connections.add(ConnectionInfo.of(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL, jsonConnInfo));

/* The MonetDB connection */
jsonConnInfo =
ConnectionInfo.toJson(MyriaConstants.STORAGE_SYSTEM_MONETDB, BENCHMARKTEST_HOSTNAME, workingDir, "0", null,
ConnectionInfo.toJson(MyriaConstants.STORAGE_SYSTEM_MONETDB, BENCHMARKTEST_HOSTNAME, workingDir, 0, null,
null, null);
// Uncomment the next line to add tests for MonetDB. However, be sure that the MonetDB service is up and running.
// connections.add(ConnectionInfo.of(MyriaConstants.STORAGE_SYSTEM_MONETDB, jsonConnInfo));
Expand Down
4 changes: 3 additions & 1 deletion speedtest/edu/washington/escience/myria/sp2bench/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import org.apache.commons.io.FilenameUtils;

import com.google.common.primitives.Ints;

import edu.washington.escience.myria.MyriaConstants;
import edu.washington.escience.myria.operator.RootOperator;
import edu.washington.escience.myria.parallel.QueryFuture;
Expand Down Expand Up @@ -87,7 +89,7 @@ public static void main(final String[] args) throws Exception {
MyriaConfiguration.loadWithDefaultValues(FilenameUtils.concat(workingDir, MyriaConstants.DEPLOYMENT_CONF_FILE));
int[] allWorkers = new int[config.getWorkerIds().size()];
int idx = 0;
for (int id : config.getWorkerIds()) {
for (int id : Ints.toArray(config.getWorkerIds())) {

This comment has been minimized.

Copy link
@senderista

senderista Jun 18, 2015

Contributor

What I meant was int[] allWorkers = Ints.toArray(config.getWorkerIds()). Not sure if I'm missing something.

This comment has been minimized.

Copy link
@jingjingwang

jingjingwang Jun 18, 2015

Author Contributor

Oh! Right, I was not careful enough. Thanks!

allWorkers[idx++] = id;
}

Expand Down
18 changes: 9 additions & 9 deletions src/edu/washington/escience/myria/MyriaSystemConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,38 @@ private MyriaSystemConfigKeys() {
* It's not a restrict upper bound. Different implementations of {@link StreamInputBuffer} may restrict the size
* differently. For example, a {@link FlowControlBagInputBuffer} use the upper bound as a soft restriction.
* */
public static final String OPERATOR_INPUT_BUFFER_CAPACITY = "operator_consumer_inputbuffer_capacity";
public static final String OPERATOR_INPUT_BUFFER_CAPACITY = "operator.consumer.inputbuffer.capacity";

/**
* After an input buffer full event, if the size of the input buffer reduced to the recover_trigger, the input buffer
* recover event should be issued.
* */
public static final String OPERATOR_INPUT_BUFFER_RECOVER_TRIGGER = "operator_consumer_inputbuffer_recover_trigger";
public static final String OPERATOR_INPUT_BUFFER_RECOVER_TRIGGER = "operator.consumer.inputbuffer.recover.trigger";

/**
* .
* */
public static final String TCP_SEND_BUFFER_SIZE_BYTES = "tcp_sendbuffer_size_bytes";
public static final String TCP_SEND_BUFFER_SIZE_BYTES = "tcp.sendbuffer.size.bytes";

/**
* .
* */
public static final String TCP_RECEIVE_BUFFER_SIZE_BYTES = "tcp_receivebuffer_size_bytes";
public static final String TCP_RECEIVE_BUFFER_SIZE_BYTES = "tcp.receivebuffer.size.bytes";

/**
* See {@link NioSocketChannelConfig#setWriteBufferLowWaterMark}.
* */
public static final String FLOW_CONTROL_WRITE_BUFFER_LOW_MARK_BYTES = "flowcontrol_writebuffer_watermark_low";
public static final String FLOW_CONTROL_WRITE_BUFFER_LOW_MARK_BYTES = "flowcontrol.writebuffer.watermark.low";

/**
* See {@link NioSocketChannelConfig#setWriteBufferHighWaterMark}.
* */
public static final String FLOW_CONTROL_WRITE_BUFFER_HIGH_MARK_BYTES = "flowcontrol_writebuffer_watermark_high";
public static final String FLOW_CONTROL_WRITE_BUFFER_HIGH_MARK_BYTES = "flowcontrol.writebuffer.watermark.high";

/**
* TCP timeout.
* */
public static final String TCP_CONNECTION_TIMEOUT_MILLIS = "tcp_connection_timeout_milliseconds";
public static final String TCP_CONNECTION_TIMEOUT_MILLIS = "tcp.connection.timeout.milliseconds";

/** */
public static final String WORKER_STORAGE_DATABASE_SYSTEM = "dbms";
Expand All @@ -77,9 +77,9 @@ private MyriaSystemConfigKeys() {
/** */
public static final String USERNAME = "username";
/** */
public static final String MAX_HEAP_SIZE_GB = "max_heap_size_gb";
public static final String MAX_HEAP_SIZE_GB = "max_heap_size.gb";
/** */
public static final String MIN_HEAP_SIZE_GB = "min_heap_size_gb";
public static final String MIN_HEAP_SIZE_GB = "min_heap_size.gb";
/** */
public static final String DEPLOYMENT_CONF_FILE = "deployment.cfg";
/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public String toJson() {
* @return the JSON string representation of the connection information.
*/
public static String toJson(@Nonnull final String dbms, final String hostName, final String dirName,
final String workerId, final String databaseName, final String databasePassword, final String databasePort) {
final int workerId, final String databaseName, final String databasePassword, final String databasePort) {
String result = "";
String host;
String user;
Expand All @@ -88,7 +88,7 @@ public static String toJson(@Nonnull final String dbms, final String hostName, f
switch (dbms) {
case MyriaConstants.STORAGE_SYSTEM_SQLITE:
Objects.requireNonNull(workerId);
SQLiteInfo sqliteInfo = SQLiteInfo.of(Paths.get(dirName, "workers", workerId, "data.db").toString());
SQLiteInfo sqliteInfo = SQLiteInfo.of(Paths.get(dirName, "workers", workerId + "", "data.db").toString());
result = sqliteInfo.toJson();
break;
case MyriaConstants.STORAGE_SYSTEM_MONETDB:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public String getMasterCatalogFile() throws ConfigFileException {
public String getSelfJsonConnInfo() throws ConfigFileException {
int id = Integer.parseInt(getRequired("runtime", MyriaSystemConfigKeys.WORKER_IDENTIFIER));
return ConnectionInfo.toJson(getRequired("deployment", MyriaSystemConfigKeys.WORKER_STORAGE_DATABASE_SYSTEM),
getHostname(id), getWorkingDirectory(id), id + "", getWorkerDatabaseName(id), getOptional("deployment",
getHostname(id), getWorkingDirectory(id), id, getWorkerDatabaseName(id), getOptional("deployment",
MyriaSystemConfigKeys.WORKER_STORAGE_DATABASE_PASSWORD), getOptional("deployment",
MyriaSystemConfigKeys.WORKER_STORAGE_DATABASE_PORT));
}
Expand Down
16 changes: 8 additions & 8 deletions src/edu/washington/escience/myria/util/DeploymentUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class DeploymentUtils {

/** usage. */
public static final String USAGE =
"java DeploymentUtils <config_file> <--deploy <optional: --clean_catalog> | --start_master | --start_workers>";
"java DeploymentUtils <config_file> <--deploy <optional: --clean-catalog> | --start-master | --start-workers>";
/** The logger. */
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(DeploymentUtils.class);

Expand All @@ -62,17 +62,17 @@ public static void main(final String[] args) throws ConfigFileException, Catalog
File tempDeploy = createTempDeployment(configFileName);

MasterCatalog.create(FilenameUtils.concat(tempDeploy.getAbsolutePath(), "master"));
boolean cleanCatalog = args.length > 2 && args[2].equals("--clean_catalog");
boolean cleanCatalog = args.length > 2 && args[2].equals("--clean-catalog");
deployMaster(tempDeploy.getAbsolutePath(), config, cleanCatalog);

for (int workerId : config.getWorkerIds()) {
deployWorker(tempDeploy.getAbsolutePath(), config, workerId);
}

FileUtils.deleteDirectory(tempDeploy);
} else if (action.equals("--start_master")) {
} else if (action.equals("--start-master")) {
startMaster(config);
} else if (action.equals("--start_workers")) {
} else if (action.equals("--start-workers")) {
for (int workerId : config.getWorkerIds()) {
startWorker(config, workerId);
}
Expand Down Expand Up @@ -183,8 +183,8 @@ public static void deployMaster(final String localDeployPath, final MyriaConfigu
*
* @param address e.g. beijing.cs.washington.edu
* @param workingDir the working directory, path/name in deployment.cfg
* @param maxHeapSize the same meaning as max_heap_size_gb in deployment.cfg
* @param minHeapSize the same meaning as min_heap_size_gb in deployment.cfg
* @param maxHeapSize the same meaning as max_heap_size.gb in deployment.cfg
* @param minHeapSize the same meaning as min_heap_size.gb in deployment.cfg
* @param workerId the worker id.
* @param port the worker port number, need it to infer the port number used in debug mode.
* @param debug if launch the worker in debug mode.
Expand Down Expand Up @@ -234,8 +234,8 @@ private static void startWorker(final String address, final String workingDir, f
*
* @param address e.g. beijing.cs.washington.edu
* @param workingDir the working directory, path/name in deployment.cfg
* @param maxHeapSize the same meaning as max_heap_size_gb in deployment.cfg
* @param minHeapSize the same meaning as min_heap_size_gb in deployment.cfg
* @param maxHeapSize the same meaning as max_heap_size.gb in deployment.cfg
* @param minHeapSize the same meaning as min_heap_size.gb in deployment.cfg
* @param restPort the port number for restlet.
* @param ssl whether the master uses SSL for the rest server.
*/
Expand Down

1 comment on commit a986c9f

@senderista
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Please sign in to comment.