# Sensor Logic and Server Logic and Communication

This includes intake and encrypting of the data by the sensor. It also shows the logic of our server and the communication between the sensors and our server throughout the course of a day. And an example output of what the server will send to BU at the end of the day.

In [None]:
!pip install phe

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting phe
  Downloading phe-1.5.0-py2.py3-none-any.whl (53 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.7/53.7 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: phe
Successfully installed phe-1.5.0


In [None]:
import hashlib
import random
from typing import List
import pandas as pd
from phe import paillier
import time

In [None]:
# this is the sample data the sensors will record every minute
s1 = [random.randint(1,51) for x in range(1, 3600)]
s2 = [random.randint(1,51) for x in range(1, 3600)]
s3 = [random.randint(1,51) for x in range(1, 3600)]

# we will have a public and secret key pair generated
# generate pk and sk here

pk1, sk1 = paillier.generate_paillier_keypair()
pk2, sk2 = paillier.generate_paillier_keypair()
pk3, sk3 = paillier.generate_paillier_keypair()

# map the keys to each sensor
sensorList = [pk1, pk2, pk3]

In [None]:
# Sensor Logic

class Sensor:
    def __init__(self, pubKey): # takes the sensors public key as an argument and initializes the sum for a sensor
        self.pubKey = pubKey
        self.vals = []
        self.sum = 0

    def collectData(self, value): # every minute this method is called to make a list of sensor values over the course of an hour
        self.vals.append(value)

    def sumData(self): # this is the hourly sum for the sensor
        self.sum = sum(self.vals)
        return self.sum

    def hashData(self): # this method performs homomorphic encryption on the hourly sensor data
        encrypted_sum = self.pubKey.encrypt(self.sum)
        return encrypted_sum

    def send(self, encrypted_sum): # sends the encrypted hourly data to our server
        return encrypted_sum

In [None]:
class Server1:
    def __init__(self, initialData): # this method initialized the daily sums of each sensor in the first hour of the day
        self.sum1 = initialData[0]
        self.sum2 = initialData[1]
        self.sum3 = initialData[2]
    
    def sumData(self, encryptedData): # for every hour after the first hour this method keeps a running sum for each sensor
        self.sum1 += encryptedData[0]
        self.sum2 += encryptedData[1]
        self.sum3 += encryptedData[2]

    def send(self):
        return [self.sum1, self.sum2, self.sum3] # at the end of the day it can output the daily sums to BU in a manageable list

In [None]:
# CODE TO SHOW HOW THE SENSORS WOULD PROCESS AND ENCRYPT THEIR HOURLY DATA AND HOW THE SERVER WOULD PROCESS AND SEND IT OUT THROUGH THE DAY

for hour in range(24):
    sensor1 = Sensor(pk1)
    sensor2 = Sensor(pk2)
    sensor3 = Sensor(pk3)
    for minute in range(60):
      sensor1.collectData(s1[(minute + (hour * 60))])
      sensor2.collectData(s2[(minute + (hour * 60))])
      sensor3.collectData(s3[(minute + (hour * 60))])

      time.sleep(60)  # sleep for 60 seconds (1 minute)

    # each sensor sums up their hourly data
    sensor1.sumData()
    sensor2.sumData()
    sensor3.sumData()

    # each sensor encrypts their hourly data
    encryptedSum1 = sensor1.hashData()
    encryptedSum2 = sensor2.hashData()
    encryptedSum3 = sensor3.hashData()

    # each sensor sends their hourly encrypted sum out for the server to take
    sentSum1 = sensor1.send(encryptedSum1)
    sentSum2 = sensor2.send(encryptedSum2)
    sentSum3 = sensor3.send(encryptedSum3)

    # server takes in hourly data
    if hour == 0: # initialize for the first hour
      server = Server1([sentSum1, sentSum2, sentSum3])
    else: # keep running sum for rest of the hours in the day
      server.sumData([sentSum1, sentSum2, sentSum3])
      

# at the end of 24 hours output the three encrypted sums to BU 
result = server.send()
print(result)

[<phe.paillier.EncryptedNumber object at 0x7ff2a40e8100>, <phe.paillier.EncryptedNumber object at 0x7ff2ce311270>, <phe.paillier.EncryptedNumber object at 0x7ff2a3f63fa0>]


In [None]:
# test code
# shows that the loop above takes the daily sum of each sensor correctly
# the sum of the first 1440 values is the number of minutes in a day 

decrypted1 = sk1.decrypt(result[0])
decrypted2 = sk2.decrypt(result[1])
decrypted3 = sk3.decrypt(result[2])

sum1_of_first_1440 = sum(s1[:1440])
sum2_of_first_1440 = sum(s2[:1440])
sum3_of_first_1440 = sum(s3[:1440])

print("Sensor 1 decrypted value:", decrypted1)
print("Sensor 1 sum for randomly generated list:", sum1_of_first_1440)
print("Sensor 2 decrypted value:", decrypted2)
print("Sensor 2 sum for randomly generated list:", sum2_of_first_1440)
print("Sensor 3 decrypted value:", decrypted3)
print("Sensor 3 sum for randomly generated list:", sum3_of_first_1440)

Sensor 1 decrypted value: 37360
Sensor 1 sum for randomly generated list: 37360
Sensor 2 decrypted value: 38009
Sensor 2 sum for randomly generated list: 38009
Sensor 3 decrypted value: 37100
Sensor 3 sum for randomly generated list: 37100


# Zero Knowledge Proof

This is how BU can share the public key of a sensor with a third party to prove that the daily sum collected from the sensor is indeed accurate.

In [None]:
# BU's Code: private script that allows for validity checking using a homomorphic ZKP
#
def provide_verification():
    output = []

    # provide a verification check for each type of sensor
    for sensor in range(len(result)):
        daily_ciphertext = result[sensor]
        pubkey = sensorList[sensor]
        max_int = sensorList[sensor].max_int

        # creating and encrypting randint r within specified range
        r = random.randint(int("1"+("0"*(len(str(max_int))-2))), max_int)
        encrypted_r = pubkey.encrypt(r)

        # using homomorphic encryption to add encrypted values
        cipher_sum = encrypted_r._add_encrypted(daily_ciphertext)

        # BU sends non-obfuscated ciphertext of cipher_sum, which is already obfuscated
        output.append([cipher_sum.ciphertext(be_secure=False), daily_ciphertext, encrypted_r])

    return(output)

In [None]:
# Verifier's Code: publicly available for third-party use
#
def verify():
    for msg in provide_verification():
        # messages in format: [cipher_sum, daily_ciphertext, encrytped_r]
        verifier_encryption = msg[2]._add_encrypted(msg[1]).ciphertext(be_secure=False)

        # final verification: compare user-generated ciphertext with BU's
        assert verifier_encryption == msg[0], "Invalid Checksum"
    
    return True

verify()

True