# Split `BSseq` objects into chunks to save memory

In [1]:
library(data.table)

In [2]:
df <- fread("09-OUT_matched_SNP_meth_cov.csv")

In [3]:
#' Generate Chunk Ranges with Fixed Number of Chunks
#'
#' This function divides a range into a specified number of chunks, with each chunk
#' having approximately the same size.
#'
#' @param start The starting point of the range to be divided.
#' @param end The ending point of the range to be divided.
#' @param num_chunks The fixed number of chunks to divide the range into.
#'
#' @return A matrix where each row represents a chunk, with the first column being
#'         the start of the chunk and the second column being the end of the chunk.
#' @examples
#' chunk_fixed_n(1, 100, 5)
#' @export
chunk_fixed_n <- function(start, end, num_chunks) {
  chunk_size <- ceiling((end - start + 1) / num_chunks)
  chunk_ranges <- sapply(1:num_chunks, function(i) {
    chunk_start <- start + (i - 1) * chunk_size
    chunk_end <- min(chunk_start + chunk_size - 1, end)
    c(chunk_start, chunk_end)
  })
  return(chunk_ranges)
}

#' Generate Chunk Ranges with Fixed Chunk Size
#'
#' This function divides a range into chunks up to a given maximum size, dynamically
#' determining the number of chunks based on the range and maximum chunk size.
#'
#' @param start The starting point of the range to be divided.
#' @param end The ending point of the range to be divided.
#' @param max_chunk_size The maximum size that each chunk can have.
#'bz
#' @return A matrix where each row represents a chunk, with the first column being
#'         the start of the chunk and the second column being the end of the chunk.
#'         The last chunk may be smaller than `max_chunk_size` to fit the range.
#' @examples
#' chunk_fixed_size(1, 100, 20)
#' @export
chunk_fixed_size <- function(start, end, max_chunk_size) {
  if (length(start) > 1 || length(end) > 1) {
    stop("start and end must be single values")
  }
  if (start > end || max_chunk_size <= 0) {
    stop("Invalid arguments: ensure start <= end and max_chunk_size > 0")
  }
  
  num_chunks <- ceiling((end - start + 1) / max_chunk_size)
  chunk_ranges <- matrix(nrow = num_chunks, ncol = 2)
  
  for (i in 1:num_chunks) {
    chunk_start <- start + (i - 1) * max_chunk_size
    chunk_end <- min(chunk_start + max_chunk_size - 1, end)
    chunk_ranges[i, ] <- c(chunk_start, chunk_end)
  }
  
  return(t(chunk_ranges))
}

In [4]:
head(df)

Chr,SNP_data,methylation_data,last_meth_value_with_SNP_coverage,first_meth_value_with_SNP_coverage,last_meth_index_with_SNP_coverage,first_meth_index_with_SNP_coverage,population,region,cov_file
<int>,<chr>,<chr>,<int>,<int>,<int>,<int>,<chr>,<chr>,<chr>
1,/expanse/lustre/projects/jhu152/naglemi/mwas/gwas//libd_chr1.pgen,/expanse/lustre/projects/jhu152/naglemi/mwas/pheno/caud/out/chr1_AA.rda,248918358,1069461,2202702,8982,AA,caud,/expanse/lustre/projects/jhu152/naglemi/mwas/full_covariates/AA_caud.csv
2,/expanse/lustre/projects/jhu152/naglemi/mwas/gwas//libd_chr2.pgen,/expanse/lustre/projects/jhu152/naglemi/mwas/pheno/caud/out/chr2_AA.rda,241863783,10001,2019984,1,AA,caud,/expanse/lustre/projects/jhu152/naglemi/mwas/full_covariates/AA_caud.csv
3,/expanse/lustre/projects/jhu152/naglemi/mwas/gwas//libd_chr3.pgen,/expanse/lustre/projects/jhu152/naglemi/mwas/pheno/caud/out/chr3_AA.rda,198099789,11602,1538467,1,AA,caud,/expanse/lustre/projects/jhu152/naglemi/mwas/full_covariates/AA_caud.csv
4,/expanse/lustre/projects/jhu152/naglemi/mwas/gwas//libd_chr4.pgen,/expanse/lustre/projects/jhu152/naglemi/mwas/pheno/caud/out/chr4_AA.rda,189877411,69399,1387731,1,AA,caud,/expanse/lustre/projects/jhu152/naglemi/mwas/full_covariates/AA_caud.csv
5,/expanse/lustre/projects/jhu152/naglemi/mwas/gwas//libd_chr5.pgen,/expanse/lustre/projects/jhu152/naglemi/mwas/pheno/caud/out/chr5_AA.rda,181172584,44104,1409038,1,AA,caud,/expanse/lustre/projects/jhu152/naglemi/mwas/full_covariates/AA_caud.csv
6,/expanse/lustre/projects/jhu152/naglemi/mwas/gwas//libd_chr6.pgen,/expanse/lustre/projects/jhu152/naglemi/mwas/pheno/caud/out/chr6_AA.rda,170619093,192453,1412543,1138,AA,caud,/expanse/lustre/projects/jhu152/naglemi/mwas/full_covariates/AA_caud.csv


