Permalink
Browse files

first commit

  • Loading branch information...
phact committed Feb 28, 2017
1 parent 13e3bea commit 6d008c86e98403c001c4164fcd8772ee9c2fa9bd
Showing with 361 additions and 0 deletions.
  1. +10 −0 analysis/charts
  2. +60 −0 analysis/consolidate-loader-log
  3. +76 −0 analysis/graph-loader-throughput
  4. +5 −0 analysis/measuregaps
  5. +148 −0 mapping/generic_mapper.groovy
  6. +62 −0 rundgl
@@ -0,0 +1,10 @@
cp ../loader.log .
./consolidate-loader-log loader.log
./graph-loader-throughput vertex-throughput.csv
mv throughput.png vertexthroughput.png
./graph-loader-throughput property-throughput.csv
mv throughput.png propertythroughput.png
./graph-loader-throughput edge-throughput.csv
mv throughput.png edgethroughput.png
./graph-loader-throughput vertex-throughput.csv edge-throughput.csv property-throughput.csv
mv throughput.png total-throughput.png
@@ -0,0 +1,60 @@
#!/usr/bin/perl
#2016-05-31 16:20:42 INFO Reporter:57 - ADD Request for 0 vertices 0 edges 175500 properties 0 anonymous
#2016-05-29 07:06:42 INFO Reporter:57 - ADD Request for 5300 vertices 0 edges 0 properties 0 anonymous
#2017-01-16 19:12:58 INFO Reporter:92 - ADD Request for 29600 vertices 0 edges 0 properties 0 anonymous
#2017-01-16 19:12:58 INFO Reporter:97 - Current total additions: 7597500 vertices 0 edges 0 properties 0 anonymous
use strict;
use warnings;
open(my $vertex_throughput, '>', 'vertex-throughput.csv');
open(my $edge_throughput, '>', 'edge-throughput.csv');
open(my $property_throughput, '>', 'property-throughput.csv');
open(my $anonymous_throughput, '>', 'anonymous-throughput.csv');
my $current_file_handler;
my $avg_vertex_throughput = 0;
my $avg_edge_throughput = 0;
my $avg_property_throughput = 0;
my $avg_anonymous_throughput = 0;
my $number_of_vertex_data_points = 0;
my $number_of_edge_data_points = 0;
my $number_of_property_data_points = 0;
my $number_of_anonymous_data_points = 0;
while(<>) {
if(/^(....-..-.. ..:..:..) .* - ADD Request for (.*) vertices (.*) edges (.*) properties (.*) anonymous$/) {
if($2 != 0) {
print $vertex_throughput "$1;$2\n";
$avg_vertex_throughput = ($avg_vertex_throughput * $number_of_vertex_data_points + $2) / ($number_of_vertex_data_points + 1);
$number_of_vertex_data_points++;
}
if($3 != 0) {
print $edge_throughput "$1;$3\n";
$avg_edge_throughput = ($avg_edge_throughput * $number_of_edge_data_points + $3) / ($number_of_edge_data_points + 1);
$number_of_edge_data_points++;
}
if($4 != 0) {
print $property_throughput "$1;$4\n";
$avg_property_throughput = ($avg_property_throughput * $number_of_property_data_points + $4) / ($number_of_property_data_points + 1);
$number_of_property_data_points++;
}
if($5 != 0) {
print $anonymous_throughput "$1;$5\n";
$avg_anonymous_throughput = ($avg_anonymous_throughput * $number_of_anonymous_data_points + $5) / ($number_of_anonymous_data_points + 1);
$number_of_anonymous_data_points++;
}
}
}
close $vertex_throughput;
close $edge_throughput;
close $property_throughput;
close $anonymous_throughput;
sub format_number {
my $text = reverse int($_[0]);
$text =~ s/(\d\d\d)(?=\d)(?!\d*\.)/$1,/g;
return scalar reverse $text;
}
printf "Element | Number of measurements | Average throughput (ops/s) | Total number of elements\n";
printf "------------- | ---------------------- | -------------------------- | ------------------------\n";
printf "Vertices | %22s | %26s | %24s\n", $number_of_vertex_data_points, format_number($avg_vertex_throughput), format_number($avg_vertex_throughput * $number_of_vertex_data_points);
printf "Edges | %22s | %26s | %24s\n", $number_of_edge_data_points, format_number($avg_edge_throughput), format_number($avg_edge_throughput * $number_of_edge_data_points);
printf "Properties | %22s | %26s | %24s\n", $number_of_property_data_points, format_number($avg_property_throughput), format_number($avg_property_throughput * $number_of_property_data_points);
printf "Anonymous | %22s | %26s | %24s\n", $number_of_anonymous_data_points, format_number($avg_anonymous_throughput), format_number($avg_anonymous_throughput * $number_of_anonymous_data_points);

