<a href="https://colab.research.google.com/github/pseudonacer/pose_conversion_pfe/blob/main/Linear_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Imports

In [None]:
import tensorflow as tf
import matplotlib.pyplot
import numpy as np
import tqdm

# Load dataset

In [None]:
import h5py
import numpy as np


In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
pose2d = np.load("/content/gdrive/MyDrive/PFE/pose2D.npy")
pose3d = np.load("/content/gdrive/MyDrive/PFE/pose3D.npy")

In [None]:
pose2d_test = np.load("/content/gdrive/MyDrive/PFE/pose2D_test.npy")
pose3d_test = np.load("/content/gdrive/MyDrive/PFE/pose3D_test.npy")

# 1st arch

In [None]:
class Block(tf.keras.Model):
    def __init__(self, embed = 1024, dropout_rate = 0.3):
        super(Block, self).__init__()
        self.d1 = tf.keras.layers.Dense(embed)
        self.bn1= tf.keras.layers.BatchNormalization()
        self.ac1= tf.keras.layers.ReLU()
        self.dp1= tf.keras.layers.Dropout(dropout_rate)

    def call(self,x):
        x = self.dp1(self.ac1(self.bn1(self.d1(x))))
        return x  

In [None]:
class OuterBlock(tf.keras.Model):
    def __init__(self,embed = 1024, dropout_rate = 0.3):
        super(OuterBlock, self).__init__()
        self.fc = tf.keras.layers.Dense(embed, name = "resize")
        self.b1 = Block(embed ,dropout_rate)
        self.b2 = Block(embed ,dropout_rate)
        
    def call(self, inputs):
        skip = self.fc(inputs)
        x = self.b1(inputs)
        x = self.b2(x)
        #x = self.add([x, skip])
        return skip + x

In [None]:
class OuterBlock2(tf.keras.Model):
    def __init__(self, embed = 1024, dropout_rate = 0.3):
        super(OuterBlock2,  self).__init__()
        self.b1 = OuterBlock(embed, dropout_rate)
        self.b2 = OuterBlock(embed, dropout_rate)
        self.d3 = tf.keras.layers.Dense(156)
#        self.reshape = tf.keras.layers.Reshape((1,52,3))
        
    def call(self, inputs):
        x = self.b1(inputs)
        x = self.b2(x)
        x = self.d3(x)
#        x = self.reshape(x)
        return x 

# Training

In [None]:
dropout_rate = 0.3 #by default        
epochs = 10
batch_size = 128 #or 64
Model_DNN = OuterBlock2(embed = 512, dropout_rate = 0.3)

Model_DNN.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-3), loss = tf.keras.losses.MeanAbsoluteError())

In [None]:
history = Model_DNN.fit(x = pose2d, y = pose3d, batch_size = batch_size, epochs = 20, validation_split=0.2, shuffle= True)

In [None]:
save_model_dir = "/content/conv_pose"
Model_DNN.save(save_model_dir)
converter = tf.lite.TFLiteConverter.from_saved_model(save_model_dir)
tflite_model = converter.convert()

# Save the model.
with open('/content/conv_pose.tflite', 'wb') as f:
  f.write(tflite_model)

In [None]:
import time 

interpreter = tf.lite.Interpreter(
    model_path = "/content/conv_pose.tflite",
    num_threads=6)
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

input_data = np.float32(pose2d_test[0]).reshape(1,-1)
interpreter.set_tensor(input_details[0]['index'], input_data)

start_time = time.time()
interpreter.invoke()
stop_time = time.time()
print(stop_time - start_time)
output_data = interpreter.get_tensor(output_details[0]['index'])


In [None]:
print(output_data.shape)


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# evaluate 
drive.mount('/content/gdrive')
pred =  Model_DNN.predict(pose2d_test)
#np.save("/content/gdrive/MyDrive/PFE/pred",pred)

In [None]:
import matplotlib.pyplot as plt
from matplotlib import animation
from matplotlib import rc
rc('animation', html='jshtml')

drive.mount('/content/gdrive')

PARENTS = [-1, 0, 1, 2, 0, 4, 5, 0, 7, 7, 6, 10, 11, 12, 13, 10, 15, 16, 17, 10, 19, 20, 21, 10, 23, 24, 25, 10, 27, 28, 29, 3, 31, 32, 33, 34, 31, 36, 37, 38, 31, 40, 41, 42, 31, 44, 45, 46, 31, 48, 49, 50] # colors = ((255, 0, 0), (255, 0, 255), (170, 0, 255), (255, 0, 85), (255, 0, 170), (85, 255, 0),
default_skeleton = list(zip(range(len(PARENTS)), PARENTS))[1:]


