# This file will prototype job processing for scheduling and utilization of hw



In [1]:
from pathlib import Path
import time

import numexpr as ne
import torch
import yaml

In [2]:
jobs_path = Path('./jobs')

def get_jobs(jobs_path):
    # Jobs starting with _ are ignored, like the default job
    return [f for f in jobs_path.iterdir() if f.is_file() and not f.name.startswith('_')]

jobs = get_jobs(jobs_path)

In [3]:
if len(jobs) == 1:
    print('There is {} job to process'.format(len(jobs)))
else:
    print('There are {} jobs to process'.format(len(jobs)))

There is 1 job to process


In [4]:
class CustomNN(torch.nn.Module):
    def __init__(self, name, sequential_arch):
        super().__init__()
        self.name = name
        self.model = torch.nn.Sequential(*sequential_arch)
    def forward(self, X):
        return self.model(X)

In [5]:
def evaluate_expression(value):
    try: 
        if isinstance(value, str) and any(op in value for op in ['+', '-', '*', '/']):
            return ne.evaluate(value).item()
        else:
            return value
    except Exception as e:
        raise ValueError('Failed to evaluate expression {}. {}'.format(value, e))

class TrainingJob():
    def __init__(self, job_path):
        self.path = job_path
        self.job = None
        self.start_time = 0.
        self.layers = []
    
    def train(self):
        raise NotImplementedError
    
    def get_new_model(self):
        return CustomNN(self.model_name, self.layers)
    
    def __enter__(self):
        self.start_time = time.time()
        
        with open(self.path, 'r') as f:
            self.job = yaml.load(f, yaml.FullLoader)
            
        self._validate_job()
        self.__inject()
        
        return self
            
    def __exit__(self, exc_type, exc_value, traceback):
        self.end_time = time.time()
        
        # todo: save job data: logs, pictures, model, etc... in completed dir.
        print('Job duration: {:.2f} seconds'.format(self.end_time-self.start_time))

#         if exc_type is not None:
#             self._failure_processing()
#         else:
#             self._success_processing()
        
        return False
    
    def _validate_job(self):
        '''
        Perform minimal/basic job validation by checking YAML configuration for inconsistencies.
        '''
        assert 'model' in self.job
        assert 'name' in self.job['model']
        assert 'architecture' in self.job['model']
        
        assert 'ml_parameters' in self.job
        assert 'audio_parameters' in self.job
        assert 'job_parameters' in self.job
        
    def __inject(self):
        '''Injects dependencies. Assumes inputs are validated / healthy'''
        self.model_name = self.job['model']['name']
        
        for layer in self.job['model']['architecture']:
            cls = getattr(torch.nn, layer['layer_type'])
            layer_params = {key: evaluate_expression(value) for key, value in layer.items() if key != 'layer_type'}
            self.layers.append(cls(**layer_params))
        
        for key, value in self.job['ml_parameters'].items():
            setattr(self, key, value)
        for key, value in self.job['audio_parameters'].items():
            setattr(self, key, value)
        for key, value in self.job['job_parameters'].items():
            setattr(self, key, value)
        
        # Overrides for non-numeric / non-str types
        if getattr(self, 'loss_fn'):
            self.loss_fn = getattr(torch.nn, self.loss_fn)
        if getattr(self, 'device'):
            self.device = torch.device(self.device)
        if getattr(self, 'data_path'):
            self.data_path = Path(self.data_path).expanduser()
        
    def _failure_processing(self):
        raise notImplementedError
    def _success_processing(self):
        raise notImplementedError

In [6]:
for job_path in jobs:
    with TrainingJob(job_path) as job:
        print('Processing: {}'.format(job.path.name))
        model = job.get_new_model()
        print(model)

Processing: sample_cnn_1.yaml
CustomNN(
  (model): Sequential(
    (0): Conv2d(2, 60, kernel_size=(5, 5), stride=(1, 1))
    (1): BatchNorm2d(60, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): ReLU()
    (4): Conv2d(60, 120, kernel_size=(5, 5), stride=(1, 1))
    (5): BatchNorm2d(120, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): ReLU()
    (8): Flatten(start_dim=1, end_dim=-1)
    (9): Dropout(p=0.5, inplace=False)
    (10): Linear(in_features=57600, out_features=256, bias=True)
    (11): ReLU()
    (12): Dropout(p=0.5, inplace=False)
    (13): Linear(in_features=256, out_features=10, bias=True)
    (14): Softmax(dim=1)
  )
)
Job duration: 0.10 seconds
