In [1]:
import json
from websocket import WebSocketApp
from graph_utils import  insert_transaction
from config import BLOCKCHAIN_WS_URL,BLOCKSTREAM_API
import requests
import time
import os
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from tqdm import tqdm

In [2]:
FINAL_JSON_PATH = "bitcoin_transactions_backup.json"

FOR GETTING THE LATEST BLOCK TRANSACTIONS

In [3]:
CONFIRMED_OUTPUT_FILE = "bitcoin_confirmed_transactions_data.jsonl"

# Robust session with retries
session = requests.Session()
retries = Retry(
    total=5,
    backoff_factor=1,
    status_forcelist=[429, 500, 502, 503, 504],
    allowed_methods=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("http://", adapter)
session.mount("https://", adapter)

def safe_get(url, **kwargs):
    try:
        response = session.get(url, timeout=10, **kwargs)
        response.raise_for_status()
        return response
    except requests.exceptions.RequestException as e:
        print(f"Request failed for {url}: {e}")
        return None

def get_latest_block_height():
    r = safe_get(f"{BLOCKSTREAM_API}/blocks/tip/height")
    if r:
        return int(r.text)
    return None

def get_block_hash(height):
    r = safe_get(f"{BLOCKSTREAM_API}/block-height/{height}")
    if r:
        return r.text
    return None

def get_block_time(block_hash):
    r = safe_get(f"{BLOCKSTREAM_API}/block/{block_hash}")
    if r:
        return r.json().get('timestamp')
    return None

def get_block_txids(block_hash):
    r = safe_get(f"{BLOCKSTREAM_API}/block/{block_hash}/txids")
    if r:
        return r.json()
    return []

def get_tx(txid):
    r = safe_get(f"{BLOCKSTREAM_API}/tx/{txid}")
    if r:
        return r.json()
    return None

def filter_and_format_tx(tx):
    status = tx.get("status", {})
    return {
        "txid": tx["txid"],
        "status": {
            "confirmed": status.get("confirmed", False),
            "block_height": status.get("block_height"),
            "block_hash": status.get("block_hash"),
            "block_time": status.get("block_time")
        },
        "vin": [
            {
                "prevout": {
                    "scriptpubkey_address": vin["prevout"].get("scriptpubkey_address"),
                    "value": vin["prevout"].get("value")
                } if vin.get("prevout") else {}
            } for vin in tx.get("vin", [])
        ],
        "vout": [
            {
                "scriptpubkey_address": vout.get("scriptpubkey_address"),
                "value": vout.get("value")
            } for vout in tx.get("vout", [])
        ]
    }

def fetch_and_store_confirmed_transactions(last_height, output_file=CONFIRMED_OUTPUT_FILE, seen_blocks=None):
    if seen_blocks is None:
        seen_blocks = set()
    current_height = get_latest_block_height()
    if current_height is None:
        print("Could not get latest block height. Retrying in 10 seconds...")
        time.sleep(10)
        return last_height, seen_blocks

    for height in range(last_height, current_height + 1):
        if height in seen_blocks:
            continue
        block_hash = get_block_hash(height)
        if not block_hash:
            continue
        block_time = get_block_time(block_hash)
        print(f"Processing block {height} at {block_time}...")
        txids = get_block_txids(block_hash)
        for txid in tqdm(txids):
            tx = get_tx(txid)
            if tx:
                tx_struct = filter_and_format_tx(tx)
                with open(output_file, "a", encoding="utf-8") as f:
                    f.write(json.dumps(tx_struct) + "\n")
            time.sleep(0.05)
        seen_blocks.add(height)
        print(f"Block {height} processed with {len(txids)} transactions.")
        time.sleep(0.2)

    last_height = current_height + 1
    return last_height, seen_blocks

if __name__ == "__main__":
    seen_blocks = set()
    last_height = get_latest_block_height()
    if last_height is None:
        print("Could not get latest block height. Exiting.")
        exit(1)
    print(f"Starting from block height {last_height}")

    last_height, seen_blocks = fetch_and_store_confirmed_transactions(last_height, seen_blocks=seen_blocks)
    print("Sleeping before next poll...")


Starting from block height 896614
Processing block 896614 at 1747187102...


  4%|â–Ž         | 45/1270 [00:06<02:44,  7.45it/s]


KeyboardInterrupt: 

WEB SOCKET API - For Fetching the Unconfirmed Transactions in Real time

In [4]:

TMP_JSONL_PATH = "bitcoin_transactions_realtime_tmp.jsonl"


def format_unconfirmed_tx(tx_raw):
    return {
        "txid": tx_raw.get("hash"),
        "status": {
            "confirmed": False,
            "block_height": None,
            "block_hash": None,
            "block_time": None
        },
        "vin": [
            {
                "prevout": {
                    "scriptpubkey_address": vin.get("prev_out", {}).get("addr"),
                    "value": vin.get("prev_out", {}).get("value")
                } if vin.get("prev_out") else {}
            } for vin in tx_raw.get("inputs", [])
        ],
        "vout": [
            {
                "scriptpubkey_address": vout.get("addr"),
                "value": vout.get("value")
            } for vout in tx_raw.get("out", [])
        ]
    }

def handle_message(message):
    try:
        data = json.loads(message)
        tx_raw = data.get("x", {})
        tx_data = format_unconfirmed_tx(tx_raw)
        # Write each tx as a line to a temp JSONL file
        with open(TMP_JSONL_PATH, "a", encoding="utf-8") as f:
            f.write(json.dumps(tx_data) + "\n")
        insert_transaction(tx_data)
        print(f"Ingested TX: {tx_data['txid']} | Confirmed: {tx_data['status']['confirmed']}")
    except Exception as e:
        print(f"Error processing message: {e}")

def on_open(ws):
    print("Subscribed to unconfirmed transactions")
    ws.send(json.dumps({"op": "unconfirmed_sub"}))

def merge_jsonl_to_master(jsonl_path, master_path):
    # Load existing master file (list of dicts)
    if os.path.exists(master_path):
        with open(master_path, "r", encoding="utf-8") as f:
            try:
                master_list = json.load(f)
            except Exception:
                master_list = []
    else:
        master_list = []
    master_txids = set(tx["txid"] for tx in master_list if "txid" in tx)

    # Read all new transactions from the temp JSONL file
    new_txs = []
    if os.path.exists(jsonl_path):
        with open(jsonl_path, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    tx = json.loads(line)
                    if tx.get("txid") not in master_txids:
                        new_txs.append(tx)
                        master_txids.add(tx.get("txid"))
                except Exception:
                    continue

    # Merge and save
    if new_txs:
        master_list.extend(new_txs)
        with open(master_path, "w", encoding="utf-8") as f:
            json.dump(master_list, f, indent=2)
        print(f"Merged {len(new_txs)} new transactions into {master_path}")
    else:
        print("No new transactions to merge.")

    # Remove temp file
    if os.path.exists(jsonl_path):
        os.remove(jsonl_path)

if __name__ == "__main__":
    print("Starting real-time Bitcoin transaction ingestion...")
    ws = WebSocketApp(
        BLOCKCHAIN_WS_URL,
        on_open=on_open,
        on_message=lambda ws, msg: handle_message(msg),
        on_error=lambda ws, err: print(f"WebSocket error: {err}"),
        on_close=lambda ws, code, msg: print("WebSocket closed")
    )
    try:
        ws.run_forever()
    except KeyboardInterrupt:
        print("\nInterrupted! Merging session transactions into master file...")
        merge_jsonl_to_master(TMP_JSONL_PATH, FINAL_JSON_PATH)
        print("Safe exit. All transactions are now in the master JSON list.")


Starting real-time Bitcoin transaction ingestion...
Subscribed to unconfirmed transactions
Ingested TX: 41c1e4274c40a118d7f35b269cfddab3ff867a97b339d357008c2ade8d2082f6 | Confirmed: False
Ingested TX: af898bf31ef2ca6e577df5bc3934df2d47e181b47bf64d506f212919b69aa74a | Confirmed: False
Ingested TX: 3ae9737eccf0f01961e70e4f20a2882809160a2013ffa61a236f5669de57899c | Confirmed: False
Ingested TX: 65da833515c3ac7ce18ee7e36326cb4709afdf03242c2966a2dbd15a6cec79ca | Confirmed: False
Ingested TX: f3c1dcc8f2496637213d958dca3b7f92b855e400ff02953cd7a522cea2339718 | Confirmed: False
Ingested TX: 635fc26e710e4663981260486f26512afb95053a4bc63137839e48fdfc96adbd | Confirmed: False
Ingested TX: 6ab423702590db4d69fe96c28dff0585d17a0e54c6f6d017a1f61d2b5aae347a | Confirmed: False
Ingested TX: e083b3e577e013d9b2cc762cc84bf9199cb22edd34954ab839d870b0cd518ed0 | Confirmed: False
Ingested TX: 6caa211f75b6f343b169f1809f700069ac3efbe6a75b5f2eaadd7ab4e73e5296 | Confirmed: False
Ingested TX: b903a267978989a342c59cd

In [10]:
# Merging both the files and the temp file wil be deleted
merge_jsonl_to_master(CONFIRMED_OUTPUT_FILE, FINAL_JSON_PATH)
merge_jsonl_to_master(TMP_JSONL_PATH, FINAL_JSON_PATH)

Merged 1914 new transactions into bitcoin_transactions_backup.json
