In [2]:
%%writefile did.py

from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding, utils
from cryptography.hazmat.backends import default_backend
from datetime import datetime
import json
import os

# 
class DID:
    def __init__(self, name):
        self.name = name
        self.private_key, self.public_key, self.did = self.generate_keys_and_did()
        self.store_did_in_blockchain()
        self.wallet = {}

    # Wallet generation
    def store_vc_in_wallet(self, vc_type, vc):
        self.wallet[vc_type] = vc

    def get_vc_from_wallet(self, vc_type):
        return self.wallet.get(vc_type, None)
    
    # Generate DIDs
    def generate_keys_and_did(self):
        
        # Generate DID - public key pair using RSA encryption algorithm
        private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=default_backend()
        )

        public_key = private_key.public_key()
        public_key_pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )

        # Create DID
        did = self.generate_did(public_key_pem)
        
        return private_key, public_key, did

    def generate_did(self, public_key_pem):
        # Generate DID using the public key pem
        return f"did:{self.name}:{hash(public_key_pem)}"
    
    # Storing DIDs on proxy blockchain
    def store_did_in_blockchain(self):
        # Blockchain proxy folder
        blockchain_folder = "../blockchain"
        if not os.path.exists(blockchain_folder):
            os.makedirs(blockchain_folder)

        # Replace so it can be saved as a filename
        filename_safe_did = self.did.replace(":", "_")

        # save DID and public key pair as a json file
        did_filename = os.path.join(blockchain_folder, f"{filename_safe_did}.json")
        with open(did_filename, "w") as file:
            json.dump({"public_key": self.public_key.public_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PublicFormat.SubjectPublicKeyInfo
            ).decode()}, file)

    @staticmethod
    def resolve_did_locally(did):
        # Simulated DID resolution locally from the blockchain folder
        blockchain_folder = "../blockchain"
        if not os.path.exists(blockchain_folder):
            raise FileNotFoundError("Blockchain folder not found.")
        
        # Replace colons with underscores in the DID for filename compatibility
        filename_safe_did = did.replace(":", "_")
        
        # Extract the public key from the corresponding DID 
        did_filename = os.path.join(blockchain_folder, f"{filename_safe_did}.json")
        if not os.path.exists(did_filename):
            return None
        
        with open(did_filename, "r") as file:
            data = json.load(file)
            public_key_pem = data.get("public_key", None)
        
        if public_key_pem:
            return serialization.load_pem_public_key(public_key_pem.encode(), backend=default_backend())
        else:
            return None
    
    def issue_vc(self, subject_did, data):
        # Resolve the subject's DID to retrieve the public key
        subject_public_key = self.resolve_did_locally(subject_did)
        if not subject_public_key:
            raise ValueError("Failed to resolve subject's DID or retrieve public key.")

        # Issue Verifiable Credential
        # General format of a VC
        vc = {
            "id": f"{self.did}/vc/{datetime.now().isoformat()}",
            "type": ["VerifiableCredential"],
            "issuer": self.did,
            "issuanceDate": datetime.now().isoformat(),
            "credentialSubject": {
                "id": subject_did,
                "data": data
            }
        }
        
        # Encoding VC as bytes
        vc_bytes = json.dumps(vc).encode()
        
        # Sign the VC with the private key
        signature = self.private_key.sign(
            vc_bytes,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        vc["signature"] = signature.hex()
        return vc
            
    def respond_to_request(self, requests):
        # Bundles VCs together per the requests
        response = {}
        for request in requests:
            if request in self.wallet:
                response[request] = self.wallet[request]

        return response
    
    def generate_vp(self, requests):
        # Generate a Verifiable Presentation 
        response = self.respond_to_request(requests)

        # Extract VCs from the response
        vcs = list(response.values())

        # Generate a presentation containing the VCs
        presentation = {
            "holder": self.did,
            "verifiable_credentials": vcs,
            "requests": list(response.keys())
        }

        # Serialize the presentation
        presentation_bytes = json.dumps(presentation).encode()

        # Sign the presentation with the private key
        signature = self.private_key.sign(
            presentation_bytes,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        presentation["signature"] = signature.hex()

        return presentation

    
    def verify_vp(self, vp):
        # Extract issuer and holder DIDs from the VC within the VP
        vc = vp["verifiable_credentials"][0]  
        issuer_did = vc["issuer"]
        holder_did = vp["holder"]

        # Resolve issuer and holder DIDs to retrieve public keys
        issuer_public_key = self.resolve_did_locally(issuer_did)
        holder_public_key = self.resolve_did_locally(holder_did)

        # Check if public keys are retrieved successfully
        if not issuer_public_key or not holder_public_key:
            print("Failed to resolve DIDs or retrieve public keys.")
            return None

        # Extract signature from VCs
        vc_signature = bytes.fromhex(vc["signature"])

        # Encode VC
        vc_copy = vc.copy()
        vc_copy.pop("signature")
        vc_bytes = json.dumps(vc_copy).encode()

        # Verify the issuer's signature using the issuer's public key
        try:
            issuer_public_key.verify(
                vc_signature,
                vc_bytes,
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            print("Issuer authenticity verified.")
        except Exception as e:
            print("Issuer verification failed:", e)
            return None

        # Extract signature from the VP
        vp_signature = bytes.fromhex(vp["signature"])

        # Encode VP
        vp_copy = vp.copy()
        vp_copy.pop("signature")
        vp_bytes = json.dumps(vp_copy).encode()

        # Verify the holder's signature using the holder's public key
        try:
            holder_public_key.verify(
                vp_signature,
                vp_bytes,
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            print("Holder authenticity verified.")
        except Exception as e:
            print("Holder verification failed:", e)
            return None

        # Extract 'credentialSubject' contained in the VC
        credential_subject = vc.get("credentialSubject", None)
        if credential_subject:
            print("Credential subject extracted successfully.")
            return credential_subject
        else:
            print("No credential subject found.")
            return None

Overwriting did.py


In [154]:
# Simulation of holder, issuer, and verifier
uni = DID('university')  # Issuer
student = DID('student')    # Holder
vendor = DID('vendor')  # Verifier

In [155]:
# University issues a verifiable credential to the student
vc_data = {"name": "Bob", "age": 30}
vc = uni.issue_vc(student.did, vc_data)
student.store_vc_in_wallet('degree', vc)

In [156]:
student.wallet

{'degree': {'id': 'did:university:388940476865698645/vc/2024-05-11T15:19:55.124957',
  'type': ['VerifiableCredential'],
  'issuer': 'did:university:388940476865698645',
  'issuanceDate': '2024-05-11T15:19:55.124957',
  'credentialSubject': {'id': 'did:student:-1716165634184872026',
   'data': {'name': 'Bob', 'age': 30}},
  'signature': 'b91402848bce04110607ae7c5e0b4bf275da17e2090194c25f560b57234d8cfcd3fe8b6896294dc1d79d6baf10fb195437ebf14fab2584df33ea565623697deede25d98a745ba3b94d74e58902048e3dbed3b4dec18816d25edbfe92789d09fc8941a9e3e4667a8df3467e80949183ac2d9196758addb39924c6c824d70140180f097fa9d55b9edb9c2dd0e7a86a587c8c8031e37e602157daf132881924439bdf9ff2fd971217f6668c84bf6d6d01c3188a1b56f898cf79f87db92cdae2af2797be6314c63a7e4908bd430e3ab3f2dfb91a0c084d2eb1369c0bfc98ceabcd92db027fe26e388a0d78a0309860a9d4a90b38b1fb1cc2ce9405b10e67260e2898'}}

In [157]:
requests = ["degree"]
proof = student.generate_vp(requests)

In [158]:
proof

{'holder': 'did:student:-1716165634184872026',
 'verifiable_credentials': [{'id': 'did:university:388940476865698645/vc/2024-05-11T15:19:55.124957',
   'type': ['VerifiableCredential'],
   'issuer': 'did:university:388940476865698645',
   'issuanceDate': '2024-05-11T15:19:55.124957',
   'credentialSubject': {'id': 'did:student:-1716165634184872026',
    'data': {'name': 'Bob', 'age': 30}},
   'signature': 'b91402848bce04110607ae7c5e0b4bf275da17e2090194c25f560b57234d8cfcd3fe8b6896294dc1d79d6baf10fb195437ebf14fab2584df33ea565623697deede25d98a745ba3b94d74e58902048e3dbed3b4dec18816d25edbfe92789d09fc8941a9e3e4667a8df3467e80949183ac2d9196758addb39924c6c824d70140180f097fa9d55b9edb9c2dd0e7a86a587c8c8031e37e602157daf132881924439bdf9ff2fd971217f6668c84bf6d6d01c3188a1b56f898cf79f87db92cdae2af2797be6314c63a7e4908bd430e3ab3f2dfb91a0c084d2eb1369c0bfc98ceabcd92db027fe26e388a0d78a0309860a9d4a90b38b1fb1cc2ce9405b10e67260e2898'}],
 'requests': ['degree'],
 'signature': '56ada1152947b39f0435608765d4dd319

In [159]:
# Verifier verifies the presentation
vendor.verify_vp(proof)

Issuer authenticity verified.
Holder authenticity verified.
Credential subject extracted successfully.


{'id': 'did:student:-1716165634184872026', 'data': {'name': 'Bob', 'age': 30}}

In [2]:
%%writefile flask/app.py
from flask import Flask, jsonify, request, send_file
from did import DID
import qrcode

app = Flask(__name__)
vendor_did = DID('vendor')

def get_streamlit_url():
    with open("../streamlit_url.txt", "r") as f:
        return f.read()

@app.route('/request-access')
def generate_qr():
    # Generate the data string to be encoded in the QR code
    address=get_streamlit_url()
    # address="http://10.132.198.247:8501"
    did = f"{vendor_did.did}".replace(":", "_")
    qr_data = f'{address}/?did={did}'

    # Create QR code
    qr = qrcode.QRCode(
        version=1, 
        error_correction=qrcode.constants.ERROR_CORRECT_L, 
        box_size=10, 
        border=4)
    qr.add_data(qr_data)
    qr.make(fit=True)

    # Generate QR code image
    img = qr.make_image(fill_color="black", back_color="white")
    img_path = "verifier_qr.png"
    img.save(img_path)  # Save QR code as an image file

    return send_file(img_path, mimetype='image/png')

@app.route('/verify_vp', methods=['POST'])
def verify_vp():
    data = request.json
    response = data.get('response')
    
    if response == 'fail':
        # Logic for handling failed response
        return jsonify({"message": "Holder rejected access"}), 200
    elif response:
        information = vendor_did.verify_vp(response)
        print(information)
        return jsonify({"message": information}), 200
    else:
        return jsonify({"error": "Invalid response"}), 400
    
if __name__ == '__main__':
    app.run(debug=True)

Overwriting flask/app.py


In [4]:
%%writefile streamlit/app.py

import streamlit as st
import requests
import socket
import urllib.parse
from did import DID

def initialize_did():
    
    # Write the network URL 
    ip = socket.gethostbyname(socket.gethostname())
    port = 8501  # The port you choose to run Streamlit on
    with open("../streamlit_url.txt", "w") as f:
        f.write(f"http://{ip}:{port}")
        
    student = DID("student")
    uni = DID('university')
    vc_data = {"name": "Bob", "age": 30}
    vc = uni.issue_vc(student.did, vc_data)
    student.store_vc_in_wallet('degree', vc)
    return student

def get_query_params():
    query_params = st.query_params
    return query_params

# Function to handle access request response
def handle_response(response):
    if response == "Accept":
        # st.success("Access request accepted")
        # Implement logic to send response to backend
        
        info_needed = ["degree"]
        proof = st.session_state.student.generate_vp(info_needed)
        
        response_data = {'response': proof}
        
    elif response == "Cancel":
        # st.warning("Access request canceled")
        response_data = {'response': 'fail'}
    else:
        st.error("Invalid response")
        return
    
    # Send POST request to server
    try:
        response = requests.post('http://127.0.0.1:5000/verify_vp', json=response_data)
        if response.status_code == 200:
            # st.success("Response sent successfully")
            if response_data['response'] == 'fail':
                st.warning("Access request canceled by holder")
            else:
                st.success("Verification success")
                data = response.json()
                information = data['message']
                st.json(information)
        else:
            st.error("Failed to send response to server")
    except requests.exceptions.ConnectionError:
        st.error("Failed to connect to the server")


# Streamlit app
def main():
    st.title("Decentralized Identity Wallet")

    # Initialize DID only once
    if 'student' not in st.session_state:
        st.session_state.student = initialize_did()

    # Extract verifier data from URL parameters
    query_params = get_query_params()
    did_str = query_params.get('did', 'No data provided')
    did = urllib.parse.unquote(did_str)
    answer = st.session_state.student.resolve_did_locally(did)

    if answer:
        st.write("Verifiable credential request received from:")
        st.write(f"DID: {did}")
        st.write("Verifier: Vendor")
        st.write("Requested Credential: Degree")

        # Buttons for response
        response = st.radio("Do you want to accept or cancel the request?", ("Accept", "Cancel"))
        if st.button("Submit"):
            handle_response(response)

if __name__ == "__main__":
    main()


Overwriting streamlit/app.py
