Skip to content

Commit

Permalink
Added support for Elasticsearch 6
Browse files Browse the repository at this point in the history
  • Loading branch information
James Lamb committed May 21, 2018
1 parent 18fef75 commit 668ebdd
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 19 deletions.
22 changes: 19 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ env:
- ES_VERSION=5.3.0
- ES_VERSION=5.4.0
- ES_VERSION=5.5.0
- ES_VERSION=5.5.0

before_install:
- case "$ES_VERSION" in
Expand Down Expand Up @@ -79,18 +80,33 @@ before_install:
export ES_VERSION=5.5.0 ;
curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-$ES_VERSION.deb && sudo dpkg -i --force-confnew elasticsearch-$ES_VERSION.deb && sudo service elasticsearch start
;;

"6.0.0")
export ES_VERSION=6.0.0 ;
curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-$ES_VERSION.deb && sudo dpkg -i --force-confnew elasticsearch-$ES_VERSION.deb && sudo service elasticsearch start
;;

"6.1.0")
export ES_VERSION=6.1.0 ;
curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-$ES_VERSION.deb && sudo dpkg -i --force-confnew elasticsearch-$ES_VERSION.deb && sudo service elasticsearch start
;;

"6.2.0")
export ES_VERSION=6.2.0 ;
curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-$ES_VERSION.deb && sudo dpkg -i --force-confnew elasticsearch-$ES_VERSION.deb && sudo service elasticsearch start
;;
esac

- sleep 20
- sudo service elasticsearch status
- wget https://download.elastic.co/demos/kibana/gettingstarted/shakespeare.json -O shakespeare.json
- mv inst/testdata/shakespeare_mapping.json shakespeare_mapping.json
- echo $(ls)
- curl --silent -X PUT "http://127.0.0.1:9200/shakespeare" -d @shakespeare_mapping.json
- curl --silent -X PUT "http://127.0.0.1:9200/shakespeare" -H 'Content-Type:application/json' -d @shakespeare_mapping.json
- head -10000 shakespeare.json > sample_data.json
- split -l 1000 sample_data.json data_
- for filename in $(ls | grep data_); do curl --silent -X POST "http://127.0.0.1:9200/shakespeare/_bulk" --data-binary "@$filename"; done
- curl -X GET "http://127.0.0.1:9200/shakespeare/_search?size=1"
- for filename in $(ls | grep data_); do curl --silent -X POST "http://127.0.0.1:9200/shakespeare/_bulk" -H 'Content-Type:application/json' --data-binary "@$filename"; done
- curl -X GET "http://127.0.0.1:9200/shakespeare/_search?size=1" -H 'Content-Type:application/json'

after_success:
- Rscript -e 'install.packages("covr"); covr::codecov()'
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ importFrom(futile.logger,flog.info)
importFrom(futile.logger,flog.warn)
importFrom(httr,GET)
importFrom(httr,RETRY)
importFrom(httr,add_headers)
importFrom(httr,content)
importFrom(httr,stop_for_status)
importFrom(jsonlite,fromJSON)
Expand Down
32 changes: 24 additions & 8 deletions R/elasticsearch_eda_funs.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#' @name get_counts
#' @description For a given field, return a data.table with its unique values in a time range.
#' @importFrom data.table := data.table setnames setkeyv
#' @importFrom httr content RETRY
#' @importFrom httr add_headers content RETRY
#' @importFrom purrr transpose
#' @export
#' @param field A valid field in whatever Elasticsearch index you are querying
Expand Down Expand Up @@ -64,8 +64,13 @@ get_counts <- function(field
]}}}}, "aggs": {"', field, '": {"terms": {"field": "', field, '", "size":', max_terms,'}}}}')

#===== Build search URL =====#
searchURL <- paste0(es_host, "/", es_index, "/_search?size=0")
result <- httr::RETRY(verb = "POST", url = searchURL, body = aggsQuery)
searchURL <- paste0(es_host, "/", es_index, "/_search?size=0")
result <- httr::RETRY(
verb = "POST"
, httr::add_headers(c('Content-Type' = 'application/json'))
, url = searchURL
, body = aggsQuery
)
counts <- httr::content(result, as = "parsed")[["aggregations"]][[field]][["buckets"]]

#===== Get data =====#
Expand Down Expand Up @@ -93,7 +98,12 @@ get_counts <- function(field
{"missing": {"field": "', field, '"}}]}}}}}')

