<a href="https://colab.research.google.com/github/wdwzyyg/ElectronCounting/blob/master/Example_counting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install and import packages

In [None]:
!pip install ElectronCounting --upgrade
import CountingNN
import torch
import os

# Evaluation funciton

In [3]:
from sklearn.metrics import pairwise_distances_argmin_min
def pos_deviation(coords, truth, threshold):
    """
    Cal the root mean square error between detected electron incident positions and the ground truth positions in units of pixels.
    """
    # elements in pair 1 need to be no less than pair 2
    distances = []
    if len(coords):
      assigment,distances = pairwise_distances_argmin_min(coords, truth)

    return distances



def end2end_evaluation(groundtruth, predicted, tolerance):
  '''
  Args:
  groundtruth: the ground truth image in electron counts
  predicted: model predicted image in electron counts
  tolerance: predictions with position error no larger than "tolerance" pixels will be selected as truth positive
  Returns:
  recall
  precision
  f1
  dce: detector conversion efficiency, # of all detected e- / # of ground truth e-
  mae_position: mean position error (Euclidean distance) averaged over all the detected electrons within this test image.
  '''
  truth = []
  for value in range(1, 1+int(groundtruth.max())):
    truth_ = np.array(np.where(groundtruth==value))
    for i in range(value):
      truth.append(truth_)
  truth = np.hstack(truth).T

  predicted_coords = []
  for value in range(1, 1 + int(predicted.max())):
    coords = np.array(np.where(predicted==value))
    for i in range(value):
      predicted_coords.append(coords)
  predicted_coords = np.hstack(predicted_coords).T

  nume = np.sum(groundtruth) # real total number of electron

  deviations= pos_deviation(predicted_coords, truth, 6) # square root distance between the ground truth and predicted position for each electron

  tp= np.where(deviations<=tolerance)[0].shape[0] # get the true positives, which have the error no larger than "tolerance" pixels
  precision= tp/deviations.shape[0]
  recall = tp/nume

  f1 = 2*((precision*recall)/(precision + recall))

  dce= len(deviations)/np.sum(groundtruth) # dector conversion efficiency

  mae_position = deviations.mean()
  print('Recall', recall, 'Precision', precision, 'F1', f1, 'DCE', dce, 'MAE of positions', mae_position)
  return recall, precision, f1, dce,mae_position

# counting function using neural network

In [4]:
# load the model
# add map_location = 'cpu' when running with on CPU
model = torch.load(os.path.dirname(CountingNN.__file__) + '/modelweights/model_200kV_final.pt', map_location=torch.device('cpu'))

from CountingNN.locator import Locator

def fastrcnn_predict(model, arr, device, process_stride, **kwargs):
  """
  Implements Faster R-CNN on a single image to detect boxes for electron events,
  then use finding maximum to assign the entry positions

  Args:
      model: the loaded fast rcnn model
      arr: array of a single image, shape [H,W]
      device: torch.device('cpu') or torch.device('cuda')
      process_stride: divide the image into pieces when applying the fast rcnn, recommend between 32 and 64.
      meanADU: optional float for mean intensity per electron (ADU), if none, will use default 241 for 200kV.
      p_list: optional list of five multiplier for model tune, if none, will use default numbers: [6, 6, 1.3, 1.5, 23]
  """
  x = arr[None, ...]
  # device =  torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
  counting = Locator(model, device, process_stride, 'max', 30, None, 'dynamic_window', meanADU = kwargs.get('meanADU'), p_list=kwargs.get('p_list'))
  filtered, event_sizes =  counting.predict_sequence(x)
  filtered = filtered[0]

  return filtered


## Run inference and evaluation

In [36]:
example = np.load('/content/Stack016.npz')['X'][0,0]
gt = np.load('/content/Stack016.npz')['y'][0,0]
# Load your own image array instead
example.shape, gt.shape

((256, 256), (256, 256))

In [37]:
model_counted = fastrcnn_predict(model, example, device= torch.device('cpu'), process_stride=64)

In [39]:
end2end_evaluation(gt, model_counted, tolerance=1)

Recall 0.8203791469194313 Precision 0.7958620689655173 F1 0.8079346557759626 DCE 1.0308056872037914 MAE of positions 0.7154261507604853


(0.8203791469194313,
 0.7958620689655173,
 0.8079346557759626,
 1.0308056872037914,
 0.7154261507604853)

# counting function using Connected component analysis

In [None]:
import numpy as np
from scipy.ndimage import maximum_position
from scipy.ndimage import label

def counting_filter_max(arr, threshold=20, structure = np.ones((3,3))):
  """
  Implements CCA on a single image to detect blobs,
  then use finding maximum to assign the entry positions

  Args:
      arr: array of a single image, shape [H,W]
      threshold: dark noise thresholding
  """
  image_binary = arr > threshold
  all_labels, num = label(image_binary, structure = np.ones((3,3)))
  m=np.ones(shape=all_labels.shape)
  obj = maximum_position(arr, all_labels, range(1,num))
  obj = np.rint(obj).astype(int)
  x = np.zeros(shape=np.shape(arr))
  x[obj[:,0],obj[:,1]]=1

  return x

# Parallel processing

By using dask, you can create parallel tasks with multiple CPU cores or GPU cores(Dask-Cuda). Just map those counting functions for a lazy signal.