Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

define .slice and dataframe methods #2

Merged
merged 6 commits into from
Mar 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ Encoding: UTF-8
LazyData: true
Imports:
httr,
parallel,
Suggests: testthat
91 changes: 87 additions & 4 deletions R/api.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ Sentenai <- setRefClass("Sentenai",
res <- GET(url, get_api_headers())
content(res)
},
# TODO: start, end
field_stats = function(stream, field) {
field_stats = function(stream, field, start = NULL, end = NULL) {
args = list()
if (!is.null(start)) {
args <- c(args, list(start = to_iso_8601(start)))
}
if (!is.null(end)) {
args <- c(args, list(end = to_iso_8601(end)))
}
url <- paste(c(host, 'streams', stream$name, 'fields', field, 'stats'), collapse = '/')
res <- GET(url, get_api_headers())
res <- GET(url, get_api_headers(), query = args)
content(res)
},
# TODO: filter by meta
Expand All @@ -44,6 +50,15 @@ Sentenai <- setRefClass("Sentenai",
streams[lapply(streams, function(s){ grepl(name, s$name)} ) == T]
}
},
range = function(stream, start, end) {
url <- paste(
c(host, 'streams', stream$name, 'start', to_iso_8601(start), 'end', to_iso_8601(end)),
collapse = '/'
)
res <- GET(url, get_api_headers())
# TODO: this returns \n separated JSON events, probably need to split/parse
content(res)
},
# `statements` is a JSON AST query string for now
query = function(statements, limit = Inf) {
Cursor$new(client = .self, query = statements, limit = limit)$get()
Expand Down Expand Up @@ -82,6 +97,10 @@ parse_iso_8601 <- function(str) {
as.POSIXlt(str, "UTC", "%Y-%m-%dT%H:%M:%S")
}

to_iso_8601 <- function(timestamp) {
strftime(timestamp, "%FT%H:%M:%OS3Z")
}

Cursor <- setRefClass("Cursor",
fields = list(client = "Sentenai", query = "character", limit = "numeric", query_id = "character"),
methods = list(
Expand All @@ -98,7 +117,7 @@ Cursor <- setRefClass("Cursor",
},
spans = function () {
cid <- query_id
spans <- c()
spans <- list()

while (!is.null(cid) & length(spans) < limit) {
if (is.finite(limit)) {
Expand All @@ -121,6 +140,70 @@ Cursor <- setRefClass("Cursor",
s
})
sps
},
.slice = function (cursor_id, start, end, max_retries = 3) {
retries <- 0
cursor <- sprintf(
"%s+%s+%s",
strsplit(cursor_id, "\\+")[[1]][[1]],
to_iso_8601(start),
to_iso_8601(end)
)
url <- sprintf("%s/query/%s/events", client$host, cursor)
events <- list()

while(!is.null(cursor)) {
r <- GET(url, client$get_api_headers())
code <- status_code(r)

if (code == 400) {
stop(sprintf("Client error in request for cursor: %s", cursor))
} else if (code != 200 & retries >= max_retries) {
stop('Failed to get cursor')
} else if (code != 200) {
retries <- retries + 1
} else {
# TODO: process this all, not sure how to mimic python lib
cont <- content(r)
# has info about which streams were queries
# print(cont[[1]])
# has actual events
# print(length(cont[[2]]))

cursor <- headers(r)$cursor
events <- c(events, cont[[2]])
}
}

events
},
dataframe = function (sp = spans()) {
c1 <- makeCluster(detectCores() - 1)

# this feels fragile
clusterExport(c1, "to_iso_8601")
clusterEvalQ(c1, library(httr))

df <- parLapply(c1, sp, function(span) {
.slice(query_id, span$start, span$end)
})

stopCluster(c1)
df
},
stats = function (sp = spans()) {
deltas <- unlist(lapply(sp, function(span) {
difftime(span$end, span$start, units = "secs")
}))
if (length(deltas) == 0) { return(list()) }

list(
min = min(deltas),
max = max(deltas),
mean = sum(deltas) / length(deltas),
median = sort(deltas)[[round(length(deltas) / 2)]],
count = length(deltas)
)
}
)
)