Skip to content

Commit

Permalink
Split scattering out into a separate class
Browse files Browse the repository at this point in the history
  • Loading branch information
shankar1729 committed Apr 22, 2024
1 parent 683fe33 commit 864d8a4
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 168 deletions.
5 changes: 3 additions & 2 deletions src/qimpy/transport/material/ab_initio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
__all__ = ["AbInitio", "PackedHermitian"]
__all__ = ["AbInitio", "fermi", "bose", "PackedHermitian", "Scattering"]

from ._packed_hermitian import PackedHermitian
from ._ab_initio import AbInitio
from ._ab_initio import AbInitio, fermi, bose
from ._scattering import Scattering
219 changes: 53 additions & 166 deletions src/qimpy/transport/material/ab_initio/_ab_initio.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,29 @@
from __future__ import annotations
from typing import Sequence, Callable, Union, Optional
from typing import Sequence, Callable, Optional
from functools import cache

import torch
import numpy as np

from qimpy import log, rc
from qimpy.mpi import BufferView
from qimpy.math import ceildiv
from qimpy.profiler import StopWatch, stopwatch
from qimpy.profiler import StopWatch
from qimpy.io import Checkpoint, CheckpointPath, Unit, InvalidInputException
from qimpy.mpi import ProcessGrid
from qimpy.transport import material
from .. import Material
from . import PackedHermitian


def fermi(E, mu, T):
return torch.special.expit((mu - E) / T)


def bose(omegaPh, T):
return 1 / torch.expm1(omegaPh / T)


def apply_batched(P: torch.Tensor, rho: torch.Tensor) -> torch.Tensor:
"""Apply batched flattened-rho operator P on batched rho.
Batch dimension is at end of input, and at beginning of output."""
result = (P @ rho.flatten(0, 2)).swapaxes(-2, -1)
return result.unflatten(-1, (-1,) + rho.shape[1:3])


class AbInitio(Material):
"""Ab initio material specification."""

T: float
mu: float
rotation: torch.Tensor
S: torch.Tensor
L: Optional[torch.Tensor]
P: torch.Tensor # P and Pbar operators stacked together
P: torch.Tensor #: Momentum matrix elements
S: Optional[torch.Tensor] #: Spin matrix elements
L: Optional[torch.Tensor] #: Angular momentum matrix elements
scattering: Optional[material.ab_initio.Scattering] #: scattering functional

