<a href="https://colab.research.google.com/github/shilz1007/shilz1007/blob/main/P%2B%2B.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import sys
import datetime
import tensorflow as tf
import numpy as np
from tensorflow.keras import Model
from tensorflow.keras.layers import Layer,Dense, Dropout, BatchNormalization,MaxPool1D


In [None]:
def square_distance(src,dst):
  B,N,_ = src.shape
  _,M,_ = dst.shape
  dist = -2 * tf.lingalg.matmul(src,dst.tf.transpose(0,2,1))
  dist += tf.math.reduce_sum(src ** 2, -1).tf.reshape(B,N,1)
  dist += tf.math.reduce_sum(dst ** 2, -1).tf.reshape(B,1,M)
  return dist

In [None]:
from numpy import int64
def index_points(points,idx):
  B = points.shape[0]
  view_shape = list(idx.shape)
  view_shape[1:] = [1] * (len(view_shape) - 1)
  repeat_shape = list(idx.shape)
  repeat_shape[0] = 1
  batch_indices = tf.range(B,dtype=int64).tf.reshape(view_shape).repeat(repeat_shape)
  new_points = points[batch_indices, idx, :]
  return new_points 

In [None]:
def farthest_point_sample(xyz,npoint):
  N,D = xyz.shape
  centroids = np.zeroes((npoint,))
  distance =  np.ones((N,)) * 1e10
  farthest = np.random.randint(0,N)
  for i in range(npoint):
    centroids[i] = farthest
    centroid = xyz[farthest,:]
    dist = np.sum((xyz - centroid)** 2,1)
    mask = dist < distance
    distance[mask] = dist[mask]
    farthest = np.argmax(distance,-1)
  point = point[centroids.astype(np.int32)]
  return point  

In [None]:
def query_ball_point(radius,nsample,xyz,new_xyz):
  B,N,C = xyz.shape
  _,S,_ = new_xyz.shape
  group_idx = tf.range(N,dtype=int64).tf.reshape(1,1,N).tf.repeat([B, S, 1])
  sqrdists = square_distance(new_xyz,xyz)
  group_idx[sqrdists > radius ** 2] = N
  group_idx = group_idx.tf.sort(dim=-1)[0][:,:,nsample]
  group_first = group_idx[:,:,0].tf.reshape(B,S,1).repeat([1,1,nsample])
  mask = group_idx == N
  return group_idx 

In [None]:
def sample_and_group(npoint,radius,nsample,xyz,points,knn=False,use_xyz=True):
  
  new_xyz = index_points(xyz,farthest_point_sample(npoint,xyz))
  
  idx,pts_cnt = query_ball_point(npoint,nsample,xyz,new_xyz)

  grouped_xyz = index_points(xyz,idx)
  grouped_xyz -= tf.tile(tf.expand_dims(new_xyz,2),[1,1,nsample,1]) 
  if points is not None:
     grouped_points = index_points(points,idx)
     if use_xyz:
       new_points = tf.concat([grouped_xyz,grouped_xyz],axis=-1)
     else:
       new_points = grouped_xyz
  else:
       new_points = grouped_xyz

  return new_xyz,new_points,idx,grouped_xyz            
     
   

In [None]:
def sample_and_group_all(xyz,points,use_xyz=True):

  batch_size = xyz.get_shape()[0]
  nsample = xyz.get_shape()[1]

  new_xyz = tf.constant(np.tile(np.array([0,0,0]).reshape((1,1,3)),(batch_size,1,1)),dtype=tf.float32)

  idx = tf.constant(np.tile(np.array(range(nsample)).reshape((1,1,nsample)), (batch_size,1,1)))
  grouped_xyz = tf.reshape(xyz,(batch_size, 1, nsample, 3))

  if points is not None:
     if use_xyz:
        new_points = tf.concat([xyz,points])
     else:
        new_points = points
     new_points = tf.expand_dims(new_points,1)
  else:
     new_points = grouped_xyz
  return new_xyz, new_points,idx,grouped_xyz            



In [None]:
class Conv2d(Layer):
  def __init__(self,filters,strides=[1,1],activation=tf.nn.relu,padding='Valid',initializer='glorot_normal',bn=False):
    super(Conv2d,self).__init__

    self.filters = filters
    self.strides = strides
    self.activation = activation
    self.padding = padding 
    self.initializer = initializer
    self.bn = bn

  def build(self,input_shape):

    self.w = self.add_weight(shape=(1, 1, input_shape[-1], self.filters),initializer=self.initializer,trainable=True,name='pnet_conv')

    if self.bn: self.bn_layer = BatchNormalization()
    
    super(Conv2d, self).build(input_shape)

  def call(self,inputs,training=True):

    points = tf.nn.conv2d(inputs,filters=self.w,strides=self.strides,padding=self.padding)

    if self.bn: points = self.bn_layers(points,training=training)
    if self.activation: points = self.activation(points)

    return points  


In [None]:
class Pointnet_SA(Layer):
  def __init__(
		  self, npoint, radius, nsample, mlp, group_all=False, knn=False, use_xyz=True, activation=tf.nn.relu, bn=False):
      super(Pointnet_SA, self).__init__()

      self.npoint = npoint
      self.radius = radius
      self.nsample = nsample
      self.mlp = mlp
      self.group_all = group_all
      self.knn = knn
      self.use_xyz = xyz 
      self.activation = activation
      self.bn = bn

      self.mlp_list = []

  def build(self,input_shape):
     for i,n_filters in enumerate(self.mlp):
        self.mlp_list.append(Conv2d(n_filters,activation = self.activation,bn = self.bn))

     super(Pointnet_SA,self).build(input_shape)

  def call(self,xyz,points,training=True):
    if points is not None:
      if len(points.shape) < 3:
        points = tf.expand_dims(points,axis=0)

    if self.group_all:
       nsample = xyz.get_shape()[1]
       new_xyz, new_points,idx,grouped_xyz = sample_and_group_all(xyz,points,self.use_xyz)

    else:
      new_xyz, new_points,idx, grouped_xyz = sample_and_group(
                                             self.npoint,
                                             self.radius,
                                             self.nsample,
                                             xyz,
                                             points,
                                             self.knn,
                                             use_xyz = self.use_xyz)

    for i, mlp_layer in enumerate(self.mlp_list):
        new_points = mlp_layer(new_points,training=training)

    new_points = tf.math.reduce_max(new_points,axis=2,keepdims=True)

    return new_xyz,tf.squeeze(new_points)                        

		  

 



	   

