In [6]:
!pip install numpy
!pip install click
!pip install phe

Collecting numpy
  Downloading numpy-1.22.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.8 MB)
[K     |████████████████████████████████| 16.8 MB 3.0 MB/s eta 0:00:01
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.22.3


In [2]:
#---------------------------
# mvg_mechanism.py
# Author: Thee Chanyaswad
#
# Version 1.0
#	- Initial implementation.
#---------------------------
import numpy as np
def compute_precision_budget(m, n, gamma, s2, epsilon, delta):
	maxRank = np.min((n,m))
	harmR1 = _get_harmonic_num(maxRank,1)
	harmR12 = _get_harmonic_num(maxRank,0.5)
	alpha = (harmR1 + harmR12) * (gamma ** 2) + 2*harmR1*gamma*s2
	zeta = 2*np.sqrt(-m*n*np.log(delta))-2*np.log(delta) + m*n
	beta = 2*((m*n)**0.25)*zeta*harmR1*s2
	
	#get total precision budget
	precisionBudget = ((-beta + np.sqrt(beta**2 + 8*alpha*epsilon))**2) / (4*(alpha**2))
	
	return precisionBudget
def generate_mvg_noise_via_affine_tx(rowCov,colCov):
	n = len(colCov)
	m = len(rowCov)
	mu = np.zeros((n,m))
	sampIid = np.random.normal(0,1,size=mu.shape) # sample iid gauss first
	
	uSig,s,vSig = np.linalg.svd(rowCov)
	sSig = np.diag(np.sqrt(s))
	Bsig = np.inner(uSig,sSig.T)
	uPsi,s,vPsi = np.linalg.svd(colCov)
	sPsi = np.diag(np.sqrt(s))
	Bpsi = np.inner(uPsi,sPsi.T)
	
	#affine tx
	sampMvg = np.inner(np.inner(Bpsi,sampIid.T),Bsig).T
	
	return sampMvg
def generate_mvg_noise_via_multivariate_gaussian(rowCov,colCov):
	cov = np.kron(colCov,rowCov)
	muVec = np.zeros(len(cov))
	sampVect = np.random.multivariate_normal(muVec, cov, size=1)
	sampMvg = sampVect.reshape((len(rowCov),len(colCov)),order='F')
	return sampMvg
def _get_harmonic_num(order,power=1.0):
	if order == 1:
		value = 1.0
	else:
		value = 1.0/(order**power) + _get_harmonic_num(order-1,power=power)
	return value

###Gen MVG DP noise####

In [4]:
import os
import time

for DataSetNo in range(1, 5):
    if DataSetNo == 1:
        DataSet = "HeartScale"
        DataSize_n = 270
    if DataSetNo == 2:
        DataSet = "BreastcancerScale"
        DataSize_n = 683
    if DataSetNo == 3:
        DataSet = "DiabetesScale"
        DataSize_n = 768
    if DataSetNo == 4:
        DataSet = "SpliceScale"
        DataSize_n = 1000

    DataSize_m = DataSize_n

    #EpsilonDegree = 12
    #round_RemainFloatingPointNo = 1

    for round_RemainFloatingPointNo in range(1 ,4):
        for EpsilonDegree in range(10, 19):        

            # parameters
            gamma = DataSize_n  #sup: "Max" of every value in kernel matrix is 1
            s2 = 2 * DataSize_n 
            epsilon = 10**EpsilonDegree
            delta = 1/DataSize_n

            #init
            fmt_npsavetxt = '%1.' + str(round_RemainFloatingPointNo) + 'f'

            precisionBudget = compute_precision_budget(DataSize_m, DataSize_n, gamma, s2, epsilon, delta)
            p_i = 1/DataSize_m * precisionBudget
            oi =  1/np.sqrt(p_i)
            A_E = np.diag(np.full(DataSize_m, oi))  
            CovMatrix = A_E  #W_E和W_E_T在此皆為I

            # #make dir
            # path = "../DataGenerated/"+DataSet
            # if not os.path.isdir(path):    
            #     os.mkdir(path)       
            # path = "../DataGenerated/"+DataSet+"/"+DataSet+"_round"+str(round_RemainFloatingPointNo)
            # if not os.path.isdir(path):    
            #     os.mkdir(path)         
            # path = "../DataGenerated/"+DataSet+"/FinalResult"
            # if not os.path.isdir(path):
            #     os.mkdir(path)

            ############Part 1~3
            for PartNo in range(1, 4):            
                start = time.process_time()                       
                mvg_noise_via_affine_tx_ScaleHeart = generate_mvg_noise_via_affine_tx(CovMatrix, CovMatrix)
                end = time.process_time()
                GenDPNoiseTime = (end - start)             
                np.savetxt("../DataGenerated/"+DataSet+"/"+DataSet+"_round"+str(round_RemainFloatingPointNo)+"/mvg_noise_via_affine_tx_"+DataSet+"_Part"+str(PartNo)+"_Epsn"+str(EpsilonDegree)+"_round"+str(round_RemainFloatingPointNo)+".csv", np.round(mvg_noise_via_affine_tx_ScaleHeart, decimals = round_RemainFloatingPointNo), delimiter=",", fmt = fmt_npsavetxt)    
                #print('precisionBudget=', precisionBudget)
                #print('A_E=', A_E)
                #print('mvg_noise_via_affine_tx_ScaleHeart=', mvg_noise_via_affine_tx_ScaleHeart)


            ####Save File####
            FinalResultDirFile = "../DataGenerated/"+DataSet+"/FinalResult/" + DataSet + "_DP_MVG_FinalResult"
            lines = [str(DataSet) + ',' + str(DataSize_n) + ',' + str(round_RemainFloatingPointNo) + ',' + str(EpsilonDegree) + ',' + str(GenDPNoiseTime)]
            with open(FinalResultDirFile, 'a') as f:
                for line in lines:
                    f.write(line)
                    f.write('\n')

FileNotFoundError: [Errno 2] No such file or directory: '../DataGenerated/HeartScale/HeartScale_round1/mvg_noise_via_affine_tx_HeartScale_Part1_Epsn10_round1.csv'

###Paillier Homo: For BreastcancerScale Dataset####

In [2]:
from phe import paillier
import numpy as np
import math
import time

DataSet = "BreastcancerScale"
EpsilonDegree = 12
round_RemainFloatingPointNo = 1

####Load DP-Noise Data####
mvg_noise_via_affine_tx_1 = np.loadtxt("../DataGenerated/"+DataSet+"/"+DataSet+"_round"+str(round_RemainFloatingPointNo)+"/mvg_noise_via_affine_tx_"+DataSet+"_Part1_Epsn"+str(EpsilonDegree)+"_round"+str(round_RemainFloatingPointNo)+".csv", delimiter=",")
mvg_noise_via_affine_tx_2 = np.loadtxt("../DataGenerated/"+DataSet+"/"+DataSet+"_round"+str(round_RemainFloatingPointNo)+"/mvg_noise_via_affine_tx_"+DataSet+"_Part2_Epsn"+str(EpsilonDegree)+"_round"+str(round_RemainFloatingPointNo)+".csv", delimiter=",")
mvg_noise_via_affine_tx_3 = np.loadtxt("../DataGenerated/"+DataSet+"/"+DataSet+"_round"+str(round_RemainFloatingPointNo)+"/mvg_noise_via_affine_tx_"+DataSet+"_Part3_Epsn"+str(EpsilonDegree)+"_round"+str(round_RemainFloatingPointNo)+".csv", delimiter=",")

####Gen Key####
pub, priv = paillier.generate_paillier_keypair()

####Encrypt####
print("Encrpt 1")
start = time.process_time()
EncMatrix1 = [pub.encrypt(mvg_noise_via_affine_tx_1[i, j]) for i in range(mvg_noise_via_affine_tx_1.shape[0]) for j in range(mvg_noise_via_affine_tx_1.shape[1])]
end = time.process_time()
encrypt_all = end - start
encrypt_avg = encrypt_all / (mvg_noise_via_affine_tx_1.shape[0]*mvg_noise_via_affine_tx_1.shape[1])
#print("encrypt_avg = %f seconds" % encrypt_avg)
#print("encrypt_all = %f seconds" % encrypt_all)

print("Encrpt 2")
EncMatrix2 = [pub.encrypt(mvg_noise_via_affine_tx_2[i, j]) for i in range(mvg_noise_via_affine_tx_2.shape[0]) for j in range(mvg_noise_via_affine_tx_2.shape[1])]

print("Encrpt 3")
EncMatrix3 = [pub.encrypt(mvg_noise_via_affine_tx_3[i, j]) for i in range(mvg_noise_via_affine_tx_3.shape[0]) for j in range(mvg_noise_via_affine_tx_3.shape[1])]



####Decrypt####
print("Decrpt")

start = time.process_time()
DecMatrix = np.array([priv.decrypt(EncMatrix1[k]+EncMatrix2[k]+EncMatrix3[k]) for k in range(mvg_noise_via_affine_tx_1.shape[0] * mvg_noise_via_affine_tx_1.shape[1])])
end = time.process_time()
decrypt_all = (end - start)
decrypt_avg = decrypt_all /(mvg_noise_via_affine_tx_1.shape[0]*mvg_noise_via_affine_tx_1.shape[1])
#print("decrypt_avg = %f seconds" % decrypt_avg)
#print("decrypt_all = %f seconds" % decrypt_all)

DecMatrix = DecMatrix.reshape(mvg_noise_via_affine_tx_1.shape[0], mvg_noise_via_affine_tx_1.shape[1])
#DecMatrix

####Compare####
Error = 0
for i in range(mvg_noise_via_affine_tx_1.shape[0]): 
    for j in range(mvg_noise_via_affine_tx_1.shape[1]):
        #because float cannot be copmared directly (PC minor error), we should compare with the tolerance diff
        if (math.isclose(round(DecMatrix[i, j],5) , round((mvg_noise_via_affine_tx_1[i, j] + mvg_noise_via_affine_tx_2[i, j] + mvg_noise_via_affine_tx_3[i, j]),5) , rel_tol=1e-10) == False):     
            #if (math.isclose(0 , round((mvg_noise_via_affine_tx_1[i, j] + mvg_noise_via_affine_tx_2[i, j] + mvg_noise_via_affine_tx_3[i, j]), 5) , rel_tol=1e-10) == False):     
            print("i = " + str(i) + ",j = " + str(j) + ", num1 = " + str(mvg_noise_via_affine_tx_1[i, j]) + ", num2 = " + str(mvg_noise_via_affine_tx_2[i, j]) + ", num3 = " + str(mvg_noise_via_affine_tx_3[i, j]) + ", dec_num_sum = " + str(DecMatrix[i, j]))          
            Error += 1

####Save File####
FinalResultDirFile = "../DataGenerated/"+DataSet+"/FinalResult/" + DataSet + "_FinalResult"

lines = ['encrypt_avg: ' + str(encrypt_avg), ', encrypt_all: ' + str(encrypt_all), ', decrypt_avg: ' + str(decrypt_avg), ', decrypt_all: ' + str(decrypt_all), ', Error: ' + str(Error)]
with open(FinalResultDirFile, 'w') as f:
    for line in lines:
        f.write(line)
        f.write('\n')

        
####Load File####
f = open(FinalResultDirFile)
lines = f.read()
print(lines)
print(type(lines))
f.close()

Encrpt 1
Encrpt 2
Encrpt 3
Decrpt
i = 0,j = 126, num1 = 0.2, num2 = -1.8, num3 = 1.6, dec_num_sum = 5.551115123125783e-17
i = 1,j = 473, num1 = 0.8, num2 = 3.0, num3 = -3.8, dec_num_sum = 2.220446049250313e-16
i = 2,j = 121, num1 = 0.1, num2 = 1.2, num3 = -1.3, dec_num_sum = -8.326672684688674e-17
i = 2,j = 130, num1 = 1.5, num2 = -0.1, num3 = -1.4, dec_num_sum = 8.326672684688674e-17
i = 2,j = 137, num1 = 1.9, num2 = 0.7, num3 = -2.6, dec_num_sum = -2.220446049250313e-16
i = 3,j = 591, num1 = 2.0, num2 = -0.9, num3 = -1.1, dec_num_sum = -1.1102230246251565e-16
i = 4,j = 127, num1 = -4.1, num2 = 1.7, num3 = 2.4, dec_num_sum = 2.220446049250313e-16
i = 4,j = 182, num1 = -0.4, num2 = -0.2, num3 = 0.6, dec_num_sum = -5.551115123125783e-17
i = 5,j = 269, num1 = 0.1, num2 = 3.4, num3 = -3.5, dec_num_sum = -8.326672684688674e-17
i = 5,j = 294, num1 = 2.9, num2 = 0.6, num3 = -3.5, dec_num_sum = -1.1102230246251565e-16
i = 6,j = 203, num1 = -1.2, num2 = 7.2, num3 = -6.0, dec_num_sum = 2.220446

i = 375,j = 638, num1 = 3.1, num2 = 0.6, num3 = -3.7, dec_num_sum = -1.1102230246251565e-16
i = 376,j = 181, num1 = 0.1, num2 = -1.7, num3 = 1.6, dec_num_sum = 1.3877787807814457e-16
i = 376,j = 204, num1 = -2.5, num2 = -3.4, num3 = 5.9, dec_num_sum = 4.440892098500626e-16
i = 376,j = 327, num1 = 4.1, num2 = 0.7, num3 = -4.8, dec_num_sum = -2.220446049250313e-16
i = 376,j = 546, num1 = -2.8, num2 = -2.1, num3 = 4.9, dec_num_sum = 4.440892098500626e-16
i = 376,j = 590, num1 = 0.7, num2 = 0.6, num3 = -1.3, dec_num_sum = -1.1102230246251565e-16
i = 377,j = 441, num1 = -0.1, num2 = 2.6, num3 = -2.5, dec_num_sum = 8.326672684688674e-17
i = 377,j = 516, num1 = -0.2, num2 = -0.4, num3 = 0.6, dec_num_sum = -5.551115123125783e-17
i = 377,j = 517, num1 = -2.8, num2 = 0.3, num3 = 2.5, dec_num_sum = 1.6653345369377348e-16
i = 378,j = 137, num1 = 0.6, num2 = 0.1, num3 = -0.7, dec_num_sum = 2.7755575615628914e-17
i = 378,j = 308, num1 = 0.3, num2 = 0.8, num3 = -1.1, dec_num_sum = -5.551115123125783e

FileNotFoundError: [Errno 2] No such file or directory: '../DataGenerated/BreastcancerScale_FinalResult/BreastcancerScale_FinalResult'

In [28]:
####Compare####
Error = 0
for i in range(mvg_noise_via_affine_tx_1.shape[0]): 
    for j in range(mvg_noise_via_affine_tx_1.shape[1]):
        #because float cannot be copmared directly (PC minor error), we should compare with the tolerance diff
        if (math.isclose(round(DecMatrix[i, j],5) , round((mvg_noise_via_affine_tx_1[i, j] + mvg_noise_via_affine_tx_2[i, j] + mvg_noise_via_affine_tx_3[i, j]),5) , rel_tol=1e-10) == False):     
            #if (math.isclose(0 , round((mvg_noise_via_affine_tx_1[i, j] + mvg_noise_via_affine_tx_2[i, j] + mvg_noise_via_affine_tx_3[i, j]), 5) , rel_tol=1e-10) == False):     
            print("i = " + str(i) + ",j = " + str(j) + ", num1 = " + str(mvg_noise_via_affine_tx_1[i, j]) + ", num2 = " + str(mvg_noise_via_affine_tx_2[i, j]) + ", num3 = " + str(mvg_noise_via_affine_tx_3[i, j]) + ", dec_num_sum = " + str(DecMatrix[i, j]))          
            Error += 1

####Save File####
FinalResultDirFile = "../DataGenerated/"+DataSet+"/FinalResult/" + DataSet + "_FinalResult"

lines = ['encrypt_avg: ' + str(encrypt_avg), ', encrypt_all: ' + str(encrypt_all), ', decrypt_avg: ' + str(decrypt_avg), ', decrypt_all: ' + str(decrypt_all), ', Error: ' + str(Error)]
with open(FinalResultDirFile, 'w') as f:
    for line in lines:
        f.write(line)
        f.write('\n')

        
####Load File####
f = open(FinalResultDirFile)
lines = f.read()
print(lines)
print(type(lines))
f.close()

encrypt_avg: 0.08080141841438275
, encrypt_all: 37692.972874707
, decrypt_avg: 0.02299351007083982
, decrypt_all: 10726.219519435996
, Error: 0

<class 'str'>


============以下是測試過程用的而已======================================================================

In [None]:
import time

from phe import paillier
pub, priv = paillier.generate_paillier_keypair()


start = time.process_time()
Enc1 = pub.encrypt(mvg_noise_via_affine_tx_1[0, 0])
end = time.process_time()
print("encrypt = %f seconds" % (end - start))

Enc2 = pub.encrypt(mvg_noise_via_affine_tx_1[0, 1])
Enc3 = pub.encrypt(mvg_noise_via_affine_tx_1[0, 2])

start = time.process_time()
Enc_All = Enc1+Enc2+Enc3
end = time.process_time()
print("homo_add = %f seconds" % (end - start))

start = time.process_time()
Dec_All = priv.decrypt(Enc_All)
end = time.process_time()
print("decrypt = %f seconds" % (end - start))

print("num1 = " + str(mvg_noise_via_affine_tx_1[0, 0]) + ", num2 = " + str(mvg_noise_via_affine_tx_1[0, 1]) + ", num3 = " + str(mvg_noise_via_affine_tx_1[0, 2]))
print("dec_num_sum = " + str(Dec_All))

Error = 0
if ( Dec_All != mvg_noise_via_affine_tx_1[0, 0] + mvg_noise_via_affine_tx_1[0, 1] + mvg_noise_via_affine_tx_1[0, 2]):
    Error += 1
print("Error = " + str(Error))

In [None]:
####Encrypt####
start = time.process_time()
EncMatrix1 = [pub.encrypt(mvg_noise_via_affine_tx_1[i, j]) for i in range(mvg_noise_via_affine_tx_1.shape[0]) for j in range(mvg_noise_via_affine_tx_1.shape[1])]
end = time.process_time()
print("encrypt_avg = %f seconds" % ((end - start)/(mvg_noise_via_affine_tx_1.shape[0]*mvg_noise_via_affine_tx_1.shape[1])))
print("encrypt_all = %f seconds" % ((end - start)))


In [7]:
import json
import random
import unittest
from unittest import TestCase
import tempfile

import io

import sys
import click
from click.testing import CliRunner

import phe.command_line
from phe.command_line import cli


class TestConsoleBasics(TestCase):

    def test_cli_includes_help(self):
        runner = CliRunner()
        result = runner.invoke(cli, ['--help'])
        assert result.exit_code == 0

        assert 'Usage' in result.output
        assert 'Options' in result.output
        assert 'Commands' in result.output

    def test_generate_keypair_to_file(self):
        runner = CliRunner()

        with tempfile.NamedTemporaryFile() as outfile:
            result = runner.invoke(cli, ['genpkey', '--keysize', '256', outfile.name])
            print(result.output)
            assert result.exit_code == 0

            outfile.seek(0)
            written_data = outfile.read()

            priv_key = json.loads(written_data.decode('utf-8'))

        assert 'pub' in priv_key
        assert 'kty' in priv_key
        assert 'p' in priv_key

    def test_generate_keypair_to_stdout(self):
        runner = CliRunner()

        result = runner.invoke(cli, ['genpkey', '--keysize', '256', '-'])

        assert 'pub' in result.output
        assert 'kty' in result.output
        assert 'p' in result.output

    def test_extract_public_key(self):
        runner = CliRunner()

        with tempfile.NamedTemporaryFile() as private_keyfile:
            runner.invoke(cli, ['genpkey', '--keysize', '256', private_keyfile.name])

            with tempfile.NamedTemporaryFile() as public_keyfile:
                result = runner.invoke(cli, ['extract', private_keyfile.name, public_keyfile.name])
                assert result.exit_code == 0

                public_keyfile.seek(0)
                written_data = public_keyfile.read().decode('utf-8')

                assert '"kty":' in written_data
                assert '"n":' in written_data
                assert '"alg":' in written_data

                assert '"p":' not in written_data
                assert '"q":' not in written_data


class TestConsoleEncryption(TestCase):

    @classmethod
    def setUpClass(cls):
        """Generate a keypair and extract the public key.
        """
        cls.private_keyfile = tempfile.NamedTemporaryFile()
        cls.public_keyfile = tempfile.NamedTemporaryFile()

        cls.runner = CliRunner()
        cls.runner.invoke(cli, ['genpkey', '--keysize', '256', cls.private_keyfile.name])
        cls.runner.invoke(cli, ['extract', cls.private_keyfile.name, cls.public_keyfile.name])

    @classmethod
    def tearDownClass(cls):
        cls.private_keyfile.close()
        cls.public_keyfile.close()

    def test_encrypt_positive_integers(self):
        numbers = [0, 1, 2, 5, 10, '1', '10550']

        for num in numbers:
            result = self.runner.invoke(cli, ['encrypt', self.public_keyfile.name, str(num)])
            assert result.exit_code == 0

    def test_encrypt_signed_integers(self):
        """encrypting positive and negative integer"""
        numbers = [0, 1, -1, 10, '1', '-10550']

        for num in numbers:
            result = self.runner.invoke(cli, ['encrypt', self.public_keyfile.name, "--", str(num)])
            assert result.exit_code == 0

    def test_encrypt_float(self):
        numbers = [0.0, 1.1, -0.0001, 100000.01, '1e-20', '-10550e20']

        for num in numbers:
            result = self.runner.invoke(cli, ['encrypt', self.public_keyfile.name, "--", str(num)])
            assert result.exit_code == 0

    def test_encrypt_to_stdout(self):
        """Test encrypting and writing output to a file"""
        numbers = [0.0, 1.1, -0.0001, 100000.01, '1e-20', '-10550e20']

        for num in numbers:
            result = self.runner.invoke(cli, ['encrypt', self.public_keyfile.name, "--", str(num)])
            assert result.exit_code == 0

    def test_decrypt_positive_integers(self):
        numbers = [0, 1, 2, 5, 10, '1', '10550']

        for num in numbers:
            with tempfile.NamedTemporaryFile() as encfile:
                fname = encfile.name

                self.runner.invoke(cli, [
                    'encrypt', self.public_keyfile.name, str(num), '--output', fname
                ])

                result = self.runner.invoke(cli, [
                    'decrypt', self.private_keyfile.name, fname
                ])
                assert result.exit_code == 0

                assert "{}".format(num) in result.output

    def test_decrypt_signed_integers(self):
        numbers = [0, 1, -1, 10, '1', '-10550']

        for num in numbers:
            with tempfile.NamedTemporaryFile() as encfile:
                fname = encfile.name
                self.runner.invoke(cli, [
                    'encrypt', self.public_keyfile.name, '--output', fname, '--', str(num),
                ])

                result = self.runner.invoke(cli, [
                    'decrypt', self.private_keyfile.name, fname
                ])
                assert result.exit_code == 0

                print(result.output)
                assert "{}".format(num) in result.output

    def test_decrypt_float(self):
        numbers = [0.0, 1.1, -0.0001, 100000.01, '1e-20', '-10550e20']

        for num in numbers:
            with tempfile.NamedTemporaryFile() as encfile:
                fname = encfile.name
                self.runner.invoke(cli, [
                    'encrypt', self.public_keyfile.name, '--output', fname, '--', str(num),
                ])

                with tempfile.NamedTemporaryFile() as outfile:
                    result = self.runner.invoke(cli, [
                        'decrypt', self.private_keyfile.name, fname, '--output', outfile.name
                    ])
                    assert result.exit_code == 0

                    out = outfile.read()
                    self.assertAlmostEqual(float(num), float(out))


class TestConsoleHelpers(TestCase):

    @classmethod
    def setUpClass(cls):
        """Generate a keypair, extract the public key, and encrypt
        a list of numbers

        """
        cls.private_keyfile = tempfile.NamedTemporaryFile()
        cls.public_keyfile = tempfile.NamedTemporaryFile()


        cls.runner = CliRunner()
        cls.runner.invoke(cli, ['genpkey', '--keysize', '256', cls.private_keyfile.name])
        cls.runner.invoke(cli, ['extract', cls.private_keyfile.name, cls.public_keyfile.name])

    def setUp(self):
        self.enc_a_file = tempfile.NamedTemporaryFile()
        self.enc_b_file = tempfile.NamedTemporaryFile()
        self.enc_result_file = tempfile.NamedTemporaryFile()

    def encrypt_and_add(self, a, b):
        self.runner.invoke(cli,
                           ['encrypt', self.public_keyfile.name, '--output', self.enc_a_file.name, '--', str(a)])
        self.runner.invoke(cli,
                           ['encrypt', self.public_keyfile.name, '--output', self.enc_b_file.name, '--', str(b)])

        result = self.runner.invoke(cli, [
            'addenc',
            self.public_keyfile.name,
            self.enc_a_file.name,
            self.enc_b_file.name,
            '--output',
            self.enc_result_file.name
        ])

        assert result.exit_code == 0

        with tempfile.NamedTemporaryFile() as outfile:
            result = self.runner.invoke(cli, [
                'decrypt', self.private_keyfile.name, self.enc_result_file.name, '--output', outfile.name
            ])
            assert result.exit_code == 0

            out = outfile.read()
            return float(out)

    def _a_b_encrypt_helper(self, a, b, operation):
        self.runner.invoke(cli,
                           [
                               'encrypt',
                               self.public_keyfile.name,
                                '--output',
                               self.enc_a_file.name,
                               '--',
                                str(a)
                           ])

        result = self.runner.invoke(cli, [
            operation,
            '--output',
            self.enc_result_file.name,
            self.public_keyfile.name,
            self.enc_a_file.name,
            '--',
            str(b)
        ])

        assert result.exit_code == 0, "Problem carrying out the {} operation".format(operation)

        with tempfile.NamedTemporaryFile() as outfile:
            result = self.runner.invoke(cli, [
                'decrypt', self.private_keyfile.name, self.enc_result_file.name, '--output', outfile.name
            ])
            assert result.exit_code == 0

            out = outfile.read()
            return float(out)

    def encrypt_a_and_add_b(self, a, b):
        return self._a_b_encrypt_helper(a, b, 'add')

    def encrypt_a_and_multiply_b(self, a, b):
        return self._a_b_encrypt_helper(a, b, 'multiply')


class TestConsoleAddition(TestConsoleHelpers):

    def test_addenc_int(self):
        a, b = 12345, 6789
        out = self.encrypt_and_add(a, b)
        self.assertAlmostEqual(float(a + b), float(out))

    def test_add_int(self):
        a, b = 12345, 6789
        out = self.encrypt_a_and_add_b(a, b)
        self.assertAlmostEqual(float(a + b), float(out))

    def test_addenc_large_ints(self):
        """Test adding large integers.
        """
        a, b = int(1.2e10), int(1e15)
        out = self.encrypt_and_add(a, b)
        self.assertAlmostEqual(float(a + b), float(out))

    def test_add_large_ints(self):
        """Test adding large integers.
        """
        a, b = int(1.2e10), int(1e15)
        out = self.encrypt_a_and_add_b(a, b)
        self.assertAlmostEqual(float(a + b), float(out))


    def test_addenc_signed_int(self):
        a, b = 12345, -6789
        out = self.encrypt_and_add(a, b)
        self.assertAlmostEqual(float(a + b), float(out))

    def test_add_signed_int(self):
        a, b = 12345, -6789
        out = self.encrypt_a_and_add_b(a, b)
        self.assertAlmostEqual(float(a + b), float(out))

    def test_addenc_floats(self):
        a, b = 123.45, 67.89
        out = self.encrypt_and_add(a, b)
        self.assertAlmostEqual(float(a + b), float(out))

    def test_add_floats(self):
        a, b = 123.45, 67.89
        out = self.encrypt_a_and_add_b(a, b)
        self.assertAlmostEqual(float(a + b), float(out))

    def test_addenc_large_floats(self):
        """Test adding large integers.
        """
        a, b = 2.3e32, 1.4e32
        out = self.encrypt_and_add(a, b)
        self.assertAlmostEqual(float(a + b), float(out))

    def test_add_large_floats(self):
        """Test adding large integers.
        """
        a, b = 2.3e32, 1.4e32
        out = self.encrypt_a_and_add_b(a, b)
        self.assertAlmostEqual(float(a + b), float(out))


class TestConsoleMultiplication(TestConsoleHelpers):
    """
    Expected to fail until we decide if encrypted numbers with different
    exponents are allowed for this CLI...
    """
    def test_multiply_ints(self):
        a, b = 15, 6
        out = self.encrypt_a_and_multiply_b(a, b)
        self.assertAlmostEqual(int(a * b), int(out))

    def test_multiply_floats(self):
        a, b = 1.2345, 0.6
        out = self.encrypt_a_and_multiply_b(a, b)
        self.assertAlmostEqual(float(a * b), float(out))

    def test_multiply_random_ints(self):
        """
        """
        MAX = 100000000000
        MIN = -MAX

        for _ in range(50):
            a, b = random.randrange(MIN, MAX), random.randrange(MIN, MAX)
            out = self.encrypt_a_and_multiply_b(a, b)
            self.assertAlmostEqual(float(a * b), float(out))


class TestFuzz(TestConsoleHelpers):

    def test_addenc_random_ints(self):
        """Test adding random ints
        """
        MAX = 1000000000000000
        MIN = -MAX

        for _ in range(20):
            a, b = random.randrange(MIN, MAX), random.randrange(MIN, MAX)
            out = self.encrypt_and_add(a, b)
            self.assertAlmostEqual(float(a + b), float(out))

    def test_add_random_ints(self):
        """Test adding random ints
        """
        MAX = 1000000000000000
        MIN = -MAX

        for _ in range(20):
            a, b = random.randrange(MIN, MAX), random.randrange(MIN, MAX)
            out = self.encrypt_a_and_add_b(a, b)
            self.assertAlmostEqual(float(a + b), float(out))

    def test_addenc_random_floats(self):
        """Test adding random floating point numbers from the range [0.0, 1.0)
        """
        for _ in range(20):
            a, b = random.random(), random.random()
            out = self.encrypt_and_add(a, b)
            self.assertAlmostEqual(float(a + b), float(out))

    def test_add_random_floats(self):
        """Test adding random floating point numbers from the range [0.0, 1.0)
        """
        for _ in range(20):
            a, b = random.random(), random.random()
            out = self.encrypt_a_and_add_b(a, b)
            self.assertAlmostEqual(float(a + b), float(out))


    def test_multiply_random_ints(self):
        """
        """
        MAX = 10000
        MIN = -MAX

        for _ in range(20):
            a, b = random.randrange(MIN, MAX), random.randrange(MIN, MAX)
            out = self.encrypt_a_and_multiply_b(a, b)
            self.assertAlmostEqual(float(a * b), float(out))




===TEST===

In [52]:
#mvg_noise_via_affine_tx_TEST = np.genfromtxt('../DataGenerated/mvg_noise_via_affine_tx_TEST.csv',delimiter=',')


from phe import paillier
pub, priv = paillier.generate_paillier_keypair()
EncMatrix1 = [pub.encrypt(mvg_noise_via_affine_tx_1[i, j]) for i in range(mvg_noise_via_affine_tx_1.shape[0]) for j in range(mvg_noise_via_affine_tx_1.shape[1])]
EncMatrix2 = [pub.encrypt(mvg_noise_via_affine_tx_2[i, j]) for i in range(mvg_noise_via_affine_tx_2.shape[0]) for j in range(mvg_noise_via_affine_tx_2.shape[1])]
EncMatrix3 = [pub.encrypt(mvg_noise_via_affine_tx_3[i, j]) for i in range(mvg_noise_via_affine_tx_3.shape[0]) for j in range(mvg_noise_via_affine_tx_3.shape[1])]

DecMatrix = np.array([priv.decrypt(EncMatrix1[k]+EncMatrix2[k]+EncMatrix3[k]) for k in range(mvg_noise_via_affine_tx_1.shape[0] * mvg_noise_via_affine_tx_1.shape[1])])
DecMatrix = DecMatrix.reshape(mvg_noise_via_affine_tx_1.shape[0], mvg_noise_via_affine_tx_1.shape[1])
DecMatrix
#np.shape(DecMatrix)


array([[ -54143.09537717, -450885.13176675, -104468.85459476],
       [ 155118.15510393,   38704.38791559,  246842.01847356],
       [ -68098.37066743, -649267.06426103, -135033.51318208]])

In [9]:
mvg_noise_via_affine_tx_TEST

array([[ -18047.69845906, -150295.04392225,  -34822.95153159],
       [  51706.05170131,   12901.46263853,   82280.67282452],
       [ -22699.45688914, -216422.35475368,  -45011.17106069]])

In [16]:
from phe import paillier
pub, priv = paillier.generate_paillier_keypair()
a = pub.encrypt(-18047.69845906)
b = pub.encrypt(51706.05170131) 
out = priv.decrypt(a + b)
out

33658.35324225

In [17]:
51706.05170131-18047.69845906

33658.35324225

In [18]:
!python setup.py test

python: can't open file 'G:\��������脩垢蝖祉��\Sync\Research\Paper\My Paper\Differentially Private SVM based on Matrix-Variate Gaussian Mechanism\Experiment\Program\setup.py': [Errno 2] No such file or directory
