Переделать на кол-во обращений в час

https://stats.stackexchange.com/questions/555291/goodness-of-fit-for-presumably-poisson-distributed-data

In [2]:
import duckdb
from scipy.stats import kstest
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import poisson
import re
import jmespath
import pyarrow as pa
import pyarrow.parquet as pq
import os

In [3]:
def preprocess_step1():
    d = {}
    table = pq.read_table('contracts_abi.parquet')
    d_list = table.to_pylist()
    for el in d_list:
        d[el['key']] = json.loads(el['value'])

    l = []
    modified_d = {}
    for k, v in d.items():
        v['address'] = k
        abi_func_list = [el['name'] for el in v['abi'] if el['type'] in ['function', 'event']]
        abi_func_list = list(set(abi_func_list))
        words = []
        for el in abi_func_list:
            words += re.split(r'(?<![A-Z])(?=[A-Z])', el)
            inputs = jmespath.search(f"abi[?name=='{el}'].inputs[*].name", v)[0]
            for i in inputs:
                words += re.split(r'(?<![A-Z])(?=[A-Z])', i)
        words = [x.replace('_', '') for x in words if x]
        v['abi'] = words
        l.append(v)
        modified_d[k] = {'name': v['name'], 'abi': v['abi']}

    df = pd.DataFrame(data=l)
    table = pa.Table.from_pandas(df)
    pq.write_table(table, 'abi.parquet')

In [4]:
def preprocess_step2():
    cursor = duckdb.connect()
    cursor.sql("create or replace table t_abi as select * from 'abi.parquet'")
    
    directory = '.'
    event_files = []
     
    for filename in os.listdir(directory):
        f = os.path.join(directory, filename)
        if os.path.isfile(f) and 'events_' in f:
            event_files.append(f"select * from '{f[2:]}'")
    
    q = ' union all '.join(event_files)
    sql = f'create or replace table t_events as ({q})'
    cursor.sql(sql)
    
    sql = """\
    create or replace table t_token_transfers as (
    select a.address, a.name, e.event, e.args from t_events e join t_abi a on e.address=a.address where e.event='Transfer')
    """
    cursor.sql(sql)

    sql = """\
    create or replace table t_edges_weighted as (
    select xxx.u, xxx.v, round((xxx.w - min(xxx.w) over ()) / ((max(xxx.w) over ()) - (min(xxx.w) over ())), 3) as norm_w from (
    select xx.u, xx.v, sum(xx.w) w from (
    select LEAST(x.f, x.t) u, GREATEST(x.f, x.t) v, sum(x.v) w from (
    select tt.args->>'from' as f, tt.args->>'to' as t, cast(tt.args->>'value' as INT64) as v from t_token_transfers tt where tt.name='TetherToken'
    ) x group by x.f, x.t
    ) xx group by xx.u, xx.v
    ) xxx
    )
    """
    cursor.sql(sql)
    cursor.sql("COPY (select * from t_edges_weighted) TO 'edges_weighted.parquet' (FORMAT PARQUET)")
    
    sql = """\
    create or replace table t_tether_events as (
    select e.args->>'from' as fr, e.hash, e.args->>'to' as to, cast(e.args->>'value' as INT64) as v, e.timestamp
    from t_events e join t_abi a on e.address=a.address where e.event='Transfer' and a.name='TetherToken'
    order by e.timestamp desc
    )
    """
    cursor.sql(sql)
    cursor.sql("COPY (select * from t_tether_events) TO 'tether_events.parquet' (FORMAT PARQUET)")
    
    cursor.close()

In [99]:
def preprocess_step3():
    cursor = duckdb.connect()
    sql = "select min(timestamp) t_min, max(timestamp) t_max, date_diff('hour', to_timestamp(min(timestamp)), to_timestamp(max(timestamp))) diff from 'tether_events.parquet'"
    row = cursor.sql(sql).fetchall()[0]

    sql = """\
    create or replace table time_series as (
    select tt.ts, 
    tt.ts + 60*60 - 1 as te,
    date_diff('hour', to_timestamp({0}), to_timestamp(tt.ts + 60*60 - 1)) diff
    from (
    SELECT epoch(t.generate_series)::INTEGER as ts FROM generate_series(to_timestamp({0}), to_timestamp({1}), INTERVAL '1' HOUR) t
    ) tt)
    """
    cursor.sql(sql.format(row[0], row[1] - 60*60))

    sql = """\
    COPY (
    with tmp1 as (select t.fr as adr from 'tether_events.parquet' t group by t.fr having count(t.to) > 1),
    tmp2 as (select t.to as adr from 'tether_events.parquet' t group by t.to having count(t.fr) > 1),
    tmp3 as (select a.adr from tmp1 a join tmp2 b on a.adr=b.adr),
    base as (select * from tmp3 cross join time_series),
    join_base as (select b.*, e.{0} as adj from base b left join 'tether_events.parquet' e on b.adr=e.{1} and e.timestamp between ts and te),
    gr_base as (select adr, diff, sum(CASE WHEN adj is null THEN 0 ELSE 1 END)::integer cnt from join_base group by adr, diff)
    select adr, list(cnt) list_cnt, avg(cnt) avg_cnt, stddev(cnt) stdev_cnt from gr_base group by adr) TO 'adr_{2}_freq.parquet' (FORMAT PARQUET)
    """
    cursor.sql(sql.format('to', 'fr', 'output'))
    cursor.sql(sql.format('fr', 'to', 'input'))
    cursor.close()

In [100]:
def preprocess_step4():
    cursor = duckdb.connect()
    sql = """\
    COPY (
    with base_adr as (select distinct adr from (select a.fr as adr from 'tether_events.parquet' a 
    union all select b.to as adr from 'tether_events.parquet' b)),
    base_type as (select b.adr, a.name from base_adr b left join 'abi.parquet' a on a.address=b.adr),
    base_type_counts_out as (select t.fr, 
                                    count(distinct t.to)::integer cnt_uniq_out, 
                                    sum(v) sum_out, 
                                    min(timestamp) min_ts_out,
                                    max(timestamp) max_ts_out from 'tether_events.parquet' t group by t.fr),
    base_type_counts_in as (select t.to, 
                                    count(distinct t.fr)::integer cnt_uniq_in, 
                                    sum(v) sum_in,
                                    min(timestamp) min_ts_in,
                                    max(timestamp) max_ts_in from 'tether_events.parquet' t group by t.to),
    base_total as (select x.*, y.cnt_uniq_out, y.sum_out, z.cnt_uniq_in, z.sum_in, (greatest(y.max_ts_out, z.max_ts_in) - least(y.min_ts_out, z.min_ts_in)) lifetime from 
        base_type x left join base_type_counts_out y on x.adr=y.fr
        left join base_type_counts_in z on x.adr=z.to)
    select x.*, 
    round(y.avg_cnt, 2) avg_cnt_out, round(z.avg_cnt, 2) avg_cnt_in,
    round(y.stdev_cnt, 2) stdev_cnt_out, round(z.stdev_cnt, 2) stdev_cnt_in, 
    (CASE WHEN x.name is null THEN 'wallet' ELSE 'contract' END) adr_type from base_total x 
    left join 'adr_output_freq.parquet' y on x.adr=y.adr
    left join 'adr_input_freq.parquet' z on x.adr=z.adr) TO 'tether_dataset.parquet' (FORMAT PARQUET)
    """
    cursor.sql(sql)
    cursor.close()

In [102]:
preprocess_step4()