In [271]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from google.cloud import bigquery
import time
%env GOOGLE_APPLICATION_CREDENTIALS=../secrets/bigquery-service-account.json 
client = bigquery.Client()
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

env: GOOGLE_APPLICATION_CREDENTIALS=../secrets/bigquery-service-account.json


In [272]:
%load_ext google.cloud.bigquery

The google.cloud.bigquery extension is already loaded. To reload it, use:
  %reload_ext google.cloud.bigquery


In [273]:
%%bigquery metadata --project masterarbeit-245718 --verbose 
SELECT * FROM `bigquery-public-data`.crypto_ethereum.INFORMATION_SCHEMA.COLUMNS where table_name = "traces"

Executing query with job ID: 0b78bc30-b38d-4ee0-9277-217494f6d92b
Query executing: 0.60s
Query complete after 1.42s


In [274]:
size_sample_data = 200

### Generate sample data for "call_type"

In [275]:
# %%bigquery call_types_sql_result --project masterarbeit-245718 --verbose 
# select DISTINCT call_type from `bigquery-public-data.crypto_ethereum.traces`
# where DATE(block_timestamp) >= '2019-07-6' AND DATE(block_timestamp) <= '2019-7-7'

In [276]:
call_types = ['call', 'delegatecall', 'staticcall', 'callcode', None]
prob_call_types = [0.7, 0.05, 0.025, 0.025, 0.2]

In [277]:
call_type_sample = np.random.choice(call_types, 2*size_sample_data, p=prob_call_types)

# Generate sample data for "trace_type"

In [278]:
additional_trace_types = ["create", "suicide", "reward", "genesis", "daofork"]
prob_additional_trace_types = [0.025, 0.025, 0.9, 0.025, 0.025]

trace_type_sample = []
for ct in call_type_sample:
    if ct == None:
        trace_type_sample.append(np.random.choice(additional_trace_types, p=prob_additional_trace_types))
    else:
        trace_type_sample.append("call")

### Generate sample data for "status"

In [279]:
# %%bigquery status_values --project masterarbeit-245718 --verbose 
# select DISTINCT status from `bigquery-public-data.crypto_ethereum.traces`
# where DATE(block_timestamp) >= '2019-07-6' AND DATE(block_timestamp) <= '2019-7-7'

In [280]:
status_values = [0, 1]
probs_status_values = [0.05, 0.95]

In [281]:
probs_status_values = [0.05, 0.95]

In [282]:
status_sample = np.random.choice(status_values, 2*size_sample_data, p=probs_status_values)

### Generate sample accounts 

In [283]:
exchanges = ["exchange_{}".format(i) for i in range(1,int(0.05*size_sample_data + 1))]

In [284]:
speculators = ["speculator_{}".format(i) for i in range(1,int(0.95*size_sample_data + 1))]

In [285]:
addresses_testdata = []
addresses_testdata.extend(speculators)
addresses_testdata.extend(exchanges)

In [286]:
addresses_testdata = pd.DataFrame(addresses_testdata, columns=["address"])

## Upload 'addresses' table to bigquery 

In [287]:
addresses_testdata.to_gbq('ethereum_us.addresses_testdata', if_exists="replace")

1it [00:02,  2.77s/it]


### Generate sample transactions (speculators to exchanges)

In [288]:
from_spec_addresses = [np.random.choice(speculators) for i in range(int(2*size_sample_data/2))]
to_ex_addresses = [np.random.choice(exchanges) for i in range(int(2*size_sample_data/2))]
values_spec_to_ex = np.random.randint(1, 20, int(2*size_sample_data/2))

In [289]:
txdata1 = pd.DataFrame(zip(from_spec_addresses, to_ex_addresses, values_spec_to_ex), columns=["from_address", "to_address", "value"])

### Generate sample transactions (exchanges to speculators)