# Get result
result <- httr::RETRY(verb = "POST", url = searchURL, body = missingQuery)
result <- httr::RETRY(
verb = "POST"
, httr::add_headers(c('Content-Type' = 'application/json'))
, url = searchURL
, body = missingQuery
)
numMissings <- httr::content(result, as = "parsed")[["hits"]][["total"]]

# Return now if user asked to only see NAs if there are any
Expand All @@ -116,7 +126,7 @@ get_counts <- function(field
#' @name get_fields
#' @description For a given Elasticsearch index, return the mapping from field name
#' to data type for all indexed fields.
#' @importFrom httr GET content stop_for_status
#' @importFrom httr add_headers content GET stop_for_status
#' @importFrom data.table := uniqueN
#' @param es_indices A character vector that contains the names of indices for
#' which to get mappings. Default is \code{'_all'}, which means
Expand Down Expand Up @@ -154,7 +164,10 @@ get_fields <- function(es_host
########################## make the query ################################
log_info(paste('Getting indexed fields for indices:', indices))

result <- httr::GET(url = url)
result <- httr::GET(
url = url
, httr::add_headers(c('Content-Type' = 'application/json'))
)
httr::stop_for_status(result)
resultContent <- httr::content(result, as = 'parsed')

Expand Down Expand Up @@ -211,14 +224,17 @@ get_fields <- function(es_host

# [title] Get a data.table containing names of indices and aliases
# [es_host] A string identifying an Elasticsearch host.
#' @importFrom httr content GET stop_for_status
#' @importFrom httr add_headers content GET stop_for_status
.get_aliases <- function(es_host) {

# construct the url to the alias endpoint
url <- paste0(es_host, '/_cat/aliases')

# make the request
result <- httr::GET(url = url)
result <- httr::GET(
url = url
, httr::add_headers(c('Content-Type' = 'application/json'))
)
httr::stop_for_status(result)
resultContent <- httr::content(result, as = 'text')

Expand Down
18 changes: 14 additions & 4 deletions R/elasticsearch_parsers.R
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ es_search <- function(es_host
# hits_to_pull - Total hits to be pulled (documents matching user's query).
# Or, in the case where max_hits < number of matching docs,
# max_hits.
#' @importFrom httr content RETRY stop_for_status
#' @importFrom httr add_headers content RETRY stop_for_status
#' @importFrom jsonlite fromJSON
#' @importFrom uuid UUIDgenerate
.keep_on_pullin <- function(scroll_id
Expand All @@ -958,7 +958,12 @@ es_search <- function(es_host
while (hits_pulled < max_hits){

# Grab a page of hits, break if we got back an error
result <- httr::RETRY(verb = "POST", url = scroll_url, body = scroll_id)
result <- httr::RETRY(
verb = "POST"
, httr::add_headers(c('Content-Type' = 'application/json'))
, url = scroll_url
, body = scroll_id
)
httr::stop_for_status(result)
resultJSON <- httr::content(result, as = "text")

Expand Down Expand Up @@ -1081,7 +1086,7 @@ es_search <- function(es_host
# write(result, 'results.json')
#
# }
#' @importFrom httr content RETRY stop_for_status
#' @importFrom httr add_headers content RETRY stop_for_status
.search_request <- function(es_host
, es_index
, trailing_args = NULL
Expand All @@ -1098,7 +1103,12 @@ es_search <- function(es_host
}

# Make request
result <- httr::RETRY(verb = "POST", url = reqURL, body = query_body)
result <- httr::RETRY(
verb = "POST"
, httr::add_headers(c('Content-Type' = 'application/json'))
, url = reqURL
, body = query_body
)
httr::stop_for_status(result)
result <- httr::content(result, as = "text")

Expand Down
2 changes: 1 addition & 1 deletion inst/testdata/shakespeare_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"type": "integer"
},
"text_entry": {
"type": "string"
"type": "text"
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions tests/testthat/test-integration.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ futile.logger::flog.threshold(0)
test_that("es_search works as expected for a simple request",
{testthat::skip_on_cran()

outDT <- es_search(es_host = "http://127.0.0.1:9200"
, es_index = "shakespeare"
, max_hits = 100)
outDT <- es_search(
es_host = "http://localhost:9200"
, es_index = "shakespeare"
, max_hits = 100
)

expect_true("data.table" %in% class(outDT))
expect_true(nrow(outDT) == 100)
Expand Down

0 comments on commit 668ebdd

Please sign in to comment.