Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'develop' of github.com:saga-project/BigJob into develop

Conflicts:
	pilot/iterative/iterative_hdfs.py
  • Loading branch information...
commit 6d4074c3122319c1e69df338cde0d6d2ad5f35d6 2 parents 65c1a38 + 9fe8dda
@drelu drelu authored
Showing with 32 additions and 26 deletions.
  1. +32 −26 pilot/iterative/iterative_hdfs.py
View
58 pilot/iterative/iterative_hdfs.py
@@ -14,16 +14,18 @@
import math
import pdb
import shutil
+import socket
from webhdfs.webhdfs import WebHDFS
#HDFS_URL="hdfs://localhost:50070"
-HDFS_URL="http://c477-101:50070"
+HDFS_URL="http://c530-203:50070"
RESULT_FILE_PREFIX="hdfs-inmem"
RESULT_DIR="results"
-MIN_SIZE=28 # 2**28 bytes
+#MIN_SIZE=28 # 2**28 bytes
+MIN_SIZE=30 # 2**28 bytes
MAX_SIZE=36 # 2**29 bytes
NUMBER_REPEATS=3
@@ -108,7 +110,7 @@ def test_without_caching(number_of_nodes, number_replicas, f, client):
print "\n*********************************\nResults\n******************************"
print "Size, Time, Backend, NumNodes, NumInstances, Type, NumReplicas "
for key, value in runtimes.iteritems():
- result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + ",write," + str(number_replicas)
+ result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + ",write," + str(number_replicas)+","+socket.gethostname()
print result
f.write(result + "\n")
@@ -124,7 +126,7 @@ def test_without_caching(number_of_nodes, number_replicas, f, client):
runtimes[num_bytes] = runtime
for key, value in runtimes.iteritems():
- result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + ",read," + str(number_replicas) + "," + str(repeat)
+ result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + ",read," + str(number_replicas) + "," + str(repeat)+","+socket.gethostname()
print result
f.write(result + "\n")
@@ -150,7 +152,7 @@ def test_with_caching(number_of_nodes, number_replicas, f, client):
print "\n*********************************\nResults\n******************************"
print "Size, Time, Backend, NumNodes, NumInstances, Type, NumReplicas "
for key, value in runtimes.iteritems():
- result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + ",write," + str(number_replicas)
+ result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + ",write," + str(number_replicas)+","+socket.gethostname()
print result
f.write(result + "\n")
@@ -166,7 +168,7 @@ def test_with_caching(number_of_nodes, number_replicas, f, client):
runtimes[num_bytes] = runtime
for key, value in runtimes.iteritems():
- result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + ",read_cache," + str(number_replicas) + "," + str(repeat)
+ result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + ",read_cache," + str(number_replicas) + "," + str(repeat)+","+socket.gethostname()
print result
f.write(result + "\n")
@@ -180,13 +182,14 @@ def test_with_inmem(number_of_nodes, number_replicas, f, client, cache=True):
client.mkdir("/tmp/test/")
print "\n*********************************\nResults\n******************************"
- print "Size, Time, Backend, NumNodes, NumInstances, Type, NumReplicas "
+ print "Size, Time, Backend, NumNodes, NumInstances, Type, NumReplicas,Hostname "
for repeat in range(0, NUMBER_REPEATS):
runtimes = {}
runtimes_write = {}
for i in range(MIN_SIZE, MAX_SIZE):
num_bytes_scenario = 2 ** i
- filename = "/tmp/test/test_" + str(num_bytes_scenario)
+ hdfs_filename = "/tmp/test/test_" + str(num_bytes_scenario)
+ filename = os.path.join(TMP_DIR, "test/test_" + str(num_bytes_scenario))
try:
os.mkdir(os.path.dirname(filename))
except:
@@ -197,10 +200,10 @@ def test_with_inmem(number_of_nodes, number_replicas, f, client, cache=True):
num_bytes=os.path.getsize(filename)
scenario="write"
if cache:
- command = "hdfs dfs -put -l %s %s"%(filename,filename)
+ command = "hdfs dfs -put -l %s %s"%(filename,hdfs_filename)
scenario="write_memory"
else:
- command = "hdfs dfs -put %s %s"%(filename,filename)
+ command = "hdfs dfs -put %s %s"%(filename,hdfs_filename)
print "PUT FILE TO HDFS: %s"%command
start = time.time()
os.system(command)
@@ -208,7 +211,7 @@ def test_with_inmem(number_of_nodes, number_replicas, f, client, cache=True):
runtimes[num_bytes_scenario] = runtime
print "GET File Size: %s MB" % str(num_bytes / 1024 / 1024)
- command="hadoop fs -text %s > /dev/null"%(filename)
+ command="hadoop fs -text %s > /dev/null"%(hdfs_filename)
print command
start = time.time()
#s = client.get("/tmp/test/test_" + str(num_bytes))
@@ -216,23 +219,24 @@ def test_with_inmem(number_of_nodes, number_replicas, f, client, cache=True):
runtime = time.time() - start
runtimes_write[num_bytes_scenario] = runtime
- os.system("hadoop fs -rm -r %s"%(filename))
+ os.system("hadoop fs -rm -r %s"%(hdfs_filename))
os.remove(filename)
time.sleep(1)
for key, value in runtimes.iteritems():
- result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario + "," + str(number_replicas)+ "," + str(repeat)
+ result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario + "," + str(number_replicas)+ "," + str(repeat)+","+socket.gethostname()
print result
f.write(result + "\n")
f.flush()
scenario_write = scenario.replace("write", "read")
for key, value in runtimes_write.iteritems():
- result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario_write + "," + str(number_replicas) + "," + str(repeat)
+ result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario_write + "," + str(number_replicas) + "," + str(repeat)+","+socket.gethostname()
print result
f.write(result + "\n")
f.flush()
+
def test_with_inmem_mr(number_of_nodes, number_replicas, f, client, cache=True):
""" Test Hadoop 2.6 Memory capbilities:
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
@@ -248,7 +252,8 @@ def test_with_inmem_mr(number_of_nodes, number_replicas, f, client, cache=True):
runtimes_write = {}
for i in range(MIN_SIZE, MAX_SIZE):
num_bytes_scenario = 2 ** i
- filename = "/tmp/test/test_" + str(num_bytes_scenario)
+ hdfs_filename = "/tmp/test/test_" + str(num_bytes_scenario)
+ filename = os.path.join(TMP_DIR, "test/test_" + str(num_bytes_scenario))
try:
os.mkdir(os.path.dirname(filename))
except:
@@ -259,10 +264,10 @@ def test_with_inmem_mr(number_of_nodes, number_replicas, f, client, cache=True):
num_bytes=os.path.getsize(filename)
scenario="write"
if cache:
- command = "hdfs dfs -put -l %s %s"%(filename,filename)
+ command = "hdfs dfs -put -l %s %s"%(filename,hdfs_filename)
scenario="write_memory"
else:
- command = "hdfs dfs -put %s %s"%(filename,filename)
+ command = "hdfs dfs -put %s %s"%(filename,hdfs_filename)
print "PUT FILE TO HDFS: %s"%command
start = time.time()
os.system(command)
@@ -271,7 +276,7 @@ def test_with_inmem_mr(number_of_nodes, number_replicas, f, client, cache=True):
print "GET File Size: %s MB with" % str(num_bytes / 1024 / 1024)
#command="hadoop jar /home1/01131/tg804093/work/hadoop-2.6.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar grep %s %s %s"%(filename, "/tmp/out", "testtest")
- command="hadoop jar " + HADOOP_STREAMING_JAR + " -input %s -output %s -numReduceTasks 0"%(filename, "/tmp/out")
+ command="hadoop jar " + HADOOP_STREAMING_JAR + " -input %s -output %s -numReduceTasks 0"%(hdfs_filename, "/tmp/out")
print command
start = time.time()
#s = client.get("/tmp/test/test_" + str(num_bytes))
@@ -279,21 +284,21 @@ def test_with_inmem_mr(number_of_nodes, number_replicas, f, client, cache=True):
runtime = time.time() - start
runtimes_write[num_bytes_scenario] = runtime
- os.system("hadoop fs -rm -r %s"%(filename))
+ os.system("hadoop fs -rm -r %s"%(hdfs_filename))
os.system("hadoop fs -rm -r /tmp/out")
os.system("hadoop fs -expunge")
os.remove(filename)
time.sleep(1)
for key, value in runtimes.iteritems():
- result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario + "," + str(number_replicas)+ "," + str(repeat)
+ result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario + "," + str(number_replicas)+ "," + str(repeat)+","+socket.gethostname()
print result
f.write(result + "\n")
f.flush()
scenario_write = scenario.replace("write", "read")
scenario_write = scenario_write + "_mr"
for key, value in runtimes_write.iteritems():
- result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario_write + "," + str(number_replicas) + "," + str(repeat)
+ result = str(key) + "," + str(value) + ",HDFS," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario_write + "," + str(number_replicas) + "," + str(repeat)+","+socket.gethostname()
print result
f.write(result + "\n")
@@ -345,18 +350,17 @@ def test_with_filesystem(number_of_nodes, number_replicas, f, client, target_dir
time.sleep(1)
for key, value in runtimes.iteritems():
- result = str(key) + "," + str(value) + ","+type+"," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario + "," + str(number_replicas)+ "," + str(repeat)
+ result = str(key) + "," + str(value) + ","+type+"," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario + "," + str(number_replicas)+ "," + str(repeat)+","+socket.gethostname()
print result
f.write(result + "\n")
- f.flush()
scenario_write = scenario.replace("write", "read")
scenario_write = scenario_write + "_mr"
for key, value in runtimes_write.iteritems():
- result = str(key) + "," + str(value) + ","+type+"," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario_write + "," + str(number_replicas) + "," + str(repeat)
+ result = str(key) + "," + str(value) + ","+type+"," + str(number_of_nodes) + "," + str(number_of_nodes) + "," + scenario_write + "," + str(number_replicas) + "," + str(repeat)+","+socket.gethostname()
print result
+ f.write(result + "\n")
f.flush()
-
if __name__ == '__main__':
# Preparation and configuration
@@ -385,4 +389,6 @@ def test_with_filesystem(number_of_nodes, number_replicas, f, client, target_dir
#test_with_caching(number_of_nodes, number_replicas, f, client)
f.close()
- os.system("cd /home1/01131/tg804093; /home1/01131/tg804093/clean.sh")
+ clean_command = "cd $HOME; $HOME/clean.sh"
+ clean_command = os.path.expandvars(clean_command)
+ os.system(clean_command)
Please sign in to comment.
Something went wrong with that request. Please try again.