In [5]:
df$SNP_data <- gsub("/expanse/lustre/projects/jhu152/naglemi/mwas/gwas/", "/dcs04/lieber/statsgen/shizhong/michael/mwas/gwas/", df$SNP_data)
df$methylation_data <- gsub("/expanse/lustre/projects/jhu152/naglemi/mwas/pheno/", "/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/", df$methylation_data)
df$cov_file <- gsub("/expanse/lustre/projects/jhu152/naglemi/mwas/full_covariates/", "/dcs04/lieber/statsgen/mnagle/mwas/full_covariates/", df$cov_file)

In [6]:
i <- 1

chunk_ranges <- chunk_fixed_size(df$first_meth_index_with_SNP_coverage[i],
                                 df$last_meth_index_with_SNP_coverage[i],
                                 20000)

In [7]:
chunk_ranges <- t(chunk_ranges)
colnames(chunk_ranges) <- c("chunk_start", "chunk_end")

In [8]:
library(data.table)

# Assume df is your existing data.table

# Function to apply chunk_fixed_size for each row and expand the data.table
expand_df_with_chunks <- function(df) {
  expanded_list <- lapply(1:nrow(df), function(i) {
    # Use the chunk_fixed_size function to get chunk start and end for the row
    chunk_ranges <- t(chunk_fixed_size(df$first_meth_index_with_SNP_coverage[i],
                                       df$last_meth_index_with_SNP_coverage[i],
                                       20000))
    
    # Create a data.table for the chunks of the current row
    chunks_dt <- data.table(chunk_ranges)
    setnames(chunks_dt, c("V1", "V2"), c("chunk_start", "chunk_end"))
    
    # Repeat the i-th row of df for each chunk
    repeated_rows <- df[rep(i, nrow(chunks_dt)), ]
    
    # Combine the chunks with the repeated rows
    cbind(repeated_rows, chunks_dt)
  })
  
  # Combine all expanded rows into a single data.table
  return(rbindlist(expanded_list))
}

# Use the function to expand the original df with chunk information
df_expanded <- expand_df_with_chunks(df)

df_expanded