In [290]:
to_spec_addresses = [np.random.choice(speculators) for i in range(int(2*size_sample_data/2))]
from_ex_addresses = [np.random.choice(exchanges) for i in range(int(2*size_sample_data/2))]
values_ex_to_spec = np.random.randint(1, 5, int(2*size_sample_data/2))

In [291]:
txdata2 = pd.DataFrame(zip(from_ex_addresses, to_spec_addresses, values_ex_to_spec), columns=["from_address", "to_address", "value"])

Anmerkung: Die speculators schicken mehr Geld zu den Börsen als umgekehrt.

In [292]:
txdata = txdata1.append(txdata2).reset_index(drop=True)

### Generate sample "block_timestamps"

In [293]:
import datetime as datetime

base = datetime.datetime.utcnow()
block_timestamps = [base - datetime.timedelta(seconds=x) for x in range(0, 12*2*size_sample_data, 12)]

### Merge data to sample "traces" table

In [294]:
traces_sampleData = txdata.copy()

In [295]:
traces_sampleData["status"] = status_sample 

In [296]:
traces_sampleData["call_type"] = call_type_sample 

In [297]:
traces_sampleData["trace_type"] = trace_type_sample 

In [298]:
traces_sampleData["block_timestamp"] = block_timestamps 

In [299]:
traces_sampleData

Unnamed: 0,from_address,to_address,value,status,call_type,trace_type,block_timestamp
0,speculator_82,exchange_7,14,1,call,call,2020-02-05 15:35:25.667779
1,speculator_97,exchange_9,9,1,call,call,2020-02-05 15:35:13.667779
2,speculator_30,exchange_3,15,1,call,call,2020-02-05 15:35:01.667779
3,speculator_55,exchange_6,19,1,call,call,2020-02-05 15:34:49.667779
4,speculator_127,exchange_6,10,1,call,call,2020-02-05 15:34:37.667779
5,speculator_108,exchange_1,9,1,call,call,2020-02-05 15:34:25.667779
6,speculator_13,exchange_2,18,1,call,call,2020-02-05 15:34:13.667779
7,speculator_13,exchange_1,7,1,call,call,2020-02-05 15:34:01.667779
8,speculator_184,exchange_5,5,1,,reward,2020-02-05 15:33:49.667779
9,speculator_28,exchange_4,15,1,call,call,2020-02-05 15:33:37.667779


### Upload "traces" table to bigquery

In [300]:
traces_sampleData.to_gbq('ethereum_us.traces_sampleData', if_exists="replace")

1it [00:05,  5.50s/it]


### Test SQL command for retrieving "weiReceived", "weiSent"

In [301]:
%%bigquery res1 --project masterarbeit-245718 --verbose 
with weiView as (

  with weiReceivedView as (
        
      -- debits
      select to_address, sum(ifnull(value, 0)) as weiReceived
      from `ethereum_us.traces_sampleData` 
      where to_address is not null
      and status = 1
      and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null) 
      group by to_address
        
  ), weiSentView as (
  
      -- credits
      select from_address, sum(ifnull(value, 0)) as weiSent
      from  `ethereum_us.traces_sampleData` 
      where from_address is not null
      and status = 1
      and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null) 
      group by from_address
  ) 
  select 
  CASE 
    when to_address is not null then to_address
    when from_address is not null then from_address
  end as address, 
  ifnull(weiReceived,0) as weiReceived, 
  ifnull(weiSent,0) as weiSent
  from weiReceivedView full outer join weiSentView on from_address = to_address
) 
select address, weiReceived, weiSent from weiView right join `ethereum_us.addresses_testdata`  using(address)

Executing query with job ID: 88fe9fed-9dc7-4a00-ba67-7f6d2bdb1630
Query executing: 2.49s
Query complete after 4.23s


In [302]:
res2 = res1.copy()
res2["balance"] = res1.weiReceived - res1.weiSent
res2 = res2.set_index("address")
res2 = res2.sort_values(by="address", ascending=False)
res2 = res2.astype("float")
res2 = res2.fillna(0.)
balance_result_sql = res2

