In [0]:
# default_exp core

In [0]:
#hide
from google.colab import drive
drive.mount('/content/drive')

In [0]:
#hide
!pip install nbdev
!pip install fastcore

In [0]:
#hide
#required for the tests using previously saved learner
try:
  import fastai2.basics
except:
  !pip install fastai2
else:
  print('fastai2 already installed')

In [0]:
#hide
% cd /content/drive/My\ Drive/fa_convnav

In [0]:
#hide
from fastai2.basics import *
from fastai2.callback.all import *
from fastai2.vision.all import *
from torch import torch

In [0]:
#hide
#not deps but we need them to use nbdev and run tests
from nbdev import * 
from nbdev.showdoc import *
from fastcore.test import *

In [0]:
#hide
def get_test_vars():
  #load test_vars from file if not already downloaded
  try:
    test_learner
  except:
    with gzip.open("test_learner_resnet18", "rb") as f:
      test_learner = pickle.load(f)
    with gzip.open("test_summary_resnet18", "rb") as f:
      test_summary = pickle.load(f)
  try:
    test_df
  except:
    with gzip.open("test_df_resnet18", "rb") as f:
      test_df = pickle.load(f)
  return test_learner, test_summary, test_df

# Core

> Core functionality for fa_convnav

In [0]:
#export
import gzip, pickle
from dataclasses import dataclass
from pandas import DataFrame, option_context
from fa_convnav.models import models

In [0]:
#export
def find_model(n): 
    "Returns tuple of model type and name (e.g. ('resnet', 'resnet50')) given `n`, the number of named_modules in Learner.model.named_modules()"
    for d in models:
      match = [(k, m) for k, v in d.items() for m, l in v if l == n]
      if match != []: break
    if len(match) > 0: return match[0] # (model_type, model_name)
    assert True, 'Model not supported. Use `supported_models()` to get a list of supported models.'

In [0]:
test_eq(find_model(162), ('resnet', 'resnet50'))
test_eq(find_model(585), ('densenet', 'densenet161'))

In [0]:
#export
def get_row(l, m):
  "Construct dataframe row from `l` (Learner.named_modules() module) and `m` (model_type)"

  # create generic row data from `l` (model.named_module() layer) and `m` (model_type)
  lyr_name = l[0]
  lyr_obj = l[1]
  ln_split = str(lyr_name).split('.', 4)
  ln_n_splits = len(ln_split)
  lyr_str = str(lyr_obj)

  tch_cls_str = str(type(lyr_obj))
  tch_cls_substr =  tch_cls_str[tch_cls_str.find("<class")+8: tch_cls_str.find(">")-1]
  tch_cls = tch_cls_substr.split('.')[-1]

  div = tch_cls if lyr_name == '0' or lyr_name == '1' else ''
  mod = tch_cls if ln_n_splits == 2 else ''
  blk = tch_cls if ln_n_splits == 3 and not lyr_name.startswith('1') else ''
  lyr = lyr_str[:90]

  # customise generic row for peculiarities of specific models
  if m == 'vgg' or m == 'alexnet':
    if ln_n_splits >2: ln_split[2] = '' 
    blk = ''

  elif m == 'squeezenet':
     blk = tch_cls if ln_n_splits == 3 and tch_cls == 'Fire' else '' 
     if blk == 'Fire': lyr = ''   

  elif m == 'resnet':
    if blk == 'BasicBlock' or blk == 'Bottleneck': lyr = ''
    else:
      if ln_n_splits > 4: lyr = f". . {lyr_str[:86]}" 
      if ln_n_splits == 4 and ln_split[3] == 'downsample': lyr = f'Container{tch_cls}'
       
  elif m == 'densenet':
    lyr_name = lyr_name.replace('denseblock', '').replace('denselayer', '')
    ln_split = str(lyr_name).split('.', 5)
    if len(ln_split) > 1 and ln_split[0] != '1': del ln_split[1] 
    ln_n_splits = len(ln_split)

    mod = tch_cls if (lyr_name.startswith('0') and ln_n_splits == 2) or (lyr_name.startswith('1') and ln_n_splits == 2) else ''
    blk = tch_cls if ln_n_splits == 3 and tch_cls == '_DenseLayer' else '' 
    if mod == '_DenseBlock' or mod == '_Transition' or blk == '_DenseLayer': 
      lyr = ''
    else: 
      if lyr_name == '0' or lyr_name == '1': div = tch_cls 
      if lyr_name == '0.0': div = f'. . {tch_cls}'
      
  elif m == 'xresnet':
    blk = tch_cls if ln_n_splits == 3 and tch_cls == 'ResBlock' else '' 
    if mod == 'ConvLayer' or blk == 'ResBlock': 
      lyr = ''
    else:
      if ln_n_splits < 4: lyr =  lyr_str[:90]
      elif ln_n_splits == 4 and tch_cls == 'Sequential': lyr =  f'Container{tch_cls}'
      elif ln_n_splits == 4 and tch_cls == 'ReLU': lyr =  lyr_str[:90]
      elif ln_n_splits == 5 and tch_cls == 'ConvLayer': lyr =  f'. . Container{tch_cls}'
      else: lyr =  f'. . . . {lyr_str[:32]}'

  else:
    raise Exception("Model type not recognised")
  
  return {
      'Module_name': lyr_name, 
      'Model': tch_cls if lyr_name == '' else '',
      'Division': div,
      'Container_child': mod,
      'Container_block': blk, 
      'Layer_description': lyr,
      'Torch_class': tch_cls_substr,
      'Output_dimensions': '',
      'Parameters': '',
      'Trainable': '',
      'Currently': '',
      'div_id': ln_split[0] if ln_n_splits >0 else '',  
      'chd_id': ln_split[1] if ln_n_splits >1 else '',  
      'blk_id': ln_split[2] if ln_n_splits >2 else '',
      'lyr_id': ln_split[3] if ln_n_splits >3 else '',
      'tch_cls': tch_cls,
      'out_dim': '',
      'current': '',
      'lyr_blk': '', 
      'lyr_chd': '',
      'blk_chd': '',
      'lyr_obj': lyr_obj
      }