In [None]:
class Pointnet_SA_MSG(Layer):

  def __init__(self,npoint,radius_list,nsample_list,mlp,use_xyz=True,activation=tf.nn.relu,bn=False):
      super(Pointnet_SA_MSG,self)._init_()
      self.npoint = npoint
      self.radius_list = radius_list
      self.nsample_list = nsample_list
      self.mlp = mlp
      self.use_xyz = use_xyz
      self.activation = activation
      self.bn = bn

      self.mlp_list = []

  def build(self,input_shape):

    for i in range(len(self.radius_list)):
        temp_list = []
        for i , n_filters in enumerate(self.mlp[i]):
            temp_list.append(Conv2d(n_filters,activation=self.activation,bn=self.bn))
        self.mlp_list.append(temp_list)
    super(Pointnet_SA_MSG,self).build(input_shape)

  def Call(self,xyz,points, training=True):
    if points is not None:
      if len(points.shape) < 3:
         points = tf.expand_dims(points,axis = 0)

    new_xyz = index_points(xyz,farthest_point_sample(self.npoint,xyz))

    new_point_list = []

    for i in range(len(self.radius_list)):
        radius = self.radius_list[i]
        nsample = self.nsample[i]
        idx,pts_cnt = query_ball_point(radius,nsample,xyz,new_xyz)
        grouped_xyz = index_points(xyz,idx)
        grouped_xyz -= tf.tile(tf.expand_dims(new_xyz, 2), [1,1,nsample,1])

        if points is not None:
          grouped_points = index_points(points,idx)
          if self.use_xyz:
            grouped_points = tf.concat([grouped_points,grouped_xyz],axis=-1)
        else:
          grouped_points = grouped_xyz

        for i,mlp_layer in enumerate(self.mlp_list[i]):
          grouped_points = mlp_layer(grouped_points,trainig = training)

        new_points = tf.math.reduce_max(grouped_points,axis = 2)
        new_point_list.append(new_points)  

    new_points_concat = tf.concat(new_point_list,axis = -1)

    return new_xyz, new_points_concat            




            


In [None]:
class CLS_MSG_Model(Model):
   def __init__(self, batch_size, num_classes, bn=False, activation=tf.nn.relu):
     super(CLS_MSG_Model, self).__init__()

     self.activation = activation
     self.batch_size = batch_size
     self.num_classes = num_classes
     self.bn = bn

     self.kernel_initializer = 'glorot_normal'
     self.kernel_regularizer = None

     self.init_network()

   def init_network(self):

     self.layer1 = Pointnet_SA_MSG(
                npoint=1024,
                radius_list=[0.1,0.2,0.4],
                nsample_list=[16,32,128],
                mlp=[[32,32,64], [64,64,128], [64,96,128]],
                activation=self.activation,
                bn = self.bn
                )
     self.layer2 = Pointnet_SA_MSG(
                npoint=512,
                radius_list=[0.2,0.4,0.8],
                nsample_list=[32,64,128],
                mlp=[[64,64,128], [128,128,256], [128,128,256]],
                activation=self.activation,
                bn = self.bn
                )
     self.layer3 = Pointnet_SA(npoint=None,
                               radius=None,
                               nsample=None,
                               mlp=[256, 512, 1024],
                               group_all=True,
                               activation=self.activation,
                               bn = self.bn)
     self.dense1 = Dense(512, activation=self.activation)
     self.dropout1 = Dropout(self.keep_prob)
     self.dense2 = Dense(128, activation=self.activation)
     self.dense2 = Dense(128, activation=self.activation)

   def forward_pass(self,input,training):

    xyz, points = self.layer1(input, None, training=training)
    xyz, points = self.layer2(xyz, points, training=training)
    xyz, points = self.layer3(xyz, points, training=training)

    net = tf.reshape(points, (self.batch_size, -1))
    net = self.dense1(net)
    net = self.dropout1(net)
    net = self.dense2(net)
    net = self.dropout2(net)
    pred = self.dense3(net)

    return pred

   def train_step(self,input):
     with tf.GradientTape() as tape:
         pred = self.forward_pass(input[0], True)
         loss = self.compiled_loss(input[1], pred)
     gradients = tape.gradient(loss, self.trainable_variables)
     self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
     self.compiled_metrics.update_state(input[1], pred)
     return {m.name: m.result() for m in self.metrics}  

   def test_step(self,input):
     pred = self.forward_pass(input[0], False)
     loss = self.compiled_loss(input[1], pred)

     self.compiled_metrics.update_state(input[1], pred)
     return {m.name: m.result() for m in self.metrics}

   def call(self,input,training=False):
      return self.forward_pass(input, training)        

In [None]:
import tensorflow as tf
import numpy as np

npoints = 200
N = 5
centroids = np.zeros((npoints,0))
print(centroids)
distance = np.ones((N,)) * 1e5
print(distance)

[]
[100000. 100000. 100000. 100000. 100000.]
