In [34]:
import networkx as nx
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Set
from datetime import datetime
from collections import defaultdict

class TransactionPatternAnalyzer:
    def __init__(self, data_loader):
        self.data_loader = data_loader
        self.G = nx.DiGraph()
        self.pattern_cache = {}
        self.contract_interaction_patterns = {}

    def build_interaction_graph(self):
        """Побудова графу взаємодій з урахуванням контрактів"""
        # Спочатку додаємо всі вузли з їх типами
        for address in self.data_loader.get_all_addresses():
            is_contract = self.data_loader.is_contract(address)
            self.G.add_node(
                address,
                type='contract' if is_contract else 'wallet',
                label=self.data_loader.get_address_info(address).get('label', 'Unknown'),
                kyc=self.data_loader.get_address_info(address).get('hasKyc', False)
            )
    
        # Потім додаємо ребра
        for _, node in self.data_loader.nodes_data.iterrows():
            address = node['address']
            for tx in node['txs']:
                # Перевіряємо чи існують обидві адреси
                if tx['from'] not in self.G:
                    self.G.add_node(
                        tx['from'],
                        type='unknown',
                        label='Unknown',
                        kyc=False
                    )
                if tx['to'] not in self.G:
                    self.G.add_node(
                        tx['to'],
                        type='unknown',
                        label='Unknown',
                        kyc=False
                    )
            
                # Додаємо ребро
                self.G.add_edge(
                    tx['from'],
                    tx['to'],
                    weight=float(tx['value$']),
                    token=tx['contract'],
                    timestamp=tx['timestamp']
                )
                
    def analyze_wallet_patterns(self, address: str) -> Dict:
        """Аналіз патернів для гаманця"""
        if self.data_loader.is_contract(address):
            return self._analyze_contract_patterns(address)
            
        return self._analyze_regular_wallet_patterns(address)
        
    def _analyze_contract_patterns(self, address: str) -> Dict:
        """Аналіз патернів для контракту"""
        if address in self.contract_interaction_patterns:
            return self.contract_interaction_patterns[address]
            
        transactions = self.data_loader.get_node_transactions(address)
        if not transactions:
            return {}
            
        # Аналіз вхідних/вихідних транзакцій
        incoming = defaultdict(list)
        outgoing = defaultdict(list)
        
        for tx in transactions:
            if tx['to'] == address:
                incoming[tx['from']].append(tx)
            else:
                outgoing[tx['to']].append(tx)
                
        # Патерни взаємодій
        patterns = {
            'interaction_summary': {
                'unique_senders': len(incoming),
                'unique_receivers': len(outgoing),
                'total_incoming': sum(len(txs) for txs in incoming.values()),
                'total_outgoing': sum(len(txs) for txs in outgoing.values())
            },
            'value_patterns': self._analyze_contract_value_patterns(transactions),
            'temporal_patterns': self._analyze_temporal_patterns(transactions),
            'type': 'contract'
        }
        
        self.contract_interaction_patterns[address] = patterns
        return patterns
        
    def _analyze_regular_wallet_patterns(self, address: str) -> Dict:
        """Аналіз патернів для звичайного гаманця"""
        if address in self.pattern_cache:
            return self.pattern_cache[address]
            
        transactions = self.data_loader.get_node_transactions(address)
        if not transactions:
            return {}
            
        patterns = {
            'temporal': self._analyze_temporal_patterns(transactions),
            'value': self._analyze_value_patterns(transactions),
            'interaction': self._analyze_interaction_patterns(address),
            'contract_usage': self._analyze_contract_usage_patterns(address),
            'sequence': self._analyze_sequence_patterns(transactions),
            'type': 'wallet'
        }
        
        self.pattern_cache[address] = patterns
        return patterns
        
    def _analyze_contract_value_patterns(self, transactions: List[Dict]) -> Dict:
        """Аналіз патернів значень для контракту"""
        values = [float(tx['value$']) for tx in transactions]
        
        return {
            'mean_value': np.mean(values),
            'median_value': np.median(values),
            'std_value': np.std(values),
            'value_distribution': self._get_value_distribution(values),
            'typical_values': self._find_typical_values(values)
        }
        
    def _analyze_temporal_patterns(self, transactions: List[Dict]) -> Dict:
        """Аналіз часових патернів"""
        timestamps = [
            datetime.strptime(tx['timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ')
            for tx in transactions
        ]
        
        # Часові інтервали
        time_diffs = [
            (timestamps[i] - timestamps[i-1]).total_seconds() / 3600
            for i in range(1, len(timestamps))
        ]
        
        # Розподіл по годинах
        hour_dist = defaultdict(int)
        for ts in timestamps:
            hour_dist[ts.hour] += 1
            
        # Розподіл по днях тижня
        weekday_dist = defaultdict(int)
        for ts in timestamps:
            weekday_dist[ts.weekday()] += 1
            
        return {
            'avg_time_between_txs': np.mean(time_diffs) if time_diffs else 0,
            'std_time_between_txs': np.std(time_diffs) if time_diffs else 0,
            'hour_distribution': dict(hour_dist),
            'weekday_distribution': dict(weekday_dist),
            'activity_regularity': self._calculate_activity_regularity(time_diffs)
        }
        
    def _analyze_interaction_patterns(self, address: str) -> Dict:
        """Аналіз патернів взаємодії"""
        if not self.G:
            self.build_interaction_graph()
        
        # Аналіз сусідів
        predecessors = list(self.G.predecessors(address))
        successors = list(self.G.successors(address))
    
        # Класифікація взаємодій
        interactions = {
            'wallet_interactions': {
                'in': [p for p in predecessors if self.G.nodes[p].get('type', 'unknown') == 'wallet'],
                'out': [s for s in successors if self.G.nodes[s].get('type', 'unknown') == 'wallet']
            },
            'contract_interactions': {
                'in': [p for p in predecessors if self.G.nodes[p].get('type', 'unknown') == 'contract'],
                'out': [s for s in successors if self.G.nodes[s].get('type', 'unknown') == 'contract']
            },
            'unknown_interactions': {
                'in': [p for p in predecessors if self.G.nodes[p].get('type', 'unknown') == 'unknown'],
                'out': [s for s in successors if self.G.nodes[s].get('type', 'unknown') == 'unknown']
            }
        }
    
        return {
            'unique_contacts': len(set(predecessors + successors)),
            'interactions': interactions,
            'interaction_stats': {
                'total_incoming': len(predecessors),
                'total_outgoing': len(successors),
                'wallet_incoming': len(interactions['wallet_interactions']['in']),
                'wallet_outgoing': len(interactions['wallet_interactions']['out']),
                'contract_incoming': len(interactions['contract_interactions']['in']),
                'contract_outgoing': len(interactions['contract_interactions']['out']),
                'unknown_incoming': len(interactions['unknown_interactions']['in']),
                'unknown_outgoing': len(interactions['unknown_interactions']['out'])
            }
        }
        
    def _analyze_contract_usage_patterns(self, address: str) -> Dict:
        """Аналіз патернів використання контрактів"""
        transactions = self.data_loader.get_node_transactions(address)
        contract_usage = defaultdict(list)
        
        for tx in transactions:
            if tx['to'] in self.data_loader.get_contract_addresses():
                contract_usage[tx['to']].append({
                    'timestamp': tx['timestamp'],
                    'value': float(tx['value$'])
                })
                
        # Аналіз використання кожного контракту
        contract_patterns = {}
        for contract, txs in contract_usage.items():            
            contract_patterns[contract] = {
                'interaction_count': len(txs),
                'total_value': sum(tx['value'] for tx in txs),
                'avg_value': np.mean([tx['value'] for tx in txs]),
                'first_interaction': min(tx['timestamp'] for tx in txs),
                'last_interaction': max(tx['timestamp'] for tx in txs)
            }
            
        return {
            'unique_contracts': len(contract_usage),
            'total_contract_interactions': sum(len(txs) for txs in contract_usage.values()),
            'contract_patterns': contract_patterns
        }
        
    def _analyze_sequence_patterns(self, transactions: List[Dict]) -> Dict:
        """Аналіз послідовностей транзакцій"""
        sequences = []
        current_seq = []
        
        for i in range(len(transactions) - 1):
            t1 = datetime.strptime(transactions[i]['timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ')
            t2 = datetime.strptime(transactions[i+1]['timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ')
            
            if (t2 - t1).total_seconds() <= 3600:  # година
                current_seq.append(transactions[i])
            else:
                if current_seq:
                    current_seq.append(transactions[i])
                    sequences.append(current_seq)
                current_seq = []
                
        if current_seq:
            sequences.append(current_seq)
            
        return {
            'sequence_count': len(sequences),
            'avg_sequence_length': np.mean([len(seq) for seq in sequences]) if sequences else 0,
            'sequence_patterns': self._analyze_sequence_types(sequences)
        }
        
    def _get_value_distribution(self, values: List[float]) -> Dict:
        """Розрахунок розподілу значень"""
        if not values:
            return {}
            
        percentiles = np.percentile(values, [25, 50, 75])
        return {
            'low': len([v for v in values if v <= percentiles[0]]),
            'medium': len([v for v in values if percentiles[0] < v <= percentiles[2]]),
            'high': len([v for v in values if v > percentiles[2]]),
            'percentiles': {
                '25th': percentiles[0],
                '50th': percentiles[1],
                '75th': percentiles[2]
            }
        }
        
    def _find_typical_values(self, values: List[float], threshold: float = 0.1) -> List[float]:
        """Знаходження типових значень"""
        if not values:
            return []
            
        value_counts = pd.Series(values).value_counts()
        return list(value_counts[value_counts >= len(values) * threshold].index)
        
    def _calculate_activity_regularity(self, time_diffs: List[float]) -> Dict:
        """Розрахунок регулярності активності"""
        if not time_diffs:
            return {'is_regular': False, 'regularity_score': 0}
            
        # Стандартне відхилення як міра регулярності
        std = np.std(time_diffs)
        mean = np.mean(time_diffs)
        cv = std / mean if mean > 0 else float('inf')
        
        return {
            'is_regular': cv < 0.5,  # коефіцієнт варіації менше 50%
            'regularity_score': 1 / (1 + cv),  # нормалізований скор
            'std_hours': std,
            'mean_hours': mean
        }
        
    def _analyze_sequence_types(self, sequences: List[List[Dict]]) -> Dict:
        """Аналіз типів послідовностей"""
        pattern_counts = defaultdict(int)
        
        for seq in sequences:
            if len(seq) < 2:
                continue
                
            # Створення патерну
            pattern = []
            for tx in seq:
                if self.data_loader.is_contract(tx['to']):
                    pattern.append('contract_interaction')
                else:
                    pattern.append('wallet_transfer')
                    
            pattern_key = '->'.join(pattern)
            pattern_counts[pattern_key] += 1
            
        return dict(pattern_counts)

    def get_interaction_summary(self, address: str) -> Dict:
        """Отримання загального підсумку взаємодій"""
        if self.data_loader.is_contract(address):
            patterns = self._analyze_contract_patterns(address)
            return {
                'type': 'contract',
                'total_interactions': patterns['interaction_summary']['total_incoming'] + 
                                    patterns['interaction_summary']['total_outgoing'],
                'unique_interactors': patterns['interaction_summary']['unique_senders'] + 
                                    patterns['interaction_summary']['unique_receivers'],
                'value_patterns': patterns['value_patterns']
            }
        else:
            patterns = self._analyze_regular_wallet_patterns(address)
            return {
                'type': 'wallet',
                'total_transactions': len(self.data_loader.get_node_transactions(address)),
                'contract_interactions': patterns['contract_usage']['total_contract_interactions'],
                'unique_contacts': patterns['interaction']['unique_contacts']
            }

    def _analyze_value_patterns(self, transactions: List[Dict]) -> Dict:
        """Аналіз патернів значень транзакцій"""
        values = [float(tx['value']) for tx in transactions]
        if not values:
            return {}
        
        # Базова статистика
        value_stats = {
            'mean': np.mean(values),
            'median': np.median(values),
            'std': np.std(values),
            'min': min(values),
            'max': max(values)
        }
    
        # Розподіл значень
        value_clusters = self._get_value_distribution(values)
    
        # Типові значення
        typical_values = self._find_typical_values(values)
    
        return {
            'stats': value_stats,
            'value_clusters': value_clusters,
            'typical_values': typical_values,
            'value_volatility': value_stats['std'] / value_stats['mean'] if value_stats['mean'] > 0 else 0
        }

In [43]:
%run data_loader.ipynb

import json
from datetime import datetime
from pprint import pprint

def analyze_address(analyzer, address: str):
    """Аналіз конкретної адреси"""
    print(f"\nAnalyzing address: {address}")
    print("=" * 50)
    
    # Отримуємо патерни
    patterns = analyzer.analyze_wallet_patterns(address)
    
    # Визначаємо тип адреси
    address_type = patterns.get('type', 'unknown')
    print(f"Address Type: {address_type}")
    
    if address_type == 'contract':
        analyze_contract_patterns(patterns)
    else:
        analyze_wallet_patterns(patterns)
        
def analyze_contract_patterns(patterns: dict):
    """Аналіз патернів контракту"""
    # Загальна інформація про взаємодії
    summary = patterns['interaction_summary']
    print("\nInteraction Summary:")
    print(f"Unique Senders: {summary['unique_senders']}")
    print(f"Unique Receivers: {summary['unique_receivers']}")
    print(f"Total Incoming: {summary['total_incoming']}")
    print(f"Total Outgoing: {summary['total_outgoing']}")
    
    # Патерни значень
    value_patterns = patterns['value_patterns']
    print("\nValue Patterns:")
    print(f"Mean Value: {value_patterns['mean_value']:.2f}")
    print(f"Median Value: {value_patterns['median_value']:.2f}")
    print(f"Standard Deviation: {value_patterns['std_value']:.2f}")
    
    # Розподіл значень
    print("\nValue Distribution:")
    pprint(value_patterns['value_distribution'])
    
    # Часові патерни
    temporal = patterns['temporal_patterns']
    print("\nTemporal Patterns:")
    print(f"Average Time Between Transactions: {temporal['avg_time_between_txs']:.2f} hours")
    
    # Активність по годинах
    print("\nHourly Activity:")
    hour_dist = temporal['hour_distribution']
    for hour in sorted(hour_dist.keys()):
        print(f"Hour {hour:02d}:00 - {hour_dist[hour]} transactions")

def analyze_wallet_patterns(patterns: dict):
    """Аналіз патернів гаманця"""
    # Часові патерни
    temporal = patterns['temporal']
    print("\nTemporal Patterns:")
    print(f"Average Time Between Transactions: {temporal['avg_time_between_txs']:.2f} hours")
    print(f"Activity Regularity Score: {temporal['activity_regularity']['regularity_score']:.2f}")
    
    # Взаємодії
    interaction = patterns['interaction']
    print("\nInteraction Patterns:")
    print(f"Unique Contacts: {interaction['unique_contacts']}")
    stats = interaction['interaction_stats']
    print(f"Total Incoming: {stats['total_incoming']}")
    print(f"Total Outgoing: {stats['total_outgoing']}")
    print(f"Contract Interactions: {stats['contract_incoming'] + stats['contract_outgoing']}")
    
    # Використання контрактів
    contract_usage = patterns['contract_usage']
    print("\nContract Usage:")
    print(f"Unique Contracts: {contract_usage['unique_contracts']}")
    print(f"Total Contract Interactions: {contract_usage['total_contract_interactions']}")
    
    # Детальна інформація про взаємодії з контрактами
    if contract_usage['contract_patterns']:
        print("\nTop Contract Interactions:")
        sorted_contracts = sorted(
            contract_usage['contract_patterns'].items(),
            key=lambda x: x[1]['interaction_count'],
            reverse=True
        )[:5]
        
        for contract, stats in sorted_contracts:
            print(f"\nContract: {contract}")
            print(f"Interactions: {stats['interaction_count']}")
            print(f"Total Value: {stats['total_value']:.2f}")
            print(f"Average Value: {stats['avg_value']:.2f}")
    
    # Послідовності
    sequence = patterns['sequence']
    print("\nSequence Patterns:")
    print(f"Number of Sequences: {sequence['sequence_count']}")
    print(f"Average Sequence Length: {sequence['avg_sequence_length']:.2f}")
    
    if sequence['sequence_patterns']:
        print("\nCommon Patterns:")
        for pattern, count in sequence['sequence_patterns'].items():
            print(f"{pattern}: {count} times")

def test():
    # Ініціалізація
    loader = DataLoader()
    loader.load_addresses("data/addresses.json")
    loader.load_tokens("data/tokens.json")
    loader.load_nodes("data/nodes_new.csv")
    
    analyzer = TransactionPatternAnalyzer(loader)
    analyzer.build_interaction_graph()

    addresses = [
        '0xeba88149813bec1cccccfdb0dacefaaa5de94cb1', # binance
        '0x39cf2e49ea4d620e77d67088a8d815348e0abdf6', # normal
        '0xa1b1bbb8070df2450810b8eb2425d543cfcef79b', # fund
    ]
    
    for address in addresses:
        analyze_address(analyzer, address)
        summary = analyzer.get_interaction_summary(address)
        print(f"\nSummary for {address}:")
        print(summary)            

In [44]:
test()

Loaded 1733 addresses
Loaded 5825 tokens
Loaded 1025 nodes
Contracts: 51
Non-contracts: 974

Analyzing address: 0xeba88149813bec1cccccfdb0dacefaaa5de94cb1
0xeba88149813bec1cccccfdb0dacefaaa5de94cb1
False
Address Type: wallet

Temporal Patterns:
Average Time Between Transactions: 0.43 hours
Activity Regularity Score: 0.50

Interaction Patterns:
Unique Contacts: 4438
Total Incoming: 2639
Total Outgoing: 1800
Contract Interactions: 0

Contract Usage:
Unique Contracts: 0
Total Contract Interactions: 0

Sequence Patterns:
Number of Sequences: 453
Average Sequence Length: 10.92

Common Patterns:
wallet_transfer->wallet_transfer->wallet_transfer: 37 times
wallet_transfer->wallet_transfer->wallet_transfer->wallet_transfer->wallet_transfer: 43 times
wallet_transfer->wallet_transfer->wallet_transfer->wallet_transfer: 34 times
wallet_transfer->wallet_transfer->wallet_transfer->wallet_transfer->wallet_transfer->wallet_transfer->wallet_transfer->wallet_transfer->wallet_transfer: 20 times
wallet_tra