### BTC to NEO4J

### Import libs and credentials

In [58]:
from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException
import json
import itertools
import pandas as pd
import time
from neo4j import GraphDatabase
import pyarrow.parquet as pq
import numpy as np
import os



In [2]:
secrets_file = ".secrets/credentials.json"

with open(secrets_file, "r") as file:
    credentials = json.load(file)


# Bitcoin RPC connection
rpc_user = credentials["BitcoinRPC"]["rpc_user"]
rpc_password = credentials["BitcoinRPC"]["rpc_password"]

# Neo4j connection
username = credentials["Neo4j"]["username"]
password = credentials["Neo4j"]["password"]

### RPC credetials - consider btc.config

In [65]:
# Setup Bitcoin RPC connection
rpc_user = rpc_user
rpc_password = rpc_password
rpc_host = "localhost"
rpc_port = "8332"
rpc_connection = AuthServiceProxy(f"http://{rpc_user}:{rpc_password}@{rpc_host}:{rpc_port}")

### Neo4J local credentials

In [5]:
# Neo4j connection
uri = "bolt://localhost:7687" 
# username = "tester"      # 
# password = "testusxxx"

### Parsing function from RPC Bitcoin core and writing to parquet

In [67]:



def process_chunk(start, end):
    '''
    Function for reading, parsing and writing from the BTC RPC

    Args:

        start: (int) Blocknumber start
        end: (int) Blocknumber end
    
    '''

    start_time = time.time()  
    output_dir = "blocks"

    # Create the output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
  


    results_in = []
    results_out = []
    for b in range(start, end):
        blockhash = rpc_connection.getblockhash(b)
        block = rpc_connection.getblock(blockhash)

        for txid in block['tx']:
            raw_tx = rpc_connection.getrawtransaction(txid, 1, blockhash)
            tiempo = raw_tx['time']
            transactionid = raw_tx['txid']

            for i, vin in enumerate(raw_tx['vin']):
                try:
                    in_n = vin['vout']
                    vin_tx = rpc_connection.getrawtransaction(vin['txid'], 1)
                    in_vout = vin_tx['vout'][in_n]
                    in_value = in_vout['value']
                    in_address = in_vout['scriptPubKey']['address']
                    in_type = in_vout['scriptPubKey']['type']
                    in_asm = in_vout['scriptPubKey']['asm']

                    results_in.append({"tx":transactionid,"address_in": in_address,"input_pos":i, "value_in": in_value, "time":tiempo})
                except:
                    pass

            for vout_out in raw_tx['vout']:
                try:
                    out_n = vout_out['n']
                    out_value = vout_out['value']
                    out_address = vout_out['scriptPubKey']['address']
                    out_type = vout_out['scriptPubKey']['type']
                    out_asm = vout_out['scriptPubKey']['asm']

                    results_out.append({"tx":transactionid,"address_out": out_address,"output_pos":out_n, "value_out": out_value, "time":tiempo})
                except:
                    pass

    results_total = results_in + results_out
    df_export = pd.DataFrame(results_total)
    df_export.to_parquet(f'{output_dir}/export_blok_{start}_to_{end}.parquet',engine="pyarrow")

    end_time = time.time()  
    print(f"Processed block {start} to {end} in {end_time - start_time} seconds")

    # Save the last processed block number
    with open(f"{output_dir}/last_processed_block.txt", "w") as file:
        #file.write(str(end))
        file.write(f"{blockStart}, {chunk_size}, {end}")

    




# # Read the last processed block number from a file
# try:
#     with open("last_processed_block.txt", "r") as file:
#         blockStart = int(file.read())
# except FileNotFoundError:
#     pass  


output_dir = "blocks"

try:
    with open(f"{output_dir}/last_processed_block.txt", "r") as file:
        content = file.read()
        parts = content.split(', ')
        blockStart = int(parts[2])
except:
    pass
   

print('Blockstart',blockStart)

##### Start Parsing
#blockStart = 111000
blockLimit = 116000
chunk_size = 1000  # Define your chunk size here

#blockStart = blockStart+1

# Process the blockchain in chunks
try:
    for i in range(blockStart, blockLimit, chunk_size):
        process_chunk(i, min(i + chunk_size, blockLimit))
except KeyboardInterrupt:
    print("CTL+C, resume later")


Blockstart 113000
Processed block 113000 to 114000 in 42.19905114173889 seconds
Processed block 114000 to 115000 in 69.95119500160217 seconds
Processed block 115000 to 116000 in 95.35189366340637 seconds