In [0]:
test_learner, _, _ = get_test_vars()
module5 = list(test_learner.model.named_modules())[5]
module_last = list(test_learner.model.named_modules())[-1]

test_eq(get_row(module5, 'resnet')['Module_name'], '0.3')
test_eq(get_row(module_last, 'resnet')['Layer_description'], 'Linear(in_features=512, out_features=37, bias=False)')

## Utilities

In [0]:
#export
def supported_models():
  "Prints list of models supported by fa_convnav (models.ipynb contains list of currently supported models)."
  print('Supported models')
  print('================\n')
  for d in models:
      [[print(m) for m, l in v] for k, v in d.items()]

In [0]:
#export
def get_inp_sz(infos):
  "Slice first row of `infos` to give string representation of model input sizes "
  inp_sz = infos[0]
  inp_sz_str = inp_sz[inp_sz.find("['")+2:inp_sz.find("']")] 
  return inp_sz_str

In [0]:
test_eq(get_inp_sz(["Sequential (Input shape: ['128 x 3 x 224 x 224'])"]), "128 x 3 x 224 x 224" )

In [0]:
#export
def infos_to_gen(infos):
  "Slice the remaining rows of `infos` to give the layers, `m`, output dimensions `o`, parameters `p`, and trainable `t` of each layer and return (m,o,p,t) for all layers in a generator"
  lyr_info = infos[4:-17][::2]
  info_list = []
  for l in lyr_info:
    m, *s, p, t = [y for y in l.split(' ') if y !=""]
    info_list.append((m, f"[{' '.join(s)}]", p, t))
  return (i for i in info_list)

In [0]:
_, test_summary, _ = get_test_vars()
gen = infos_to_gen(test_summary.split('\n'))
test_eq(next(gen), ('Conv2d', '[128 x 64 x 112 x 11]', '9,408', 'False'))

