# DAPT (Domain-Adaptive Pretraining) for Llama 3.1 on Earnings Call Transcripts

This notebook performs continued pretraining (causal LM objective) of Llama 3.1 using your local `stock_earning_call_transcripts.parquet`.

What you'll get:
- Environment-adaptive setup (CUDA, MPS, CPU) with automatic LoRA/QLoRA selection
- Robust dataset loading from Parquet and text-column auto-detection
- Efficient token packing into fixed-length sequences
- PEFT LoRA (and QLoRA on CUDA) training pipeline with Transformers Trainer
- Save adapters and quick inference sanity check

Notes:
- Accept the Llama 3.1 license on Hugging Face and authenticate before training.
- On macOS (MPS), QLoRA is disabled (no bitsandbytes). We use standard LoRA with float16/float32.
- For best performance, use a CUDA GPU and enable QLoRA.



In [1]:
# Install required libraries (run this once). You can re-run safely.
%pip -q install -U transformers datasets accelerate peft sentencepiece protobuf

# For CUDA QLoRA only (Linux/NVIDIA). Skip on macOS/CPU.
# %pip -q install bitsandbytes


^C
Note: you may need to restart the kernel to use updated packages.


In [8]:
%pip install -q huggingface_hub>=0.23.0

Note: you may need to restart the kernel to use updated packages.


In [None]:
from huggingface_hub import InferenceClient

client = InferenceClient("meta-llama/Llama-3.1-8B", token="hf_token")
resp = client.text_generation(
    "Write a haiku about GPUs",
    max_new_tokens=128,
    temperature=0.7,
)
print(resp)


The haiku is a 3-line poem that originated in Japan. It has 17 syllables in total, and it follows a 5-7-5 syllable pattern.
A GPU (Graphics Processing Unit) is a specialized electronic circuit designed to rapidly manipulate and alter memory to accelerate the creation of images in a frame buffer intended for output to a display. It is a component of a video card, used to control the rendering of images and video in a computer.
The GPU is a type of microprocessor, designed to handle a specific type of data. It is used in conjunction with a CPU, which is a general-purpose


In [1]:
import torch
print("CUDA available:", torch.cuda.is_available())
print("CUDA version:", torch.version.cuda)
print("PyTorch version:", torch.__version__)
print("SDPA backend:", torch.backends.cuda.sdp_kernel())

# If your GPU supports it, you can enable the flash kernel:
torch.backends.cuda.enable_flash_sdp(True)
torch.backends.cuda.enable_mem_efficient_sdp(True)
torch.backends.cuda.enable_math_sdp(True)


CUDA available: True
CUDA version: 12.4
PyTorch version: 2.6.0+cu124
SDPA backend: <contextlib._GeneratorContextManager object at 0x7f64b03f9ed0>


  self.gen = func(*args, **kwds)


In [2]:
print(torch.backends.cuda.flash_sdp_enabled())
print(torch.backends.cuda.mem_efficient_sdp_enabled())


True
True


In [11]:
# Minimize on-disk writes (avoid "No space left on device")
import os, tempfile, datasets, transformers

# Use a small temp dir for caches or disable dataset cache writes
TMP_DIR = tempfile.mkdtemp(prefix="hf_tmp_")
os.environ["HF_HOME"] = TMP_DIR
os.environ["HF_DATASETS_CACHE"] = os.path.join(TMP_DIR, "datasets_cache")
os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"
# Reduce CUDA fragmentation on small GPUs
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# Keep map results in memory to avoid materializing to disk
datasets.disable_caching()
print({
    "HF_HOME": os.environ.get("HF_HOME"),
    "HF_DATASETS_CACHE": os.environ.get("HF_DATASETS_CACHE"),
    "caching_disabled": True,
    "PYTORCH_CUDA_ALLOC_CONF": os.environ.get("PYTORCH_CUDA_ALLOC_CONF"),
})


