In [1]:
import os
from datetime import datetime, date
from decimal import Decimal
from typing import List, Tuple

import pandas as pd
import matplotlib.pyplot as plt

from src.data_processing.fetch_data import fetch_oku_liquidity, fetch_pool_data
from src.models.pool import Pool
from src.models.liquidity_distribution import LiquidityDistribution
from src.utils.helper import organize_tick_data, sqrtPriceX96_to_price
from src.visualization.plot_liquidity import plot_liquidity_distribution
from config.api_config import API_KEY
from src.data_processing.swap_data_collector import SwapDataCollector

In [6]:
import pandas as pd
import numpy as np
from datetime import date, datetime
from decimal import Decimal
from typing import List, Tuple, Dict
import matplotlib.pyplot as plt
from src.models.pool import Pool
from src.models.liquidity_distribution import LiquidityDistribution
from src.data_processing.fetch_data import fetch_oku_liquidity
from src.utils.helper import organize_tick_data
import json
from pathlib import Path

class LiquidityAnalyzer:
    def __init__(self, pool_address: str, token0: str, token1: str, decimals0: int, decimals1: int, 
                 tick_spacing: int, start_date: date, end_date: date):
        self.pool_address = pool_address
        self.pool = Pool(token0, token1, decimals0, decimals1, tick_spacing)
        self.start_date = start_date
        self.end_date = end_date
        self.sampled_blocks = self.load_sampled_blocks()

    def load_sampled_blocks(self) -> pd.DataFrame:
        collector = SwapDataCollector(
            project_id="uniswap-v3-analytics",
            pool_address=self.pool_address,
            token0=self.pool.token0,
            token1=self.pool.token1,
            decimals0=self.pool.decimals0,
            decimals1=self.pool.decimals1,
            tick_spacing=self.pool.tick_spacing,
            start_date=self.start_date,
            end_date=self.end_date
        )
        blocks = collector.load_sampled_blocks_bars(method='dollar')
        
        df = pd.DataFrame(blocks)
        df['datetime'] = pd.to_datetime(df['block_timestamp'], unit='s')
        return df.sort_values('datetime')

    def select_blocks_by_date_range(self, start: date, end: date) -> List[int]:
        mask = (self.sampled_blocks['datetime'].dt.date >= start) & (self.sampled_blocks['datetime'].dt.date <= end)
        return self.sampled_blocks.loc[mask, 'block_number'].tolist()

    def analyze_liquidity(self, block_number: int) -> Tuple[List[dict], Decimal, Decimal]:
        data = fetch_oku_liquidity(pool_address=self.pool_address, block_number=block_number)
        tick_mapping = organize_tick_data(tick_data=data["ticks"])
        current_tick = int(data["current_pool_tick"])
        distribution = LiquidityDistribution(self.pool, tick_mapping, current_tick)
        result, total_amount0, total_amount1 = distribution.get_distribution()
        
        # Save the liquidity distribution
        self.save_liquidity_distribution(block_number, result)
        
        return result, total_amount0, total_amount1

    def save_liquidity_distribution(self, block_number: int, distribution: List[dict]):
        df = pd.DataFrame([d._asdict() for d in distribution])
        filename = self.liquidity_dir / f"liquidity_distribution_{block_number}.parquet"
        df.to_parquet(filename, compression='snappy')

    def load_liquidity_distribution(self, block_number: int) -> pd.DataFrame:
        filename = self.liquidity_dir / f"liquidity_distribution_{block_number}.parquet"
        return pd.read_parquet(filename)

    def calculate_liquidity_concentration(self, distribution: List[dict], num_buckets: int = 10) -> Dict[str, float]:
        total_liquidity = sum(d['liquidity'] for d in distribution)
        sorted_distribution = sorted(distribution, key=lambda x: x['liquidity'], reverse=True)
        
        concentrations = {}
        for i in range(1, num_buckets + 1):
            bucket_size = len(sorted_distribution) // num_buckets
            bucket_liquidity = sum(d['liquidity'] for d in sorted_distribution[:i*bucket_size])
            concentrations[f'top_{i*10}%'] = bucket_liquidity / total_liquidity * 100
        
        return concentrations

    def calculate_liquidity_depth(self, distribution: List[dict], price_ranges: List[Tuple[float, float]]) -> Dict[str, float]:
        depths = {}
        for start, end in price_ranges:
            depth = sum(d['liquidity'] for d in distribution if start <= d['price'] <= end)
            depths[f'{start}-{end}'] = depth
        return depths

    def analyze_liquidity_changes(self, start_date: date, end_date: date) -> pd.DataFrame:
        blocks = self.select_blocks_by_date_range(start_date, end_date)
        
        liquidity_data = []
        for block in blocks:
            distribution = self.load_liquidity_distribution(block)
            concentration = self.calculate_liquidity_concentration(distribution)
            depth = self.calculate_liquidity_depth(distribution, [(0.8, 1.2), (0.9, 1.1), (0.95, 1.05)])
            
            liquidity_data.append({
                'block_number': block,
                'datetime': self.sampled_blocks.loc[self.sampled_blocks['block_number'] == block, 'datetime'].iloc[0],
                **concentration,
                **depth
            })
        
        return pd.DataFrame(liquidity_data)

    def plot_liquidity_over_time(self, start: date, end: date, num_samples: int = 5):
        blocks = self.select_blocks_by_date_range(start, end)
        sampled_blocks = blocks[:num_samples]  # Take first num_samples blocks for simplicity
        
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 16))
        
        for block in sampled_blocks:
            result, _, _ = self.analyze_liquidity(block)
            timestamp = self.sampled_blocks.loc[self.sampled_blocks['block_number'] == block, 'datetime'].iloc[0]
            
            ticks = [info.tick for info in result]
            liquidities = [info.liquidity for info in result]
            
            ax1.plot(ticks, liquidities, label=f'Block {block} ({timestamp:%Y-%m-%d %H:%M})')
            ax2.plot([info.price for info in result], liquidities, label=f'Block {block} ({timestamp:%Y-%m-%d %H:%M})')
        
        ax1.set_xlabel('Tick')
        ax1.set_ylabel('Liquidity')
        ax1.set_title('Liquidity Distribution over Ticks')
        ax1.legend()
        
        ax2.set_xlabel('Price')
        ax2.set_ylabel('Liquidity')
        ax2.set_title(f'Liquidity Distribution over Price ({self.pool.token1}/{self.pool.token0})')
        ax2.set_xscale('log')
        ax2.legend()
        
        plt.tight_layout()
        plt.show()

