In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [2]:
def _make_divisible(v, divisor, min_value=None):
    """
    This function is taken from the original tf repo.
    It ensures that all layers have a channel number that is divisible by 8
    It can be seen here:
    https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
    """
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

In [3]:
def hard_sigmoid(x, inplace: bool = False):
    if inplace:
        return x.add_(3.).clamp_(0., 6.).div_(6.)
    else:
        return F.relu6(x + 3.) / 6.

In [4]:
class SqueezeExcite(nn.Module):
    def __init__(self, in_chs, se_ratio=0.25, reduced_base_chs=None,
                 act_layer=nn.ReLU, gate_fn=hard_sigmoid, divisor=4, **_):
        super(SqueezeExcite, self).__init__()
        self.gate_fn = gate_fn
        reduced_chs = _make_divisible((reduced_base_chs or in_chs) * se_ratio, divisor)
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.conv_reduce = nn.Conv2d(in_chs, reduced_chs, 1, bias=True)
        self.act1 = act_layer(inplace=True)
        self.conv_expand = nn.Conv2d(reduced_chs, in_chs, 1, bias=True)

    def forward(self, x):
        x_se = self.avg_pool(x)
        x_se = self.conv_reduce(x_se)
        x_se = self.act1(x_se)
        x_se = self.conv_expand(x_se)
        x = x * self.gate_fn(x_se)
        return x    

