In [None]:
import sys
sys.path.append("../lib/")

from pyzbar import pyzbar
import time
from pathlib import Path
import os
import cv2
import numpy as np
from TIS import TIS
from SignalEmitter import SignalEmitter
from BarcodeScanResult import BarcodeScanResult
from EvaluationUtils import EvaluationUtils
import pandas as pd
from multiprocessing import Process

In [None]:
# Number of frames to be collected
frames = 50

# Number of barcodes expected
expected = 2

# Expected barcode content
# This is used to count the number of correctly detected barcodes
barcode_content = ["6009800461091", "*25X20X4*"]

# Color used to highlight barcode
dark_blue = (216,118,0,255)

In [None]:
def capture_frames(Tis, freq, exposure, camera_gain, store_frames=False):
    
    total_correct = 0
    total_detected = 0
    
    # Array to cache the frames captured
    images_cache = []
    
    # Set camera specific settings, such as exposure and gain - has to be done before each capture 
    # to ensure settings are correct, even if the camera lost the connection between captures
    # The settings are stored in a dictionary called camera_settings
    Tis.Set_Property("Trigger Mode", False)
    Tis.Set_Property("Exposure Auto",False)
    Tis.Set_Property("Exposure", exposure)
    Tis.Set_Property("Gain Auto",False)
    Tis.Set_Property("Gain", float(camera_gain))
    
    # Sleep to make sure the settings have been applied before the collection starts
    time.sleep(3)
    
    try:
        count = 0
        while count != frames:
            
            # Snap an image with one second timeout
            if Tis.Snap_image(1) is True:  
                
                # Get the image. It is a numpy array
                image = Tis.Get_image()
                
                # Extract the barcodes from the frame
                results = pyzbar.decode(image)
                number_of_detected = len(results)
                
                # Cache the captured frame to store it later
                if store_frames:
                    images_cache.append(image)
                    
                if number_of_detected > 0:
                    total_detected += number_of_detected
                    
                    # Iterate over all detected barcodes
                    for result in results:
                        
                        # Check if the barcodes have been detected correctly
                        if result.data.decode("utf-8") in barcode_content:
                            total_correct += 1
                            
                        # Draw a rectangle around the detected barcode for visualization purposes
                        (x, y, w, h) = result.rect
                        cv2.rectangle(image, (x - 7, y - 7), (x + w + 15, y + h + 15), dark_blue, 8)

                count += 1

    finally:
        
        # If the flag store_frames is set to true, store the cached frames
        # WARNING: Depending on the number of frames collected, this can consume a lot of space
        if store_frames:
            
            # Path where to store the captured frames - if folder does not exist, create it
            frame_storage_path = f"/home/code/data/barcode_scanner/{freq}/{exposure}/{camera_gain}"
            Path(frame_storage_path).mkdir(parents=True, exist_ok=True)
            
            # Iterate over the cached frames to store them at the previously defined path
            for indx, img in enumerate(images_cache):
                cv2.imwrite(f"{frame_storage_path}/frame_{indx}.png", img)
                
        return BarcodeScanResult(freq, exposure, camera_gain, total_detected, total_correct, frames, expected)

In [None]:
def start_attack_signal(freq, sample_rate=1, sdr_gain=31, duration=3600):
    # Separate Process that handles the USRP/emission of the attack signal 
    emit_sig = Process(name="emit", target=SignalEmitter.emit_random_noise, args=(freq, sample_rate, sdr_gain, duration))
    
    # Start the transmission of the attack signal
    emit_sig.start()
    
    # Sleep for 2 seconds to make sure the USRP is emitting a signal before the script continues
    time.sleep(2)

    # Return the process
    return emit_sig

In [None]:
def run_barcode_scanner_evaluation(freq, store_frames, file_name):

    results = []

    # Set of exposure settings to be tested
    exposures = [20000, 28000, 33000]
    # Set of camera gains to be tested
    camera_gains = [0.0, 4.67, 6.67, 8.67]
    
    # If frequency is above 0, an attack signal is emitted
    if freq > 0:
        usrp_process = start_attack_signal(freq)
    
    try:
        # Open the camera
        Tis = TIS.TIS()
        Tis.openDevice("30610365", 1280, 960, 30, TIS.SinkFormats.BGRA, False)

        # Stop the pipeline first to make sure the camera is not blocked
        Tis.Stop_pipeline()
        Tis.Start_pipeline()  # Start the pipeline so the camera streams

        # Iterate over all defined exposure values
        for exposure in exposures:
            # Iterate over all camera gain values for each exposure value
            for camera_gain in camera_gains:
                results.append(capture_frames(Tis, freq, exposure, camera_gain, store_frames))

    # Finally, to ensure the camera pipeline is stopped to prevent a deadlock
    finally:
        
        # Save results
        EvaluationUtils.save_results(file_name, results)
        
        # Stop the pipeline and clean up
        Tis.Stop_pipeline()
        
        # Stop emitting the attack signal 
        if freq > 0:
            usrp_process.terminate()

In [None]:
##### RUN EVALUATION #######

# Carrier frequency in MHz
fc = 190

# Name and path of the file in which the results are stored
file_name = "/home/code/data/barcode_scanner/barcodescanner_results.csv"

# First, run the barcode scanner without an attack signal
# fc = 0 to signal that no attack is executed
run_barcode_scanner_evaluation(0, True, file_name)

# Second, run the barcode scanner with an attack signal at fc
run_barcode_scanner_evaluation(fc, True, file_name)