In [0]:
#export
@dataclass
class CNDF:
  "Compile information from fastai `Learner.model` and 'layer_info(Learner)` into a dataframe"
  learner: any
  learner_summary: str

  def __post_init__(self):
    assert hasattr(self.learner, 'model'), "Invalid learner: no 'model' attribute"
    self.model = self.learner.model                                         # fastai `Learner.model` object
    self.layers = list(self.learner.model.named_modules())                  # fastai `named_modules` method
    self.num_layers = len(self.layers)                    
    self.model_type, self.model_name = find_model(self.num_layers)

    infos_split = self.learner_summary.split('\n')                         # fastai `Learner.summary()` string
    self.inp_sz = get_inp_sz(infos_split)
    self.bs = self.inp_sz[0]
    info_gen = infos_to_gen(infos_split)
               
    # create base dataframe `df` from a list of formatted rows in `layers`
    df = DataFrame([get_row(l, self.model_type) for l in self.layers]) 

    # remove layer descriptions from container rows
    df.at[0, 'Layer_description'] = ''
    df.loc[(df['Division'].str.contains('Sequential')) | \
          (df['Container_child'] == 'Sequential') | \
          (df['Container_child'] == 'AdaptiveConcatPool2d'), 'Layer_description'] = ''

    for row in df.itertuples(): 
      idx = row.Index
      if row.Layer_description not in {'', '. . ContainerConvLayer', 'ContainerSequential'}:
        _, o, p, t = next(info_gen)
        df.at[idx, 'Output_dimensions'] = str(o)
        df.at[idx, 'Parameters'] =  p
        df.at[idx, 'Trainable'] = t
        if 'Conv2d' in row.Torch_class:
          df.at[idx, 'Currently'] = 'Unfrozen' if t == 'True' else 'Frozen'

    # backfill container rows with summary layer information and layer/block counts
    # 1.set up index stores and counters
    m, b  = 0, 0                              
    layer_count = [0, 0, 0]                                       # layers in [div, child, blocks]
    block_count = [0, 0]                                          # blocks in [div, childs]
    frozen_count=[[0,0], [0, 0], [0,0]]                           # [Frozen, Unfrozen] layers in [div, child, blocks],  

    # 2.iterate over rows, incrementing counters with each new row
    for row in df.itertuples():
      idx = row.Index
      if row.Currently == 'Frozen':
        for i in [0,1,2]: frozen_count[i][0] += 1 
      if row.Currently == 'Unfrozen':
        for i in [0,1,2]: frozen_count[i][1] += 1
      if row.Layer_description != '':
        for i in [0,1,2]: layer_count[i] += 1 

      # backfill 'child' container rows with layer_info and block and layer counts
      if (row.Output_dimensions == '' and row.Container_child != '') or row.Module_name == '1':
        m = idx if m == 0 else m
        df.at[m, 'out_dim'] = df.at[idx-1, 'Output_dimensions']
        df.at[m, ['current', 'blk_chd', 'lyr_chd']] = self.get_frozen(frozen_count[1]), block_count[1], layer_count[1] 
        m = idx
        layer_count[1] = block_count[1] = 0
        for i in [0, 1]: frozen_count[1][i] = 0

      # backfill 'block' container rows with layer_info and layer counts
      if (row.Output_dimensions == '' and row.Container_block != '') or row.Module_name == '1':
        b = idx if b == 0 else b
        df.at[b, 'out_dim'] = df.at[idx-1, 'Output_dimensions'] or df.at[idx-2, 'Output_dimensions']  
        df.at[b, ['current', 'lyr_blk']] = self.get_frozen(frozen_count[2]), layer_count[2]
        b = idx
        layer_count[2] = 0
        for i in [0, 1]: block_count[i] += 1
        for i in [0, 1]: frozen_count[2][i] = 0
      
    # 3.backfill division container rows with summary layer_info and block and layer counts
    div0_idx = df[df['Module_name'] == '0'].index.tolist()[0]
    div1_idx = df[df['Module_name'] == '1'].index.tolist()[0] 

    df.at[div0_idx, 'out_dim'] = df.at[div1_idx-1, 'Output_dimensions'] or df.at[div1_idx-2, 'Output_dimensions']  
    df.at[div0_idx, ['current', 'lyr_chd', 'blk_chd']] = self.get_frozen(frozen_count[0]), layer_count[0], block_count[0]

    df.at[div1_idx, 'out_dim'] = df.iloc[-1]['Output_dimensions']

    self._cndf = df

  @staticmethod
  def get_frozen(f):
    "Returns a string interpretation of the number of frozen/unfrozen layers in tuple `f`"
    if f[0] == 0: return 'Unfrozen'
    elif f[1] == 0: return 'Frozen'
    else: return f'{f[0]}/{(f[0]+f[1])} layers frozen'

  @property
  def output_dimensions(self):
    "Returns output dimensions of model (bs, classes)"
    return self._cndf.iloc[-1]['Output_dimensions']

  @property
  def frozen_to(self):
    "Returns parameter group model is curently frozen to"
    return self.learner.opt.frozen_idx + 1

  @property
  def num_param_groups(self):
    "Returns number of parameter groups"
    return len(self.learner.opt.param_groups)

  @property
  def batch_size(self):
    "Returns the batch size of the current learner"
    return self.bs

  @property
  def input_sizes(self):
    "Returns the sizes (dimensions bs, ch, h, w) of the model)"
    return self.inp_sz

  @property
  def model_info(self):
    "Return an info string derived from`Learner.model`"  
    res = f"{self.model_type.capitalize()}: {self.model_name.capitalize()}\n"
    res += f"Input shape: [{self.inp_sz}] (bs, ch, h, w)\n"
    res += f"Output features: {self.output_dimensions} (bs, classes)\n" 
    res += f"Currently frozen to parameter group {self.frozen_to} out of {self.num_param_groups}" 
    return res

  def cndf(self, with_modules=False):
    "Returns a ConvNav dataframe"
    df = self._cndf.copy() 
    if not with_modules: df = df.iloc[:,:-1]
    return df

  def layer_params(self, idx):
    "Returns the parameters of layer with Index `idx`"
    m = get_layer(self._cndf, idx)
    assert not m_out is None, f'Module {idx} is a container module, not a layer'
    return int(m['Parameters'].replace(',', ''))

  def layer_dims(self, idx):
    "Return tuple of input and output dimensions (as strings) of layer with index `idx`"
    m_out = get_layer(self._cndf, idx)
    assert not m_out is None, f'Module {idx} is a container module, not a layer'
    i = 1
    m_in = None
    while m_in is None:
      m_in = get_layer(self._cndf, idx-i)
      i += 1
      if idx-i == 0:
        break
    return (self.inp_sz if m_in is None else m_in['Output_dimensions'], m_out['Output_dimensions'])