{'HF_HOME': '/nobackup/vdhanuka/hf_cache/tmp/hf_tmp_0rbv6pvc', 'HF_DATASETS_CACHE': '/nobackup/vdhanuka/hf_cache/tmp/hf_tmp_0rbv6pvc/datasets_cache', 'caching_disabled': True, 'PYTORCH_CUDA_ALLOC_CONF': 'expandable_segments:True'}


In [12]:
# If needed, install dependencies. Uncomment the next cell to run once.
# %pip -q install -U transformers datasets accelerate peft
# For CUDA QLoRA only (Linux/NVIDIA):
# %pip -q install bitsandbytes

import os
import platform
import torch

# Detect environment
USE_CUDA = torch.cuda.is_available()
USE_MPS = (not USE_CUDA) and torch.backends.mps.is_available()
BF16_OK = USE_CUDA and torch.cuda.is_bf16_supported()
USE_QLORA = USE_CUDA  # QLoRA requires CUDA + bitsandbytes; set False on macOS/CPU

# Disable QLoRA automatically if bitsandbytes is not installed
try:
    import importlib.metadata as _ilmd
    _ = _ilmd.version("bitsandbytes")
except Exception:
    if USE_QLORA:
        print("bitsandbytes not found; disabling QLoRA (falling back to standard LoRA)")
    USE_QLORA = False

DEVICE = (
    torch.device("cuda") if USE_CUDA else (torch.device("mps") if USE_MPS else torch.device("cpu"))
)

print({
    "cuda": USE_CUDA,
    "mps": USE_MPS,
    "bf16_ok": BF16_OK,
    "use_qlora": USE_QLORA,
    "device": str(DEVICE),
    "python": platform.python_version(),
})


{'cuda': True, 'mps': False, 'bf16_ok': True, 'use_qlora': True, 'device': 'cuda', 'python': '3.10.12'}


In [13]:
from datasets import load_dataset
from typing import Optional

# Paths and config
PARQUET_PATH = "stock_earning_call_transcripts.parquet"
TEXT_COLUMN: Optional[str] = None  # override to force a column, else auto

raw_ds = load_dataset("parquet", data_files={"train": PARQUET_PATH})["train"]
print("Columns:", raw_ds.column_names)
print(raw_ds[0])

# If schema has nested `transcripts` (array of structs with speaker/content),
# flatten into a single text field for DAPT.
if "transcripts" in raw_ds.column_names:
    def flatten_segments(example):
        segments = example.get("transcripts") or []
        lines = []
        for seg in segments:
            if not seg:
                continue
            speaker = seg.get("speaker")
            content = seg.get("content")
            if content is None:
                continue
            if speaker and len(str(speaker)) > 0:
                lines.append(f"{speaker}: {content}")
            else:
                lines.append(str(content))
        example["__flattened_text"] = "\n".join(lines)
        return example

    raw_ds = raw_ds.map(flatten_segments)
    # Prefer flattened text unless user overrides
    if TEXT_COLUMN is None:
        TEXT_COLUMN = "__flattened_text"

# Auto-detect a reasonable text column if still unknown
if TEXT_COLUMN is None:
    preferred = ["__flattened_text","text","transcript","content","body","cleaned_text","utterance","raw_text"]
    for p in preferred:
        exact = [c for c in raw_ds.column_names if c.lower() == p]
        if len(exact) > 0:
            TEXT_COLUMN = exact[0]
            break

if TEXT_COLUMN is None:
    # fallback to first string-like column
    for name, feature in raw_ds.features.items():
        if getattr(feature, "dtype", "") in ("string", "large_string"):
            TEXT_COLUMN = name
            break

if TEXT_COLUMN is None:
    TEXT_COLUMN = raw_ds.column_names[0]

print("Using text column:", TEXT_COLUMN)

# Filter empty
ds = raw_ds.filter(lambda x: x.get(TEXT_COLUMN) is not None and len(str(x[TEXT_COLUMN])) > 0)
print(ds)
print("Example text:", str(ds[0][TEXT_COLUMN])[:400])


