In [19]:
import os
from pathlib import Path, PurePath
import math
from fastai.vision.all import *
import torchaudio
import tempfile

In [20]:
# Figure out fft sizes
# STFT LF
sr = 32000
imgsize = 460
Fmax = sr / 2
Nfft_lf = 32768

Fbin_lf = Fmax / Nfft_lf
Nskip_lf = Nfft_lf //5  # 20% overlap  #int(round((sr * 60 - Nfft_lf) / (imgsize))) # makes a minute 
rng_lf = Nskip_lf * (imgsize-1)
       
stft_lf = torchaudio.transforms.Spectrogram(Nfft_lf,hop_length=Nskip_lf, power=2,return_complex=False ).cuda()

# STFT HF
Nfft_hf = 1024
Fbin_hf = Fmax / Nfft_hf
Nskip_hf = 1392 #int(round((sr * 60 - Nfft_hf*2) / imgsize)/2 ) # half a minute
rng_hf = Nskip_hf * imgsize + Nfft_hf
stft_hf = torchaudio.transforms.Spectrogram(Nfft_hf,hop_length=Nskip_hf, power=2,return_complex=False ).cuda()


In [21]:
with tempfile.TemporaryFile as tmp

SyntaxError: invalid syntax (2551211368.py, line 1)

In [22]:
# Patch learner for batch predictions
def predict_batch(self, item, rm_type_tfms=None, with_input=False):
    dl = self.dls.test_dl(item, rm_type_tfms=rm_type_tfms, num_workers=0)
    ret, _,mask = self.get_preds(dl=dl, with_decoded=True)
    return ret, mask
Learner.predict_batch = predict_batch

In [23]:
def readWav(p: Path, rand=True):
    frames = torchaudio.info(p).num_frames
    last = frames-rng_lf
    wav = torch.Tensor()  
    # Repeat wav if not long enough
    while last < 0:
        wav = torch.cat((wav,torchaudio.load(p)[0]),1)
        last += frames
    # Random start point
    start = random.randint(0,last) if rand else int(last/2)
    
    # If enough frames
    if frames-rng_lf > 0: 
        return torchaudio.load(p, num_frames=rng_lf, frame_offset=start)[0]
    else: 
        wav = torch.cat((wav,torchaudio.load(p)[0]),1)
        return wav[:,start:start+rng_lf]


In [24]:
def normSamp(audio):
    ret = audio - torch.mean(audio)
    return ret / torch.max(ret)

def normSpec(spec):
    # take the logarithm of the values
    ret = torch.log10(spec+1e-20)
    mean = torch.mean(ret)
    std = torch.std(ret)
    # Normalize each frame so its max 1, we dont need the extra dimension
    #return (ret / torch.transpose(torch.max(ret,2)[0],0,1))[0]
    #return (ret / torch.max(ret))[0]
    
    ret =  (ret - mean) / (std*4) + 0.5
    return torch.clamp(ret, min=0, max=1) 


In [25]:
def wavToSpecs(wavs : torch.Tensor, hf_idx=0):
    lf = stft_lf(wavs)[0]
    
    lf0 = normSpec(lf[:imgsize,:imgsize])
    lf1 = normSpec(lf[imgsize:imgsize*2,:imgsize])
    
    hf = stft_hf(wavs[:,hf_idx:hf_idx+rng_hf])[0]
    
    hf = normSpec(hf[12:imgsize+12,:imgsize])
    #return torch.stack((normSpec(lf[0, ]),mf, hf),0)
    return Spectrogram.create(torch.stack((lf0,lf1,hf),0))

