In [2]:
# step 1,2,4 maximum cpu 96 ram 624
# step 3,5 training cpu 32 ram 120 enable gpu nvidia tesla v100 gpus 8

# Step 1. Download omop table

In [1]:
!mkdir linear_prob
!mkdir aou_cehrgpt
!mkdir allofus_omop_v8
!pip install cehrbert_data

In [6]:
import os
import subprocess
# Get the BigQuery curated dataset for the current workspace context.
CDR = os.environ['WORKSPACE_CDR']
my_bucket = os.getenv('WORKSPACE_BUCKET')
local_folder = "allofus_omop_v8"
 
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
 
# Initialize Spark Session
spark = SparkSession.builder \
    .appName('BigQuery with Spark') \
    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2') \
    .config('spark.driver.memory', '24g') \
    .config('spark.executor.cores', '32') \
    .config('spark.executor.memory', '10g') \
    .getOrCreate()

In [7]:
# Verify if we set those properties correctly
print("spark.driver.memory: " + spark.conf.get("spark.driver.memory"))
print("spark.executor.cores: " + spark.conf.get("spark.executor.cores"))
print("spark.executor.memory: " + spark.conf.get("spark.executor.memory"))

In [8]:
for omop_table in [
    "person", "visit_occurrence", "drug_exposure", 
    "procedure_occurrence", "condition_occurrence", "measurement",
    "concept", "concept_ancestor", "concept_relationship", "death"
]:

    print(f"converting {omop_table} now")
    omop_table_df = spark.read.format('bigquery') \
    .option('table', f'{CDR}.{omop_table}') \
    .load()
    omop_table_df.write.mode("overwrite").parquet(os.path.join(local_folder, omop_table))

# Step 2. Generate training data

In [None]:
##########SPARK process for preparing training data
```bash

# 1. Set all configs in SPARK_SUBMIT_OPTIONS 
export SPARK_SUBMIT_OPTIONS="--master local[16] --driver-memory 16g --executor-memory 36g --executor-cores 4 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.coalescePartitions.enabled=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer"

```

# 2. do this to find where python sitepackages lives:

ls -al ~/.


# 3. Generate a list of concepts to use (qualified_concept_list folder - concept list should sit under allofus_omop_v8)

```bash
spark-submit $SPARK_SUBMIT_OPTIONS ~/.local/lib/python3.10/site-packages/cehrbert_data/apps/generate_included_concept_list.py \
    -i allofus_omop_v8 \
    -o allofus_omop_v8 \
    --min_num_of_patients 100
```

# 4. You need to make train and test folders in patient_sequence folder before generating training data.
SEQ_DIR    = Path("allofus_omop_v8/patient_sequence/patient_sequence")
TRAIN_DIR  = SEQ_DIR / "train"
TEST_DIR   = SEQ_DIR / "test"

TRAIN_DIR.mkdir(parents=True, exist_ok=True)
TEST_DIR.mkdir(parents=True, exist_ok=True)

# 5. run full download (first go to workspace folder by cd...) - generate training data

```bash
spark-submit $SPARK_SUBMIT_OPTIONS ~/.local/lib/python3.10/site-packages/cehrbert_data/apps/generate_training_data.py \
    --input_folder allofus_omop_v8 \
    --output_folder allofus_omop_v8/patient_sequence \
    -iv \
    -ip \
    --gpt_patient_sequence \
    --include_concept_list \
    --include_inpatient_hour_token \
    --att_type day \
    --inpatient_att_type day \
    --should_construct_artificial_visits \
    --disconnect_problem_list_records \
    --include_death \
    --domain_table_list condition_occurrence procedure_occurrence drug_exposure
```

# If there is memory error: To increase SPARK_EXECUTOR_MEMORY reduce SPARK_MASTER local[64] to local[16]. Since you have 2 cores, you have 8 ***available. 8xSPARK_EXECUTOR_MEMORY should not exceed RAM.


# Step 3. CEHR-GPT training 

In [1]:
import os
my_bucket = os.getenv('WORKSPACE_BUCKET')

In [2]:
mkdir -p /home/jupyter/workspaces/ehrcancermodelevaluation/allofus_omop_v8

In [3]:
!gsutil -m cp -r {my_bucket}/allofus_omop_v8 /home/jupyter/workspaces/ehrcancermodelevaluation/allofus_omop_v8

In [11]:
import os
import subprocess
# Get the BigQuery curated dataset for the current workspace context.
 
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
 
