# Dependencies

In [None]:
pip install pycryptodome



In [None]:
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
from time import time
from typing import List
import json
import enum
import hashlib
import copy
import sys

def hashStr(str_in):
  str_enc = str_in.encode()
  raw_hash = hashlib.sha256(str_enc)
  hex_hash = raw_hash.hexdigest()
  return hex_hash

def merkleRec(hashes):
  if len(hashes) == 0:
    return None
  if len(hashes) == 1:
    return hashStr(hashes[0])
  if len(hashes) == 2:
    return hashStr(hashes[0] + hashes[1])
  size = len(hashes)
  above_layer = []
  for i in range(0, size-1, 2):
    hash = hashStr(hashes[i] + hashes[i+1])
    above_layer.append(hash)
  if size % 2 == 1:
    above_layer.append(hashStr(hashes[-1]))
  return merkleRec(above_layer)

def merkleTree(l):
  strs = list(map(str, l))
  strs.sort()
  return merkleRec(strs)

# Owner

In [None]:
'''
Owner is any entity that can own an IC. This class constructs an owner object with all relevant information to participate in the ESC. 
'''
class Owner(object):
  def __init__(self, address: str, key: RSA.RsaKey):
      super().__init__
      self.address = address
      self.privateKey = key.export_key().hex()
      self.publicKey  = key.publickey().export_key().hex()
  
  def to_json_string(self):
    return json.dumps(self.__dict__, sort_keys = True)

  def encryptICkey(self, ICkey):
    pubKey = RSA.import_key(bytes.fromhex(self.publicKey))
    cipher_rsa = PKCS1_OAEP.new(pubKey)
    return cipher_rsa.encrypt(bytes.fromhex(ICkey)).hex()

  def decryptICkey(self, encICkey):
    privKey = RSA.import_key(bytes.fromhex(self.privateKey))
    cipher_rsa = PKCS1_OAEP.new(privKey)
    return cipher_rsa.decrypt(bytes.fromhex(encICkey)).hex()

  def ICkeySwap(self, encICkey, newOwner):
    ICkey = self.decryptICkey(encICkey)
    cipher_rsa = PKCS1_OAEP.new(RSA.import_key(bytes.fromhex(newOwner.publicKey)))
    return  cipher_rsa.encrypt(bytes.fromhex(ICkey)).hex()

  def signMessage(self, message):  
    key = RSA.import_key(bytes.fromhex(self.privateKey))
    h = SHA256.new(bytes.fromhex(message))
    signature = pkcs1_15.new(key).sign(h)
    return signature.hex()

  def verifySign(self, message, signature):
    key = RSA.import_key(bytes.fromhex(self.publicKey))
    h = SHA256.new(bytes.fromhex(message))
    try:
        pkcs1_15.new(key).verify(h, bytes.fromhex(signature))
        return True
    except (ValueError, TypeError):
        return False


'''
This class constructs the public image of the owner, how it is stored in the ICtracker Database. 
'''
class OwnerPublic(object):
  def __init__(self, Owner):
      self.address = Owner.address
      self.publicKey = Owner.publicKey
  
  def to_json_string(self):
    return json.dumps(self.__dict__, sort_keys = True)

  def verifySign(self, message, signature):
    key = RSA.import_key(bytes.fromhex(self.publicKey))
    h = SHA256.new(bytes.fromhex(message))
    try:
        pkcs1_15.new(key).verify(h, bytes.fromhex(signature))
        return True
    except (ValueError, TypeError):
        return False

In [None]:
ownr1 = Owner('A Devices', RSA.generate(2048))
ownr1Pub = OwnerPublic(ownr1)
ownr2 = Owner('IC Corp', RSA.generate(2048))
ownr2Pub = OwnerPublic(ownr2)

ICkey = get_random_bytes(16).hex()
ICkeyHash = hashStr(ICkey)

encICkey1 = ownr1.encryptICkey(ICkey)
decICkey1 = ownr1.decryptICkey(encICkey1)

encICkey2 = ownr1.ICkeySwap(encICkey1, ownr2Pub)
decICkey2 = ownr2.decryptICkey(encICkey2)
decICkey2 == ICkey

True

In [None]:
len(ICkeyHash)*4

256

In [None]:
len(RSA.generate(2048).export_key().hex())

3348

In [None]:
len(RSA.generate(2048).publickey().export_key().hex())

900

In [None]:
message = get_random_bytes(16).hex()
signature = ownr1.signMessage(message)

ownr1Pub.verifySign(message, signature)

True

# ICtoken

In [None]:
'''
Contains all information unique to the IC that is stored as metadata for the icToken.
'''