CNDF class builds a dataframe using a fastai Learner and the output of Learner.summary() method. The resulting dataframe, called a CNDF dataframe, combines information from both sources and represents both the structure and information of Learner.model. 

CNDF is a parent class of ConvNav and a CNDF dataframe is automatically built when a new ConvNav object is instantiated and for most use cases CNDF dataframes should be built this way. However, if there is a need to build a CNDF dataframe in isolation use: 

```
cndf = CNDF(Learner, Learner.summary())
```



In [0]:
test_learner, test_summary, _ = get_test_vars()
cndf_test = CNDF(test_learner, test_summary)

test_eq(type(cndf_test._cndf), DataFrame)   
test_eq(len(cndf_test._cndf), 79)             # rows
test_eq(len(cndf_test._cndf.columns), 22)     # columns

In [0]:
show_doc(CNDF.cndf)

<h4 id="CNDF.cndf" class="doc_header"><code>CNDF.cndf</code><a href="__main__.py#L124" class="source_link" style="float:right">[source]</a></h4>

> <code>CNDF.cndf</code>(**`with_modules`**=*`False`*)

Returns a ConvNav dataframe

In native format, CNDF dataframes include the module objects in a 'lyr_obj' column and the combined size of the module objects can be quite large, 100-200mb for a complex model such as a densenet or xresnet. Thus, by default, module objects are removed from the dataframe. Set `with_modules = True` to include module objects. in returned dataframe.

In [0]:
show_doc(CNDF.batch_size)

<h4 id="CNDF.batch_size" class="doc_header"><code>CNDF.batch_size</code><a href="" class="source_link" style="float:right">[source]</a></h4>

Returns the batch size of the current learner

In [0]:
show_doc(CNDF.input_sizes)

<h4 id="CNDF.input_sizes" class="doc_header"><code>CNDF.input_sizes</code><a href="" class="source_link" style="float:right">[source]</a></h4>

Returns the sizes (dimensions bs, ch, h, w) of the model)

In [0]:
show_doc(CNDF.output_dimensions)

<h4 id="CNDF.output_dimensions" class="doc_header"><code>CNDF.output_dimensions</code><a href="" class="source_link" style="float:right">[source]</a></h4>

Returns output dimensions of model (bs, classes)

In [0]:
show_doc(CNDF.model_info)

<h4 id="CNDF.model_info" class="doc_header"><code>CNDF.model_info</code><a href="" class="source_link" style="float:right">[source]</a></h4>

Return an info string derived from`Learner.model`

In [0]:
show_doc(CNDF.layer_params)

<h4 id="CNDF.layer_params" class="doc_header"><code>CNDF.layer_params</code><a href="__main__.py#L130" class="source_link" style="float:right">[source]</a></h4>

> <code>CNDF.layer_params</code>(**`idx`**)

Returns the parameters of layer with Index `idx`

In [0]:
show_doc(CNDF.layer_dims)

<h4 id="CNDF.layer_dims" class="doc_header"><code>CNDF.layer_dims</code><a href="__main__.py#L136" class="source_link" style="float:right">[source]</a></h4>

> <code>CNDF.layer_dims</code>(**`idx`**)

Return tuple of input and output dimensions (as strings) of layer with index `idx`

## Element selectors

In [0]:
#export
def get_layer(df, idx):
  "Returns layer with Index `idx` from CNDF dataframe `df` or `None` if `idx` indexes a container element and an exception if `idx` is invalid." 
  if isinstance(idx, int): 
    if 0 < idx < len(df):
      m = df.iloc[idx]
      if m['Layer_description'] in {'', '. . ContainerConvLayer', 'ContainerSequential'}:
        return None
      else: return m
    else: 
      assert True, 'Index out of range.'
  assert True, 'Invalid index'

In [0]:
_, _, cndf_test = get_test_vars()
test_eq(get_layer(cndf_test, 2)['Module_name'], '0.0')
test_eq(get_layer(cndf_test, 1), None)

In [0]:
#hide
try:
  get_layer(cndf_test, "2")
  get_layer(cndf_test, 100)
  assert True, 'Failed: get_layer() accepted invalid arguments'
except:
  pass