In [None]:
import os
import math
import zipfile
import datetime
import matplotlib.pyplot as plt
%matplotlib inline

import numpy as np
import pandas as pd
from tqdm.notebook import tqdm as tqdm

import torch  # root package
from torch.utils.data import Dataset, DataLoader  # dataset representation and loading
import torch.autograd as autograd  # computation graph
from torch import Tensor  # tensor node in the computation graph
import torch.nn as nn  # neural networks
import torch.nn.functional as F  # layers, activations and more
import torch.optim as optim  # optimizers e.g. gradient descent, ADAM, etc.
from torch.jit import script, trace  # hybrid frontend decorator and tracing jit

from model import ManyMLP

# pd.set_option('display.max_rows', 200)
pd.set_option('display.min_rows', 200)
pd.set_option('display.max_columns', 10)
pd.set_option('display.max_colwidth', 100)
pd.set_option('precision', 2)

np.set_printoptions(precision=4)

torch.set_default_dtype(torch.float64)

In [None]:
test_file_names = ['test_easy.csv', 'test_normal.csv', 'test_hard.csv']
test_df = [pd.read_csv(f'input_data/test/{file_name}', dtype={'motion': str}) for file_name in test_file_names]

parts = test_df[0].keys()[2:]

frames_length = [5, 15, 45]
target_name = ['easy', 'normal', 'hard']

In [None]:
try:
    os.mkdir('outputs/MMLP')
except Exception as e:
    print(e)

try:
    os.mkdir('outputs/MMLP-PPed')
except Exception as e:
    print(e)

In [None]:
def zipping(res_dir):
    t = datetime.datetime.now().strftime('%Y%m%d%H%M%S')

    with zipfile.ZipFile(f'outputs/{res_dir}-{t}.zip', 'w', compression=zipfile.ZIP_DEFLATED) as new_zip:
        new_zip.write(f'outputs/{res_dir}/test_easy.csv', arcname='test_easy.csv')
        new_zip.write(f'outputs/{res_dir}/test_normal.csv', arcname='test_normal.csv')
        new_zip.write(f'outputs/{res_dir}/test_hard.csv', arcname='test_hard.csv')

In [None]:
def fft_a(x, fc):
    N = len(x)
    
    F = np.fft.fft(x, N, axis=0)
    
    fq = np.linspace(0, N-1, N)
    F[(fq > fc)] = 0
    
    F_ifft = np.fft.ifft(F, N, axis=0)

    return F_ifft.real

def fft_a_2(x, frames):
    N = len(x)
    
    F = np.fft.fft(x, N, axis=0)
    
    fq = np.linspace(0, N-1, N)
    F[(fq == frames+1)] = 0
    
    F_ifft = np.fft.ifft(F, N, axis=0)

    return F_ifft.real

def fft_b(x, ac):
    N = len(x)
    
    F = np.fft.fft(x, N, axis=0)    
    
    F[(np.abs(F) < ac)] = 0
    
    F_ifft = np.fft.ifft(F, N, axis=0)

    return F_ifft.real

In [None]:
D_FFN = 128
DEPTH = 16

JOINT_NUM = 21
JOINT_DIM = 3
D_TYPE = 'float64'

predict_df = [pd.DataFrame([]) for _ in range(3)]

for INDEX in range(2, 3):
    FRAMES = frames_length[INDEX]
    TARGET = target_name[INDEX]

    model = ManyMLP((FRAMES+1, JOINT_NUM, JOINT_DIM), d_ffn=D_FFN, depth=DEPTH)
    model.load_state_dict(torch.load(f"models/{TARGET}/latest.pth"))
    model.eval()

    parts = test_df[INDEX].keys()[2:]

    for m, v in tqdm(test_df[INDEX].groupby('motion')):
        t = v[parts].values
        for j in range(0, t.shape[0] - FRAMES, FRAMES):
            X = np.zeros((FRAMES + 1, JOINT_NUM, JOINT_DIM))
            X[0] = t[j].reshape(JOINT_NUM, JOINT_DIM)
            X[FRAMES] = t[j + FRAMES].reshape(JOINT_NUM, JOINT_DIM)

            X = torch.tensor(np.array([X.astype(D_TYPE)]))

            with torch.no_grad():
                pred = model(X)[0]

            for f in range(1, FRAMES):
                fid = j + f + 1
                idx = v[v['frame_id'] == fid].index
                
                v.loc[idx] = [m, fid] + pred[f].flatten().tolist()

        predict_df[INDEX] = pd.concat([predict_df[INDEX], v])

for INDEX in range(0, 3):
    TARGET = target_name[INDEX]
    predict_df[INDEX].to_csv(f'outputs/MMLP/test_{TARGET}.csv', index=False)

zipping('MMLP')

In [None]:
df = [d.copy() for d in predict_df]

def FC(m, mv, base):
    if mv < 100:
        base -= 1
    
    return base

def AC(m, mv, base=5):
    if m in ['032', '059']:
        base *= 2
    if mv >= 500:
        base /= 2
    
    return base

def WIDTH(m, mv, base=1):
    if m in ['032', '059']:
        base += 1
    
    return base

with tqdm(range(3)) as tqdm_:
    for INDEX in range(0, 3):
        FRAMES = frames_length[INDEX]
        TARGET = target_name[INDEX]
        
        # FFTによるノイズ除去
        for m, v in df[INDEX].groupby('motion'):
            t = v[parts].values
            N = len(t)

            mv = np.linalg.norm(t[-1]-t[0]) # 移動量の目安
            
            fc = FC(m, mv, N)
            ac = AC(m, mv)
            
            # 周波数と振幅でノイズ除去
            t_fft = t
#             t_fft = fft_a(t, fc)
            t_fft = fft_b(t_fft, ac)
            for f0 in range(0, N-1, FRAMES):
                for p in range(1, FRAMES + 1):
                    f = f0 + p + 1
                    idx = v[v['frame_id'] == f].index

                    df[INDEX].loc[idx] = [m, f] + t_fft[f-1].tolist()

            print(m, mv, fc, ac, np.mean(np.square(t - t_fft)))
        
        # 平均によるスムージング
        for m, v in df[INDEX].groupby('motion'):
            t = v[parts].values
            N = len(t)
            
            mv = np.linalg.norm(t[-1]-t[0]) # 移動量の目安
            
            w = WIDTH(m, mv, INDEX+1)

            tmp = {}
            for f0 in range(0, N-1, FRAMES):
                for p in range(1, FRAMES + 1):
                    f = f0 + p + 1
                    idx = v[v['frame_id'] == f].index

                    l = max(0, f - w - 1)
                    r = min(N, f + w)
                    v0 = np.mean(t[l:r], 0)

                    tmp[idx[0]] = [m, f] + v0.tolist()

            for key, value in tmp.items():
                df[INDEX].loc[key] = value

        TARGET = target_name[INDEX]
        df[INDEX].to_csv(f'outputs/MMLP-PPed/test_{TARGET}.csv', index=False)
        
        tqdm_.postfix = str(INDEX)
        tqdm_.update()

zipping('MMLP-PPed')