class ICmetaData(object):
  def __init__(self, ECID, PID, SID, netHash, prevIdx, stage = None, status = None):
    self.ECID    = ECID
    self.PID     = PID
    self.SID     = SID
    self.netHash = netHash
    self.stage   = stage
    self.status  = status
    
    self.prevIdx = prevIdx
    self.version = 0

  def to_json_string(self):
    metaDataDict = self.__dict__.copy()
    metaDataDict.pop("version")
    metaDataDict.pop("prevIdx")
    return json.dumps(metaDataDict, sort_keys = True)


'''
The logic locking key (IC key) object. Stores the encryption of the IC key under 
the current owner's public key and a hash/digest of the plaint text key to ensure 
it is not changed along the way.
'''

class ICkeyPublic(object):
  def __init__(self, ICkey, owner: Owner):
    super().__init__()
    self.keyHash = hashStr(ICkey)
    self.keyEncr = owner.encryptICkey(ICkey)
      
  def to_json_string(self):
    return json.dumps(self.__dict__, sort_keys = True)

"""
An enumeration of the possible stages that a chip could be in
"""
class Stage(enum.Enum):
  fabrication = 1
  pcb_assembly = 2
  system_integrator = 3
  end_user = 4


class ICtoken(object):
  def __init__(self, metaData: ICmetaData, ICkeyPub: ICkeyPublic, ownerPub: OwnerPublic):

    self.metaData  = metaData    # all info unique to the IC
    self.ICkey     = ICkeyPub    # public image of the logic locking key: 1) hash digest for ID and 2) encrypted key
    self.owner     = ownerPub    # public image of the owner: 1) address and 2) Public Key

    icTokenStr  = self.metaData.to_json_string()
    icTokenStr += self.ICkey.to_json_string()
    icTokenStr += self.owner.to_json_string()

    self.trnsaxnID = hashStr(icTokenStr) # Indetifier for the transaction
    self.signature = bytes(6).hex()      # owner's signature for validation of the transaction

  def to_json_string(self):
    ICtokenDict = self.__dict__.copy()
    strOut      = self.metaData.to_json_string()
    ICtokenDict.pop("metaData")
    strOut     += self.ICkey.to_json_string()
    ICtokenDict.pop("ICkey")
    strOut     += self.owner.to_json_string()
    ICtokenDict.pop("owner")
    return strOut + json.dumps(ICtokenDict, sort_keys = True)

#Wallet

In [None]:
class Wallet(object):
  def __init__(self, owner: Owner):
    self.owner      = owner
    self.ICdb       = {}
  
  def enrollIC(self, ECID, ICkey):
    metaData  = ICmetaData(ECID, None, None,None, None, 
                           Stage.fabrication.value, 0)
    icToken   = ICtoken(metaData, 
                        ICkeyPublic(ICkey, self.owner), 
                        OwnerPublic(self.owner))
    icToken.signature  = self.owner.signMessage(icToken.trnsaxnID)
    return icToken

  def updateStage(self, ECID, stage, status):
    if ECID not in self.ICdb:
      print("Error: No IC with given ECID in Wallet")
      return None
    prevICtoken = copy.deepcopy(self.ICdb[ECID].ICtoken)
    prevICtoken.metaData.stage  = stage
    prevICtoken.metaData.status = status
    icToken     = ICtoken(prevICtoken.metaData,
                          prevICtoken.ICkey, 
                          prevICtoken.owner)
    icToken.signature = self.owner.signMessage(icToken.trnsaxnID)
    return icToken

  def updatePIDorSID(self, ECIDs, updatePID):

    merkleHash = merkleTree(ECIDs)

    icTokens = []
    owner_pub = None
    for ECID in ECIDs:
      if ECID not in self.ICdb:
        print("Error: No IC with given ECID in Wallet")
        return None  
      prevICtoken = copy.deepcopy(self.ICdb[ECID].ICtoken)
      if updatePID:
        prevICtoken.metaData.PID = merkleHash
      else:
        prevICtoken.metaData.SID = merkleHash
      icToken   = ICtoken(prevICtoken.metaData,
                          prevICtoken.ICkey, 
                          prevICtoken.owner)
      owner_pub = prevICtoken.owner
      icToken.signature = self.owner.signMessage(icToken.trnsaxnID)
      icTokens.append(icToken)
    return icTokens
  
  def transferIC(self, ECID, newOwner: OwnerPublic):
    if ECID not in self.ICdb:
      print("Error: No IC with this ECID# in Wallet")
      return None
    prevICtoken       = copy.deepcopy(self.ICdb[ECID].ICtoken)
    newKey            = copy.deepcopy(prevICtoken.ICkey)
    newKey.keyEncr     = self.owner.ICkeySwap(newKey.keyEncr, newOwner)
    icToken           = ICtoken(prevICtoken.metaData, newKey, newOwner)
    icToken.signature = self.owner.signMessage(icToken.trnsaxnID)
    return icToken

  def updateICdb(self, icTracker):
    try:
      self.ICdb = icTracker.ICdb[self.owner.address]
      return self.ICdb
    except (ValueError, TypeError, AttributeError):
      return None