In [303]:
%%bigquery traces_sampleData --project masterarbeit-245718 --verbose 
select * from `ethereum_us.traces_sampleData`

Executing query with job ID: ea1bd493-51dd-43b9-9c4a-2cd58c9881fa
Query executing: 1.75s
Query complete after 2.60s


In [304]:
data1 = [row for (index, row) in traces_sampleData.iterrows() if (row.call_type not in ['delegatecall', 'callcode', 'staticcall'] or row.call_type == None) and row.status == 1]
data1 = pd.DataFrame(data1)

In [305]:
data2 = pd.DataFrame(index=set(data1["from_address"].unique()) | set(data1["to_address"].unique()))
data2["weiReceived"] = data1.groupby('to_address').sum().value
data2["weiSent"] = data1.groupby('from_address').sum().value
data2["weiSent"] = data2["weiSent"].fillna(0.)
data2["weiReceived"] = data2["weiReceived"].fillna(0.)
data2["balance"] = data2["weiReceived"] - data2["weiSent"]
data2.index = data2.index.rename("address")
data2 = data2.reindex(addresses_testdata["address"], fill_value=0.)
data2 = data2.sort_values(by="address", ascending=False)
balance_result_py = data2

In [306]:
balance_result_py.head()
balance_result_sql.head()

Unnamed: 0_level_0,weiReceived,weiSent,balance
address,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
speculator_99,4.0,3.0,1.0
speculator_98,12.0,16.0,-4.0
speculator_97,3.0,46.0,-43.0
speculator_96,6.0,10.0,-4.0
speculator_95,0.0,22.0,-22.0


Unnamed: 0_level_0,weiReceived,weiSent,balance
address,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
speculator_99,4.0,3.0,1.0
speculator_98,12.0,16.0,-4.0
speculator_97,3.0,46.0,-43.0
speculator_96,6.0,10.0,-4.0
speculator_95,0.0,22.0,-22.0


In [307]:
pd.testing.assert_frame_equal(balance_result_py, balance_result_sql)
print("weiSent, weiReceived Test succeeded!!")

weiSent, weiReceived Test succeeded!!


### Test SQL command for retrieving "txSent" and "txReceived"

In [308]:
%%bigquery res2 --project masterarbeit-245718 --verbose 
with txView as (

  with txSent as (
      SELECT from_address, count(*) as numberOfTranscationsSent FROM `masterarbeit-245718.ethereum_us.traces_sampleData` 
      where to_address is not null and status = 1 and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null) 
      group by from_address
    ), txReceived as (
      SELECT to_address, count(*) as numberOfTranscationsReceived FROM `masterarbeit-245718.ethereum_us.traces_sampleData` 
      where to_address is not null and status = 1 and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null) 
      group by to_address
    ) 
    SELECT 
    CASE  
      WHEN to_address IS NOT NULL THEN to_address
      WHEN from_address IS NOT NULL THEN from_address
    END AS address,
    IFNULL(numberOfTranscationsReceived, 0) as numberOfTranscationsReceived, 
    IFNULL(numberOfTranscationsSent, 0) as numberOfTranscationsSent
    from txReceived FULL OUTER JOIN txSent on to_address = from_address
) 

    select address, numberOfTranscationsReceived, numberOfTranscationsSent from txView right join `ethereum_us.addresses_testdata` using(address)

Executing query with job ID: fee582b1-023f-4683-9480-7d79921b4bf3
Query executing: 1.57s
Query complete after 2.11s


In [309]:
res3 = res2.copy()
res3 = res3.fillna(0.)
res3 = res3.sort_values(by="address", ascending=False)
tx_sent_received_result_sql = res3.set_index("address", drop=True)

In [310]:
data3 = pd.DataFrame(index=set(data1["from_address"].unique()) | set(data1["to_address"].unique()))
data3["numberOfTranscationsReceived"] = data1.groupby('to_address').count().value
data3["numberOfTranscationsSent"] = data1.groupby('from_address').count().value
data3 = data3.fillna(0)
data3 = data3.astype("int")
data3.index = data3.index.rename("address")
data3 = data3.reindex(addresses_testdata["address"], fill_value=0.)
data3 = data3.sort_values(by="address", ascending=False)
tx_sent_received_result_py = data3