### Import to NEO4J

In [None]:
# with open(f"export_blok_{blockStart}_to_{blockLimit}.txt", 'w+') as f:
#     df_string = df_export.to_string(header=False, index=False)
#     f.write(df_string)

# f.close()

#### Creating neo4j connection

In [19]:
driver = GraphDatabase.driver(uri, auth=(username, password))
session = driver.session(database="testdb")  ### change name of Neo4J database

### Importing files

In [20]:
# Without function
# df_import = pd.read_parquet(f"export_blok_{blockStart}_to_{blockLimit}.parquet")
# df_sort = df_import.sort_values(by=['tx'],ascending=False)
# df_sort.replace(to_replace=[None], value=np.nan, inplace=True)
# df = df_sort


## reading chunks
#Old
# def read_chunk(blockStart, blockEnd):
#     df_import = pd.read_parquet(f"export_blok_{blockStart}_to_{blockEnd}.parquet")
#     df_sort = df_import.sort_values(by=['tx'],ascending=False)
#     df_sort.replace(to_replace=[None], value=np.nan, inplace=True)
#     df = df_sort
#     return df

##### Reading based on chunksize

In [68]:
def read_chunks(blockStart, blockEnd, chunk_size):
    dfs = []
    output_dir = "blocks"
    for i in range(blockStart, blockEnd, chunk_size):
        start = i
        end = min(i + chunk_size, blockEnd)
        df = pd.read_parquet(f"{output_dir}/export_blok_{start}_to_{end}.parquet")
        dfs.append(df)
    return pd.concat(dfs, ignore_index=True)

In [22]:
# blockStart = 105000
# blockEnd = 110000
# chunk_size = 1000

# df = read_chunks(blockStart, blockEnd, chunk_size)


### Create pd.Serie of the unique addresses

In [23]:
def address_transformation (df):
    addresses = pd.concat([df["address_in"], df["address_out"]]).drop_duplicates()
    return addresses

### Create the tx format for writing to neo4j

In [24]:
# OLD
# def tx_ftransformation0 (df):

#     # unique transaction IDs
#     transaction_ids = df["tx"].dropna().unique()


#     transactions = []

#     # Iterate over the transaction IDs
#     for transaction_id in transaction_ids:
#         # Filter the DataFrame for the current transaction ID
#         df_transaction = df[df["tx"] == transaction_id]

#         # Extract the inputs
#         inputs = []
#         df_inputs = df_transaction.dropna(subset=["address_in"])
#         for _, row in df_inputs.iterrows():
#             address = row["address_in"]
#             amount = row["value_in"]
#             if pd.notnull(address) and pd.notnull(amount):
#                 inputs.append({"address": address, "amount": amount})

#         # Calculate the total amount
#         total_amount = sum([float(input_data["amount"]) for input_data in inputs])

#         #### COMPARING INPUT vs OUTPUT Sum ... difference are fees

#         # Extract the outputs
#         outputs = []
#         df_outputs = df_transaction.dropna(subset=["address_out"])
#         for _, row in df_outputs.iterrows():
#             address = row["address_out"]
#             amount = row["value_out"]
#             if pd.notnull(address) and pd.notnull(amount):
#                 outputs.append({"address": address, "amount": amount})

#         # Create the transaction dictionary
#         transaction_data = {
#             "transaction_id": transaction_id,
#             "total_amount": total_amount,
#             "inputs": inputs,
#             "outputs": outputs
#         }

#         transactions.append(transaction_data)
        
#     return transactions

In [29]:
def tx_ftransformation(df):
    # Unique transaction IDs
    transaction_ids = df["tx"].dropna().unique()

    transactions = []

    # Iterate over the transaction IDs
    for transaction_id in transaction_ids:
        # Filter the DataFrame for the current transaction ID
        df_transaction = df[df["tx"] == transaction_id]

        # Extract the inputs
        inputs = []
        df_inputs = df_transaction.dropna(subset=["address_in"])
        for _, row in df_inputs.iterrows():
            address = row["address_in"]
            amount = row["value_in"]
            time = row["time"]  # Added the 'time' field for inputs
            if pd.notnull(address) and pd.notnull(amount):
                inputs.append({"address": address, "amount": amount, "time": time})

        # Calculate the total amount
        total_amount = sum([float(input_data["amount"]) for input_data in inputs])

        # Extract the outputs
        outputs = []
        df_outputs = df_transaction.dropna(subset=["address_out"])
        for _, row in df_outputs.iterrows():
            address = row["address_out"]
            amount = row["value_out"]
            time = row["time"]  # Added the 'time' field for outputs
            if pd.notnull(address) and pd.notnull(amount):
                outputs.append({"address": address, "amount": amount, "time": time})

        # Create the transaction dictionary
        transaction_data = {
            "transaction_id": transaction_id,
            "total_amount": total_amount,
            "inputs": inputs,
            "outputs": outputs
        }

        transactions.append(transaction_data)

    return transactions