# Blockchain

In [None]:
class Block(object):
  def __init__(self, metaData: ICmetaData, ICkey: ICkeyPublic, owner: OwnerPublic, index = None, prevHash = None):

    # Block Header
    self.ICtoken = ICtoken(metaData, ICkey, owner)

    # Blockchain attributes
    self.index     = index
    self.prevHash  = prevHash
    self.timestamp = time()
  
  def to_json_string(self):
    blockDict = self.__dict__.copy()
    strOut  = self.ICtoken.to_json_string()
    blockDict.pop("ICtoken")
    return strOut + json.dumps(blockDict, sort_keys = True)

In [None]:
def genGenesisBlock():
  genesis_metaD = ICmetaData(bytes(4).hex(), None, None, None, None)
  genesis_owner = Owner('No(o)ne', RSA.generate(2048))
  genesis_ICkey = ICkeyPublic(bytes(4).hex(), genesis_owner)
  genesis_ICkey.keyHash = bytes(4).hex()
  genesis_ICkey.keyEncr = bytes(4).hex()
  genesis_owner = OwnerPublic(genesis_owner)
  genesis_owner.publicKey = bytes(4).hex()
  return Block(genesis_metaD, genesis_ICkey, genesis_owner, 0)

"""
The basic blockchain structure
"""
class Blockchain(object):
  def __init__(self):
    self.chain = []
    genesis_block = genGenesisBlock()
    self.chain.append(genesis_block)

  @property
  def last_block(self):
    return self.chain[-1]


  """
  Adds a new block to the blockchain

  return: returns the block added or None on failures
  """
  def new_block(self, block : Block):
    # Check that we only add Block objects
    if not type(block) is Block:
      print('Error: attempted to add non-Block object')
      return None

    blockCopy = copy.deepcopy(block)

    # Gets hash of the previous block
    blockCopy.prevHash = hashStr(self.last_block.to_json_string())
    # Set the index of this block
    blockCopy.index = len(self.chain)
    # Set the timestamp of this block on the blockchain
    blockCopy.timestamp = time()

    self.chain.append(blockCopy)
    return blockCopy


  """
  Formats the contents of the blockchain in a pretty string
  """
  def chain_to_string(self):
    res = '['
    for block in self.chain:
      res += '\n\t' + block.to_json_string() + ''
    res += '\n]'
    return res

# ICtracker

