Skip to content

Commit

Permalink
Bring data into R before doing a big left join -- Acero has a bug res…
Browse files Browse the repository at this point in the history
…ulting in a limit of 4GB on the right table (apache/arrow#37655)
  • Loading branch information
skysyzygy committed Jun 9, 2024
1 parent 4e6f0ff commit 360841e
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export(setnafill)
export(setunite)
export(stream_debounce)
export(tessi_changed_emails)
importFrom(arrow,as_arrow_table)
importFrom(arrow,concat_tables)
importFrom(censusxy,cxy_geography)
importFrom(checkmate,assert)
Expand Down
8 changes: 4 additions & 4 deletions R/email_stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,10 @@ email_fix_eaddress <- function(email_stream) {
setleftjoin(email_matches, emails_cleaned, by = "eaddress")

# provide event_type and event_subtype and remove columns
email_stream <- email_stream %>% select(-eaddress) %>% compute %>%
email_stream <- email_stream %>% select(-eaddress) %>% collect %>%
left_join(email_matches[,.(customer_no,timestamp_id,eaddress,domain)],
by = c("customer_no","timestamp_id")) %>%
compute
as_arrow_table

}

Expand Down Expand Up @@ -289,8 +289,8 @@ email_stream_base <- function(from_date = as.POSIXct("1900-01-01"), to_date = no
return(arrow::arrow_table(group_customer_no=integer(0)))
}

email_stream %>% email_data_append(...) %>% email_fix_timestamp %>% email_fix_eaddress %>%
filter(timestamp >= from_date & timestamp < to_date) %>%
email_stream %>% email_data_append(...) %>% email_fix_timestamp %>%
filter(timestamp >= from_date & timestamp < to_date) %>% email_fix_eaddress %>%
transmute(group_customer_no = as.integer(group_customer_no), customer_no,
timestamp, timestamp_id, event_type = "Email", event_subtype,
source_no, appeal_no, campaign_no, source_desc, extraction_desc,
Expand Down
2 changes: 1 addition & 1 deletion tests/testthat/test-email_stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ test_that("email_fix_timestamp recalculates send timestamps based on earliest pr
test_that("email_fix_eaddress fills in customer emails based on the send date", {
email_fix_timestamp <- email_data_stubbed() %>% email_data_append_stubbed() %>%
email_fix_timestamp %>% collect %>% setDT
email_fix_eaddress <- email_fix_eaddress_stubbed(email_fix_timestamp)
email_fix_eaddress <- email_fix_eaddress_stubbed(email_fix_timestamp) %>% collect %>% setDT
emails <- readRDS(rprojroot::find_testthat_root_file("email_stream-emails.Rds")) %>% setDT %>%
setkey(customer_no, timestamp)

Expand Down

0 comments on commit 360841e

Please sign in to comment.