def draw_poses(ax, ar, data, default_skeleton, color = "blue"):
    data = data.reshape(52, 3)
    artist = ax.scatter(*data.T, s = 10, color = color)
    ar.append(artist)
    for origin, end in default_skeleton :
        x, y , z   = data[end] 
        xx, yy, zz = data[origin]
        line, = ax.plot([x, xx],[y, yy], [z, zz], color = color,  linewidth = 2)
        ar.append(line)
    return ax, ar

f = h5py.File("/content/gdrive/MyDrive/PFE/demo_conan.hdf5", "r")

demo_pose2d = np.array(f['pose2d'])
demo_pred   = Model_DNN.predict(demo_pose2d)
demo_pose3d = np.array(f['pose3d'])

artists = []
fig = plt.figure(figsize = (10,6))
ax = fig.add_subplot(projection = '3d')
for i in range(len(demo_pose2d)):
  ar = []
  ax, ar = draw_poses(ax, ar, demo_pose3d[i], default_skeleton, color = "blue")
  ax, ar = draw_poses(ax, ar, demo_pred[i], default_skeleton, color = "red")
  ax.view_init(45,45)
  artists.append(ar)
ani = animation.ArtistAnimation(fig, artists, interval=20, blit=True, repeat_delay=200) 
plt.show()

In [None]:
ani.save("/content/animation.mp4", writer =  animation.FFMpegWriter(fps=20))

In [None]:
!sudo apt-get install imagemagick

# Conv1D architectures

In [None]:
class cBlock(tf.keras.Model):
    def __init__(self, num_filters = 64, dropout_rate = 0.3):
        super(cBlock, self).__init__()
        self.d1 = tf.keras.layers.Conv1D(filters = num_filters, kernel_size = 5,  padding = 'same') 
        self.bn1= tf.keras.layers.BatchNormalization()
        self.ac1= tf.keras.layers.ReLU()
        self.dp1= tf.keras.layers.Dropout(dropout_rate)

    def call(self,x):
        x = self.dp1(self.ac1(self.bn1(self.d1(x))))
        return x  

In [None]:
class ConvBlock_inner(tf.keras.Model):
    def __init__(self, num_filters = 64, dropout_rate = 0.3):
        super(ConvBlock_inner, self).__init__()
        self.b1 = cBlock( num_filters = num_filters ,dropout_rate = dropout_rate )
        self.b2 = cBlock( num_filters = num_filters ,dropout_rate = dropout_rate )
        self.add = tf.keras.layers.Add()
        
    def call(self, inputs):
        x = self.b1(inputs)
        x = self.b2(x)
        return inputs + x

In [None]:
class ConvBlock_outer(tf.keras.Model):
    def __init__(self, num_filters = [128, 128] , dropout_rate = 0.3):
        super(ConvBlock_outer,  self).__init__()
        self.reshape = tf.keras.layers.Reshape((104,1))
        self.b1 = ConvBlock_inner(num_filters[0], dropout_rate)
        self.b2 = ConvBlock_inner(num_filters[1], dropout_rate)
        self.flatten = tf.keras.layers.Flatten()
        self.d3 = tf.keras.layers.Dense(156)
        
    def call(self, inputs):
        x = self.reshape(inputs)
        x = self.b1(x)
        x = self.b2(x)
        x = self.flatten(x)
        x = self.d3(x)
        return x 

In [None]:
dropout_rate = 0.3 #by default        
epochs = 10
batch_size = 128 #or 64
model_conv = ConvBlock_outer()

model_conv.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-3), loss = tf.keras.losses.MeanAbsoluteError())

In [None]:
history = model_conv.fit(x = pose2d, y = pose3d, batch_size = batch_size, epochs = 20, validation_split=0.2, shuffle= True)


# SemGCN

In [None]:
! git clone https://github.com/garyzhao/SemGCN.git