In [311]:
pd.testing.assert_frame_equal(tx_sent_received_result_py, tx_sent_received_result_sql)
print("txSent, txReceived Test succeeded !!")

txSent, txReceived Test succeeded !!


### Test avg time between tx

In [312]:
%%bigquery res4 --project masterarbeit-245718 --verbose 
with timeRecView as (

  with receivedTx as (
    SELECT to_address, count(*) as numberOfTranscationsReceived FROM `masterarbeit-245718.ethereum_us.traces_sampleData` 
    where to_address is not null 
      and status = 1 
      and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null)
    group by to_address),
  
  timeStampDiffs as (
    SELECT to_address, TIMESTAMP_DIFF(MAX(block_timestamp), MIN( block_timestamp ), second ) as timestampDiff
    FROM `masterarbeit-245718.ethereum_us.traces_sampleData`
    where to_address is not null 
      and status = 1 
      and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null)
    group by to_address
  
  ) select to_address as address, 
  CASE 
    when (numberOfTranscationsReceived - 1)  > 0 then timestampDiff / (numberOfTranscationsReceived - 1) 
    else 0
  end as avgTimeDiffBetweenReceivedTransactions
  from receivedTx inner join  timeStampDiffs using(to_address)
)

select address, ifnull(avgTimeDiffBetweenReceivedTransactions,0) as avgTimeDiffBetweenReceivedTransactions from timeRecView right join `ethereum_us.addresses_testdata` using(address)

Executing query with job ID: b7860bb1-f7cf-4afa-9d09-6a15ec620784
Query executing: 1.02s
Query complete after 1.84s


In [313]:
res6 = res4.set_index("address", drop=True)
res6 = res6.fillna(0.)
res6 = res6.sort_values(by="address")
avg_time_diff_receivedtx_result_sql = res6

In [314]:
%%bigquery res5 --project masterarbeit-245718 --verbose 
with timeSentView as (

  with sentTx as (
    SELECT from_address, count(*) as numberOfTranscationsSent FROM `masterarbeit-245718.ethereum_us.traces_sampleData` 
    where to_address is not null 
      and status = 1 
      and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null)
    group by from_address),
  timeStampDiffs as (
    SELECT from_address, TIMESTAMP_DIFF(MAX(block_timestamp), MIN( block_timestamp ), second ) as timestampDiff
    FROM `masterarbeit-245718.ethereum_us.traces_sampleData`
    where to_address is not null 
      and status = 1 
      and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null)
    group by from_address
  ) select from_address as address, 
  CASE 
    when (numberOfTranscationsSent - 1)  > 0 then timestampDiff / (numberOfTranscationsSent - 1) 
    else 0
  end as avgTimeDiffBetweenSentTransactions
     from sentTx inner join  timeStampDiffs using(from_address)
)

select address, ifnull(avgTimeDiffBetweenSentTransactions,0) as avgTimeDiffBetweenSentTransactions from timeSentView right join `ethereum_us.addresses_testdata` using(address)

Executing query with job ID: c0ad8837-0268-4ea0-a8b6-03326215608e
Query executing: 1.06s
Query complete after 1.87s


In [315]:
res6 = res5.set_index("address", drop=True)
res6 = res6.fillna(0.)
res6 = res6.sort_values(by="address")
avg_time_diff_senttx_result_sql = res6