Chr,SNP_data,methylation_data,last_meth_value_with_SNP_coverage,first_meth_value_with_SNP_coverage,last_meth_index_with_SNP_coverage,first_meth_index_with_SNP_coverage,population,region,cov_file,chunk_start,chunk_end
<int>,<chr>,<chr>,<int>,<int>,<int>,<int>,<chr>,<chr>,<chr>,<dbl>,<dbl>
1,/dcs04/lieber/statsgen/shizhong/michael/mwas/gwas//libd_chr1.pgen,/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/caud/out/chr1_AA.rda,248918358,1069461,2202702,8982,AA,caud,/dcs04/lieber/statsgen/mnagle/mwas/full_covariates/AA_caud.csv,8982,28981
1,/dcs04/lieber/statsgen/shizhong/michael/mwas/gwas//libd_chr1.pgen,/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/caud/out/chr1_AA.rda,248918358,1069461,2202702,8982,AA,caud,/dcs04/lieber/statsgen/mnagle/mwas/full_covariates/AA_caud.csv,28982,48981
1,/dcs04/lieber/statsgen/shizhong/michael/mwas/gwas//libd_chr1.pgen,/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/caud/out/chr1_AA.rda,248918358,1069461,2202702,8982,AA,caud,/dcs04/lieber/statsgen/mnagle/mwas/full_covariates/AA_caud.csv,48982,68981
1,/dcs04/lieber/statsgen/shizhong/michael/mwas/gwas//libd_chr1.pgen,/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/caud/out/chr1_AA.rda,248918358,1069461,2202702,8982,AA,caud,/dcs04/lieber/statsgen/mnagle/mwas/full_covariates/AA_caud.csv,68982,88981
1,/dcs04/lieber/statsgen/shizhong/michael/mwas/gwas//libd_chr1.pgen,/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/caud/out/chr1_AA.rda,248918358,1069461,2202702,8982,AA,caud,/dcs04/lieber/statsgen/mnagle/mwas/full_covariates/AA_caud.csv,88982,108981
1,/dcs04/lieber/statsgen/shizhong/michael/mwas/gwas//libd_chr1.pgen,/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/caud/out/chr1_AA.rda,248918358,1069461,2202702,8982,AA,caud,/dcs04/lieber/statsgen/mnagle/mwas/full_covariates/AA_caud.csv,108982,128981
1,/dcs04/lieber/statsgen/shizhong/michael/mwas/gwas//libd_chr1.pgen,/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/caud/out/chr1_AA.rda,248918358,1069461,2202702,8982,AA,caud,/dcs04/lieber/statsgen/mnagle/mwas/full_covariates/AA_caud.csv,128982,148981
1,/dcs04/lieber/statsgen/shizhong/michael/mwas/gwas//libd_chr1.pgen,/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/caud/out/chr1_AA.rda,248918358,1069461,2202702,8982,AA,caud,/dcs04/lieber/statsgen/mnagle/mwas/full_covariates/AA_caud.csv,148982,168981
1,/dcs04/lieber/statsgen/shizhong/michael/mwas/gwas//libd_chr1.pgen,/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/caud/out/chr1_AA.rda,248918358,1069461,2202702,8982,AA,caud,/dcs04/lieber/statsgen/mnagle/mwas/full_covariates/AA_caud.csv,168982,188981
1,/dcs04/lieber/statsgen/shizhong/michael/mwas/gwas//libd_chr1.pgen,/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/caud/out/chr1_AA.rda,248918358,1069461,2202702,8982,AA,caud,/dcs04/lieber/statsgen/mnagle/mwas/full_covariates/AA_caud.csv,188982,208981


Sanity checks

In [9]:
#df_expanded[which(df_expanded$chunk_end == 2202702), ]

In [10]:
df_expanded[, modified_methylation_data := sub("/dcs04/lieber/statsgen/shizhong/michael/mwas/pheno/", "/dcs04/lieber/statsgen/mnagle/mwas/pheno/", methylation_data)]
df_expanded[, modified_methylation_data := sub("\\.rda$", paste0("_", chunk_start, "-", chunk_end, ".rds"), modified_methylation_data), by = .I]

#df_expanded


In [12]:
fwrite(df_expanded, "09.5-OUT_matched_SNP_meth_cov_chunked_JHPCE.csv")

In [11]:
#df_expanded[which(df_expanded$Chr == 2), ]

## Test for first chunk