@@ -0,0 +1,76 @@
#!/bin/bash
function help
{
echo "Usage: $0 <file> [<file>...] [-v]" >&2
echo "" >&2
echo "Generates the throughput charts from DSE Graph Loader CSV files" >&2
echo "" >&2
echo " -v Print runtime information" >&2
}
while getopts "hv:" opt; do
case $opt in
h)
help
exit
;;
v)
verbose="yes"
;;
esac
done
if [ $# == 0 ]
then
echo "Missing parameter: <file>" >&2
help
exit 1
fi
(
output_file="throughput.png"
echo "set term png size 1600,800"
echo "set output '$output_file'"
echo "set datafile separator ';'"
echo "set xdata time"
echo "set timefmt '%Y-%m-%d %H:%M:%S'"
echo "set format x '%Y-%m-%d %H:%M:%S'"
echo "set bmargin 5"
echo "set rmargin 6"
echo "set xtics rotate by 30 right"
echo "set yrange [0:]"
echo "set grid"
echo -n "plot 0/0 notitle"
for file in $@
do
if [[ "$file" == *.xz ]]
then
command="< xzcat $file"
avg_throughput=`xzcat $file | LC_ALL=en_US awk -F ';' '{ sum += $2; n++ } END { if (n > 0) printf("%'"'"'.0f", sum / n); }'`
elif [[ "$file" == *.gz ]]
then
command="< gzcat $file"
avg_throughput=`gzcat $file | LC_ALL=en_US awk -F ';' '{ sum += $2; n++ } END { if (n > 0) printf("%'"'"'.0f", sum / n); }'`
else
command="$file"
avg_throughput=`cat $file | LC_ALL=en_US awk -F ';' '{ sum += $2; n++ } END { if (n > 0) printf("%'"'"'.0f", sum / n); }'`
fi
echo -n ", '$command' using 1:2 with lines linewidth 2 title '$file (avg: $avg_throughput)' smooth bezier"
done
echo
) | gnuplot
(
echo "File | Average throughput"
echo "-- | --"
for file in $@
do
if [[ "$file" == *.xz ]]
then
avg_throughput=`xzcat $file | LC_ALL=en_US awk -F ';' '{ sum += $2; n++ } END { if (n > 0) printf("%'"'"'.0f", sum / n); }'`
elif [[ "$file" == *.gz ]]
then
avg_throughput=`gzcat $file | LC_ALL=en_US awk -F ';' '{ sum += $2; n++ } END { if (n > 0) printf("%'"'"'.0f", sum / n); }'`
else
avg_throughput=`cat $file | LC_ALL=en_US awk -F ';' '{ sum += $2; n++ } END { if (n > 0) printf("%'"'"'.0f", sum / n); }'`
fi
echo "$file | $avg_throughput"
done
) | column -ts '|'

@@ -0,0 +1,5 @@
#cat ../loader.log|grep queue|awk -F"DEBUG" '{print $1}' |sed "s/-//g"|sed "s/ //g"|sed "s/://g"|histogram.py
echo "Minutes in which job was queueing"
cat ./loader.log|grep queue|awk -F"DEBUG" '{print $1}'|awk -F":" '{print $1 ":" $2}'|uniq -c
echo "Minutes in which job was writing"
cat ./loader.log|grep ADD|awk -F"DEBUG" '{print $1}'|awk -F":" '{print $1 ":" $2}'|uniq -c
@@ -0,0 +1,148 @@
//This lets you log to logger.log
log = org.slf4j.LoggerFactory.getLogger("Mapping Script");

//Some utility functions
//null checks
isBlank = {
it == null || (it instanceof String && (it.isEmpty() || it.isAllWhitespace()))
}

noneBlank = { final String... args ->
return { def m ->
def keys = args != null && args.length > 0 ? args : m.keySet() as String[]
return !keys.collect(m.&get).any(isBlank)
}
}

removeBlank = null // required to allow recursion
removeBlank = { m ->
def result = [:]
m.keySet().each {
def val = m[it]
if (!isBlank(val)) {
result.put(it, val instanceof Map ? removeBlank(val) : val)
}
}
result
}

//Directory file traversal
//we are not traversing directories in this example but if you need to you must include the whole package name for
//java.ioFile
/*
import java.io.File as JavaFile
import groovy.io.FileType
*/

//Utility for matching file names with load blocks
def getFiles(fileList, matchPattern) {
def list = []

filearray = fileList.split(" ")
for( String value : filearray){
if (value ==~ matchPattern){
log.info("value added to matched list:" + value)
list << value
}
}
list
}


//The S3 integration has performance issues, when loading large volumes avoid using S3
//for authentication install the awscli and authenticate at the OS level with aws key / secret
//when loading from S3 ensure you have a custom log4j.properties file for the DGL to avoid
//ultra verbose output
/*
def loadS3Files(fileInput, matchPattern) {
def s3Files =[]
new java.io.File('./data').eachLine { line ->
s3Files << line
}
def list = []
s3Files.each { file ->
if (file ==~ matchPattern){
list << fileInput + file
}
}
list
}
*/

//Define Edges and Vertices
sampleVertex = {
label "samplevertex"
key "sampleid"
}

sampleEdge = {
label "has_edge_name"

outV "samplevertex", {
label "samplevertex"
key "sampleid"
}

inV "samplevertex1", {
label "samplevertex1"
key "sampleid"
}
inV "samplevertex2", {
label "samplevertex2"
key "sampleid"
}
}

//These are the load blocks. Create one for each file type you want to process.
//use the getFiles utility to run the right code block for the files being passed to DGL
//the files are passed using the parameter `inputfilename` which is provided in ./rundgl
getFiles(inputfilename, /.+sample_file_name.+.json.gz/).each{file ->
sampleInput = File.json(file).gzip()

//ensure that the primary keys of your vertices are present in the file before processing a record
sampleEdgeInput = sampleInput.filter(noneBlank("Key1", "Key2")).transform {
//we use def to ensure that there is a copy of the map inside the script event loop. This is required
//to keep things thread safe
//we use a map inside edge definitions to provide nesting,
//the edge has properties and two vertices, each vertex has it's own properties
def sampleEdge=[:]

sampleEdge["prop1"]=it["prop1"]
//you can manipulate names and values in the mapping, just make sure the name inside the it array
//matches the name of the property in your json file exactly (case sensitive)
sampleEdge["prop2"]=it["Prop2"]
//the vertices are defined as maps so they can hold their properties and keys
sampleEdge["samplevertex1"]=[:]
sampleEdge["samplevertex2"]=[:]
//the name of the key needs to match the name of the key in the vertex definition above
sampleEdge["samplevertex1"]["sampleid"]=it["sampleid"]
sampleEdge["samplevertex1"]["prop1"]=it["prop1"]
sampleEdge["samplevertex2"]["sampleid"]=it["sampleid"]
//notice here I am hardcoding a property which will be the same for all samplevertex2's
sampleEdge["samplevertex2"]["prop1"]="prop1"
sampleEdge["samplevertex2"]["prop2"]=it["prop2"]

//null check for vertices
sampleEdge["samplevertex1"] = removeBlank(sampleEdge["samplevertex1"])
sampleEdge["samplevertex2"] = removeBlank(sampleEdge["samplevertex2"])

//return the finished map
sampleEdge

//any empty fields will be removed
}.transform (removeBlank)

//note, by loading this edge, you will also be loading the two Vertices
//if you have already loaded this vertices in a previous file that is not a problem, we are
//using idempotent vertices (custom IDs)
//If you already loaded a Vetex with all it's properties and you load just it's ID here, it will
//continue to have the properties on reads (cassandra writes are sparse).
log.info("loading: sample edge")
load(sampleEdgeInput).asEdges(sampleEdge)

}

//add other load blocks for other file types here:

62 rundgl
@@ -0,0 +1,62 @@
#!/bin/bash

set -e
#set -x
TMPDIR="/mnt/tmp"
if [ ! -d "$TMPDIR" ]; then
sudo mkdir "$TMPDIR"
sudo chown ${USER} ${TMPDIR}
fi

rm -f loader.log loader.gclog loadedfiles.txt
echo "Started: " + $(date) >> loadedfiles.txt

#remember the directory match needs to end in *
FILES=($(ls -1fd /mnt/new-data/* | grep json.gz|sort))
FILECOUNT=${#FILES[@]}
BUCKETSIZE=40

BUCKETCOUNT=$((($FILECOUNT / $BUCKETSIZE) +1))
LASTBUCKET=$(($FILECOUNT % $BUCKETSIZE))

trap "exit" INT
#for f in /mnt/new-data/new-data-10M.json.*a.gz; do
for bucket in `seq "$BUCKETCOUNT"` ; do
INDEX=$(((0+$bucket-1)*$BUCKETSIZE))
LENGTH=$BUCKETSIZE

#echo $INDEX

#echo "{FILES[@]:$INDEX:$LENGTH}"
FILELISTCOUNT=$(echo "${FILES[@]:$INDEX:$LENGTH}"|wc -w )
FILELIST=$(echo "${FILES[@]:$INDEX:$LENGTH}")
echo "Processing $FILELISTCOUNT Files: $FILELIST"

#grc is nice for aesthetics, if you don't want to install it just comment out this line.
grc \
java \
-Xloggc:loader.gclog \
-XX:+PrintGCDateStamps \
-XX:+PrintGCDetails \
-XX:+PrintTenuringDistribution \
-XX:+PrintGCApplicationStoppedTime \
-XX:+PrintGCCause \
-XX:+HeapDumpOnOutOfMemoryError \
-XX:HeapDumpPath=. \
-Djava.io.tmpdir=/mnt/tmp \
-jar /path_to_dgl/dse-graph-loader-*-uberjar.jar \
-graph graph_dgl \
-address host \
./mapper/generic_mapper.groovy \
-inputfilename "$FILELIST" \
-load_vertex_threads 32 \
-load_edge_threads 32 \
-read_threads 8 \
-abort_on_num_failures $((1 * 1000 * 1000)) \
-driver_retry_attempts 100 \
-preparation false \
-load_new true \
-vertex_complete false \
&& echo "$(date) Loaded " "$FILELIST" >> ./loadedfiles.txt

done

0 comments on commit 6d008c8

Please sign in to comment.