In [88]:
import pandas as pd
import random
from datetime import datetime, timedelta
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import hashes
import json

# Step 1: Generate a large dataset
def generate_prescription_data(num_records):
    doctors = ["Dr. Ahsan Habib", "Dr. Sarah Khan", "Dr. John Doe"]
    patients = [f"Patient_{i}" for i in range(1, num_records + 1)]
    medicines = [
        {"name": "Paracetamol", "dose": "100mg", "frequency": "3 times a day"},
        {"name": "Amoxicillin", "dose": "250mg", "frequency": "1 times a day"},
        {"name": "Ibuprofen", "dose": "100mg", "frequency": "2 times a day"},
        {"name": "Metformin", "dose": "500mg", "frequency": "2 times a day"},
        {"name": "Atorvastatin", "dose": "10mg", "frequency": "1 times a day"},
        {"name": "Omeprazole", "dose": "20mg", "frequency": "1 times a day"},
        {"name": "Amlodipine", "dose": "5mg", "frequency": "1 times a day"},
        {"name": "Losartan", "dose": "50mg", "frequency": "1 times a day"},
        {"name": "Gabapentin", "dose": "300mg", "frequency": "3 times a day"},
        {"name": "Levothyroxine", "dose": "50mcg", "frequency": "1 times a day"},
        {"name": "Sertraline", "dose": "50mg", "frequency": "1 times a day"},
        {"name": "Albuterol", "dose": "90mcg", "frequency": "as needed"},
        {"name": "Prednisone", "dose": "10mg", "frequency": "1 times a day"},
        {"name": "Cetirizine", "dose": "10mg", "frequency": "1 times a day"},
        {"name": "Lisinopril", "dose": "10mg", "frequency": "1 times a day"},
        {"name": "Simvastatin", "dose": "20mg", "frequency": "1 times a day"},
        {"name": "Furosemide", "dose": "40mg", "frequency": "1 times a day"},
        {"name": "Doxycycline", "dose": "100mg", "frequency": "2 times a day"},
        {"name": "Clonazepam", "dose": "0.5mg", "frequency": "2 times a day"},
        {"name": "Tramadol", "dose": "50mg", "frequency": "2 times a day"},
        {"name": "Ciprofloxacin", "dose": "500mg", "frequency": "2 times a day"},
        {"name": "Hydrochlorothiazide", "dose": "25mg", "frequency": "1 times a day"},
        {"name": "Metoprolol", "dose": "50mg", "frequency": "2 times a day"},
        {"name": "Ranitidine", "dose": "150mg", "frequency": "2 times a day"},
        {"name": "Warfarin", "dose": "5mg", "frequency": "1 times a day"},
        {"name": "Morphine", "dose": "10mg", "frequency": "as needed"},
        {"name": "Diazepam", "dose": "5mg", "frequency": "2 times a day"},
        {"name": "Insulin", "dose": "varies", "frequency": "as needed"},
        {"name": "Pantoprazole", "dose": "40mg", "frequency": "1 times a day"},
        {"name": "Naproxen", "dose": "250mg", "frequency": "2 times a day"},
        {"name": "Duloxetine", "dose": "30mg", "frequency": "1 times a day"},
        {"name": "Bupropion", "dose": "150mg", "frequency": "1 times a day"},
        {"name": "Methylprednisolone", "dose": "4mg", "frequency": "1 times a day"},
        {"name": "Erythromycin", "dose": "500mg", "frequency": "4 times a day"},
        {"name": "Risperidone", "dose": "2mg", "frequency": "1 times a day"},
        {"name": "Loratadine", "dose": "10mg", "frequency": "1 times a day"},
        {"name": "Carvedilol", "dose": "6.25mg", "frequency": "2 times a day"},
        {"name": "Tamsulosin", "dose": "0.4mg", "frequency": "1 times a day"},
        {"name": "Propranolol", "dose": "40mg", "frequency": "2 times a day"},
        {"name": "Fluoxetine", "dose": "20mg", "frequency": "1 times a day"},
        {"name": "Methotrexate", "dose": "10mg", "frequency": "weekly"},
        {"name": "Sitagliptin", "dose": "100mg", "frequency": "1 times a day"},
        {"name": "Montelukast", "dose": "10mg", "frequency": "1 times a day"},
        {"name": "Glimepiride", "dose": "2mg", "frequency": "1 times a day"},
        {"name": "Hydroxyzine", "dose": "25mg", "frequency": "1 times a day"},
        {"name": "Escitalopram", "dose": "10mg", "frequency": "1 times a day"},
        {"name": "Azithromycin", "dose": "500mg", "frequency": "1 times a day"},
        {"name": "Famotidine", "dose": "20mg", "frequency": "2 times a day"},
        {"name": "Bisoprolol", "dose": "5mg", "frequency": "1 times a day"},
        {"name": "Dapagliflozin", "dose": "10mg", "frequency": "1 times a day"},
        {"name": "Empagliflozin", "dose": "10mg", "frequency": "1 times a day"},
        {"name": "Rosuvastatin", "dose": "10mg", "frequency": "1 times a day"}
    ]
    
    data = []
    for i in range(num_records):
        doctor = random.choice(doctors)
        patient = patients[i]
        age = random.randint(18, 80)
        date = (datetime.now() - timedelta(days=random.randint(0, 365))).strftime('%Y-%m-%d')
        medicine = random.choice(medicines)
        remarks = random.choice(["Take after food.", "Take before food.", "Avoid alcohol."])
        
        record = {
            "doctor": doctor,
            "patient": patient,
            "age": age,
            "date": date,
            "medicine": medicine,
            "remarks": remarks
        }
        data.append(record)
    
    return pd.DataFrame(data)