### Write to Neo4J

In [46]:
# # Create unique constraint on Address nodes
# session.run("CREATE CONSTRAINT IF NOT EXISTS ON (a:Address) ASSERT a.address IS UNIQUE")
# session.run("CREATE CONSTRAINT IF NOT EXISTS ON (t:Transaction) ASSERT t.transaction_id IS UNIQUE")

# # Filter out null addresses
# filtered_addresses = addresses.dropna()

# # Create the Address nodes
# for address in filtered_addresses:
#     session.run(
#         """
#         MERGE (:Address {address: $address})
#         """,
#         address=address
#     )

# # Create the transaction nodes and relationships
# for transaction in transactions:
#     transaction_id = transaction["transaction_id"]
#     total_amount = transaction["total_amount"]
#     inputs = transaction["inputs"]
#     outputs = transaction["outputs"]
    
#     # Create the Transaction node
#     session.run(
#         """
#         CREATE (t:Transaction {transaction_id: $transaction_id, total_amount: $total_amount})
#         """,
#         transaction_id=transaction_id,
#         total_amount=total_amount
#     )
    
#     # Create the input relationships with individual amounts
#     for input_data in inputs:
#         input_address = input_data["address"]
#         input_amount = float(input_data["amount"])  # Convert amount to float
        
#         session.run(address_transformation
#             MATCH (a:Address {address: $input_address})
#             MATCH (t:Transaction {transaction_id: $transaction_id})
#             MERGE (a)-[:TRANSACTION_INPUT {amount: $input_amount}]->(t)
#             """,
#             input_address=input_address,
#             transaction_id=transaction_id,
#             input_amount=input_amount
#         )
    
#     # Create the output relationships with individual amounts
#     for output_data in outputs:
#         output_address = output_data["address"]
#         output_amount = float(output_data["amount"])  # Convert amount to float
        
#         session.run(
#             """
#             MATCH (a:Address {address: $output_address})
#             MATCH (t:Transaction {transaction_id: $transaction_id})
#             MERGE (t)-[:TRANSACTION_OUTPUT {amount: $output_amount}]->(a)
#             """,
#             output_address=output_address,
#             transaction_id=transaction_id,
#             output_amount=output_amount
#         )


In [25]:
## OLD

# def write_to_neo0 (session, addresses, transactions):

#     # Create unique constraint on Address nodes
#     session.run("CREATE CONSTRAINT IF NOT EXISTS ON (a:Address) ASSERT a.address IS UNIQUE")
#     session.run("CREATE CONSTRAINT IF NOT EXISTS ON (t:Transaction) ASSERT t.transaction_id IS UNIQUE")

#     filtered_addresses = addresses.dropna()
#     # Create the Address nodes
#     for address in filtered_addresses:
#         session.run(
#             """
#             MERGE (:Address {address: $address})
#             """,
#             address=address
#         )

#     # Create the transaction nodes and relationships
#     for transaction in transactions:
#         transaction_id = transaction["transaction_id"]
#         total_amount = transaction["total_amount"]
#         inputs = transaction["inputs"]
#         outputs = transaction["outputs"]
        
#         # Check if the Transaction node already exists
#         result = session.run(
#             """
#             MATCH (t:Transaction {transaction_id: $transaction_id})
#             RETURN count(t) AS count
#             """,
#             transaction_id=transaction_id
#         )
        
#         count = result.single()["count"]
        
#         if count == 0:
#             # Create the Transaction node
#             session.run(
#                 """
#                 CREATE (t:Transaction {transaction_id: $transaction_id, total_amount: $total_amount})
#                 """,
#                 transaction_id=transaction_id,
#                 total_amount=total_amount
#             )
        
#         # Create the input relationships with individual amounts
#         for input_data in inputs:
#             input_address = input_data["address"]
            
