In [None]:
import importlib
import subprocess
import sys
from utils.environment_specific import is_local_development

def install_if_missing(package_name, pip_name=None):
    try:
        importlib.import_module(package_name)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", pip_name or package_name])

if not is_local_development():
    install_if_missing("dotenv", "python-dotenv")
    install_if_missing("tldextract")
    install_if_missing("onnxruntime")

In [None]:
import argparse
import copy
import random
import os

from dotenv import load_dotenv
import numpy as np
import pandas as pd
import torch
import tldextract

from utils.dataset import get_dataset_from_args
from utils.experiments import setup_mlflow_client, get_run_name
from utils.environment_specific import is_local_development
from utils.transformers import TransformerModelManager

# Parameters

In [None]:
# fmt: off
parser = argparse.ArgumentParser()
parser.add_argument("--seed", default=42, type=int, help="Random seed")
# Model args
parser.add_argument("--model_type", default="bert_tiny", type=str, help="Type of BERT model to use. see 'get_model_checkpoint' method")
parser.add_argument("--dropout", default=0, type=float, help="Dropout rate on final classification layer")
parser.add_argument("--decision_threshold", default=0.5, type=float, help="If probability of a class 1 is higher than this, then the sample is classified as class 1")
parser.add_argument("--max_sequence_length", default=256)

# Training params
parser.add_argument("--bert_learning_rate", default=3e-5, type=float, help="AdamW learning rate for everything in model except final classifaction layer")
parser.add_argument("--classifier_learning_rate", default=2e-3, type=float, help="AdamW learning rate for classification layer of model")
parser.add_argument("--weight_decay", default=0.01, type=float, help="AdamW weight decay for both parts of the model")
parser.add_argument("--batch_size", default=128, type=int, help="Batch size")
parser.add_argument("--epochs_max", default=5, type=int, help="Maximum number of epochs. Can stop early, however")
parser.add_argument("--patience", default=3, type=int, help="Number of epochs to wait for validation accuracy increase before stopping. If it is set to None, then early stopping is not used")
parser.add_argument("--freeze_epochs", default=1, type=int, help="Number of epochs to freeze BERT non-final layers initially.")
parser.add_argument("--loss", default="focal", choices=["cross_entropy", "focal"], type=str, help="Loss function used")
parser.add_argument("--focal_loss_gamma", default=2, help="Pass gama parameter if focal loss is being used. Otherwise has no effect")
parser.add_argument("--focal_loss_alpha", default=0.5, help="Pass alpha parameter if focal loss is being used. Otherwise has no effect")

# Dataset args
parser.add_argument("--dataset_name", default="joined", choices=["private_data", "any_public_dataset_name"])
parser.add_argument("--train_folds", default=None, type=str, help="Which folds of the dataset should be used for training")
parser.add_argument("--eval_folds", default=[4], type=str, help="Which folds of the dataset should be used for evaluation")
parser.add_argument("--shorten_to_train", default=None, help="How much should train dataset be shortened (400u - 400 records), (10% - 10 percent of all records)")
parser.add_argument("--shorten_to_eval", default=None, help="How much should test or validation set be shortened")

default_args = parser.parse_args([])
# fmt: on

In [None]:
loaded = load_dotenv(".env")
if not loaded:
    loaded = load_dotenv("../../.env")
assert loaded is True

In [None]:
np.random.seed(default_args.seed)
torch.manual_seed(default_args.seed)
# in case any standard library uses some random function
random.seed(default_args.seed)

- threads, parallelism env, ...

In [None]:
# TODO: check if useful
# num_threads = os.cpu_count()
# torch.set_num_threads(num_threads)
# if not "called" in locals():
#     called = True
#     torch.set_num_interop_threads(num_threads)
# print(f"Using {num_threads} CPU threads.")
os.environ["TOKENIZERS_PARALLELISM"] = "true"
os.environ["NCCL_SHM_DISABLE"] = "1"
if not is_local_development():
    print("Databricks is running")
    assert torch.cuda.is_available(), "GPU is not available!"
else:
    print("Local development is running")



In [None]:
# if you find those version and useless warnings annoying, uncomment
# import warnings
pd.set_option("display.max_colwidth", 120)
pd.set_option("display.max_rows", 800)
# warnings.filterwarnings("ignore", category=FutureWarning)
# warnings.filterwarnings("ignore", category=UserWarning, module="_distutils_hack")

# Dataset preparation

