Skip to content

Commit

Permalink
Add doc on ingesting from curl
Browse files Browse the repository at this point in the history
  • Loading branch information
dhutchis committed Aug 10, 2016
1 parent f8e2e5e commit aabe6f9
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 0 deletions.
39 changes: 39 additions & 0 deletions ingest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Myria Ingest from `curl`

The following snippet is useful for streaming data into Myria.
Modify it to fit your needs, especially the "relationKey" and "schema".
The `curl` command reads data from stdin. You can pipe data into it from a file or program.

```bash
TDIR="."
MyriaHostAndPort="localhost:8753"
# Write the relation name and schema into temporary files
echo "{ \"userName\" : \"public\", \"programName\" : \"adhoc\", \"relationName\" : \"MyRelation\" }" > "$TDIR/relationKey.json"
echo '{ "columnTypes": ["STRING_TYPE", "LONG_TYPE"], "columnNames": ["kmer", "cnt"] }' > "$TDIR/schema.json"

curl -i -XPOST "$MyriaHostAndPort"/dataset -H "Content-type: multipart/form-data" \
-F "relationKey=@$TDIR/relationKey.json; type=application/json" \
-F "schema=@$TDIR/schema.json; type=application/json" \
-F "delimiter=," \
-F "binary=false" \
-F "overwrite=false" \
-F "data=@-; type=application/octet-stream"
```

The "binary" option indicates a binary file stream.
This should come from some equivalent of the Java `DataOutputStream`. See Myria's `BinaryFileScan` class for reference.
Otherwise CSV format is assumed. It should be possible to change the delimiter,
except that there seems to be trouble communicating the delimiter character to the server.


## Script Starting Points

Included are some scripts that were used in a genomics/oceanography data ingest scenario.
You may find it easier to start with these and modify them to suit your needs.

The first is for ingesting a single file: [ingest_myria.sh](ingest_myria.sh).
The second is for ingseting a directory of files, each one as an individual relation: [ingest_directory.sh](ingest_directory.sh).
The third generates a MyriaL query that unions together all the dataests that match a search term: [combine_tables.sh](combine_tables.sh).
The fourth is for ingesting from S3: [ingest_from_s3.sh](ingest_from_s3.sh).


48 changes: 48 additions & 0 deletions ingest/combine_tables.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/bash
set -e #command fail -> script fail
set -u #unset variable reference causes script fail

_usage() {
echo "Usage: $0 myria_host:myria_port QueryTerm ResultRelation"
echo " QueryTerm: the search term for relations in Myria to combine"
echo " ResultRelation: the name of the new relation to store in Myria"
echo " ex: $0 node-109:8753 kmercnt_11_forward_S kmercnt_11_forward"
echo " This script queries a Myria instance for all the relations that match a search term."
echo " It creates a MyriaL query that unions together all the matching relations and stores them."
exit 1
}

if [ "$#" -lt "3" ]; then
_usage
fi

MyriaHostAndPort="${1-node-109:8753}"
QueryTerm="${2-kmercnt_11_forward_S000}"
ResultRelation="${3-}"

# Check pre-requisite
command -v jsawk >/dev/null 2>&1 || { echo >&2 "I require 'jsawk' but it's not installed. Aborting."; exit 1; }

str=""
while read rn; do {

sid=`expr "$rn" : '.*\(S[0-9]\{4\}\)'`

if [ -z "$str" ]; then
str="$rn = scan($rn); R = [from $rn emit \"$sid\" as sampleid, kmer, cnt];
"
else
str="$str$rn = scan($rn); R = R + [from $rn emit \"$sid\" as sampleid, kmer, cnt];
"
fi
}; done < <(curl -s -XGET "$MyriaHostAndPort"/dataset/search?q="$QueryTerm" \
| jsawk 'return this.relationName' -a 'return this.join("\n")' ) || :

str="$str
store(R, ${ResultRelation}_Pkmer, [kmer]);
store(R, ${ResultRelation}_Psampleid, [sampleid]);"

echo "$str"

# to remove newlines from the output
# | tr '\n' ' ' | less
83 changes: 83 additions & 0 deletions ingest/ingest_directory.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/bin/bash
set -e #command fail -> script fail
set -u #unset variable reference causes script fail

_usage() {
echo "Usage: $0 [-f] myria_host:myria_port InputDirectory RelationPrefix RelationSuffix JAVA_COMMAND"
echo " InputDirectory: the csv kmer files to ingest. Processed files end in '.csv'"
echo " RelationPrefix and RelationSuffix: creates relations of the template \"PrefixSXXXXSuffix\""
echo " The first argument can be '-f'. If so, the relation forcefully overwrites existing relations."
echo " The JAVA_COMMAND are the arguments to java. It takes files from InputDirectory as input and pipes to a curl POST."
exit 1
}
#node-109
#/home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_11_cnt/S0002_11_cnt.csv
#'/home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar'
#./ingest_myria.sh -f localhost:8753 /home/dhutchis/gits/istc_oceanography/myria_ingest/S0002_11_cnt_cut.csv kmercnt_11_forward_S0002_cut -cp /home/dhutchis/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 11