Cloning into 'SemGCN'...
remote: Enumerating objects: 76, done.[K
remote: Counting objects: 100% (3/3), done.[K
remote: Compressing objects: 100% (3/3), done.[K
remote: Total 76 (delta 0), reused 1 (delta 0), pack-reused 73[K
Unpacking objects: 100% (76/76), 6.10 MiB | 14.55 MiB/s, done.


In [None]:
import sys
sys.path.insert(1, "/content/gdrive/MyDrive/PFE/SemGCN/")

In [None]:
!python3 /content/gdrive/MyDrive/PFE/SemGCN/main_colab.py  --dropout 0.2 --hid_dim 256 --num_layers 3 --batch_size 64  --lr 0.001 --non_local --checkpoint /content/gdrive/MyDrive/PFE --resume /content/gdrive/MyDrive/PFE/2023-01-27T09:34:10.370082/ckpt_best.pth.tar


==> Using settings Namespace(actions='*', batch_size=64, checkpoint='/content/gdrive/MyDrive/PFE', dataset='h36m', downsample=1, dropout=0.2, epochs=100, evaluate='', hid_dim=256, keypoints='gt', lr=0.001, lr_decay=100000, lr_gamma=0.96, max_norm=True, non_local=True, num_layers=3, num_workers=8, resume='/content/gdrive/MyDrive/PFE/2023-01-27T09:34:10.370082/ckpt_best.pth.tar', snapshot=5)
==> Loading dataset...
==> Preparing data...
==> Loading 2D detections...
==> Creating model...
==> Total parameters: 1.33M
==> Loading checkpoint '/content/gdrive/MyDrive/PFE/2023-01-27T09:34:10.370082/ckpt_best.pth.tar'
==> Loaded checkpoint (Epoch: 30 | Error: 290.15651578903197)

Epoch: 31 | LR: 0.00100000
[KTrain |#######                         | (627/2689) Data: 0.000545s | Batch: 0.054s | Total: 0:00:34 | ETA: 0:01:27 | Loss:  0.1234Traceback (most recent call last):
  File "/content/gdrive/MyDrive/PFE/SemGCN/main_colab.py", line 304, in <module>
    main(parse_args())
  File "/content/gdriv

In [None]:
import torch
import torch.nn as nn
import torch.backends.cudnn as cudnn
from torch.utils.data import Dataset, DataLoader


from progress.bar import Bar
from common.log import Logger, savefig
from common.utils import AverageMeter, lr_decay, save_ckpt
from common.graph_utils import adj_mx_from_skeleton
from common.data_utils import fetch, read_3d_data, create_2d_data
from common.loss import mpjpe, p_mpjpe
from common.skeleton import  Skeleton
from models.sem_gcn import SemGCN


check = torch.load("/content/gdrive/MyDrive/PFE/2023-01-27T09:34:10.370082/ckpt_best.pth.tar", map_location="cuda:0")

In [None]:
pats_skeleton = Skeleton(parents = [-1, 0, 1, 2, 0, 4, 5, 0, 7, 7, 6, 10, 11, 12, 13, 10, 15, 16, 17, 10, 19, 20, 21, 10, 23, 24, 25, 10, 27, 28, 29, 3, 31, 32, 33, 34, 31, 36, 37, 38, 31, 40, 41, 42, 31, 44, 45, 46, 31, 48, 49, 50],
                            joints_left =  [4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
                            joints_right = [1, 2, 3, 8, 31, 32, 33, 34, 35, 36, 37 ,38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51],
                            joints_group = [[0,7], [8,9], [1,4], [2, 31], [5, 10], [11,12] , [13,14], [15,16], [17,18], [19, 20], [21, 22], [23,24], [25, 26], [27, 28], [29, 30], [32, 33], [34, 35], [36, 37], [38, 39], [40, 41], [42,43],[44,45],[46,47],[48,49],[50,51], [15, 27]])


In [None]:
device = torch.device("cuda")
adj = adj_mx_from_skeleton(pats_skeleton)
model_pos = SemGCN(adj, 256, num_layers=3, p_dropout=0.2,
                       nodes_group=pats_skeleton.joints_group()).to(device)

In [None]:
model_pos.load_state_dict(check["state_dict"])

<All keys matched successfully>

# Export to tflite

In [None]:
!pip install onnx

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip install pytorch-to-tflite

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pytorch-to-tflite
  Using cached pytorch_to_tflite-0.0.4-py3-none-any.whl (9.7 kB)
Collecting pytorch
  Using cached pytorch-1.0.2.tar.gz (689 bytes)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pytorch
  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mpython setup.py bdist_wheel[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m See above for output.
  
  [1;35mnote[0m: This error originates from a subprocess, and is likely not a problem with pip.
  Building wheel for pytorch (setup.py) ... [?25lerror
[31m  ERROR: Failed building wheel for pytorch[0m[31m
[0m[?25h  Running setup.py clean for pytorch
Failed to build pytorch
Installing collected packages: pytorch, pytorch-to-tflite
  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mRunning 

In [None]:
! git clone https://github.com/onnx/onnx-tensorflow.git 
! cd onnx-tensorflow/ && pip install -e .

fatal: destination path 'onnx-tensorflow' already exists and is not an empty directory.
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Obtaining file:///content/onnx-tensorflow
  Preparing metadata (setup.py) ... [?25l[?25hdone
Installing collected packages: onnx-tf
  Attempting uninstall: onnx-tf
    Found existing installation: onnx-tf 1.10.0
    Uninstalling onnx-tf-1.10.0:
      Successfully uninstalled onnx-tf-1.10.0
  Running setup.py develop for onnx-tf
Successfully installed onnx-tf-1.10.0


In [None]:
import onnx

# Load the ONNX model
model = onnx.load("model_pos.onnx")

from onnx_tf.backend import prepare

tf_rep = prepare(model)

tf_rep.export_graph("")
# Check that the IR is well formed
# onnx.checker.check_model(model)

# # Print a Human readable representation of the graph
# onnx.helper.printable_graph(model.graph)

BackendIsNotSupposedToImplementIt: ignored

In [None]:
!pip install onnx2keras

In [None]:
"""
The PyTorch2Keras converter interface
"""

from onnx2keras import onnx_to_keras
import torch
import onnx
import io
import logging


def pytorch_to_keras(
    model, args, input_shapes=None,
    change_ordering=False, verbose=False, name_policy=None,
    do_constant_folding=False
):
    """
    By given PyTorch model convert layers with ONNX.
    Args:
        model: pytorch model
        args: pytorch model arguments
        input_shapes: keras input shapes (using for each InputLayer)
        change_ordering: change CHW to HWC
        verbose: verbose output
        name_policy: use short names, use random-suffix or keep original names for keras layers
    Returns:
        model: created keras model.
    """
    logger = logging.getLogger('pytorch2keras')

    if verbose:
        logging.basicConfig(level=logging.DEBUG)

    logger.info('Converter is called.')

    if name_policy:
        logger.warning('Name policy isn\'t supported now.')

    if input_shapes:
        logger.warning('Custom shapes isn\'t supported now.')

    if input_shapes and not isinstance(input_shapes, list):
        input_shapes = [input_shapes]

    if not isinstance(args, list):
        args = [args]

    args = tuple(args)

    dummy_output = model(*args)

    if isinstance(dummy_output, torch.autograd.Variable):
        dummy_output = [dummy_output]

    input_names = ['input_{0}'.format(i) for i in range(len(args))]
    output_names = ['output_{0}'.format(i) for i in range(len(dummy_output))]

    logger.debug('Input_names:')
    logger.debug(input_names)

    logger.debug('Output_names:')
    logger.debug(output_names)

    stream = io.BytesIO()
    torch.onnx.export(model, args, stream, do_constant_folding=do_constant_folding, verbose=verbose, input_names=input_names, output_names=output_names)

    stream.seek(0)
    onnx_model = onnx.load(stream)

    k_model = onnx_to_keras(onnx_model=onnx_model, input_names=input_names,
                            input_shapes=input_shapes, name_policy=name_policy,
                            verbose=verbose, change_ordering=change_ordering)
    return k_model

In [None]:
k_model = pytorch_to_keras(model_pos.to(cpu), args = [x.to(cpu)], input_shapes = (52,2) , name_policy= "random")



ValueError: ignored

In [None]:
# Install the MLTK (if necessary)
!pip install --upgrade silabs-mltk

In [None]:
# Install the standard ONNX Python package
# so that we can read the .onnx formatted model file
!pip install onnx onnx_tf

In [None]:
# Install the onnsim Python package
# This can help reduce the complexity of the generated ONNX model file
# https://github.com/daquexian/onnx-simplifier
!pip install onnx-simplifier onnxruntime

In [None]:
# Install the openvino_dev Python package
# This allows for converting the ONNX model to an intermediate 
# format so we can then convert it to a TF-Lite model format
# https://docs.openvino.ai/
!pip install openvino_dev

In [None]:
# Install the openvino2tensorflow Python package
# This allows for converting from the openvino format to the .tflite model format
# We primarily need this so we can convert from the NCHW used by .onnx 
# to the NHWC used by .tflite
# https://github.com/PINTO0309/openvino2tensorflow
!pip install openvino2tensorflow  tensorflow_datasets

### Save Pytorch model into onnx 

In [None]:
cpu = torch.device("cpu")
x = pose2d[0]
x = torch.from_numpy(x).to(device).float()
x = torch.reshape(x, (1, 52, 2))
torch.onnx.export(model_pos.to(cpu), x.to(cpu), "model_pos.onnx", export_params=True, verbose=True)


In [None]:
k_model = pytorch_to_keras(


In [None]:
import onnxsim
import onnx

simplified_onnx_model, success = onnxsim.simplify('model_pos.onnx')
assert success, 'Failed to simplify the ONNX model. You may have to skip this step'
simplified_onnx_model_path =  f'model_pos.simplified.onnx'
print(simplified_onnx_model_path)
print(f'Generating {simplified_onnx_model_path} ...')
onnx.save(simplified_onnx_model, simplified_onnx_model_path)
print('done')

model_pos.simplified.onnx
Generating model_pos.simplified.onnx ...
done


In [None]:
import sys
import os

# Import the model optimizer tool from the openvino_dev package
from openvino.tools.mo import main as mo_main
import onnx
from onnx_tf.backend import prepare
from mltk.utils.shell_cmd import run_shell_cmd

# Load the ONNX model
onnx_model = onnx.load("model_pos.onnx")
tf_rep = prepare(onnx_model)

# Get the input tensor shape
input_tensor = tf_rep.signatures[tf_rep.inputs[0]]
input_shape = input_tensor.shape
input_shape_str = '[' + ','.join([str(x) for x in input_shape]) + ']'



openvino_out_dir = f'openvino'
os.makedirs(openvino_out_dir, exist_ok=True)


print(f'Generating openvino at: {openvino_out_dir}')
cmd = [ 
    sys.executable, mo_main.__file__, 
    '--input_model', simplified_onnx_model_path,
    '--input_shape', input_shape_str,
    '--output_dir', openvino_out_dir,
    '--data_type', 'FP32'

]
retcode, retmsg = run_shell_cmd(cmd,  outfile=sys.stdout)
assert retcode == 0, 'Failed to do conversion' 

Generating openvino at: openvino
[ INFO ] The model was converted to IR v11, the latest model format that corresponds to the source DL framework input/output format. While IR v11 is backwards compatible with OpenVINO Inference Engine API v1.0, please use API v2.0 (as of 2022.1) to take advantage of the latest improvements in IR v11.
Find more information about API v2.0 and IR v11 at https://docs.openvino.ai/latest/openvino_2_0_transition_guide.html
[ SUCCESS ] Generated IR version 11 model.
[ SUCCESS ] XML file: /content/openvino/model_pos.simplified.xml
[ SUCCESS ] BIN file: /content/openvino/model_pos.simplified.bin


In [None]:
import os 
from mltk.utils.shell_cmd import run_shell_cmd

openvino2tensorflow_out_dir = f'openvino2tensorflow'
openvino_xml_name = os.path.basename(simplified_onnx_model_path)[:-len('.onnx')] + '.xml'


if os.name == 'nt':
  openvino2tensorflow_exe_cmd = [sys.executable, os.path.join(os.path.dirname(sys.executable), 'openvino2tensorflow')]
else:
  openvino2tensorflow_exe_cmd = ['openvino2tensorflow']

print(f'Generating openvino2tensorflow model at: {openvino2tensorflow_out_dir} ...')
cmd = openvino2tensorflow_exe_cmd + [ 
    '--model_path', f'{openvino_out_dir}/{openvino_xml_name}',
    '--model_output_path', openvino2tensorflow_out_dir,
    '--output_saved_model',
    '--output_no_quant_float32_tflite'
]

retcode, retmsg = run_shell_cmd(cmd)
assert retcode == 0, retmsg
print('done')

Generating openvino2tensorflow model at: openvino2tensorflow ...


AssertionError: ignored

In [None]:
import tensorflow as tf 

tflite_int8_model_path = f'model_pos.int8.tflite'

converter = tf.lite.TFLiteConverter.from_saved_model("/content/openvino2tensorflow/model_float32.tflite")

def representative_dataset():
    for i, sample in enumerate(x_test):
        yield [np.expand_dims(sample, axis=0)]
        if i >= 1000: # We only need a small portion of the dataset to do the quantization 
            break

converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8] # We only want to use int8 kernels
converter.inference_input_type = tf.float32 # Can also be tf.int8
converter.inference_output_type = tf.float32  # Can also be tf.int8
converter.representative_dataset = representative_dataset

print(f'Generating {tflite_int8_model_path} ...')
tflite_quant_model = converter.convert()

with open(tflite_int8_model_path, 'wb') as f:
    f.write(tflite_quant_model)

print('done')

OSError: ignored