def __init__(
self,
Expand Down Expand Up @@ -69,14 +54,15 @@ def __init__(
self.eph_scatt = eph_scatt
self.rotation = torch.tensor(rotation, device=rc.device)
watch = StopWatch("Dynamics.read_checkpoint")
with (Checkpoint(fname) as data_file):
with Checkpoint(fname) as data_file:
attrs = data_file.attrs
ePhEnabled = bool(attrs["ePhEnabled"])
spinorial = bool(attrs["spinorial"])
haveL = bool(attrs["haveL"])
self.T = float(attrs["Tmax"])
wk = 1 / float(attrs["nkTot"])
nk, n_bands = data_file["E"].shape
log.info(f"Initializing AbInitio material with {nk = } and {n_bands = }")
super().__init__(
wk=wk,
nk=nk,
Expand All @@ -88,35 +74,29 @@ def __init__(

self.k[:] = self.read_scalars(data_file, "k")
self.E[:] = self.read_scalars(data_file, "E")
P = self.read_vectors(data_file, "P")
self.P = self.read_vectors(data_file, "P")
self.S = self.read_vectors(data_file, "S") if spinorial else None
self.L = self.read_vectors(data_file, "L") if haveL else None
watch.stop()

self.v = torch.einsum("kibb->kbi", P).real
self.eye_bands = torch.eye(n_bands, device=rc.device)
self.packed_hermitian = PackedHermitian(n_bands)
self.v = torch.einsum("kibb->kbi", self.P).real
self.eye_bands = torch.eye(n_bands, device=rc.device)
self.packed_hermitian = PackedHermitian(n_bands)

# Zeroth order Hamiltonian:
H0 = torch.diag_embed(self.E) + self.zeemanH(
torch.tensor([[0.0, 0.0, 0.0]]).to(rc.device)
)
self.rho0, _, _ = self.rho_fermi(H0, self.mu)
# Construct P operators from matrix elements in checkpoint:
if eph_scatt:
if not ePhEnabled:
raise InvalidInputException("No e-ph scattering available in h5 input")
self.P = self.constructP(fname)
self.P_eye = apply_batched(
self.P, torch.tile(self.eye_bands[None], (nk, 1, 1))[..., None]
# Zeroth order Hamiltonian:
H0 = torch.diag_embed(self.E) + self.zeemanH(
torch.tensor([[0.0, 0.0, 0.0]]).to(rc.device)
)
nnzP = self.comm.allreduce(torch.count_nonzero(self.P))
ntotP = self.comm.allreduce(np.prod(self.P.shape))
fill_percent_P = 100.0 * nnzP / ntotP
log.info(f"P tensor fill fraction: {fill_percent_P:.1f}%")
self.rho_dot_scatter0 = self.rho_dot_scatter(
self.packed_hermitian.unpack(self.rho0)
) # for detailed balance correction
self.rho0, _, _ = self.rho_fermi(H0, self.mu)

if eph_scatt:
if not ePhEnabled:
raise InvalidInputException(
f"No e-ph scattering available in {fname}"
)
self.scattering = material.ab_initio.Scattering(self, data_file)
else:
self.scattering = None

def read_scalars(self, data_file: Checkpoint, name: str) -> torch.Tensor:
"""Read quantities that don't transform with rotations from data_file."""
Expand All @@ -136,110 +116,6 @@ def read_vectors(self, data_file: Checkpoint, name: str) -> torch.Tensor:
"ij, kj... -> ki...", self.rotation.to(result.dtype), result
)

@stopwatch
def constructP(self, fname: str, n_blocks: int = 100) -> torch.Tensor:
log.info("Constructing P tensor")
nk = self.k_division.n_tot
ik_start = self.k_division.i_start
ik_stop = self.k_division.i_stop
nk_mine = ik_stop - ik_start
n_bands_sq = self.n_bands**2
ph = self.packed_hermitian
block_shape_flat = (-1, n_bands_sq, n_bands_sq)
P_shape = (2, nk_mine * nk, n_bands_sq, n_bands_sq)
P = torch.zeros(P_shape, dtype=torch.double, device=rc.device)
prefactor = np.pi * self.wk

def get_mine(ik) -> Union[torch.Tensor, slice, None]:
"""Utility to fetch efficient slices of relevant k-points."""
if self.k_division.n_procs == 1:
return slice(None) # no split, so bypass search
sel = torch.where(torch.logical_and(ik >= ik_start, ik < ik_stop))[0]
if not len(sel):
return None
sel_start = sel[0].item()
sel_stop = sel[-1].item() + 1
if sel_stop - sel_start == len(sel):
return slice(sel_start, sel_stop) # contiguous
return sel # general selection

def pack_real(einsum_path, G1, G2):
"""Pack Hermitian `einsum_path` combination of G1 and G2 to real"""
out = torch.einsum(einsum_path, G1, G2).reshape(block_shape_flat)
return torch.einsum("AB, kBC, CD -> kAD", ph.Rinv, out, ph.R).real

# Operate in blocks to reduce working memory:
with Checkpoint(
fname
) as checkpoint: # change this somehow to avoid 2 Opened checkpoint file for reading
cp_ikpair = checkpoint["ikpair"]
n_pairs = cp_ikpair.shape[0]
block_size = ceildiv(n_pairs, n_blocks)
block_lims = np.minimum(
np.arange(0, n_pairs + block_size - 1, block_size), n_pairs
)
cp_omega_ph = checkpoint["omega_ph"]
cp_G = checkpoint["G"]
for block_start, block_stop in zip(block_lims[:-1], block_lims[1:]):
# Read current slice of data:
cur = slice(block_start, block_stop)
ik, jk = torch.from_numpy(cp_ikpair[cur]).to(rc.device).T
omega_ph = torch.from_numpy(cp_omega_ph[cur]).to(rc.device)
G = torch.from_numpy(cp_G[cur]).to(rc.device)
bose_occ = bose(omega_ph, self.T)[:, None, None]
wm = prefactor * bose_occ
wp = prefactor * (bose_occ + 1.0)

# Contributions to dynamics of ik:
if (sel := get_mine(ik)) is not None:
i_pair = (ik[sel] - ik_start) * nk + jk[sel]
Gcur = G[sel]
Gsq = pack_real("kac, kbd -> kabcd", Gcur, Gcur.conj())
P[0].index_add_(0, i_pair, wm[sel] * Gsq) # P contribution
P[1].index_add_(0, i_pair, wp[sel] * Gsq) # Pbar contribution

# Contributions to dynamics of jk:
if (sel := get_mine(jk)) is not None:
i_pair = (jk[sel] - ik_start) * nk + ik[sel]
Gcur = G[sel]
Gsq = pack_real("kca, kdb -> kabcd", Gcur.conj(), Gcur)
P[0].index_add_(0, i_pair, wp[sel] * Gsq) # P contribution
P[1].index_add_(0, i_pair, wm[sel] * Gsq) # Pbar contribution

op_shape = (2, nk_mine * n_bands_sq, nk * n_bands_sq)
return P.unflatten(1, (nk_mine, nk)).swapaxes(2, 3).reshape(op_shape)

def collectT(self, rho: torch.Tensor) -> torch.Tensor:
"""Collect rho from all MPI processes and transpose batch dimension.
Batch dimension is put at end for efficient matrix multiplication."""
if self.comm.size == 1:
return rho.permute(1, 2, 3, 0)
nk = self.k_division.n_tot
n_bands = self.n_bands
n_batch = rho.shape[0]
sendbuf = rho.reshape(n_batch, -1).T.contiguous()
recvbuf = torch.zeros(
(n_batch, nk * n_bands * n_bands), dtype=rho.dtype, device=rc.device
)
mpi_type = rc.mpi_type[rho.dtype]
recv_prev = self.k_division.n_prev * n_bands * n_bands * n_batch
self.comm.Allgatherv(
(BufferView(sendbuf), np.prod(rho.shape), 0, mpi_type),
(BufferView(recvbuf), np.diff(recv_prev), recv_prev[:-1], mpi_type),
)
return recvbuf.reshape(nk, n_bands, n_bands, n_batch)

def rho_dot_scatter(self, rho: torch.Tensor) -> torch.Tensor:
"""drho/dt due to scattering in Schrodinger picture.
Input and output rho are in unpacked (complex Hermitian) form."""
ph = self.packed_hermitian
eye = self.eye_bands
rho_all = self.collectT(ph.pack(rho)) # packed, all k
Prho_packed = apply_batched(self.P, rho_all)
Prho_packed[1] -= self.P_eye[1] # convert [1] to Pbar @ (rho - eye)
Prho, minus_Prhobar = ph.unpack(Prho_packed)
return (eye - rho) @ Prho + rho @ minus_Prhobar # unpacked, my k only

def schrodingerV(self, t: float) -> torch.Tensor:
"""Compute unitary rotations from interaction to Schrodinger picture."""
phase = torch.exp((-1j * t) * self.E)
Expand Down Expand Up @@ -274,31 +150,34 @@ def get_reflector(
) -> Callable[[torch.Tensor], torch.Tensor]: # absorbing boundary
return torch.zeros_like

@stopwatch
def rho_dot(self, rho: torch.Tensor, t: float) -> torch.Tensor:
"""Overall drho/dt in interaction picture.
Input and output rho are in packed (real) form."""
if not self.eph_scatt:
if not self.eph_scatt: # TODO: check for coherent evolution too
return torch.zeros_like(rho)
ik_start = self.k_division.i_start
ik_stop = self.k_division.i_stop
nk_mine = ik_stop - ik_start
n_spatial_1 = rho.shape[0]
n_spatial_2 = rho.shape[1]
rho = rho.flatten(0, 1).unflatten(1, (nk_mine, self.n_bands, self.n_bands))
# Compute scattering in Schrodinger picture:
watch = StopWatch("AbInitio.rho_dot_pre")
shape_in = rho.shape
rho = rho.reshape(-1, self.nk_mine, self.n_bands, self.n_bands)

# Switch to Schrodinger picture for scattering / coherent evolution:
ph = self.packed_hermitian
phase = self.schrodingerV(t)
rho_I = ph.unpack(rho) # interaction picture, unpacked to complex
rho_S = rho_I * phase
rho_dot_S = self.rho_dot_scatter(rho_S) - self.rho_dot_scatter0
watch.stop()

# Compute rho_dot (upto an overall +h.c.) in Schrodinger picture:
rho_dot_S = torch.zeros_like(rho_S)
if self.scattering is not None:
rho_dot_S += self.scattering.rho_dot(rho_S, t)

# Convert result back to interaction picture:
watch = StopWatch("AbInitio.rho_dot_post")
rho_dot_I = rho_dot_S * phase.conj()
rho = rho.flatten(1, 3).unflatten(0, (n_spatial_1, n_spatial_2))
return (
(ph.pack(rho_dot_I + rho_dot_I.conj().swapaxes(-1, -2)))
.flatten(1, 3)
.unflatten(0, (n_spatial_1, n_spatial_2))
) # + h.c.
rho_dot_I += rho_dot_I.conj().swapaxes(-1, -2) # + h.c.
result = ph.pack(rho_dot_I).reshape(shape_in)
watch.stop()
return result

def get_observable_names(self) -> list[str]:
return ["q", "Sx", "Sy", "Sz"] # charge, components of spin operator
Expand Down Expand Up @@ -346,3 +225,11 @@ def __call__(self, t: float) -> torch.Tensor:
phase = ab_initio.schrodingerV(t)
rho0_I = ph.pack(ph.unpack(self.rho0_S) * phase.conj())
return torch.flatten(rho0_I)


def fermi(E, mu, T):
return torch.special.expit((mu - E) / T)


def bose(omegaPh, T):
return 1 / torch.expm1(omegaPh / T)

0 comments on commit 864d8a4

Please sign in to comment.