# Initialize Spark Session
spark = SparkSession.builder \
    .appName('BigQuery with Spark') \
    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2') \
    .config('spark.driver.memory', '24g') \
    .config('spark.executor.cores', '32') \
    .config('spark.executor.memory', '10g') \
    .getOrCreate()

In [None]:
!mkdir aou_cehrgpt

In [None]:
# git clone https://github.com/knatarajan-lab/cehrgpt.git
# pip install .
# training loss needs to be under 3-4
# with learning rate 0.0002, training loss was 6.7
# reduced learning rate to 5e-5 and added logging_step 10

In [None]:
#################training
export CEHR_GPT_MODEL_DIR=/home/jupyter/workspaces/ehrcancermodelevaluation/aou_cehrgpt
export CEHR_GPT_DATA_DIR=/home/jupyter/workspaces/ehrcancermodelevaluation/allofus_omop_v8/allofus_omop_v8
export TRANSFORMERS_VERBOSITY=info

nohup python -u -m cehrgpt.runners.hf_cehrgpt_pretrain_runner \
  --model_name_or_path $CEHR_GPT_MODEL_DIR \
  --tokenizer_name_or_path $CEHR_GPT_MODEL_DIR \
  --output_dir $CEHR_GPT_MODEL_DIR \
  --data_folder "$CEHR_GPT_DATA_DIR/patient_sequence/patient_sequence/train" \
  --dataset_prepared_path "$CEHR_GPT_DATA_DIR/dataset_prepared" \
  --do_train true --seed 42 \
  --dataloader_num_workers 16 --dataloader_prefetch_factor 8 \
  --hidden_size 768 --num_hidden_layers 14 --max_position_embeddings 2048 \
  --evaluation_strategy epoch --save_strategy epoch \
  --warmup_steps 500 --weight_decay 0.01 \
  --num_train_epochs 10 --learning_rate 5e-5 \
  --use_early_stopping --early_stopping_threshold 0.001 \
  --load_best_model_at_end \
  --report_to none \
  --per_device_train_batch_size 2 \
  --per_device_eval_batch_size 2 \
  --gradient_accumulation_steps 1 \
  --sample_packing \
  --max_tokens_per_batch 3072 \
  --logging_steps 10 &> nohup.out &

# Step 4. Extract features

In [5]:
# prediction_time must be string
meds_data = read_df_from_bucket('meds_all_data_rev')
meds_data['prediction_time'] = meds_data['prediction_time'].astype(str)
meds_data.to_parquet('meds_data/meds_data.parquet', index = False)

# # alternatively, run
# python -u -m cehrbert_data.tools.convert_prediction_time_to_str \
#     -i meds_data \
#     -o meds_data

In [6]:
# check
meds_data = pd.read_parquet('meds_data/meds_data.parquet')
meds_data.dtypes

subject_id          int64
prediction_time    object
boolean_value       int64
dtype: object

In [None]:
spark-submit $SPARK_SUBMIT_OPTIONS ~/.local/lib/python3.10/site-packages/cehrbert_data/tools/extract_features.py \
    -c year3_gpt_sequence \
    -i allofus_omop_v8 \
    -o gpt_seq \
    -dl 1985-01-01 \
    -du 2023-12-31 \
    --cohort_dir meds_data/ \
    --person_id_column subject_id \
    --index_date_column prediction_time \
    --label_column boolean_value \
    -ip \
    --gpt_patient_sequence \
    --att_type day \
    --inpatient_att_type day \
    -iv \
    --ehr_table_list condition_occurrence procedure_occurrence drug_exposure \
    --include_concept_list \
    --patient_splits_folder allofus_omop_v8/patient_splits/ \
    --cache_events \
    --should_construct_artificial_visits \
    --disconnect_problem_list_records \
    --observation_window 0

In [14]:
!ps aux | grep extract_features.py | grep -v grep

# Step 5. Compute features

In [None]:
nohup python -u -m cehrgpt.tools.linear_prob.compute_cehrgpt_features /home/jupyter/workspaces/ehrcancermodelevaluation/config_pat_emb.yaml &> config_pat_emb.yaml.out &

# Step 6. Linear probing

In [None]:
nohup python -u -m cehrgpt.tools.linear_prob.train_with_cehrgpt_features \
--features_data_dir "/home/jupyter/workspaces/ehrcancermodelevaluation/aou_cehrgpt/linear_prob" \
--output_dir "/home/jupyter/workspaces/ehrcancermodelevaluation/aou_cehrgpt/linear_prob" \
&> "linear_prob.out" &