# install packages

In [2]:
from neo4j import GraphDatabase
import multiprocessing as mp
import pandas as pd
import random
import string
import os
import pickle
import time
from py2neo import Graph 

# EXECUTE: WalletClustering_neo4jConnect notebook

In [None]:
#%run ./WalletClustering_neo4jConnect.ipynb # includes Neo4J connector
# methods & variables of notebook can be referenced

#session = Graph("neo4j://127.0.0.1:7687", auth=("team", "F0110wTh€M0n€y"))

In [3]:
class Neo4jConnection:
    
    def __init__(self, uri, user, pwd):
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
        except Exception as e:
            print("Failed to create the driver:", e)
        
    def close(self):
        if self.__driver is not None:
            self.__driver.close()
        
    def query(self, query, db=None):
        assert self.__driver is not None, "Driver not initialized!"
        session = None
        response = None
        try: 
            session = self.__driver.session(database=db) if db is not None else self.__driver.session() 
            response = list(session.run(query))
        except Exception as e:
            print("Query failed:", e)
        finally: 
            if session is not None:
                session.close()
        return response

conn = Neo4jConnection(uri="neo4j://127.0.0.1:7687", user="team", pwd="F0110wTh€M0n€y")

# cluster wallets

In [4]:
mihTemplate = '''
MATCH (:Address{address:"%s"})-[:SENDS]->(t:Transaction),
(walletMember:Address)-[:SENDS]->(t:Transaction)
RETURN DISTINCT walletMember
'''
#address

In [5]:
# use existing terrorAddressList if exists
if not os.path.exists('output\\terrorAddressList.pickle'):
    createTerrorAddressList()

terrorAddressList = pickle.load(open('output\\terrorAddressList.pickle', 'rb'))
print(terrorAddressList)