In [None]:
"""
Uses the blockchain to implement ICtracker
"""
class ICtracker(object):
  def __init__(self):
    self.blockchain = Blockchain()
    self.ownersdb = {}
    self.ecid_to_index = {}
    self.ICdb = {}

  """
  Gets the most current block with a given ECID
  """
  def get_ECID_block(self, ECID):
    if ECID in self.ecid_to_index:
      index = self.ecid_to_index[ECID]
      return self.blockchain.chain[index], index
    return None

  """
  Gets the histroy of an ECID on blockchain
  """
  def get_ECID_history(self, ECID):
    if ECID not in self.ecid_to_index:
      return None
    history = []
    index = self.ecid_to_index[ECID]
    block = self.blockchain.chain[index]
    blockCopy = copy.deepcopy(block)
    history.append(blockCopy)
    while block.ICtoken.metaData.prevIdx is not None:
      block = self.blockchain.chain[block.ICtoken.metaData.prevIdx]
      blockCopy = copy.deepcopy(block)
      history.append(blockCopy)
    return history

  def enrollOwner(self, owner: OwnerPublic):
    if owner.address in self.ownersdb:
      print('Error: Owner already enrolled')
      return None
    self.ownersdb[owner.address] = owner
    self.ICdb[owner.address] = {}
    return True


  def verifyTransaxn(self, owner: OwnerPublic, icToken = None):
    if owner.address not in self.ownersdb:
      print('Error: Owner not authorized')
      return None
    if icToken is not None:
      if not self.ownersdb[owner.address].verifySign(icToken.trnsaxnID, icToken.signature):
        print('Error: Owner verification failed')
        return None
    return owner.address


  def enrollIC(self, icToken: ICtoken):

    if icToken.metaData.ECID in self.ecid_to_index:
      print('Error: cannot enroll an already enrolled ECID')
      return None

    ownerID = self.verifyTransaxn(icToken.owner, icToken)
    if ownerID is None:
      print('Error: unable to verify owner')
      return None

    PIDcheck    = (icToken.metaData.PID == None)
    SIDcheck    = (icToken.metaData.SID == None)
    stageChk    = (icToken.metaData.stage == Stage.fabrication.value)
    statusChk   = (icToken.metaData.status  == 0)
    prevIdxChk  = (icToken.metaData.prevIdx == None)
    versionChk  = (icToken.metaData.version == 0)
    if (PIDcheck + SIDcheck + stageChk + statusChk + prevIdxChk + versionChk) < 6:
      print('Error: incorrect metadata for Enrollment')
      return None

    metaDataCopy = copy.deepcopy(icToken.metaData)
    ICkeyCopy    = copy.deepcopy(icToken.ICkey)
    ownerCopy    = copy.deepcopy(icToken.owner)
    block        = Block(metaDataCopy, ICkeyCopy, ownerCopy)
    block.ICtoken.trnsaxnID = copy.deepcopy(icToken.trnsaxnID)
    block.ICtoken.signature = copy.deepcopy(icToken.signature)

    res = self.blockchain.new_block(block)

    if res is None:
      print('Error: failed adding block')
      return None

    self.ecid_to_index[res.ICtoken.metaData.ECID] = res.index
    self.ICdb[ownerID][res.ICtoken.metaData.ECID] = res

    return res

  def updateStage(self, icToken: ICtoken):

    if icToken.metaData.ECID not in self.ecid_to_index:
      print('Error: chip not enrolled!')
      return None

    prevBlock = self.get_ECID_block(icToken.metaData.ECID)[0]
    
    ownerID = self.verifyTransaxn(prevBlock.ICtoken.owner, icToken)
    if ownerID is None:
      return None

    if  icToken.metaData.stage <= prevBlock.ICtoken.metaData.stage:
      if  icToken.metaData.status <= prevBlock.ICtoken.metaData.status:
        print('Error: Can\'t rollback stage!')
        return None

    newBlock = copy.deepcopy(prevBlock)
    newBlock.ICtoken.metaData.stage    =  icToken.metaData.stage
    newBlock.ICtoken.metaData.status   =  icToken.metaData.status
    newBlock.ICtoken.metaData.prevIdx  =  self.ecid_to_index[icToken.metaData.ECID]
    newBlock.ICtoken.metaData.version +=  1 
    newBlock.ICtoken.trnsaxnID = icToken.trnsaxnID
    newBlock.ICtoken.signature = icToken.signature
    
    res = self.blockchain.new_block(newBlock)
    if res is None:
      print('Error: failed adding block')

    self.ecid_to_index[res.ICtoken.metaData.ECID] = res.index
    self.ICdb[ownerID][res.ICtoken.metaData.ECID] = res
    
    return res

  def updatePIDorSID(self, icTokens: list):
    can_add_PID = True
    can_add_SID = True
    ECIDs = []
    for icToken in icTokens:
      metaData = icToken.metaData
      ICkeyPublic = icToken.ICkey
      ECID = metaData.ECID
    
      if icToken.metaData.ECID not in self.ecid_to_index:
        print('Error: chip not enrolled!')
        return None
      ECIDs.append(ECID)

      prevICtoken = self.get_ECID_block(ECID)[0].ICtoken
      
      ownerID = self.verifyTransaxn(prevICtoken.owner, icToken)
      if ownerID is None:
        return None

      if prevICtoken.metaData.stage != Stage.pcb_assembly.value or not prevICtoken.metaData.PID is None:
        can_add_PID = False
      if prevICtoken.metaData.stage != Stage.system_integrator.value or not prevICtoken.metaData.SID is None:
        can_add_SID = False
    
    if not (can_add_PID or can_add_SID):
      print('Error: cannot add PID or SID')
      return None

    output_hash = merkleTree(ECIDs)

    res_list = []
    for icToken in icTokens:
      prevBlock = self.get_ECID_block(icToken.metaData.ECID)[0]
      newBlock = copy.deepcopy(prevBlock)
      if can_add_PID:
        newBlock.ICtoken.metaData.PID = output_hash
      else:
        newBlock.ICtoken.metaData.SID = output_hash
      newBlock.ICtoken.metaData.prevIdx  = self.ecid_to_index[icToken.metaData.ECID]
      newBlock.ICtoken.metaData.version += 1
      newBlock.ICtoken.trnsaxnID = icToken.trnsaxnID
      newBlock.ICtoken.signature = icToken.signature
      res = self.blockchain.new_block(newBlock)
      if res is None:
        print('Error: failed adding block')
      self.ecid_to_index[res.ICtoken.metaData.ECID] = res.index
      self.ICdb[ownerID][res.ICtoken.metaData.ECID] = res
      res_list.append(res)
    return res_list

  def transferIC(self, icToken: dict):

    if icToken.metaData.ECID not in self.ecid_to_index:
      print('Error: chip not enrolled!')
      return None
    
    newOwnerID  = self.verifyTransaxn(icToken.owner)
    if newOwnerID is None:
      print("\tNew Owner not autherized!")
      return None

    prevBlock = self.get_ECID_block(icToken.metaData.ECID)[0]
    oldOwnerID = self.verifyTransaxn(prevBlock.ICtoken.owner, icToken)
    if oldOwnerID is None:
      print('\toldOwner verification failed')
      return None

    newBlock = copy.deepcopy(prevBlock)
    newBlock.ICtoken.metaData.prevIdx  = self.ecid_to_index[icToken.metaData.ECID]
    newBlock.ICtoken.metaData.version += 1
    newBlock.ICtoken.owner     = icToken.owner
    newBlock.ICtoken.ICkey     = icToken.ICkey
    newBlock.ICtoken.trnsaxnID = icToken.trnsaxnID
    newBlock.ICtoken.signature = icToken.signature
    res = self.blockchain.new_block(newBlock)
    if res is None:
      print('Error: failed adding block')

    self.ecid_to_index[res.ICtoken.metaData.ECID] = res.index
    self.ICdb[oldOwnerID].pop(res.ICtoken.metaData.ECID)
    self.ICdb[newOwnerID][res.ICtoken.metaData.ECID] = res
    return res