Columns: ['symbol', 'fiscal_year', 'fiscal_quarter', 'report_date', 'transcripts', 'transcripts_id']
{'symbol': 'A', 'fiscal_year': 2006, 'fiscal_quarter': 1, 'report_date': '2006-02-13', 'transcripts': [{'paragraph_number': 1, 'speaker': 'Agilent Technologies Incorporated (NYSE', 'content': 'A) :  Q1 2006 Earnings Release Conference Call   February 13, 2006'}, {'paragraph_number': 2, 'speaker': 'Operator', 'content': 'Good day, ladies and gentlemen and welcome to the Q1 2006 Agilent Technology Incorporated Earnings Conference and Analyst Meeting. My name is Jessie and I will be your coordinator for today’s call. At this time, all participants are in a listen-only mode and we will be conducting a question and answer session towards the end of this conference. As at any time during the call you require assistance, please key “*” followed by “0” and coordinator will be happy to assist you. As a reminder, this conference is being recorded for replay purposes. I would now like to turn the 

Map:   0%|          | 0/172429 [00:00<?, ? examples/s]

Using text column: __flattened_text


Filter:   0%|          | 0/172429 [00:00<?, ? examples/s]

Dataset({
    features: ['symbol', 'fiscal_year', 'fiscal_quarter', 'report_date', 'transcripts', 'transcripts_id', '__flattened_text'],
    num_rows: 172428
})
Example text: Agilent Technologies Incorporated (NYSE: A) :  Q1 2006 Earnings Release Conference Call   February 13, 2006
Operator: Good day, ladies and gentlemen and welcome to the Q1 2006 Agilent Technology Incorporated Earnings Conference and Analyst Meeting. My name is Jessie and I will be your coordinator for today’s call. At this time, all participants are in a listen-only mode and we will be conducting a


In [None]:
from transformers import AutoTokenizer

MODEL_ID = "meta-llama/Llama-3.1-8B"
BLOCK_SIZE = 1024  # lowered to reduce activation memory on 10–12 GB GPUs

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
# Avoid long-sequence warnings during tokenization; packing enforces BLOCK_SIZE later
try:
    tokenizer.model_max_length = 1_000_000_000
except Exception:
    pass

def tokenize_examples(batch):
    return tokenizer(batch[TEXT_COLUMN], add_special_tokens=False, truncation=False)

print("Tokenizing dataset (this may take a while)...")
tok_ds = ds.map(tokenize_examples, batched=True, remove_columns=[c for c in ds.column_names if c != TEXT_COLUMN])

# Pack tokens into fixed blocks
def group_texts(examples):
    concatenated = []
    for ids in examples["input_ids"]:
        concatenated.extend(ids + [tokenizer.eos_token_id])
    total_length = (len(concatenated) // BLOCK_SIZE) * BLOCK_SIZE
    if total_length == 0:
        return {"input_ids": [], "labels": []}
    input_ids = [concatenated[i:i+BLOCK_SIZE] for i in range(0, total_length, BLOCK_SIZE)]
    return {"input_ids": input_ids, "labels": [x.copy() for x in input_ids]}

lm_ds = tok_ds.map(group_texts, batched=True, remove_columns=tok_ds.column_names)
print(lm_ds)


Loading tokenizer...
Tokenizing dataset (this may take a while)...


Map:   0%|          | 0/172428 [00:00<?, ? examples/s]

Map:   0%|          | 0/172428 [00:00<?, ? examples/s]

Dataset({
    features: ['input_ids', 'labels'],
    num_rows: 1980805
})


In [16]:
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training

OUTPUT_DIR = "llama31_dapt_transcripts_lora"
LEARNING_RATE = 2e-4
EPOCHS = 1
PER_DEVICE_BATCH = 1
GRAD_ACCUM = 32

bnb_config = None
if USE_QLORA:
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16 if BF16_OK else torch.float16,
        bnb_4bit_use_double_quant=True,
    )

# Prefer FlashAttention-2 on CUDA if available; else fall back to SDPA
attn_impl = "sdpa"
if USE_CUDA:
    try:
        import flash_attn  # noqa: F401
        attn_impl = "flash_attention_2"
    except Exception:
        pass

print("Loading base model...")
model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    device_map="auto",
    torch_dtype=torch.bfloat16 if BF16_OK else (torch.float16 if USE_CUDA else torch.float32),
    quantization_config=bnb_config if USE_QLORA else None,
    attn_implementation=attn_impl,
    low_cpu_mem_usage=True,
)

if USE_QLORA:
    model = prepare_model_for_kbit_training(model)

lora_cfg = LoraConfig(
    task_type="CAUSAL_LM",
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"],
)
model = get_peft_model(model, lora_cfg)

# Reduce training memory footprint
model.config.use_cache = False
try:
    model.enable_input_require_grads()
except Exception:
    pass
try:
    model.gradient_checkpointing_enable(gradient_checkpointing_kwargs={"use_reentrant": False})
except Exception:
    model.gradient_checkpointing_enable()

print(model)


Loading base model...


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

PeftModelForCausalLM(
  (base_model): LoraModel(
    (model): LlamaForCausalLM(
      (model): LlamaModel(
        (embed_tokens): Embedding(128256, 4096)
        (layers): ModuleList(
          (0-31): 32 x LlamaDecoderLayer(
            (self_attn): LlamaAttention(
              (q_proj): lora.Linear4bit(
                (base_layer): Linear4bit(in_features=4096, out_features=4096, bias=False)
                (lora_dropout): ModuleDict(
                  (default): Dropout(p=0.05, inplace=False)
                )
                (lora_A): ModuleDict(
                  (default): Linear(in_features=4096, out_features=16, bias=False)
                )
                (lora_B): ModuleDict(
                  (default): Linear(in_features=16, out_features=4096, bias=False)
                )
                (lora_embedding_A): ParameterDict()
                (lora_embedding_B): ParameterDict()
                (lora_magnitude_vector): ModuleDict()
              )
              (k_proj): lor

In [None]:
from transformers import Trainer, TrainingArguments, DataCollatorForLanguageModeling

collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False, pad_to_multiple_of=8)

args = TrainingArguments(
    output_dir=OUTPUT_DIR,
    num_train_epochs=EPOCHS,
    per_device_train_batch_size=PER_DEVICE_BATCH,
    gradient_accumulation_steps=GRAD_ACCUM,
    learning_rate=LEARNING_RATE,
    logging_steps=10,
    save_steps=500,
    save_total_limit=2,
    bf16=BF16_OK,
    fp16=(USE_CUDA and not BF16_OK),
    tf32=True,
    gradient_checkpointing=True,
    remove_unused_columns=False,
    dataloader_num_workers=2,
    optim="paged_adamw_8bit" if USE_QLORA else "adamw_torch",
    lr_scheduler_type="cosine",
    warmup_ratio=0.03,
    weight_decay=0.0,
    save_safetensors=True,
    report_to="none",
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=lm_ds,
    data_collator=collator,
)

# Free any stale allocations before training
import gc, torch; gc.collect()
if torch.cuda.is_available():
    torch.cuda.empty_cache()

trainer.train()


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Step,Training Loss


KeyboardInterrupt: 

In [None]:
# Save adapter + tokenizer, then run a quick inference via HF Inference API
from peft import PeftModel

# Save
trainer.model.save_pretrained(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)
print(f"Saved PEFT adapter and tokenizer to {OUTPUT_DIR}")

# Hosted inference via Hugging Face Inference API (no GPU weights needed here)
print("Running inference via Hugging Face Inference API...")
from huggingface_hub import InferenceClient

hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN")
client = InferenceClient("meta-llama/Llama-3.1-8B-Instruct", token=hf_token)

resp = client.text_generation(
    "Write a haiku about GPUs",
    max_new_tokens=128,
    temperature=0.7,
)
print(resp)