# Step 2: Generate DSA Key Pair
private_key = dsa.generate_private_key(key_size=2048)
public_key = private_key.public_key()

# Step 3: Generate a large dataset with 10,000 records
large_dataset = generate_prescription_data(10000)

# Step 4: Convert each row to JSON string
json_records = large_dataset.to_dict(orient='records')

# Step 5: Sign each record
signed_records = []
for record in json_records:
    record_bytes = json.dumps(record, indent=4).encode()
    signature = private_key.sign(
        record_bytes,
        hashes.SHA256()
    )
    signed_records.append({
        "record": record,
        "signature": signature.hex()  # Store signature as hexadecimal string
    })

# Step 6: Verify signatures
def verify_signature(record, signature_hex, public_key):
    try:
        signature = bytes.fromhex(signature_hex)  # Convert hex string back to bytes
        public_key.verify(
            signature,
            json.dumps(record, indent=4).encode(),
            hashes.SHA256()
        )
        return True
    except:
        return False

# Test verification for all records
verification_results = []
for signed_record in signed_records:
    is_valid = verify_signature(
        signed_record["record"],
        signed_record["signature"],
        public_key
    )
    verification_results.append(is_valid)

# Check if all signatures are valid
all_valid = all(verification_results)
print("All signatures verified:", all_valid)

# Step 7: Save signed records to a JSON file
with open('signed_prescriptions.json', 'w') as f:
    json.dump(signed_records, f, indent=4)

print("Signed prescriptions saved to 'signed_prescriptions.json'")

# Step 8: Load signed records from file and verify
with open('signed_prescriptions.json', 'r') as f:
    loaded_signed_records = json.load(f)

# Verify loaded records
loaded_verification_results = []
for loaded_record in loaded_signed_records:
    is_valid = verify_signature(
        loaded_record["record"],
        loaded_record["signature"],
        public_key
    )
    loaded_verification_results.append(is_valid)

# Check if all loaded signatures are valid
all_loaded_valid = all(loaded_verification_results)
print("All loaded signatures verified:", all_loaded_valid)

All signatures verified: True
Signed prescriptions saved to 'signed_prescriptions.json'
All loaded signatures verified: True


In [89]:
# Verify the signature of a specific record (e.g., first record)
first_record = signed_records[0]
is_first_record_valid = verify_signature(
    first_record["record"],
    first_record["signature"],
    public_key
)

print(f"First record signature verified: {is_first_record_valid}")

First record signature verified: True


In [90]:
# Inspect and verify a specific record
specific_record_index = 100  # Example: Check the 43rd record (index 42)
specific_record = signed_records[specific_record_index]

print("Record Details:")
print(json.dumps(specific_record["record"], indent=4))

is_specific_record_valid = verify_signature(
    specific_record["record"],
    specific_record["signature"],
    public_key
)

print(f"Signature for record #{specific_record_index} verified: {is_specific_record_valid}")

Record Details:
{
    "doctor": "Dr. Ahsan Habib",
    "patient": "Patient_101",
    "age": 34,
    "date": "2024-04-15",
    "medicine": {
        "name": "Gabapentin",
        "dose": "300mg",
        "frequency": "3 times a day"
    },
    "remarks": "Avoid alcohol."
}
Signature for record #100 verified: True


In [91]:
# Modify a record's data to test signature verification failure
modified_record = signed_records[0]["record"].copy()
modified_record["medicine"]["name"] = "Dapagliflozin"  # Change the medicine name