# ./ingest_directory.sh localhost:8753 /home/dhutchis/gits/istc_oceanography/extracted-kmers/nohead testpre_ _forward -cp /home/dhutchis/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 11
# ./ingest_directory.sh http://ec2-54-213-84-235.us-west-2.compute.amazonaws.com:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_11_cnt kmercnt_11_rc_ _ol -cp /home/gridsan/groups/istcdata/fromDylan/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 11 -rc

# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_11_cnt kmercnt_11_forward_ _ol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 11
# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_11_cnt kmercnt_11_rc_ _ol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 11 -rc
# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_11_cnt kmercnt_11_lex_ _ol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 11 -lex
# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_non_overlapped_11_cnt kmercnt_11_forward_ _nol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 11
# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_non_overlapped_11_cnt kmercnt_11_rc_ _nol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 11 -rc
# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_non_overlapped_11_cnt kmercnt_11_lex_ _nol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 11 -lex

# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_13_cnt kmercnt_13_forward_ _ol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 13
# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_13_cnt kmercnt_13_rc_ _ol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 13 -rc
# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_13_cnt kmercnt_13_lex_ _ol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 13 -lex
# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_non_overlapped_13_cnt kmercnt_13_forward_ _nol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 13
# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_non_overlapped_13_cnt kmercnt_13_rc_ _nol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 13 -rc
# ./ingest_directory.sh node-109:8753 /home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_non_overlapped_13_cnt kmercnt_13_lex_ _nol -cp /home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 13 -lex

if [ "$#" -lt "5" ]; then
_usage
fi

force="false"
if [ "$1" == "-f" ]; then
force="true"
shift
fi

if [ "$#" -lt "5" ]; then
_usage
fi