In [None]:
def genRandECID(numBytes):
  return get_random_bytes(numBytes).hex()

In [None]:
icTracker = ICtracker()
wallet1 = Wallet(ownr1)

# Testing Enroll owner
icTracker.enrollOwner(ownr1Pub)

True

In [None]:
# generating random ECID and ICkey for testing

ECID1      = genRandECID(6)
ICkey1     = genRandECID(6)

In [None]:
# Testing Enrollment of new IC in wallet

icToken1 = wallet1.enrollIC(ECID1, ICkey1)

In [None]:
sys.getsizeof(icToken1)

64

In [None]:
len(icToken1.ICkey.keyEncr)/2

256.0

In [None]:
# Testing Enrollment of new IC in ICtracker

res = icTracker.enrollIC(icToken1)

print("Owner1's ICtokens :\n", wallet1.updateICdb(icTracker))

Owner1's ICtokens :
 {'93838f111967': <__main__.Block object at 0x7fdb530c8190>}


In [None]:
# Testing transfer of IC to unenrolled Owner

icToken2 = wallet1.transferIC(icToken1.metaData.ECID, ownr2Pub)

res = icTracker.transferIC(icToken2)

Error: Owner not authorized
	New Owner not autherized!


In [None]:
# Enrolling new owner into ICtracker

wallet2 = Wallet(ownr2)

icTracker.enrollOwner(ownr2Pub)

True

In [None]:
# Testing transfer of IC to an enrolled Owner

res = icTracker.transferIC(icToken2)

print("Owner1's ICtokens :\n", wallet1.updateICdb(icTracker))
print("\nOwner2's ICtokens :\n",wallet2.updateICdb(icTracker))

Owner1's ICtokens :
 {}

Owner2's ICtokens :
 {'93838f111967': <__main__.Block object at 0x7fdb530cbed0>}


#Testing

In [None]:
from numpy import random

# Test Params: number of total users and number of total chips
num_ownersdb = 2
num_chips = 100

# Our own mapping of ownersdb to data for testing purposes
test_ownersdb = {}

# Our own mapping of ECIDs to current owner for testing purposes
test_ecid_to_data = {}


def verify_blockchain_correctness(ownersdb, ecid_to_data, supply_chain):
  for ecid in ecid_to_data:
    block, idx = supply_chain.get_ECID_block(ecid)
    if block is None:
      assert False, ('ERROR: Did not find block with ECID ' + str(ecid) + ' in blockchain')
    test_owner_index = ecid_to_data[ecid]['owner_num']
    
    # Assert owner is correct
    assert ownersdb[test_owner_index]['owner_pub'].address == block.ICtoken.owner.address, ('ERROR: Incorrect address: ECID ' + str(ecid) + ' is owned by ' + ownersdb[test_owner_index]['owner_pub'].address + ', but blockchain has ' + block.ICtoken.owner.address)
    assert ownersdb[test_owner_index]['owner_pub'].publicKey == block.ICtoken.owner.publicKey, ('ERROR: Incorrect Public Key: ECID ' + str(ecid) + ' is owned by ' + ownersdb[test_owner_index]['owner_pub'].address + ', but blockchain has ' + block.ICtoken.owner.address)

    # Assert metadata is correct
    assert ecid_to_data[ecid]['SID'] == block.ICtoken.metaData.SID, ('ERROR: ECID ' + str(ecid) + ' has SID ' + str(ecid_to_data[ecid]['SID']) + ', but blockchain has ' + str(block.ICtoken.metaData.SID))
    assert ecid_to_data[ecid]['PID'] == block.ICtoken.metaData.PID, ('ERROR: ECID ' + str(ecid) + ' has PID ' + str(ecid_to_data[ecid]['PID']) + ', but blockchain has ' + str(block.ICtoken.metaData.PID))
    assert ecid_to_data[ecid]['stage'] == block.ICtoken.metaData.stage, ('ERROR: ECID ' + str(ecid) + ' has stage ' + str(ecid_to_data[ecid]['stage']) + ', but blockchain has ' + str(block.ICtoken.metaData.stage))
    assert ecid_to_data[ecid]['status'] == block.ICtoken.metaData.status, ('ERROR: ECID ' + str(ecid) + ' has status ' + str(ecid_to_data[ecid]['status']) + ', but blockchain has ' + str(block.ICtoken.metaData.status))
    assert ecid_to_data[ecid]['version'] == block.ICtoken.metaData.version, ('ERROR: ECID ' + str(ecid) + ' has version ' + str(ecid_to_data[ecid]['version']) + ', but blockchain has ' + str(block.ICtoken.metaData.version))