is_modified_record_valid = verify_signature(
    modified_record,
    signed_records[0]["signature"],
    public_key
)

print(f"Signature for modified record verified: {is_modified_record_valid}")  # Should print False

Signature for modified record verified: False


In [92]:
# Try to verify using the private key instead of the public key (should fail)
try:
    private_key.verify(
        bytes.fromhex(signed_records[0]["signature"]),
        json.dumps(signed_records[0]["record"], indent=4).encode(),
        hashes.SHA256()
    )
    print("Verification with private key succeeded (unexpected)")
except Exception as e:
    print(f"Verification with private key failed: {e}")  # Expected behavior

Verification with private key failed: 'cryptography.hazmat.bindings._rust.openssl.dsa.DSAPrivateKey' object has no attribute 'verify'


In [93]:
import time

# Add timestamp to each record before signing
timestamped_signed_records = []
for record in json_records:
    record_with_timestamp = record.copy()
    record_with_timestamp["timestamp"] = int(time.time())  # Add current timestamp
    
    record_bytes = json.dumps(record_with_timestamp, indent=4).encode()
    signature = private_key.sign(
        record_bytes,
        hashes.SHA256()
    )
    timestamped_signed_records.append({
        "record": record_with_timestamp,
        "signature": signature.hex()
    })

# Verify timestamped records
def verify_timestamped_signature(record, signature_hex, public_key):
    try:
        signature = bytes.fromhex(signature_hex)
        public_key.verify(
            signature,
            json.dumps(record, indent=4).encode(),
            hashes.SHA256()
        )
        return True
    except:
        return False

# Test verification for all timestamped records
timestamped_verification_results = []
for ts_record in timestamped_signed_records:
    is_valid = verify_timestamped_signature(
        ts_record["record"],
        ts_record["signature"],
        public_key
    )
    timestamped_verification_results.append(is_valid)

all_ts_valid = all(timestamped_verification_results)
print("All timestamped signatures verified:", all_ts_valid)

All timestamped signatures verified: True


In [94]:
# Save public key to a file
from cryptography.hazmat.primitives import serialization

with open('public_key.pem', 'wb') as f:
    f.write(public_key.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    ))

print("Public key saved to 'public_key.pem'")

Public key saved to 'public_key.pem'


In [95]:
# Load public key from file
with open('public_key.pem', 'rb') as f:
    loaded_public_key = serialization.load_pem_public_key(f.read())

# Verify a record using the loaded public key
first_record = signed_records[0]
is_first_record_valid = verify_signature(
    first_record["record"],
    first_record["signature"],
    loaded_public_key
)

print(f"First record signature verified with loaded public key: {is_first_record_valid}")

First record signature verified with loaded public key: False


In [96]:
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization

# Generate a DSA private key
private_key = dsa.generate_private_key(key_size=2048)

# Extract the public key
public_key = private_key.public_key()

# Serialize the private key to PEM format
private_pem = private_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
)

# Serialize the public key to PEM format
public_pem = public_key.public_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
)

# Save the keys to files
with open('private_key.pem', 'wb') as f:
    f.write(private_pem)

with open('public_key.pem', 'wb') as f:
    f.write(public_pem)

In [97]:
# import json
# from cryptography.hazmat.primitives import hashes
# from cryptography.hazmat.primitives.asymmetric import dsa
# from cryptography.hazmat.primitives import serialization

# # Load the private key
# with open('private_key.pem', 'rb') as key_file:
#     private_key = serialization.load_pem_private_key(
#         key_file.read(),
#         password=None
#     )

# # Sample prescriptions (replace with your actual data)
# prescriptions = [
#     {
#         "record": {
#             "doctor": "Dr. Sarah Khan",
#             "patient": "Patient_44",
#             "age": 52,
#             "date": "2024-08-28",
#             "medicine": {
#                 "name": "Paracetamol",
#                 "dose": "100mg",
#                 "frequency": "6 times a day"
#             },
#             "remarks": "Take before food."
#         },
#         "signature": "30450221009de05a8ab585e70732c4709e97284e766fcc0afbf32855ef92ac033a0dc9e845022005fa4fdbfe7acafc1121ba882e8ed0acc107f84bc87d1982a3b3ad44eb3fef1b"
#     },
#     {
#         "record": {
#             "doctor": "Dr. Ahsan Habib",
#             "patient": "Patient_45",
#             "age": 34,
#             "date": "2024-08-20",
#             "medicine": {
#                 "name": "Carvedilol",
#                 "dose": "6.25mg",
#                 "frequency": "2 times a day"
#             },
#             "remarks": "Take before food."
#         },
#         "signature": "30450220265c430fe083ec8db5a48214cc89ecd3c98136a0b52612e3c4d725089cd9989c022100b06003a8bb5633c1fc1758ddb533623c48457dee2ed365c3b4e6bbfb00bfc30a"
#     }
# ]