In [12]:
library(CpGWAS)

In [13]:
# num_cores <- future::availableCores()

# i <- 1

# load(df_expanded$methylation_data[i])

# methInput <- new("MethylationInput",
#                BSseq_obj = BSobj2,
#                snp_data_path = df_expanded$SNP_data[i],
#                cov_path = df_expanded$cov_file[i],
#                start_site = df_expanded$chunk_start[i],
#                end_site = df_expanded$chunk_end[i],
#                no_cores = num_cores)

# BSobj2 <- means <- sds <- NULL

# if(!dir.exists(dirname(df_expanded$modified_methylation_data[i]))) dir.create(dirname(df_expanded$modified_methylation_data[i]))

# object.size(methInput)/10^6

# saveRDS(methInput, df_expanded$modified_methylation_data[i])

## Run over all

In [14]:
library(plyr)
library(methods)

In [15]:
# Filter out rows where the modified methylation data file already exists
df_expanded <- df_expanded[!sapply(modified_methylation_data, file.exists)]

In [16]:
num_cores <- 7

In [17]:
# # Initialize last loaded methylation_data to manage repeated loading
# last_loaded <- "filler"

# # Initialize the progress bar
# #pb <- txtProgressBar(min = 0, max = nrow(df_expanded), style = 3)

# for (i in 1:nrow(df_expanded)) {
#     # Progress bar setup
#     #setTxtProgressBar(pb, i)

#     # Check if the file already exists to skip the processing for this row
#     if (file.exists(df_expanded$modified_methylation_data[i])) {
#         next
#     }

#     # Only reload the methylation data if it's different from the last loaded data
#     if (df_expanded$methylation_data[i] != last_loaded) {
#         BSobj2 <- means <- sds <- NULL    
#         load(df_expanded$methylation_data[i])
#         last_loaded <- df_expanded$methylation_data[i]
#     }
    
#     # Create MethylationInput object
#     methInput <- suppressWarnings(new("MethylationInput",
#                      BSseq_obj = BSobj2,
#                      snp_data_path = df_expanded$SNP_data[i],
#                      cov_path = df_expanded$cov_file[i],
#                      start_site = df_expanded$chunk_start[i],
#                      end_site = df_expanded$chunk_end[i],
#                      no_cores = num_cores))
    
#     # Ensure the directory exists before saving
#     if (!dir.exists(dirname(df_expanded$modified_methylation_data[i]))) {
#         dir.create(dirname(df_expanded$modified_methylation_data[i]), recursive = TRUE)
#     }
    
#     # Print object size for debugging
#     #print(paste("Size of methInput object:", object.size(methInput)/10^6, "MB"))

#     methInput@pvar_dt <- NULL # To save space on disk since we'll reload it from SNP data
    
#     # Save the RDS file
#     saveRDS(methInput, df_expanded$modified_methylation_data[i])
# }

In [18]:
library(parallel)
library(data.table)

# Number of cores to use
num_cores <- 7

In [19]:
num_cores

In [20]:
library(parallel)
library(data.table)

THIS ONE GIVES ERROR

In [21]:
# library(parallel)
# library(CpGWAS)  # Ensure the CpGWAS package is loaded in the main session

# # Initialize last loaded methylation_data to manage repeated loading
# last_loaded <- "filler"

# # Set up parallel computation using 7 cores
# num_cores <- 2
# cluster <- makeCluster(num_cores)

# # Load CpGWAS package on each cluster node
# clusterEvalQ(cluster, {
#   library(CpGWAS)
# })

# # Export variables to the cluster nodes
# clusterExport(cluster, c("last_loaded", "df_expanded", "num_cores"))

# process_row <- function(i) {
#   # Check if the file already exists to skip the processing for this row
#   if (file.exists(df_expanded$modified_methylation_data[i])) {
#     return(NULL)
#   }