# Initialize ICtracker
icTracker = ICtracker()

# Init ownersdb
for i in range(num_ownersdb):
  ownr = Owner('Owner ' + str(i), RSA.generate(2048))
  ownr_pub = OwnerPublic(ownr)
  wallet = Wallet(ownr)
  test_ownersdb[i] = {'owner': ownr, 'owner_pub': ownr_pub, 'wallet': wallet}
  icTracker.enrollOwner(ownr_pub)

# Init Chips, Assign to ownersdb, and ownersdb Enroll Chips to Blockchain
for i in range(num_chips):
  ownr_num = random.randint(0, num_ownersdb)
  ownr = test_ownersdb[ownr_num]['owner']
  ownr_pub = test_ownersdb[ownr_num]['owner_pub']
  ownr_wallet = test_ownersdb[ownr_num]['wallet']
  logic_lock_key = get_random_bytes(16).hex()
  icToken = ownr_wallet.enrollIC(i, logic_lock_key)
  res = icTracker.enrollIC(icToken)
  ownr_wallet.updateICdb(icTracker)
  test_ecid_to_data[i] = {'owner_num': ownr_num,
                          'stage': Stage.fabrication.value,
                          'status': 0,
                          'version': 0,
                          'PID': None,
                          'SID': None}

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print("Enrolling Chips Successful\n\n")


Enrolling Chips Successful




In [None]:
# Update a Chip Stage
ownr_num = random.randint(0, num_ownersdb)
owned_chips_cnt = len(test_ownersdb[ownr_num]['wallet'].ICdb)
while owned_chips_cnt == 0:
  ownr_num = random.randint(0, num_ownersdb)
  owned_chips_cnt = len(test_ownersdb[ownr_num]['wallet'].ICdb)

ownr_wallet = test_ownersdb[ownr_num]['wallet']
ecid_of_chip_to_update = 0
for x in ownr_wallet.ICdb:
  ecid_of_chip_to_update = x
  break
icToken = ownr_wallet.updateStage(ecid_of_chip_to_update, Stage.fabrication.value, 1)
res = icTracker.updateStage(icToken)
ownr_wallet.updateICdb(icTracker)
test_ecid_to_data[ecid_of_chip_to_update]['status'] = 1
test_ecid_to_data[ecid_of_chip_to_update]['version'] = 1

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print("Updating Single Chip Stage Successful\n\n")


Updating Single Chip Stage Successful




In [None]:
# Update Stage of Multiple Chips
ownr_num = random.randint(0, num_ownersdb)
owned_chips_cnt = len(test_ownersdb[ownr_num]['wallet'].ICdb)
while owned_chips_cnt == 0:
  ownr_num = random.randint(0, num_ownersdb)
  owned_chips_cnt = len(test_ownersdb[ownr_num]['wallet'].ICdb)

ownr_wallet = test_ownersdb[ownr_num]['wallet']
for x in ownr_wallet.ICdb:
  ecid_of_chip_to_update = x
  icToken = ownr_wallet.updateStage(ecid_of_chip_to_update, Stage.pcb_assembly.value, 0)
  res = icTracker.updateStage(icToken)
  ownr_wallet.updateICdb(icTracker)
  test_ecid_to_data[ecid_of_chip_to_update]['stage'] = Stage.pcb_assembly.value
  test_ecid_to_data[ecid_of_chip_to_update]['status'] = 0
  test_ecid_to_data[ecid_of_chip_to_update]['version'] += 1

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print("Updating Stage of Multiple Chips Successful\n\n")

Updating Stage of Multiple Chips Successful




In [None]:
# Add PID to Chips in PCB Assembly Stage
ecid_list = []

for x in ownr_wallet.ICdb:
  ecid_of_chip_to_update = x
  ecid_list.append(ecid_of_chip_to_update)