# # Sign each prescription
# for prescription in prescriptions:
#     record_data = prescription['record']
#     canonical_data = json.dumps(record_data, separators=(',', ':'), sort_keys=True).encode('utf-8')
#     signature = private_key.sign(canonical_data, hashes.SHA256())
#     prescription['signature'] = signature.hex()
    

# prescriptions[0]['record']['patient'] = "Tampered_Patient"


# # Save the signed prescriptions to a JSON file
# with open('signed_prescriptions.json', 'w', encoding='utf-8') as file:
#     json.dump(prescriptions, file, indent=4)

In [98]:
# import json
# from cryptography.hazmat.primitives import hashes
# from cryptography.hazmat.primitives.asymmetric import dsa
# from cryptography.exceptions import InvalidSignature
# from cryptography.hazmat.primitives import serialization

# def verify_prescription_signatures(json_file_path, public_key_pem_path):
#     # Load the DSA public key from the provided PEM file path
#     try:
#         with open(public_key_pem_path, 'rb') as key_file:
#             public_key = serialization.load_pem_public_key(key_file.read())
#         print("Successfully loaded public key")
#     except Exception as e:
#         return [f"Error loading public key: {str(e)}"]
    
#     # Read the JSON file containing prescriptions
#     try:
#         with open(json_file_path, 'r', encoding='utf-8') as file:
#             prescriptions = json.load(file)
#             print(f"Successfully parsed JSON, found {len(prescriptions)} prescription(s)")
#     except Exception as e:
#         return [f"Error reading file: {str(e)}"]
    
#     verification_results = []
#     verified_count = 0
#     failed_count = 0
    
#     # Verify each prescription
#     for idx, prescription in enumerate(prescriptions, 1):
#         try:
#             if idx % 1000 == 0:  # Progress indicator
#                 print(f"Processing prescription {idx}...")
            
#             # Extract the record and signature
#             signature_hex = prescription.get('signature')
#             record_data = prescription.get('record')
            
#             if not signature_hex or not record_data:
#                 failed_count += 1
#                 verification_results.append(f"Prescription {idx}: Missing signature or record")
#                 continue
            
#             # Ensure consistent JSON formatting - must match exactly how it was signed
#             canonical_data = json.dumps(record_data, separators=(',', ':'), sort_keys=True).encode('utf-8')
            
#             try:
#                 # Convert signature from hex to bytes
#                 signature = bytes.fromhex(signature_hex)
                
#                 # Verify the signature using SHA256
#                 public_key.verify(
#                     signature,
#                     canonical_data,
#                     hashes.SHA256()
#                 )
#                 verified_count += 1
#                 if idx <= 5:  # Show first 5 successful verifications
#                     verification_results.append(f"Prescription {idx}: Signature verified successfully!")
#                     # Print details for debugging
#                     print(f"\nSuccessfully verified prescription {idx}:")
#                     print(f"Doctor: {record_data['doctor']}")
#                     print(f"Patient: {record_data['patient']}")
#                     print(f"Medicine: {record_data['medicine']['name']}")
#             except InvalidSignature:
#                 failed_count += 1
#                 if idx <= 5:  # Show first 5 failures
#                     verification_results.append(f"Prescription {idx}: Invalid signature")
                
#         except Exception as e:
#             failed_count += 1
#             if idx <= 5:
#                 verification_results.append(f"Prescription {idx}: Error - {str(e)}")
    
#     # Add summary information
#     verification_results.append(f"\nVerification Summary:")
#     verification_results.append(f"Total prescriptions processed: {len(prescriptions)}")
#     verification_results.append(f"Successfully verified: {verified_count}")
#     verification_results.append(f"Failed verifications: {failed_count}")
#     verification_results.append(f"Success rate: {(verified_count/len(prescriptions))*100:.2f}%")
    
#     return verification_results

# # Use the function with specific file paths
# try:
#     print("Starting verification process...")
#     results = verify_prescription_signatures(
#         'C:\\Users\\saniu\\Machine Learning 2025\\Block Chain\\signed_prescriptions.json',
#         'C:\\Users\\saniu\\Machine Learning 2025\\Block Chain\\public_key.pem'
#     )
#     print("\nVerification Results:")
#     for result in results:
#         print(result)
# except Exception as e:
#     print(f"Fatal error: {str(e)}")