In [5]:
class ConvBnAct(nn.Module):
    def __init__(self, in_chs, out_chs, kernel_size,
                 stride=1, act_layer=nn.ReLU):
        super(ConvBnAct, self).__init__()
        self.conv = nn.Conv2d(in_chs, out_chs, kernel_size, stride, kernel_size//2, bias=False)
        self.bn1 = nn.BatchNorm2d(out_chs)
        self.act1 = act_layer(inplace=True)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn1(x)
        x = self.act1(x)
        return x

In [6]:
class AdaptiveDFC(nn.Module):
    def __init__(self, channels, redu = 4, kern_sizes = (5,9), resid = True):
        super(AdaptiveDFC, self).__init__()
        self.resid = resid
        self.kern_sizes = kern_sizes
        self.num_branch = len(kern_sizes)

        # Channel Reduction
        mid_pt = max(channels // redu, 1)
        self.redu = nn.Conv2d(channels, mid_pt, kernel_size=1, bias=False)
        self.batch_redu = nn.BatchNorm2d(mid_pt)
        self.activ_redu = nn.ReLU(inplace = True)

        # branches
        self.conv_h = nn.ModuleList()
        self.conv_w = nn.ModuleList()
        for ker in kern_sizes:
            k_padding = ker//2
            # Horizontal
            self.conv_h.append(
                nn.Conv2d(mid_pt, mid_pt, kernel_size=(1, ker), padding =(0, k_padding), groups=mid_pt, bias=False) 
            )
            # Vertical
            self.conv_w.append(
                nn.Conv2d(mid_pt, mid_pt, kernel_size=(ker, 1), padding=(k_padding, 0), groups=mid_pt, bias=False)
            )
    
        # projecting to the original channels
        self.expand = nn.Conv2d(mid_pt, channels, kernel_size=1, bias=False)
        self.batch_expand = nn.BatchNorm2d(channels)
    
        # Gating mech (selecing the best path/branch)
        hidden_gate = max(channels // 16, 1)
        self.gap = nn.AdaptiveAvgPool2d(1)
        self.fc_layer1 = nn.Linear(channels, hidden_gate, bias=False)
        self.fc_layer2 = nn.Linear(hidden_gate, self.num_branch, bias=False)
    
        if resid:
            self.alpha = nn.Parameter(torch.zeros(1))
        else:
            self.register_parameter("alpha", None)

    def forward(self, input_feat, target_feature):
        b, c, h, w = input_feat.shape

        # Reducing channels
        u =  self.activ_redu(self.batch_redu(self.redu(input_feat)))

        # calc features for each kernel branch
        atten_maps = []
        for cnv_h, cnv_w in zip(self.conv_h, self.conv_w):
            attention = cnv_h(u)
            attention = cnv_w(attention)
            atten_maps.append(attention)

        atten_stack = torch.stack(atten_maps, dim=1)

        # calculating weights with softmax gate
        gate = self.gap(input_feat).view(b,c)
        gate = F.relu(self.fc_layer1(gate))
        gate = self.fc_layer2(gate)
        gate = F.softmax(gate, dim=1)
        gate = gate.view(b, self.num_branch, 1, 1, 1)

        # weighted comb
        A_midpt = (atten_stack*gate).sum(dim=1)

        # Expanding and sig activation
        A = self.batch_expand(self.expand(A_midpt))
        A = torch.sigmoid(A)

        # interpolation
        if A.shape[-2:] != target_feature[-2:]:
            A = F.interpolate(A, size=target_feature.shape[-2:], mode='bilinear', align_corners=False)

        #resid scaling
        if self.resid:
            scaling = 1.0 + self.alpha*A
        else:
            scaling = A

        return target_feature * scaling

In [7]:
class GhostModuleV2(nn.Module):
    def __init__(self, inp, oup, kernel_size=1, ratio=2, dw_size=3, stride=1, relu=True,mode=None,args=None):
        super(GhostModuleV2, self).__init__()
        self.mode=mode
        self.gate_fn=nn.Sigmoid()

        if self.mode in ['original']:
            self.oup = oup
            init_channels = math.ceil(oup / ratio) 
            new_channels = init_channels*(ratio-1)
            self.primary_conv = nn.Sequential(  
                nn.Conv2d(inp, init_channels, kernel_size, stride, kernel_size//2, bias=False),
                nn.BatchNorm2d(init_channels),
                nn.ReLU(inplace=True) if relu else nn.Sequential(),
            )
            self.cheap_operation = nn.Sequential(
                nn.Conv2d(init_channels, new_channels, dw_size, 1, dw_size//2, groups=init_channels, bias=False),
                nn.BatchNorm2d(new_channels),
                nn.ReLU(inplace=True) if relu else nn.Sequential(),
            )
        elif self.mode in ['attn']: 
            self.oup = oup
            init_channels = math.ceil(oup / ratio) 
            new_channels = init_channels*(ratio-1)
            self.primary_conv = nn.Sequential(  
                nn.Conv2d(inp, init_channels, kernel_size, stride, kernel_size//2, bias=False),
                nn.BatchNorm2d(init_channels),
                nn.ReLU(inplace=True) if relu else nn.Sequential(),
            )
            self.cheap_operation = nn.Sequential(
                nn.Conv2d(init_channels, new_channels, dw_size, 1, dw_size//2, groups=init_channels, bias=False),
                nn.BatchNorm2d(new_channels),
                nn.ReLU(inplace=True) if relu else nn.Sequential(),
            ) 
            self.dfc_projection = nn.Sequential( 
                nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
                nn.BatchNorm2d(oup),
                nn.ReLU(inplace=True)
            ) 
            # replace convolution seq with adaptiveDFC implementation
            self.short_conv = AdaptiveDFC(oup, kern_sizes=(5, 9), resid=True)
      
    def forward(self, x):
        if self.mode in ['original']:
            x1 = self.primary_conv(x)
            x2 = self.cheap_operation(x1)
            out = torch.cat([x1,x2], dim=1)
            return out[:,:self.oup,:,:]         
        elif self.mode in ['attn']:  
            x1 = self.primary_conv(x)
            x2 = self.cheap_operation(x1)
            ghost_feat = torch.cat([x1,x2], dim=1)[:,:self.oup,:,:]
            # downsampling the input
            if x.shape[2] > 1 and x.shape[3] > 1:
                #res=self.short_conv(F.avg_pool2d(x,kernel_size=2,stride=2))
                x_downspl = F.avg_pool2d(x, kernel_size=2, stride=2)
            else:
                #res=self.short_conv(x) 
                x_downspl = x
  
            # x1 = self.primary_conv(x)
            # x2 = self.cheap_operation(x1)
            # out = torch.cat([x1,x2], dim=1)
            # return out[:,:self.oup,:,:]*F.interpolate(self.gate_fn(res),size=(out.shape[-2],out.shape[-1]),mode='nearest')

            # projection of input
            x_projec = self.dfc_projection(x_downspl)

            # adaptice class
            out = self.short_conv(x_projec, ghost_feat)

            return out

In [8]:
class GhostBottleneckV2(nn.Module): 

    def __init__(self, in_chs, mid_chs, out_chs, dw_kernel_size=3,
                 stride=1, act_layer=nn.ReLU, se_ratio=0.,layer_id=None,args=None):
        super(GhostBottleneckV2, self).__init__()
        has_se = se_ratio is not None and se_ratio > 0.
        self.stride = stride

        # Point-wise expansion
        if layer_id<=1:
            self.ghost1 = GhostModuleV2(in_chs, mid_chs, relu=True,mode='original',args=args)
        else:
            self.ghost1 = GhostModuleV2(in_chs, mid_chs, relu=True,mode='attn',args=args) 

        # Depth-wise convolution
        if self.stride > 1:
            self.conv_dw = nn.Conv2d(mid_chs, mid_chs, dw_kernel_size, stride=stride,
                             padding=(dw_kernel_size-1)//2,groups=mid_chs, bias=False)
            self.bn_dw = nn.BatchNorm2d(mid_chs)

        # Squeeze-and-excitation
        if has_se:
            self.se = SqueezeExcite(mid_chs, se_ratio=se_ratio)
        else:
            self.se = None
            
        self.ghost2 = GhostModuleV2(mid_chs, out_chs, relu=False,mode='original',args=args)
        
        # shortcut
        if (in_chs == out_chs and self.stride == 1):
            self.shortcut = nn.Sequential()
        else:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_chs, in_chs, dw_kernel_size, stride=stride,
                       padding=(dw_kernel_size-1)//2, groups=in_chs, bias=False),
                nn.BatchNorm2d(in_chs),
                nn.Conv2d(in_chs, out_chs, 1, stride=1, padding=0, bias=False),
                nn.BatchNorm2d(out_chs),
            )
    def forward(self, x):
        residual = x
        x = self.ghost1(x)
        if self.stride > 1:
            x = self.conv_dw(x)
            x = self.bn_dw(x)
        if self.se is not None:
            x = self.se(x)
        x = self.ghost2(x)
        x += self.shortcut(residual)
        return x

In [9]:
class GhostNetV2(nn.Module):
    def __init__(self, cfgs, num_classes=1000, width=1.0, dropout=0.2,block=GhostBottleneckV2,args=None):
        super(GhostNetV2, self).__init__()
        self.cfgs = cfgs
        self.dropout = dropout

        # building first layer
        output_channel = _make_divisible(16 * width, 4)
        self.conv_stem = nn.Conv2d(3, output_channel, 3, 2, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(output_channel)
        self.act1 = nn.ReLU(inplace=True)
        input_channel = output_channel

        # building inverted residual blocks
        stages = []
        #block = block
        layer_id=0
        for cfg in self.cfgs:
            layers = []
            for k, exp_size, c, se_ratio, s in cfg:
                output_channel = _make_divisible(c * width, 4)
                hidden_channel = _make_divisible(exp_size * width, 4)
                if block==GhostBottleneckV2:
                    layers.append(block(input_channel, hidden_channel, output_channel, k, s,
                                  se_ratio=se_ratio,layer_id=layer_id,args=args))
                input_channel = output_channel
                layer_id+=1
            stages.append(nn.Sequential(*layers))

        output_channel = _make_divisible(exp_size * width, 4)
        stages.append(nn.Sequential(ConvBnAct(input_channel, output_channel, 1)))
        input_channel = output_channel
        
        self.blocks = nn.Sequential(*stages)        

        # building last several layers
        output_channel = 1280
        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.conv_head = nn.Conv2d(input_channel, output_channel, 1, 1, 0, bias=True)
        self.act2 = nn.ReLU(inplace=True)
        self.classifier = nn.Linear(output_channel, num_classes)

    def forward(self, x):
        x = self.conv_stem(x)
        x = self.bn1(x)
        x = self.act1(x)
        x = self.blocks(x)
        x = self.global_pool(x)
        x = self.conv_head(x)
        x = self.act2(x)
        x = x.view(x.size(0), -1)
        if self.dropout > 0.:
            x = F.dropout(x, p=self.dropout, training=self.training)
        x = self.classifier(x)
        return x

In [10]:
def ghostnetv2(num_classes=1000, width=1.0, dropout=0.2, args=None):
    """
    Constructs a GhostNetV2 model
    
    Args:
        num_classes (int): Number of output classes (default: 1000)
        width (float): Width multiplier for channels (default: 1.0)
        dropout (float): Dropout rate (default: 0.2)
        args: Additional arguments
    
    Returns:
        GhostNetV2 model
    """
    cfgs = [
        # k, t, c, SE, s
        [[3,  16,  16, 0, 1]],
        [[3,  48,  24, 0, 2]],
        [[3,  72,  24, 0, 1]],
        [[5,  72,  40, 0.25, 2]],
        [[5, 120,  40, 0.25, 1]],
        [[3, 240,  80, 0, 2]],
        [[3, 200,  80, 0, 1],
         [3, 184,  80, 0, 1],
         [3, 184,  80, 0, 1],
         [3, 480, 112, 0.25, 1],
         [3, 672, 112, 0.25, 1]
        ],
        [[5, 672, 160, 0.25, 2]],
        [[5, 960, 160, 0, 1],
         [5, 960, 160, 0.25, 1],
         [5, 960, 160, 0, 1],
         [5, 960, 160, 0.25, 1]
        ]
    ]
    return GhostNetV2(cfgs, num_classes=num_classes,
                      width=width,
                      dropout=dropout,
                      args=args)

In [11]:
# Create model for ImageNet (1000 classes)
model = ghostnetv2(num_classes=1000, width=1.0, dropout=0.2, args=None)
print("Model created successfully!")
print(f"Model: {model.__class__.__name__}")

Model created successfully!
Model: GhostNetV2


In [12]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

num_params = count_parameters(model)
print(f"Total trainable parameters: {num_params:,}")

Total trainable parameters: 8,945,182


In [13]:
# Create a dummy input (batch_size=1, channels=3, height=224, width=224)
dummy_input = torch.randn(1, 3, 224, 224)

# Set model to evaluation mode
model.eval()

# Forward pass
with torch.no_grad():
    output = model(dummy_input)

print(f"Input shape: {dummy_input.shape}")
print(f"Output shape: {output.shape}")
print(f"Output type: {output.dtype}")

Input shape: torch.Size([1, 3, 224, 224])
Output shape: torch.Size([1, 1000])
Output type: torch.float32


In [14]:
# Test with batch size of 4
batch_input = torch.randn(4, 3, 224, 224)

with torch.no_grad():
    batch_output = model(batch_input)

print(f"Batch input shape: {batch_input.shape}")
print(f"Batch output shape: {batch_output.shape}")

Batch input shape: torch.Size([4, 3, 224, 224])
Batch output shape: torch.Size([4, 1000])


In [15]:
# Create a smaller model with width=0.5
small_model = ghostnetv2(num_classes=1000, width=0.5, dropout=0.2, args=None)
small_params = count_parameters(small_model)

# Create a larger model with width=1.3
large_model = ghostnetv2(num_classes=1000, width=1.3, dropout=0.2, args=None)
large_params = count_parameters(large_model)

print(f"Standard model (width=1.0): {num_params:,} parameters")
print(f"Small model (width=0.5): {small_params:,} parameters")
print(f"Large model (width=1.3): {large_params:,} parameters")

Standard model (width=1.0): 8,945,182 parameters
Small model (width=0.5): 3,547,360 parameters
Large model (width=1.3): 13,680,404 parameters