#   # Only reload the methylation data if it's different from the last loaded data
#   if (df_expanded$methylation_data[i] != last_loaded) {
#     BSobj <- BSobj2 <- means <- sds <- NULL
#     load(df_expanded$methylation_data[i])
#     last_loaded <<- df_expanded$methylation_data[i]
#   }

#   # Determine which BSseq object to use directly in the new object creation
#   # Create MethylationInput object
#   methInput <- suppressWarnings(new("MethylationInput",
#                     BSseq_obj = if (exists("BSobj2")) BSobj2 else BSobj,
#                     snp_data_path = df_expanded$SNP_data[i],
#                     cov_path = df_expanded$cov_file[i],
#                     start_site = df_expanded$chunk_start[i],
#                     end_site = df_expanded$chunk_end[i],
#                     no_cores = num_cores))

#   # Ensure the directory exists before saving
#   if (!dir.exists(dirname(df_expanded$modified_methylation_data[i]))) {
#     dir.create(dirname(df_expanded$modified_methylation_data[i]), recursive = TRUE)
#   }

#   methInput@pvar_dt <- NULL # To save space on disk since we'll reload it from SNP data

#   # Save the RDS file
#   saveRDS(methInput, df_expanded$modified_methylation_data[i])
# }

# # Execute the process_row function in parallel
# results <- parLapply(cluster, 1:nrow(df_expanded), process_row)

# # Stop the cluster
# stopCluster(cluster)


In [22]:
library(parallel)
library(CpGWAS)  # Ensure the CpGWAS package is loaded in the main session

# Initialize last loaded methylation_data to manage repeated loading
last_loaded <- "filler"

# Set up parallel computation using 2 cores (as corrected)
cluster <- makeCluster(7)

# Load CpGWAS package on each cluster node
clusterEvalQ(cluster, {
  library(CpGWAS)
})

# Export variables to the cluster nodes
clusterExport(cluster, c("last_loaded", "df_expanded", "num_cores"))

process_row <- function(i) {
  tryCatch({
    # Check if the file already exists to skip the processing for this row
    if (file.exists(df_expanded$modified_methylation_data[i])) {
      return(NULL)
    }

    # Only reload the methylation data if it's different from the last loaded data
    if (df_expanded$methylation_data[i] != last_loaded) {
      BSobj <- BSobj2 <- means <- sds <- NULL
      load(df_expanded$methylation_data[i])
      last_loaded <<- df_expanded$methylation_data[i]
    }

    # Determine which BSseq object to use directly in the new object creation
    # Create MethylationInput object
    methInput <- suppressWarnings(new("MethylationInput",
                      BSseq_obj = if (exists("BSobj2")) BSobj2 else BSobj,
                      snp_data_path = df_expanded$SNP_data[i],
                      cov_path = df_expanded$cov_file[i],
                      start_site = df_expanded$chunk_start[i],
                      end_site = df_expanded$chunk_end[i],
                      no_cores = 1))

    # Ensure the directory exists before saving
    if (!dir.exists(dirname(df_expanded$modified_methylation_data[i]))) {
      dir.create(dirname(df_expanded$modified_methylation_data[i]), recursive = TRUE)
    }

    methInput@pvar_dt <- NULL # To save space on disk since we'll reload it from SNP data

    # Save the RDS file
    saveRDS(methInput, df_expanded$modified_methylation_data[i])
    return(list(success = TRUE, index = i))
  }, error = function(e) {
    return(list(success = FALSE, index = i, error_message = e$message))
  })
}

# Execute the process_row function in parallel
results <- parLapply(cluster, 1:nrow(df_expanded), process_row)

# Stop the cluster
stopCluster(cluster)

# Filter out error messages and indices
errors <- Filter(function(x) !x$success, results)

# Print errors and their indices
if (length(errors) > 0) {
  print("Errors occurred at the following rows:")
  for (error in errors) {
    print(paste("Row:", error$index, "Error:", error$error_message))
  }
} else {
  print("No errors occurred.")
}