In [None]:
if is_local_development():
    spark = None
    # default_args.shorten_to_train = default_args.shorten_to_train or "4000u"
    # default_args.shorten_to_eval = default_args.shorten_to_eval or "2000u"
    default_args.shorten_to_train = None
    default_args.shorten_to_eval = None
    default_args.shorten_to_train = default_args.shorten_to_train or "1500u"
    default_args.shorten_to_eval = default_args.shorten_to_eval or "1000u"


In [None]:
def mask_second_level_domain(url):
    ext = tldextract.extract(url)
    sld = ext.domain
    if not sld:
        # nothing to replace
        return url
    return url.replace(sld, "[MASK]", 1)


def masking_batch_increaser(batch):
    flat = []
    for url, label in batch:
        flat.append((url, label))
        flat.append((mask_second_level_domain(url), label))
    random.shuffle(flat)
    return flat

- code for testing that it works

In [None]:
from utils.dataset import URLDatasetPart

base_urls = [f"http://abc{i}.com/{i}" for i in range(20)]
labels = [i % 2 for i in range(20)]

print(masking_batch_increaser(zip(base_urls, labels)))

In [None]:
default_train_dataset, default_eval_dataset = get_dataset_from_args(default_args, spark)

# Model

- load the model to all available GPUs

In [None]:
setup_mlflow_client()

In [None]:
import mlflow

from torch.utils.data import DataLoader

In [None]:
def do_transformer_train_test(
    args, train_dataset, eval_dataset, num_workers_train=0, num_workers_eval=0, persistent_workers=False, prefetch_factor=None
):
    """Do training run and evaluate on test set designated for evaluation

    Use test set from earlier from the script - everything has the same set
    """
    run_name = get_run_name("TRAIN", args.model_type, args.dataset_name)
    transformer_model = TransformerModelManager.from_args(args)
    print(f"[runner] Loading dataset...")
    print(f"[runner] Run name: {run_name}")
    print(f"[runner] Args: {args}")
    def collate_fn_combination(batch):
        new_batch = masking_batch_increaser(batch)
        return transformer_model.prepare_batch(new_batch)
    
    train_data_loader = DataLoader(
        train_dataset,
        batch_size=args.batch_size,
        shuffle=True,
        collate_fn=collate_fn_combination,
        prefetch_factor=prefetch_factor,
        persistent_workers=persistent_workers,
        num_workers=num_workers_train,
    )
    eval_data_loader = DataLoader(
        eval_dataset,
        batch_size=args.batch_size,
        shuffle=False,
        collate_fn=transformer_model.prepare_batch,
        prefetch_factor=prefetch_factor,
        persistent_workers=persistent_workers,
        num_workers=num_workers_eval,
    )
    print()
    with mlflow.start_run(run_name=run_name) as run:
        print(f"[runner]: Run id {run.info.run_id}")
        mlflow.log_params(vars(args))
        # log dataset info without actually storing the datasets
        train_dataset.log_to_mlflow("train", args.dataset_name, should_save=False)
        eval_dataset.log_to_mlflow("eval", args.dataset_name, should_save=False)
        transformer_model.train(train_data_loader=train_data_loader, eval_data_loader=eval_data_loader)

In [None]:

args = copy.deepcopy(default_args)


args.model_type="bert_mini"
args.dropout = None
args.bert_learning_rate=1e-5
args.classifier_learning_rate=1e-5
args.weight_decay=0
args.max_sequence_length=256
# will become 2 times bigger at runtime
args.batch_size=64
args.epochs_max=4
args.freeze_epochs=0
args.focal_loss_alpha=0.7
args.focal_loss_gamma=2
args.loss="focal"

do_transformer_train_test(args, train_dataset=default_train_dataset, eval_dataset=default_eval_dataset)

In [None]:

# args = copy.deepcopy(default_args)

# args.model_type="bert_small"
# args.dropout = None
# args.bert_learning_rate=1e-5
# args.classifier_learning_rate=1e-5
# args.weight_decay=0
# args.max_sequence_length=256
# args.batch_size=128
# args.epochs_max=3
# args.freeze_epochs=1
# args.focal_loss_alpha=-1
# args.loss="focal"

# do_transformer_train_test(args)

In [0]:
# args = copy.deepcopy(default_args)
# args.model_type = "bert_tiny"
# args.loss = "cross_entropy"
# args.epochs_max = 5
# do_transformer_train_test(args)

In [0]:
# args = copy.deepcopy(default_args)
# args.model_type = "bert_tiny"
# args.epochs_max = 5
# do_transformer_train_test(args)

In [0]:
# args = copy.deepcopy(default_args)
# args.model_type = "bert_small"
# do_transformer_train_test(args)