pid_hash = merkleTree(ecid_list)

for x in ownr_wallet.ICdb:
  test_ecid_to_data[x]['PID'] = pid_hash
  test_ecid_to_data[x]['version'] += 1

icToken = ownr_wallet.updatePIDorSID(ecid_list, True)
res_list = icTracker.updatePIDorSID(icToken)
for res in res_list:
  ownr_wallet.updateICdb(icTracker)

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print("Updating PID Successful\n\n")

Updating PID Successful




In [None]:
# Moving Back Stage Unsuccessful
ownr_wallet = test_ownersdb[ownr_num]['wallet']
for x in ownr_wallet.ICdb:
  ecid_of_chip_to_update = x
  icToken = ownr_wallet.updateStage(ecid_of_chip_to_update, Stage.fabrication.value, 0)
  res = icTracker.updateStage(icToken)
  ownr_wallet.updateICdb(icTracker)

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print("Verified Moving Back Stage Unsuccessful\n\n")


Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't rollback stage!
Error: Can't r

In [None]:
# Adding PID to Block with Existing PID Unsuccessful
icToken = ownr_wallet.updatePIDorSID(ecid_list, True)
res_list = icTracker.updatePIDorSID(icToken)
assert res_list is None 

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print("Verified Updating PID Unsuccessful\n\n")


Error: cannot add PID or SID
Verified Updating PID Unsuccessful




In [None]:
# Check the history of blocks
for x in ownr_wallet.ICdb:
  history = icTracker.get_ECID_history(x)
  assert len(history)-1 == test_ecid_to_data[x]['version']

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print("Check History of ECID with Version\n\n")


Check History of ECID with Version




In [None]:
# Move blocks to system integration
for x in ownr_wallet.ICdb:
  ecid_of_chip_to_update = x
  icToken = ownr_wallet.updateStage(ecid_of_chip_to_update, Stage.system_integrator.value, 0)
  res = icTracker.updateStage(icToken)
  ownr_wallet.updateICdb(icTracker)
  test_ecid_to_data[ecid_of_chip_to_update]['stage'] = Stage.system_integrator.value
  test_ecid_to_data[ecid_of_chip_to_update]['status'] = 0
  test_ecid_to_data[ecid_of_chip_to_update]['version'] += 1

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print("Updating Stage of Multiple Chips Successful\n\n")


Updating Stage of Multiple Chips Successful




In [None]:
# Test Transferring

# First create a new user
ownr_new = Owner('Owner X', RSA.generate(2048))
ownr_new_pub = OwnerPublic(ownr_new)
wallet_new = Wallet(ownr_new)


# Transferring to unenrolled owner should fail
ecid_of_chip_to_update = 0
for x in ownr_wallet.ICdb:
  ecid_of_chip_to_update = x
  break
icToken = ownr_wallet.transferIC(x, ownr_new_pub)
res = icTracker.transferIC(icToken)
verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print('Verified Transferring to Unenrolled Owner Fails\n\n')

# Transferring to enrolled owner sould succeed
icTracker.enrollOwner(ownr_new_pub)
test_ownersdb[num_ownersdb] = {'owner': ownr_new, 'owner_pub': ownr_new_pub, 'wallet': wallet_new}
ICdb_copy = copy.deepcopy(ownr_wallet.ICdb)
for x in ICdb_copy:
  ecid_of_chip_to_update = x
  icToken = ownr_wallet.transferIC(ecid_of_chip_to_update, ownr_new_pub)
  res = icTracker.transferIC(icToken)
  ownr_wallet.updateICdb(icTracker)
  wallet_new.updateICdb(icTracker)
  test_ecid_to_data[x]['version'] += 1
  test_ecid_to_data[x]['owner_num'] = num_ownersdb

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print('Verified Transferring to Enrolled Owner Succeeds')


Error: Owner not authorized
	New Owner not autherized!
Verified Transferring to Unenrolled Owner Fails


Verified Transferring to Enrolled Owner Succeeds


In [None]:
# New owner gets enrolls more chips and combines all to add SID
for i in range(num_chips, num_chips+5):
  logic_lock_key = get_random_bytes(16).hex()
  icToken = wallet_new.enrollIC(i, logic_lock_key)
  res = icTracker.enrollIC(icToken)
  wallet_new.updateICdb(icTracker)
  test_ecid_to_data[i] = {'owner_num': num_ownersdb, 'stage': Stage.fabrication.value, 'status': 0, 'version': 0, 'PID': None, 'SID': None}

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print('Verified New Owner Adding More Chips\n\n')