['12sDU3FyYJXc2oRzE6XXuuhVHCBJvaoCC8', '1348ThkNoDupq1bws95diMiL8haGs61K7M', '13iQsrwBYdrLpnitG5EV79o3PeHjH8XUBc', '13Pcmh4dKJE8Aqrhq4ZZwmM1sbKFcMQEEV', '15K9Zj1AU2hjT3ebZMtWqDsMv3fFxTNwpf', '15soXrE3NJBMkkQhrccXonTT9bpjpPvE67', '164fawNZVwsR5SamAJypvCMtkMx4Xv1B3f', '179bzhS4FY7qLDza9YjuorhWyXVVYZu2YH', '17QAWGVpFV4gZ25NQug46e5mBho4uDP6MD', '17UUXDzPGkMwWrabhtk7YCha88tSoua2Vr', '19D1iGzDr7FyAdiy3ZZdxMd6ttHj1kj6WW', '19XVEDZCGVMA9WCF1qUayxtnjUnyD7zDDQ', '1A7pDH1EdrkH9YZtsPnc8uzirBFnAN9Eay', '1BPf9qr7M5xUgNHUYtrQtEKvUKcyERzXao', '1C6hetVWVXZnS6P2BYBNu5Y1ZJ57JyXGac', '1DrhHEkv42JVwiDQNi28JFdSuiSGgPNXwP', '1EDcKCRypUTFoTZbxDWF9MBAT4W7XUGB32', '1EfmRn6Bp3cjrTBubaH8MzRRc2ikSjNGXw', '1EnX6BuJiGWydqXJT9BN5dSvfLg3QW4Mdz', '1EVTZmTMqZPMzGxsug9TXBtvPJZH8dXSCK', '1EYya5dfNvuYDwpeboGKBtkXzJcEHMCQXR', '1GALPyvUDDXqA6H2eHQ9Y1yidfQ6T1Drvn', '1GC2SjzCyCwxo1uxTi28oqn9L3mJj7bLPs', '1Gg25VzQkqCizXHNSNet4RoysLEe19su4s', '1JpSBaUwrZaEgmsYka7mzm9t3Z4syyaw7A', '1LhRW1msre1cFgT7fBY2BRrZ4ANMPwVj9u', '1Lm9BCDUKo

In [6]:
mihWhere = """MATCH (a:Address)-[:SENDS]->(t:Transaction), (walletMember:Address)-[:SENDS]->(t:Transaction) 
where a.address in [\"{0}\",\"{1}\",\"{2}\",\"{3}\",\"{4}\",\"{5}\",\"{6}\",\"{7}\",\"{8}\",\"{9}\"]
RETURN DISTINCT walletMember"""

In [7]:
def updateWalletAddresses(address, walletName):
  
  query = """CALL apoc.periodic.iterate( 'MATCH (a:Address {address: "%s"})-[:SENDS]->(t:Transaction), (walletMember:Address)-[:SENDS]->(t:Transaction) RETURN  walletMember',
  'set walletMember.association = "%s"', {batchSize:1000, parallel:true})""" % (address, walletName)
  return query


In [8]:
check_association = """Match (a:Address {address: '%s'}) 
where a.association is not null
return true"""

In [22]:
# Iterating through the addresses and finding all the addresses that are connected to the input address.
# Store all responses in a dictionary and instead of looping over every item and adding only new Addresses to the list,
# write all records of a response into the dictionary. They addresses are the keys
# additionally use a batched version if the amount of retrieved records is greater than 10
def iterMultiInputClustering_old(address):

    # create initial set of addresses
    walletAddresses = {address: 1}
    response = conn.query(mihTemplate % address, db='neo4j')

    # store every found address as key in the dictionary, values do not matter here, so we just pass 1
    for record in response:
       walletAddresses [record[0]._properties["address"]]= 1 
    
    i = 1
    
    while i < len(walletAddresses):

        # generate a list of the keys to get an index; this is necessary for the batching
        list_ofKeys = list(walletAddresses.keys())
        
        # if there are less than 10 addresses left between i and the maximum; then no batching is possible
        if len(walletAddresses) - i <= 10 :

            response = conn.query(mihTemplate % list_ofKeys[i], db='neo4j')
            
            # this automatically resolves duplicates. Instead of iterating over every address one by one in the list and comparing them with the existing set, 
            # this is more faster since the dictionaries are actually hash tables. So it reaches less than logarithmic runtime
            for record in response:
                walletAddresses [record[0]._properties["address"]]= 1 
            i += 1
            list_ofKeys = list(walletAddresses.keys())
        
        
       # batching 10 addresses at once to avoid querying every single transaction in the dictionary
       # only possible if there are more than 10 addresses left in the dictionary
       # question is if we can further improve this... like with 500 and a function in between that creates a string
        while 10 < len(walletAddresses) - i:

            response = conn.query(mihWhere.format(list_ofKeys[i], list_ofKeys[i+1], list_ofKeys[i+2], list_ofKeys[i+3], list_ofKeys[i+4], list_ofKeys[i+5]
            , list_ofKeys[i+6], list_ofKeys[i+7], list_ofKeys[i+8], list_ofKeys[i+9] ), db='neo4j')

            #same as above
            for record in response:
                walletAddresses [record[0]._properties["address"]]= 1
            i += 10
            list_ofKeys = list(walletAddresses.keys())
    
    source = string.ascii_letters + string.digits
    walletString = ''.join((random.choice(source) for i in range(32)))
    
    print("Updating ... "+ str(address) +", with size " + str(len(walletAddresses)) + ": " + walletString)
    list_of_Addresses = str(list(walletAddresses))
    
    query = """CALL apoc.periodic.iterate( 'UNWIND $addresses as item return item',
                    'Match (a:Address {address: item}) set a.association = "%s" return a', 
                    {batchSize:1000, parallel:true, iterateList:true, params:{addresses:%s}})""" % (walletString, list_of_Addresses)
    result = conn.query(query,db='neo4j')
    print(result)
    return walletAddresses, walletString


In [12]:
# additional remarks and examples
#print(len(wallets))
#output of 13iQsrwBYdrLpnitG5EV79o3PeHjH8XUBc:  
"""137373
Execution time Query: 3572.6374530792236 seconds
Execution time inWallet: 158.30259037017822 seconds
Execution time IF: 0.022434473037719727 seconds   | Going into statement:  2  times."""
#1EYya5dfNvuYDwpeboGKBtkXzJcEHMCQXR '1PeSDEMzi7nj1ah4YFcgnRmijWpgQqP3Yp' '1P963yWMBFkUouU2Me7cQ6136orZDD4gTf' '1KFiRjjvE4rtheuEYGo9VeDDBvGgmm7nRg' exchanges: Btc38.com-1CELa15H4DMzHtHnuz7LCpSFgFWf61Ra6A, 
# QuadrigaCX.com-1LQF9Suqgm4YtxY6kriiE8DJftNTPTqwAm, CoinHako.com- 3PpSAGEGfA9e995bpCkAFdKaw3fMmo8Eyw, MaiCoin.com - 1Lfktsua4x25UcsqDeuXUrXZq3jSoPpJ1b, Hashnest - 1D7JStLYKJ2ma6yfH7a7DXSom5ZPfyfNM3
#wallets
#no batching: 1EYya5dfNvuYDwpeboGKBtkXzJcEHMCQXR - 7sec, 13iQsrwBYdrLpnitG5EV79o3PeHjH8XUBc - cancelled after 4 hours, 
#batching with 6: 1EYya5dfNvuYDwpeboGKBtkXzJcEHMCQXR - 1.3sec, 13iQsrwBYdrLpnitG5EV79o3PeHjH8XUBc - cancelled after 6 hours

In [9]:
mihWhereList = """MATCH (a:Address)-[:SENDS]->(t:Transaction), (walletMember:Address)-[:SENDS]->(t:Transaction) 
where a.address in %s
RETURN DISTINCT walletMember"""

In [10]:
# Iterating through the addresses and finding all the addresses that are connected to the input address.
# Store all responses in a dictionary and instead of looping over every item and adding only new Addresses to the list,
# write all records of a response into the dictionary. They addresses are the keys
# additionally use a batched version if the amount of retrieved records is greater than 10
def iterMultiInputClustering_chunks(address):
    chunk_size = 500
    # create initial set of addresses
    walletAddresses = {address: 1}
    response = conn.query(mihTemplate % address, db='neo4j')

    # store every found address as key in the dictionary, values do not matter here, so we just pass 1
    for record in response:
       walletAddresses [record[0]._properties["address"]]= 1 
    
    i = 1
    
    while i < len(walletAddresses):

        # generate a list of the keys to get an index; this is necessary for the batching
        list_ofKeys = list(walletAddresses.keys())
        
        # if there are less than 10 addresses left between i and the maximum; then no batching is possible
        if len(walletAddresses) - i <= chunk_size :

            response = conn.query(mihTemplate % list_ofKeys[i], db='neo4j')
            
            # this automatically resolves duplicates. Instead of iterating over every address one by one in the list and comparing them with the existing set, 
            # this is more faster since the dictionaries are actually hash tables. So it reaches less than logarithmic runtime
            for record in response:
                walletAddresses [record[0]._properties["address"]]= 1 
            i += 1
            list_ofKeys = list(walletAddresses.keys())
        
        
       # batching 10 addresses at once to avoid querying every single transaction in the dictionary
       # only possible if there are more than 10 addresses left in the dictionary
       # question is if we can further improve this... like with 500 and a function in between that creates a string
        while chunk_size < len(walletAddresses) - i:
            list_ofKeys = list(walletAddresses.keys())
            
            response = conn.query(mihWhereList % str(list_ofKeys[i:i+chunk_size]), db='neo4j')
            #same as above
            for record in response:
                walletAddresses [record[0]._properties["address"]]= 1
            i += chunk_size
            list_ofKeys = list(walletAddresses.keys())
    
    source = string.ascii_letters + string.digits
    walletString = ''.join((random.choice(source) for i in range(32)))
    # bevor update prüfe ob es in dem wallet nicht bereits associations gibt
    # -> Beispiel von neue Adresse die zu Binance dazugehört
    # Was passiert mit Wallets die zusammengeführt werden zu einem späteren Zeitpunkt
    # -> Übernimm die erste association 
    # alle Association des Wallets suchen und mit der Blacklist vergleichen; falls es eine Übereinstimmung gibt setze die Ass. der Blacklist
    
    
    print("Updating ... "+ str(address) +", with size " + str(len(walletAddresses)) + ": " + walletString)
    list_of_Addresses = str(list(walletAddresses))
    
    query = """CALL apoc.periodic.iterate( 'UNWIND $addresses as item return item',
                    'Match (a:Address {address: item}) set a.association = "%s" return a', 
                    {batchSize:1000, parallel:true, iterateList:true, params:{addresses:%s}})""" % (walletString, list_of_Addresses)
    result = conn.query(query,db='neo4j')
    print(result)
    return walletAddresses, walletString


In [11]:
def clusterAddresses(addressList):
    failing_addresses = []
    for address in addressList:
        response = conn.query(check_association % address,db='neo4j')
        if not response:
            try:
                iterMultiInputClustering_chunks(address)
            except TypeError:
                print(address + " could not be clustered due to internal failure!")
                failing_addresses.append(address)
                continue
        else:
            print(address + ": There is already an association in the DB")
    return  failing_addresses
#terrorAddressList.remove("1QH9hfeSSb2iftcVpgpQp3NsFD174crFoW") -> 2.1 mio entries
#terrorAddressList.remove("3PajPWymUexhewHPczmLQ8CMYatKAGNj3y") -> 34 mio entries
#addresses = ["12sDU3FyYJXc2oRzE6XXuuhVHCBJvaoCC8"]
#failed_addresses = clusterAddresses(addresses)

#addresses = ["1JpSBaUwrZaEgmsYka7mzm9t3Z4syyaw7A"]
#codes = clusterAddresses(addresses)

#print(list(codes.keys()))

12sDU3FyYJXc2oRzE6XXuuhVHCBJvaoCC8: There is already an association in the DB
