In [1]:
from datetime import datetime
import numpy as np
import pandas as pd
from store import *

In [2]:
# Can be downloaded from https://mega.nz/file/rOJmhLIR#5j7wko32Mh0MlsQnC9yVG6jCvPql7Isqcyvgh3kmxKk
# See cell one before the last for avoiding this download
pre_checkpoint_store = Store(r'/home/pool/data/kaspa-data-22-11-21-correct-utxo-commit')
# The current node's datadir
current_store = Store(r'/home/pool/.kaspad/kaspa-mainnet/datadir2')

In [3]:
'''
Builds a sample of headers throughout the history
'''
def build_header_samples(store):
    # We first read the historic pruning point list. This gives
    # us a per pruning-point period sample throughout history (~ 1 or 2 a day)
    samples = store.pruning_points_chain()
    # Pruning points are returned from later to earlier so we reverse
    samples.reverse()
    # Get the chain index of the last block (aka the current pruning point)
    low = store.get_chain_block_index_by_hash(samples[-1])
    # Get the index of the highest chain block (aka the sink/virtual-selected-parent)
    high = store.get_highest_chain_block_index()
    # Add a few more samples from recent data
    for i in range(low + (high - low) // 3, high + 1, (high - low) // 3):
        samples.append(store.get_chain_block_hash_by_index(i))
    return [store.get_raw_header(h) for h in samples]

In [4]:
'''
Estimate timestamp from DAA score by interpolating over the samples
'''
def estimate_timestamp(samples, daa_score):
    if daa_score < samples[0].daaScore or daa_score > samples[-1].daaScore:
        raise '{} is out of range'.format(daa_score)
    # Optimization: this search can be done with a binary search 
    # since DAA score is monotonically increasing over the chain
    for i in range(len(samples) - 1):
        current, next = samples[i], samples[i+1]
        if daa_score >= current.daaScore and daa_score < next.daaScore:
            frac = (daa_score - current.daaScore) / (next.daaScore - current.daaScore)
            interpolated_timestamp = int(current.timeInMilliseconds + 
                                         (next.timeInMilliseconds - current.timeInMilliseconds) * frac)
            return datetime.fromtimestamp(interpolated_timestamp // 1000)
    raise 'unreachable'

In [5]:
pre_checkpoint_samples = build_header_samples(pre_checkpoint_store)
current_samples = build_header_samples(current_store)
# Combine samples from both network phases
samples = pre_checkpoint_samples + current_samples

In [6]:
estimate_timestamp(samples, 55), estimate_timestamp(samples, 55137666), estimate_timestamp(samples, 55137650)

(datetime.datetime(2021, 11, 7, 17, 27, 23),
 datetime.datetime(2023, 8, 9, 13, 36, 32),
 datetime.datetime(2023, 8, 9, 13, 36, 16))

In [7]:
# Print the (DAA score, timestamp) tuples from the pre-halt store.
# Since these values are fixed, one can simply use this list
for header in pre_checkpoint_samples:
    print(header.daaScore, header.timeInMilliseconds)

0 1636298787842
87133 1636386662010
176797 1636473700804
264837 1636560706885
355974 1636650005662
445152 1636737841327
536709 1636828600930
624635 1636912614350
712234 1636999362832
801831 1637088292662
890716 1637174890675
978396 1637260956454
1068387 1637349078269
1139626 1637418723538
1218320 1637495941516
1312860 1637609671037


In [8]:
# Close the opened resources
pre_checkpoint_store.close()
current_store.close()