#             input_amount = float(input_data["amount"])  

#             session.run(
#                 """
#                 MATCH (a:Address {address: $input_address})
#                 MATCH (t:Transaction {transaction_id: $transaction_id})
#                 MERGE (a)-[:TRANSACTION_INPUT {amount: $input_amount}]->(t)
#                 """,
#                 input_address=input_address,
#                 transaction_id=transaction_id,
#                 input_amount=input_amount
#             )
        
#         # Create the output relationships 
#         for output_data in outputs:
#             output_address = output_data["address"]

#             output_amount = float(output_data["amount"])  
#             output_time = 
            
#             session.run(
#                 """
#                 MATCH (a:Address {address: $output_address})
#                 MATCH (t:Transaction {transaction_id: $transaction_id})
#                 MERGE (t)-[:TRANSACTION_OUTPUT {amount: $output_amount}]->(a)
#                 """,
#                 output_address=output_address,
#                 transaction_id=transaction_id,
#                 output_amount=output_amount
#             )

#     return True

In [37]:
from datetime import datetime

def write_to_neo(session, addresses, transactions):

    # Create unique constraint on Address nodes
    session.run("CREATE CONSTRAINT IF NOT EXISTS ON (a:Address) ASSERT a.address IS UNIQUE")
    session.run("CREATE CONSTRAINT IF NOT EXISTS ON (t:Transaction) ASSERT t.transaction_id IS UNIQUE")

    filtered_addresses = addresses.dropna()
    # Create the Address nodes
    for address in filtered_addresses:
        session.run(
            """
            MERGE (:Address {address: $address})
            """,
            address=address
        )

    # Create the transaction nodes and relationships
    for transaction in transactions:
        transaction_id = transaction["transaction_id"]
        total_amount = transaction["total_amount"]
        inputs = transaction["inputs"]
        outputs = transaction["outputs"]
        
        # Check if the Transaction node already exists
        result = session.run(
            """
            MATCH (t:Transaction {transaction_id: $transaction_id})
            RETURN count(t) AS count
            """,
            transaction_id=transaction_id
        )
        
        count = result.single()["count"]
        
        if count == 0:
            # Create the Transaction node
            session.run(
                """
                CREATE (t:Transaction {transaction_id: $transaction_id, total_amount: $total_amount})
                """,
                transaction_id=transaction_id,
                total_amount=total_amount
            )
        
        # Create the input relationships with individual amounts and time
        for input_data in inputs:
            input_address = input_data["address"]
            input_amount = float(input_data["amount"])
            input_time = input_data["time"]
            
            session.run(
                """
                MATCH (a:Address {address: $input_address})
                MATCH (t:Transaction {transaction_id: $transaction_id})
                MERGE (a)-[:TRANSACTION_INPUT {amount: $input_amount, time: $input_time}]->(t)
                """,
                input_address=input_address,
                transaction_id=transaction_id,
                input_amount=input_amount,
                input_time=input_time
            )
        
        # Create the output relationships with individual amounts and time
        for output_data in outputs:
            output_address = output_data["address"]
            output_amount = float(output_data["amount"])
            output_time = output_data["time"]  # Convert timestamp to human-readable time
            
            session.run(
                """
                MATCH (a:Address {address: $output_address})
                MATCH (t:Transaction {transaction_id: $transaction_id})
                MERGE (t)-[:TRANSACTION_OUTPUT {amount: $output_amount, time: $output_time}]->(a)
                """,
                output_address=output_address,
                transaction_id=transaction_id,
                output_amount=output_amount,
                output_time=output_time
            )

    return True


### Complete flow writing Neo4j

In [69]:
# blockStart = 104001
# blockEnd = 105000
#df = read_chunk(blockStart, blockEnd)

##Processed block 112000 to 113000 in 25.98978304862976 seconds
##Processed block 113000 to 114000 in 43.11733293533325 seconds
##

blockStart = 113000
blockEnd = 116000
chunk_size = 1000

df = read_chunks(blockStart, blockEnd, chunk_size)

addresses = address_transformation(df)
transactions = tx_ftransformation(df)

In [33]:
transactions