In [316]:
res7 = [row for (index, row) in traces_sampleData.iterrows() if (row.call_type not in ['delegatecall', 'callcode', 'staticcall'] or row.call_type == None) and row.status == 1]
res7 = pd.DataFrame(res7)
res8 = res7.groupby("from_address").max().block_timestamp
res9 = res7.groupby("from_address").min().block_timestamp
res10 = res8 - res9
res10 = res10.rename("seconds_diff")
res10 = tx_sent_received_result_py.join(res10, how="right").drop("numberOfTranscationsReceived", axis=1)
res10 = res10.fillna(0.)
res10["avgTimeDiffBetweenSentTransactions"] = res10["seconds_diff"] / (res10["numberOfTranscationsSent"] - 1)
res10["avgTimeDiffBetweenSentTransactions"] = res10["avgTimeDiffBetweenSentTransactions"].fillna(datetime.timedelta(0))
res10 = res10.reindex(addresses_testdata["address"], fill_value=0.)
res10.index = res10.index.rename("address")
res10 = res10.sort_values(by="address")
res10 = res10.drop(["numberOfTranscationsSent", "seconds_diff"],axis=1)
res10["avgTimeDiffBetweenSentTransactions"] = [ts.total_seconds() for ts in res10["avgTimeDiffBetweenSentTransactions"]]
avg_time_diff_senttx_result_py = res10

  result = self._data / other


In [317]:
pd.testing.assert_frame_equal(avg_time_diff_senttx_result_sql, avg_time_diff_senttx_result_py)
print("avgTimeDiffBetweenSentTransactions Test succeeded !!")

avgTimeDiffBetweenSentTransactions Test succeeded !!


In [318]:
features = balance_result_sql.join(tx_sent_received_result_sql,how="left")
features = features.join(avg_time_diff_senttx_result_sql)
features = features.join(avg_time_diff_receivedtx_result_sql)
features = features.sort_values(by="balance", ascending=False)
features = features.fillna(0.0)
import sys
# addresses that have sent/received only one transaction get the avg time max * 2
features["avgTimeDiffBetweenSentTransactions"] = features["avgTimeDiffBetweenSentTransactions"].replace(to_replace=0.0, value=2 * max(features["avgTimeDiffBetweenSentTransactions"]))
features["avgTimeDiffBetweenReceivedTransactions"] = features["avgTimeDiffBetweenReceivedTransactions"].replace(to_replace=0.0, value=2 * max(features["avgTimeDiffBetweenReceivedTransactions"]))

In [319]:
features = features.reset_index()

In [320]:
features.to_gbq('ethereum_us.features_sampleData', if_exists="replace")

1it [00:06,  6.59s/it]


# Test mined Blocks

In [321]:
%%bigquery sql_res_minedblocks --project masterarbeit-245718 --verbose 

with minedBlocksView as (
    SELECT to_address as address, count(*) as mined_blocks FROM `masterarbeit-245718.ethereum_us.traces_sampleData`
        where trace_type = "reward"
        group by to_address
    )
select address, ifnull(mined_blocks,0) as minedBlocks from minedBlocksView right join `ethereum_us.addresses_testdata` using(address)


Executing query with job ID: 932f13f5-9baf-4a7b-ae3e-360759195ecc
Query executing: 1.32s
Query complete after 1.84s


In [322]:
features.head()

Unnamed: 0,address,weiReceived,weiSent,balance,numberOfTranscationsReceived,numberOfTranscationsSent,avgTimeDiffBetweenSentTransactions,avgTimeDiffBetweenReceivedTransactions
0,exchange_4,227.0,45.0,182.0,23.0,17.0,134.25,101.454545
1,exchange_7,185.0,45.0,140.0,18.0,17.0,136.5,136.235294
2,exchange_2,177.0,47.0,130.0,17.0,17.0,134.25,144.0
3,exchange_1,153.0,32.0,121.0,18.0,13.0,158.0,120.705882
4,exchange_5,156.0,35.0,121.0,14.0,14.0,161.538462,173.538462


# Test var(avgTimeDiff...)

In [335]:
%%bigquery res_sql_stddev --project masterarbeit-245718 --verbose 