[1] "Errors occurred at the following rows:"
[1] "Row: 2 Error: object 'BSobj' not found"
[1] "Row: 3 Error: object 'BSobj' not found"
[1] "Row: 4 Error: object 'BSobj' not found"
[1] "Row: 5 Error: object 'BSobj' not found"
[1] "Row: 6 Error: object 'BSobj' not found"
[1] "Row: 7 Error: object 'BSobj' not found"
[1] "Row: 8 Error: object 'BSobj' not found"
[1] "Row: 9 Error: object 'BSobj' not found"
[1] "Row: 10 Error: object 'BSobj' not found"
[1] "Row: 11 Error: object 'BSobj' not found"
[1] "Row: 12 Error: object 'BSobj' not found"
[1] "Row: 13 Error: object 'BSobj' not found"
[1] "Row: 14 Error: object 'BSobj' not found"
[1] "Row: 15 Error: object 'BSobj' not found"
[1] "Row: 16 Error: object 'BSobj' not found"
[1] "Row: 17 Error: object 'BSobj' not found"
[1] "Row: 18 Error: object 'BSobj' not found"
[1] "Row: 19 Error: object 'BSobj' not found"
[1] "Row: 20 Error: object 'BSobj' not found"
[1] "Row: 21 Error: object 'BSobj' not found"
[1] "Row: 22 Error: object 'BSobj' not foun

In [24]:
library(parallel)
library(CpGWAS)  # Ensure the CpGWAS package is loaded in the main session

# Set up parallel computation using 7 cores
cluster <- makeCluster(11)

# Load CpGWAS package on each cluster node
clusterEvalQ(cluster, {
  library(CpGWAS)
})

# Group data by unique methylation_data path for efficient loading
methylation_groups <- split(1:nrow(df_expanded), df_expanded$methylation_data)

process_group <- function(group_indices) {
  BSobj <- BSobj2 <- means <- sds <- NULL
  meth_data_path <- df_expanded$methylation_data[group_indices[1]]
  
  # Load methylation data only once per group
  load(meth_data_path)
  
  results <- lapply(group_indices, function(i) {
    tryCatch({
      # Skip processing if the modified methylation data already exists
      if (file.exists(df_expanded$modified_methylation_data[i])) {
        return(NULL)
      }

      # Create MethylationInput object
      methInput <- suppressWarnings(new("MethylationInput",
                        BSseq_obj = if (exists("BSobj2")) BSobj2 else BSobj,
                        snp_data_path = df_expanded$SNP_data[i],
                        cov_path = df_expanded$cov_file[i],
                        start_site = df_expanded$chunk_start[i],
                        end_site = df_expanded$chunk_end[i],
                        no_cores = 1))

      # Ensure the directory exists before saving
      if (!dir.exists(dirname(df_expanded$modified_methylation_data[i]))) {
        dir.create(dirname(df_expanded$modified_methylation_data[i]), recursive = TRUE)
      }

      # Nullify large variables to save space
      methInput@pvar_dt <- NULL
      
      # Save the RDS file
      saveRDS(methInput, df_expanded$modified_methylation_data[i])
      return(list(success = TRUE, index = i))
    }, error = function(e) {
      return(list(success = FALSE, index = i, error_message = e$message))
    })
  })
  return(results)
}

# Export necessary variables and execute the process_group function in parallel
clusterExport(cluster, c("df_expanded"))
results <- parLapply(cluster, methylation_groups, process_group)

# Stop the cluster
stopCluster(cluster)

# Filter out error messages and indices
errors <- Filter(function(x) !x$success, unlist(results, recursive = FALSE))

# Print errors and their indices
if (length(errors) > 0) {
  print("Errors occurred at the following rows:")
  for (error in errors) {
    print(paste("Row:", error$index, "Error:", error$error_message))
  }
} else {
  print("No errors occurred.")
}


ERROR: Error in !x$success: invalid argument type