In [7]:
POOL_ADDRESS = "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8"
START_DATE = date(2024, 8, 1)
END_DATE = date(2024, 8, 31)

analyzer = LiquidityAnalyzer(
    pool_address=POOL_ADDRESS,
    token0="WETH",
    token1="USDC",
    decimals0=18,
    decimals1=6,
    tick_spacing=60,
    start_date=START_DATE,
    end_date=END_DATE
)

In [8]:
analyzer.sampled_blocks

Unnamed: 0,block_number,block_timestamp,cumulative_dollar_volume,datetime
0,20430379,1722475295,1.024129e+06,2024-08-01 01:21:35
1,20430449,1722476135,2.046662e+06,2024-08-01 01:35:35
2,20430453,1722476183,3.550200e+06,2024-08-01 01:36:23
3,20430453,1722476183,5.999740e+06,2024-08-01 01:36:23
4,20430460,1722476267,7.017654e+06,2024-08-01 01:37:47
...,...,...,...,...
380,20638625,1724987507,4.132868e+08,2024-08-30 03:11:47
381,20641869,1725026651,4.142962e+08,2024-08-30 14:04:11
382,20642308,1725031943,4.152978e+08,2024-08-30 15:32:23
383,20643178,1725042407,4.163208e+08,2024-08-30 18:26:47


In [None]:


# Analyze liquidity for a specific date range
analysis_start = date(2024, 8, 15)
analysis_end = date(2024, 8, 20)
analyzer.plot_liquidity_over_time(analysis_start, analysis_end)
