Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add last_cached_at metadata column for existing shards #122

Merged
merged 12 commits into from
Jan 15, 2016
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: cachemeifyoucan
Type: Package
Title: Cache Me If You Can
Version: 0.2.3.5
Version: 0.2.3.6
Description: One of the most frustrating parts about being a data scientist
is waiting for data or other large downloads. This package offers a caching
layer for arbitrary functions that relies on a PostgreSQL backend.
Expand Down
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Version 0.2.3.6
* Fix `last_cached_at` bug for existing shards.

# Version 0.2.3.5
* Fix `safe_column` logic to do what it actually intended.

Expand Down
22 changes: 16 additions & 6 deletions R/db.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ CLASS_MAP <- list(

# Columns that store meta data for shards
META_COLS <- c("last_cached_at")
META_COLS_TYPE <- list(last_cached_at = 'text')

#' Database table name for a given prefix and salt.
#'
Expand Down Expand Up @@ -89,8 +90,7 @@ db2df <- function(df, dbconn, key) {
#' @param key. Identifier of database table.
add_index <- function(dbconn, tblname, key, idx_name = paste0("idx_", digest::digest(tblname))) {
if (!tolower(substring(idx_name, 1, 1)) %in% letters) {
stop(sprintf("Invalid index name '%s': must begin with an alphabetic character",
idx_name))
stop(sprintf("Invalid index name '%s': must begin with an alphabetic character", idx_name))
}
DBI::dbGetQuery(dbconn, paste0("CREATE INDEX ", idx_name, " ON ", tblname, "(", key, ")"))
TRUE
Expand Down Expand Up @@ -335,28 +335,38 @@ write_data_safely <- function(dbconn, tblname, df, key, safe_columns) {
new_names_raw <- c(colnames(df), id_cols, META_COLS)
missing_cols <- setNames(!is.element(new_names, colnames(one_row)), new_names_raw)

if (any(missing_cols)) {
missing_user_cols <- missing_cols[setdiff(new_names_raw, META_COLS)]

if (any(missing_user_cols)) {
if (isTRUE(safe_columns)) {
stop("Safe Columns Error: Your function call is adding additional ",
"columns to a cache that already has pre-existing columns. This ",
"would suggest your cache is invalid and you should wipe the cache ",
"and start over.")
} else if (is.function(safe_columns)) {
safe_columns(missing_cols)
safe_columns(missing_user_cols)
}
}

# TODO: (RK) Check reverse, that we're not missing any already-present columns
removes <- integer(0)

if (any(missing_cols) && isTRUE(getOption("cachemeifyoucan.debug"))) {
message("Add columns to", tblname, ":", paste(names(which(missing_cols)), collapse = ", "))
}

for (index in which(missing_cols)) {
col <- new_names[index]
if (!all(vapply(col, nchar, integer(1)) > 0))
stop("Failed to retrieve MD5 hashed column names in write_data_safely")
# TODO: (RK) Figure out how to filter all NA columns without wrecking
# the tables.
if (index > length(df)) index <- col
sql <- paste0("ALTER TABLE ", tblname, " ADD COLUMN ",
col, " ", CLASS_MAP[[class(df[[index]])[1]]])

if (col %in% META_COLS) col_type <- META_COLS_TYPE[[col]]
else col_type <- CLASS_MAP[[class(df[[index]])[1]]]

sql <- paste("ALTER TABLE", tblname, "ADD COLUMN", col, col_type)
suppressWarnings(DBI::dbGetQuery(dbconn, sql))
}

Expand Down
14 changes: 8 additions & 6 deletions tests/testthat/test-data_integrity.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ describe("data integrity", {
}, no_check = TRUE)
})

test_that('it crashes when trying to expand a table on new column when safe_columns is TRUE', {
dbtest::with_test_db({
lapply(dbListTables(test_con), function(t) dbRemoveTable(test_con, t))
cached_fcn <- cache(batch_data, key = c(key = "id"), c("model_version", "type"), con = test_con, prefix = prefix, safe_columns = TRUE)
cached_fcn(key = 5:1, model_version, type)
expect_error(cached_fcn(key = 1:10, model_version, type, add_column = TRUE))
describe("when safe_column is TRUE", {
test_that('it crashes when trying to expand a table on new column', {
dbtest::with_test_db({
lapply(dbListTables(test_con), function(t) dbRemoveTable(test_con, t))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to do this anymore... @peterhurford ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirillseva Just did a little test. Tables persist between runs. So this line is still needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No you have to do this using with_test_db. But dbtest::db_test_that would do this automatically.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterhurford oh woah.. Nice! That's really helpful. Thanks for clearing it up.

cached_fcn <- cache(batch_data, key = c(key = "id"), c("model_version", "type"), con = test_con, prefix = prefix, safe_columns = TRUE)
cached_fcn(key = 5:1, model_version, type)
expect_error(cached_fcn(key = 1:10, model_version, type, add_column = TRUE))
})
})
})

Expand Down
53 changes: 53 additions & 0 deletions tests/testthat/test-last_cached_at.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,57 @@ describe("last_cached_at", {
expect_false(identical(sample_rows$last_cached_at, new_sample_rows$last_cached_at))
expect_true(all(sample_rows$last_cached_at < new_sample_rows$last_cached_at))
})

describe("when last_cached_at does not yet exist in shard", {
test_that("it adds the last_cached_at column with new data", {
dbtest::with_test_db({
lapply(dbListTables(test_con), function(t) dbRemoveTable(test_con, t))

cached_fcn <- cache(batch_data, key = c(key = "id"), c("model_version", "type"), con = test_con, prefix = prefix)
cached_fcn(key = 5:1, model_version, type)

shard_name <- cached_fcn(key = 5:1, model_version, type, dry. = TRUE)$shard_names[[1]]

# Drop last_cached_at column to mimic tables in older versions of cmiyc
DBI::dbGetQuery(test_con, paste("alter table", shard_name, "drop column", "last_cached_at"))

df <- DBI::dbGetQuery(test_con, paste("select * from ", shard_name, "limit 1"))
expect_false("last_cached_at" %in% names(df))

# Cache new data
cached_fcn(key = 5:10, model_version, type)

# Verify that the last_cached_at column exists
df <- DBI::dbGetQuery(test_con, paste("select * from ", shard_name, "limit 1"))
expect_true("last_cached_at" %in% names(df))
})
})

test_that("it adds the last_cached_at column with forced data", {
dbtest::with_test_db({
lapply(dbListTables(test_con), function(t) dbRemoveTable(test_con, t))

cached_fcn <- cache(batch_data, key = c(key = "id"), c("model_version", "type"), con = test_con, prefix = prefix)
cached_fcn(key = 5:1, model_version, type)

shard_name <- cached_fcn(key = 5:1, model_version, type, dry. = TRUE)$shard_names[[1]]

# Drop last_cached_at column to mimic tables in older versions of cmiyc
DBI::dbGetQuery(test_con, paste("alter table", shard_name, "drop column", "last_cached_at"))

df <- DBI::dbGetQuery(test_con, paste("select * from ", shard_name, "limit 1"))
expect_false("last_cached_at" %in% names(df))

# 1. Unforced cache retrival doesn't change anything
cached_fcn(key = 5:1, model_version, type)
df <- DBI::dbGetQuery(test_con, paste("select * from ", shard_name, "limit 1"))
expect_false("last_cached_at" %in% names(df))

# 2. Force cache data adds last_cached_at column
cached_fcn(key = 5:1, model_version, type, force. = TRUE)
df <- DBI::dbGetQuery(test_con, paste("select * from ", shard_name, "limit 1"))
expect_true("last_cached_at" %in% names(df))
})
})
})
})