# Gets these chips to PCB assembly
for x in range(num_chips, num_chips+5):
  ecid_of_chip_to_update = x
  icToken = wallet_new.updateStage(ecid_of_chip_to_update, Stage.pcb_assembly.value, 0)
  res = icTracker.updateStage(icToken)
  wallet_new.updateICdb(icTracker)
  test_ecid_to_data[ecid_of_chip_to_update]['stage'] = Stage.pcb_assembly.value
  test_ecid_to_data[ecid_of_chip_to_update]['status'] = 0
  test_ecid_to_data[ecid_of_chip_to_update]['version'] += 1

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print('Verified New Owner Updating Stage\n\n')


# Adds PID to new chips
ecid_list = []
for x in range(num_chips, num_chips+5):
  ecid_of_chip_to_update = x
  ecid_list.append(ecid_of_chip_to_update)

pid_hash = merkleTree(ecid_list)

for x in range(num_chips, num_chips+5):
  test_ecid_to_data[x]['PID'] = pid_hash
  test_ecid_to_data[x]['version'] += 1

icToken = wallet_new.updatePIDorSID(ecid_list, True)
res_list = icTracker.updatePIDorSID(icToken)
wallet_new.updateICdb(icTracker)

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print("Verified New Owner Adding PID Successful\n\n")


# Check adding SID currently unsuccessful
ecid_list = []
for x in range(num_chips, num_chips+5):
  ecid_of_chip_to_update = x
  ecid_list.append(ecid_of_chip_to_update)

pid_hash = merkleTree(ecid_list)

icToken = wallet_new.updatePIDorSID(ecid_list, False)
res_list = icTracker.updatePIDorSID(icToken)
wallet_new.updateICdb(icTracker)

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print("Verified Adding SID Unsuccessful in Fab Stage\n\n")

# Move to system integration stage
for x in range(num_chips, num_chips+5):
  ecid_of_chip_to_update = x
  icToken = wallet_new.updateStage(ecid_of_chip_to_update, Stage.system_integrator.value, 0)
  res = icTracker.updateStage(icToken)
  wallet_new.updateICdb(icTracker)
  test_ecid_to_data[ecid_of_chip_to_update]['stage'] = Stage.system_integrator.value
  test_ecid_to_data[ecid_of_chip_to_update]['status'] = 0
  test_ecid_to_data[ecid_of_chip_to_update]['version'] += 1

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print('Verified New Owner Updating Stage\n\n')


# New owner combines all chips in system integration
ecid_list = []
for x in wallet_new.ICdb:
  ecid_of_chip_to_update = x
  ecid_list.append(ecid_of_chip_to_update)

sid_hash = merkleTree(ecid_list)

for x in wallet_new.ICdb:
  test_ecid_to_data[x]['SID'] = sid_hash
  test_ecid_to_data[x]['version'] += 1

icToken = wallet_new.updatePIDorSID(ecid_list, False)
res_list = icTracker.updatePIDorSID(icToken)
wallet_new.updateICdb(icTracker)

verify_blockchain_correctness(test_ownersdb, test_ecid_to_data, icTracker)
print("Updating SID Successful\n\n")

Verified New Owner Adding More Chips


Verified New Owner Updating Stage


Verified New Owner Adding PID Successful


Error: cannot add PID or SID
Verified Adding SID Unsuccessful in Fab Stage


Verified New Owner Updating Stage


Updating SID Successful




In [None]:
print(icTracker.blockchain.chain_to_string())

[
	{"ECID": "00000000", "PID": null, "SID": null, "netHash": null, "stage": null, "status": null}{"keyEncr": "00000000", "keyHash": "00000000"}{"address": "No(o)ne", "publicKey": "00000000"}{"signature": "000000000000", "trnsaxnID": "58724593f5930adf4d33e7a14e618c7ddf6db9d81e38773cd00e7e377a62869d"}{"index": 0, "prevHash": null, "timestamp": 1652703339.540596}
	{"ECID": 0, "PID": null, "SID": null, "netHash": null, "stage": 1, "status": 0}{"keyEncr": "0638574ac605fcc62e3488355307f3628afe9e4c592ae7fbf8be3bad2b33c888ac931d68d363695904dcdbebfbd2ff122628611f15fd98845d5653acf0f0884f30d4af4a2c118e9dbd0f19014f05612d3e81e3d8bacc73369e57b264af18d8b306f2e09a728b28d87763d376d3d61be88de99260093fae0ea1ed4fbdf3311e50891c2da1dff5c269354d13012d8bfdd9e1d96e62cb183bfe7965ffa17348154bedb016410f594f5065122c973a31def1941b394d88b2cdee47ab8eb9e80fb5e6b7b3f31dc4fb1402dfc35f332688596a6f3b99fc16a630ee3cfc882fea01f88538e6ef645c5b064c895299467c38db5c5ebb182ac9cb31dfc4e148c373afa350", "keyHash": "2b51d5ed0004d0187