get_script_dir () {
SOURCE="${BASH_SOURCE[0]}"
# While $SOURCE is a symlink, resolve it
while [ -h "$SOURCE" ]; do
DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
SOURCE="$( readlink "$SOURCE" )"
# If $SOURCE was a relative symlink (so no "/" as prefix, need to resolve it relative to the symlink base directory
[[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE"
done
DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
echo "$DIR"
}
SCRIPT_DIR="$(get_script_dir)"

# Check pre-requisite
command -v jsawk >/dev/null 2>&1 || { echo >&2 "I require 'jsawk' but it's not installed. Aborting."; exit 1; }
command -v "$SCRIPT_DIR/ingest_myria.sh" >/dev/null 2>&1 || { echo >&2 "I require 'ingest_myria' but it's not installed. Aborting."; exit 1; }

MyriaHostAndPort="${1}"
InputDirectory="${2}"
RelationPrefix="${3}"
RelationSuffix="${4}"
shift; shift; shift; shift;

while read fn; do {
sid=`expr "$fn" : '.*\(S[0-9]\{4\}\)'`
echo "$fn: $sid"

if [ "$force" == "true" ]; then
"$SCRIPT_DIR/ingest_myria.sh" -f "$MyriaHostAndPort" "$fn" "$RelationPrefix$sid$RelationSuffix" $@
else
"$SCRIPT_DIR/ingest_myria.sh" "$MyriaHostAndPort" "$fn" "$RelationPrefix$sid$RelationSuffix" $@
fi
}; done < <(ls "$InputDirectory"/*S*.csv -1 ) #> /tmp/o

34 changes: 34 additions & 0 deletions ingest/ingest_from_s3.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/sh
command -v aws >/dev/null 2>&1 || { echo >&2 "aws-cli not found - aborting."; exit 1; }

if [ "$#" -lt "2" ]; then
echo "Usage: $0 s3Bucket ResultRelation"
echo " ResultRelation: the name of the new relation to store in Myria"
echo " ex: $0 oceankmers/overlapped kmercnt_11_ol"
echo " This creates a MyriaL query that unions together all the matching relations and stores them."
exit 1
fi

ResultRelation="${2-}"
s3bucket="$1"


str=""
for f in $(aws s3 ls $s3bucket/ | cut -d ' ' -f 6); do
sid=`expr "$f" : '.*\(S[0-9]\{4\}\)'`
s3path="https://s3-us-west-2.amazonaws.com/$s3bucket/$f"
if [ -z "$str" ]; then
str="$sid = load(\"$s3path\", csv(schema(kmer:string, cnt:float),skip=1)); R = [from $sid emit \"$sid\" as sampleid, kmer, cnt];
"
else
str="$str $sid = load(\"$s3path\", csv(schema(kmer:string, cnt:float),skip=1)); R = R + [from $sid emit \"$sid\" as sampleid, kmer, cnt];
"
fi
done

str="$str
store(R, ${ResultRelation}_Pkmer, [kmer]);
store(R, ${ResultRelation}_Psampleid, [sampleid]);"

echo "$str"

106 changes: 106 additions & 0 deletions ingest/ingest_myria.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/bin/bash
set -e #command fail -> script fail
set -u #unset variable reference causes script fail

_usage() {
echo "Usage: $0 [-f] myria_host:myria_port InputFile ResultRelation JAVA_COMMAND"
echo " InputFile: the csv kmer file to ingest"
echo " ResultRelation: the name of the new relation to store in Myria"
echo " The first argument can be '-f'. If so, the relation is force added."
echo " The JAVA_COMMAND are the arguments to java. It takes the InputFile as input and pipes to a curl POST."
exit 1
}
#node-109
#/home/gridsan/groups/istcdata/datasets/ocean_metagenome/csv_data/parsed_11_cnt/S0002_11_cnt.csv
#'/home/gridsan/dhutchison/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar'
#./ingest_myria.sh -f localhost:8753 /home/dhutchis/gits/istc_oceanography/myria_ingest/S0002_11_cnt_cut.csv kmercnt_11_forward_S0002_cut -cp /home/dhutchis/gits/graphulo/target/graphulo-1.0.0-SNAPSHOT-all.jar edu.mit.ll.graphulo_ocean.OceanPipeKmers -K 11

if [ "$#" -lt "3" ]; then
_usage
fi

force="false"
if [ "$1" == "-f" ]; then
force="true"
shift
fi

if [ "$#" -lt "3" ]; then
_usage
fi

# Check pre-requisite
command -v jsawk >/dev/null 2>&1 || { echo >&2 "I require 'jsawk' but it's not installed. Aborting."; exit 1; }

MyriaHostAndPort="${1}"
InputFile="${2}"
ResultRelation="${3}"
shift; shift; shift;

if [ "$force" == "true" ]; then
t=""
else
t=$(curl -s -XGET "$MyriaHostAndPort"/dataset/search?q="$ResultRelation" \
| jsawk 'return this.relationName' -a 'return this.join("\n")' )
fi

if [ ! -z "$t" ] && [ "$t" == "$ResultRelation" ]; then
echo "The relation $ResultRelation already exists."
else
# do the ingest

if [ -e "/tmp/myriaIngestDir_$(whoami)" ]; then
if [ -d "/tmp/myriaIngestDir_$(whoami)" ]; then
TDIR="/tmp/myriaIngestDir_$(whoami)"
else
echo "Warning: the path \"/tmp/myriaIngestDir\" is taken"
TDIR=`mktemp -d myriaIngestDir_XXXXXXXXXX`
fi
else
mkdir "/tmp/myriaIngestDir_$(whoami)"
TDIR="/tmp/myriaIngestDir_$(whoami)"
fi
echo "{ \"userName\" : \"public\", \"programName\" : \"adhoc\", \"relationName\" : \"$ResultRelation\" }" > "$TDIR/relationKey.json"
echo '{ "columnTypes": ["STRING_TYPE", "LONG_TYPE"], "columnNames": ["kmer", "cnt"] }' > "$TDIR/schema.json"
printf ',' > "$TDIR/delimiter.json"
echo 'true' > "$TDIR/overwrite.json"

java $@ < "$InputFile" \
| curl -i -XPOST "$MyriaHostAndPort"/dataset -H "Content-type: multipart/form-data" \
-F "relationKey=@$TDIR/relationKey.json; type=application/json" \
-F "schema=@$TDIR/schema.json; type=application/json" \
-F "delimiter=," \
-F "overwrite=$force" \
-F "data=@-; type=application/octet-stream"

fi




##################################
# -F "partitionFunction=a" \

# if [ ! -d ingestTmp ]; then
# mkdir ingestTmp
# fi
# echo "1,2
# 2,3
# 4,5
# 6,7" > ingestTmp/smallerTable.csv
# echo '{ "userName" : "public", "programName" : "adhoc", "relationName" : "test1" }' > ingestTmp/relationKey.json
# echo '{ "columnTypes": ["LONG_TYPE", "LONG_TYPE"], "columnNames": ["col1", "col2"] }' > ingestTmp/schema.json

# #http://ec2-52-39-96-185.us-west-2.compute.amazonaws.com
# curl -i -XPOST node-109:8753/dataset -H "Content-type: multipart/form-data" \
# -F "relationKey=@./ingestTmp/relationKey.json; type=application/json" \
# -F "schema=@./ingestTmp/schema.json; type=application/json" \
# -F "overwrite=true" \
# -F "data=@-; type=application/octet-stream"

# ./ingestTmp/smallerTable.csv
# -F "binary=false" \
# -F "isLittleEndian=false" \
# -F "delimitter=," \


0 comments on commit aabe6f9

Please sign in to comment.