[{'transaction_id': '69bfa9995a2e587e6b16e3d0292b79f5ab0cfe50ceb1bfbd80a0d30888c733e7',
  'total_amount': 6.0,
  'inputs': [{'address': '15mbsahyyG8UUPeX8BgAzntXgbbpDSCvKd',
    'amount': Decimal('6.00000000'),
    'time': 1299335141}],
  'outputs': [{'address': '1CLb2Xhd6Bmw9iX44iSdVzSoGxyyDBB7Bq',
    'amount': Decimal('4.00000000'),
    'time': 1299335141},
   {'address': '197knBpk846nki3R51rUCUAeLobKngKsVe',
    'amount': Decimal('2.00000000'),
    'time': 1299335141}]},
 {'transaction_id': '23c6e93f53b42de620ea663b151c9651802cc9dbf30f575f5dd4891eec80386c',
  'total_amount': 79.29,
  'inputs': [{'address': '1CAmvuizVuANavWSvAYqxypfTKFxz2cSPg',
    'amount': Decimal('79.29000000'),
    'time': 1299335141}],
  'outputs': [{'address': '16KADGbf7Lna9T3ohk8rGw5V418Xqv3krh',
    'amount': Decimal('79.23000000'),
    'time': 1299335141},
   {'address': '1C7QAuYEpXLWbVbtgD9EDY2khtNqdHe7V4',
    'amount': Decimal('0.05000000'),
    'time': 1299335141}]},
 {'transaction_id': '846067070c45b8d

In [None]:
df

Unnamed: 0,tx,address_in,input_pos,value_in,time,address_out,output_pos,value_out
0,69bfa9995a2e587e6b16e3d0292b79f5ab0cfe50ceb1bf...,15mbsahyyG8UUPeX8BgAzntXgbbpDSCvKd,0.0,6.00000000,1299335141,,,
1,23c6e93f53b42de620ea663b151c9651802cc9dbf30f57...,1CAmvuizVuANavWSvAYqxypfTKFxz2cSPg,0.0,79.29000000,1299335141,,,
2,846067070c45b8de8e30b676dd8491accb6a4987e629d8...,16KADGbf7Lna9T3ohk8rGw5V418Xqv3krh,0.0,79.23000000,1299335141,,,
3,19f4273b6d5bed678a6a8a225ec8b16838428e7a785a20...,19zzW2b2odPWPmX8m4Y3zgakUWHEEUf5Kk,0.0,79.17000000,1299335141,,,
4,a82882d74e52a7f2c99fde01e3c667854f9c8fb6b3632c...,19HY4dPZVLepSSviawjRbBwYiP329HnvpX,0.0,79.11000000,1299335141,,,
...,...,...,...,...,...,...,...,...
88488,8886e13c7c0f186ab5cf90bee0a585934e42fde805720d...,,,,1300441137,1MwKBcSCE1gQaCmK3wADPdQdMz37VB21cf,1.0,3.81000000
88489,6b7941d4ed25205ca1d7d5816bcf0125f1cd1860d5c363...,,,,1300441137,1NhDsbzw61UETUArd2oGDZS6nsFuBopdb8,0.0,0.18000000
88490,6b7941d4ed25205ca1d7d5816bcf0125f1cd1860d5c363...,,,,1300441137,19FSMkQBcqaLFJ1aX8YgyZxA91FhBCNkSB,1.0,2.25000000
88491,2820aee05bd4cf8cb9120386c54dc21c2ef895d3e881cf...,,,,1300441137,1CK4JK9uVw5oum2P2CqXBiKjDRZiTChQ5t,0.0,2.27000000


In [39]:
result = write_to_neo(session, addresses, transactions)
if result:
    print ("Blocks written to neo4j")
else: 
    print ("there occured an error in writing to neo4j db")

Blocks written to neo4j


##### Useful NEO4J Snippets

MATCH (n)
DETACH DELETE n



RETURN datetime({epochSeconds: 1299336076000 / 1000}) AS theDate



MATCH (s:Address {address: '1918yyUX3HEftoWZHq9qgKxWPQmCecCtJY'}), (t:Address {address: '1Jfz35JognNZRLAKbp8BnCuUWMmExQotf7'})
MATCH path = shortestPath((s)-[:TRANSACTION_OUTPUT|TRANSACTION_INPUT*1..10]-(t))
RETURN path



13MfK8Wsxz1eoZLBrYUShxZ29NtT3XNWSA
1EpEE1rp1nWqo6YP53w7rFgPbCZj1wCXL8

address: 1918yyUX3HEftoWZHq9qgKxWPQmCecCtJY
address: 1Jfz35JognNZRLAKbp8BnCuUWMmExQotf7