In [26]:
class Spectrogram(TensorImageBase):
    """Type to represent a spectogram which knows show itself"""
    @classmethod
    def create(cls, o: Tensor):
        return cls(o)
    
    def show(self, figsize=None, ctx=None, **kwargs): 
        channels = self.shape[0]
        t = self
        if not isinstance(t, Tensor): return ctx
        if figsize is None: figsize=(10,10)   
        #f, axarr = plt.subplots(1,3,figsize=(15,15))
        #axarr[0].imshow(specs[0,:,:].cpu(),extent=[0,imgsize,(imgsize+1)*Fbin_lf,Fbin_lf] ,aspect=1/Fbin_lf)
        #axarr[1].imshow(specs[1,:,:].cpu(),extent=[0,imgsize,(imgsize+50)*Fbin_hf,Fbin_hf*50] ,aspect=1/Fbin_hf)
        #axarr[2].imshow(specs[2,:,:].cpu())
        return show_images(t, nrows=1, ncols=channels)

In [27]:
# Make a fastai Transform
class SpectrogramTransform(RandTransform):
    "A transform handler for multiple `spect` transforms"
    split_idx,order=None,0  # 0 = HIGH prio
    #def __init__(self, train_aug, valid_aug): store_attr()
    def __init__(self): 
        store_attr()
    
    def before_call(self, b, split_idx):
        self.idx = split_idx
    
    def encodes(self, p : Path):
        
        if self.idx == 0: #Train transform
            hf_idx = random.randint(0,rng_lf-rng_hf)
            wav = readWav(p, True)
        else: #Valid transform
            hf_idx = (rng_lf-rng_hf) //2
            wav = readWav(p, False)
        return wavToSpecs(wav.cuda(), hf_idx)
    
    #def decodes(self, x): return TitledImage(x,'test')
def get_wavs(p : Path) :
    return get_files(p,'.wav')
#class ImgTransform(ItemTransform):
#    def __init__(self, vocab): self.vocab = vocab
#    def encodes(self, o): return o
    #def decodes(self, x): return TitledImage(x[0],self.vocab[x[1]])

In [28]:
def label_func(p : Path):
    if PurePath(p).parent.name == "AmbientSE": return []
    return [PurePath(p).parent.parent.name]

In [29]:
def predTensor(p, overlap=0.5):
    frames = torchaudio.info(p).num_frames
    wav,sr = torchaudio.load(p)
    
    # Resample audio if different samplerate
    if(sr != 32000): 
        print('resampling to 32000..')
        wav = torchaudio.functional.resample(wav, sr, 32000)
        sr = 32000
        frames = wav.shape[1]
        print('resampling done..')
    # Repeat wav if not long enough
    
    while wav.shape[1]-rng_lf < 0 :
        wav = torch.cat((wav,wav),1)
        
    spec = wavToSpecs(wav[:,0:rng_lf].cuda())[None,:,:,:]
    # cat rest of the frames
    
    hf_idx = (rng_lf-rng_hf) //2
    
    start = int(Nskip_lf *(1-overlap) * (imgsize))
    
    runs = int((frames-Nfft_lf)/(Nskip_lf*(1-overlap))/imgsize) +1 
    for i in range(1,runs):
        idx = start*i
        if idx+rng_lf > frames: # add last frame 
            spec = torch.cat((spec,wavToSpecs(wav[:,-rng_lf:].cuda(),hf_idx)[None,:,:,:]),0)
            break
        # Add frame according to index
        spec = torch.cat((spec,wavToSpecs(wav[:,idx:idx+rng_lf].cuda(),hf_idx)[None,:,:,:]),0)
    return spec

In [13]:
testfile = '../DeepShip/roro.wav'

In [14]:
specBatch = predTensor(testfile, 0.5)

formats: can't open input file `../DeepShip/roro.wav': No such file or directory


RuntimeError: Error loading audio file: failed to open file ../DeepShip/roro.wav

In [15]:
plt.imshow(specBatch[1,0].cpu())

NameError: name 'specBatch' is not defined

In [32]:
learn = load_learner('../models/resnet50-93')

In [31]:
preds = learn.predict_batch(specBatch); preds

NameError: name 'specBatch' is not defined

In [None]:
def pred(pbatch):
    p = torch.mean(pbatch[0]* (pbatch[1]+ 0.0* ~pbatch[1]) ,0).tolist()
    return {k: v for k,v in zip(learn.dls.vocab, p)}

In [None]:
pred(preds)