with timestamps_diffs as (
    
    with timestamps_preceding_tx as (
        
        with timestamps_sent_tx as (
            select from_address, block_timestamp from `masterarbeit-245718.ethereum_us.traces_sampleData`
                where from_address is not null 
                  and status = 1 
                  and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null)
        )
        
        select from_address, block_timestamp,
            lag(block_timestamp) OVER (partition by from_address order by block_timestamp asc) as preceding_block_timestamp 
        from timestamps_sent_tx
    )
    
    select from_address, block_timestamp, preceding_block_timestamp, 
        TIMESTAMP_DIFF(block_timestamp, preceding_block_timestamp, second) as timestampdiff
    from timestamps_preceding_tx
)
select from_address, STDDEV_SAMP(timestampdiff) as stddev_sample  
from timestamps_diffs group by from_address

Executing query with job ID: d0b5a68e-b9aa-48fe-aa85-e80ce6a61d55
Query executing: 0.51s
Query complete after 1.01s


In [336]:
res_sql_stddev = res_sql_stddev.set_index("from_address")
res_sql_stddev = res_sql_stddev.reindex(addresses_testdata["address"])
res_sql_stddev = res_sql_stddev.fillna(res_sql_stddev["stddev_sample"].max())
res_sql_stddev = res_sql_stddev.sort_values(by="address")
res_sql_stddev

Unnamed: 0_level_0,stddev_sample
address,Unnamed: 1_level_1
exchange_1,151.169983
exchange_10,235.643799
exchange_2,151.296398
exchange_3,83.609482
exchange_4,102.875653
exchange_5,137.400034
exchange_6,115.188140
exchange_7,127.439397
exchange_8,213.975864
exchange_9,86.671247


In [337]:
%%bigquery res_sql_stddev --project masterarbeit-245718 --verbose 

with timestamps_diffs as (
    
    with timestamps_preceding_tx as (
        
        with timestamps_received_tx as (
            select to_address, block_timestamp from `masterarbeit-245718.ethereum_us.traces_sampleData`
                where to_address is not null 
                  and status = 1 
                  and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null)
        )
        
        select to_address, block_timestamp,
            lag(block_timestamp) OVER (partition by to_address order by block_timestamp asc) as preceding_block_timestamp 
        from timestamps_received_tx
    )
    
    select to_address, block_timestamp, preceding_block_timestamp, 
        TIMESTAMP_DIFF(block_timestamp, preceding_block_timestamp, second) as timestampdiff
    from timestamps_preceding_tx
)

select to_address, STDDEV_SAMP(timestampdiff) as stddev_sample  
from timestamps_diffs group by to_address

Executing query with job ID: 4533842a-9f53-4554-8cc2-bd87bbe9f5ba
Query executing: 0.59s
Query complete after 1.10s


In [338]:
res_sql_stddev = res_sql_stddev.set_index("to_address")
res_sql_stddev = res_sql_stddev.reindex(addresses_testdata["address"])
res_sql_stddev = res_sql_stddev.fillna(res_sql_stddev["stddev_sample"].max())
res_sql_stddev = res_sql_stddev.sort_values(by="address")
res_sql_stddev

Unnamed: 0_level_0,stddev_sample
address,Unnamed: 1_level_1
exchange_1,126.745693
exchange_10,163.511467
exchange_2,135.126607
exchange_3,130.427352
exchange_4,93.666443
exchange_5,148.481545
exchange_6,167.405542
exchange_7,125.351271
exchange_8,173.093889
exchange_9,144.283811


# Deprecated

In [326]:
%%bigquery res6 --project masterarbeit-245718 --verbose 

SELECT
* 
FROM
`masterarbeit-245718.ethereum_us.INFORMATION_SCHEMA.TABLES`


Executing query with job ID: 5036d0d3-7765-405c-8eff-954e26a3cf6c
Query executing: 0.62s
Query complete after 1.40s


In [327]:
tablesToDelete = [tn for tn in res6["table_name"] if tn.startswith("top40k_19_20")]

for t in tablesToDelete:
     client.delete_table("ethereum_us.{}".format(t), not_found_ok=True)  