
# KIN Economy Graph


Draw a graph with matplotlib, color by degree.
You must have matplotlib for this to work.



###### Imports and Init

In [6]:
%matplotlib inline

In [7]:
import matplotlib.pyplot as plt
# import seaborn as sns # For styling plots
import networkx as nx
import pandas as pd
import os
from sqlalchemy import create_engine
from google.cloud import bigquery
from IPython.display import display, Markdown

%matplotlib nbagg
# %matplotlib notebook
plt.ion()


data_path = '/Users/skulas/Dev/kre_cache_folder/'


## Dawing Basic Graphs

### Cycle Graph

In [None]:
# Author: Aric Hagberg (hagberg@lanl.gov)

G = nx.cycle_graph(24)
pos = nx.spring_layout(G, iterations=200)
nx.draw(G, pos, node_color=range(24), node_size=800, cmap=plt.cm.Blues)
plt.show()

### Single DS sub-ecosystem without P2P

In [None]:
# Author: Aric Hagberg (hagberg@lanl.gov)

G = nx.star_graph(20)
pos = nx.spring_layout(G)
colors = range(20)
nx.draw(G, pos, node_color='#A0CBE2', edge_color=colors,
        width=4, edge_cmap=plt.cm.Blues, with_labels=False)
plt.show()

### Single DS sub-ecoystem with P2P

In [None]:
# Author:  Drew Conway (drew.conway@nyu.edu)

from operator import itemgetter

if __name__ == '__main__':
    # Create a BA model graph
    n = 1000
    m = 2
    G = nx.generators.barabasi_albert_graph(n, m)
    # find node with largest degree
    node_and_degree = G.degree()
    (largest_hub, degree) = sorted(node_and_degree, key=itemgetter(1))[-1]
    # Create ego graph of main hub
    hub_ego = nx.ego_graph(G, largest_hub)
    # Draw graph
    pos = nx.spring_layout(hub_ego)
    nx.draw(hub_ego, pos, node_color='b', node_size=50, with_labels=False)
    # Draw ego as large and red
    nx.draw_networkx_nodes(hub_ego, pos, nodelist=[largest_hub], node_size=300, node_color='r')
    plt.show()

## Data Functions

### HC Queries

In [1]:
## Querying HC
ron_pwd = os.environ['RON_PG_PWD']
ron_engine = create_engine(f'postgresql://postgres:{ron_pwd}@history-collector.'
                           'cmmqkby1prsb.us-east-1.rds.amazonaws.com:5432/kin')


def run_query_on_postgres(query_str, engine=ron_engine):
    dataframe = pd.read_sql_query(query_str, con=engine)

    return dataframe

def get_transactions_between_dates(date_start = '2019-01-01', date_end = '2019-03-15', limit = 5000):
    query_str = f"""
    SELECT source AS tx_source, destination AS tx_destination, 
            amount AS tx_amount, extract(epoch from "time") AS tx_time_epoch
        FROM public.payments
        WHERE tx_status = 'txSUCCESS'
        AND "time" > '{date_start}'
        AND "time" < '{date_end}'
        -- ORDER BY "time"
        LIMIT {limit};
    """

    df = run_query_on_postgres(query_str)
    return df
    
def get_some_transactions(tx_count=500):
    query_str = f"""
        SELECT source, destination, COUNT(source) num_of_tx, SUM(amount) as total, MAX("time") as last_tx
        FROM public.payments
        WHERE tx_status = 'txSUCCESS'
        AND destination in ('GCSMRVRSWROT5BYFPPGZEB6CJBPSRX6A4XW3HG7YBXN6B72E5T2DFNA6',
                            'GBOXPDOLQUNDLWTKYFLQG4AL57VVGFQUHM3SKB73EX5VN5AGKQWBDHZJ',
                            'GBX2P4BBBHDXLK22GVUJE3LSUGLE5B35O7XBY5QUP7Q7SVYPD4QA2OV2',
                            'GBPHY5HEAIY7ZT63O5HH3ZB2PHUDFZHMJ77PWJ7ILQTL66JKDGJT7TUI',
                            'GDRVYZ7JRADW3SEUKWYDR5JCKDENJTLGURBKSQFI5AGNERWO2PXCSWBS')
        GROUP BY destination, source
        ORDER BY num_of_tx DESC
        LIMIT {tx_count};
    """

    df = run_query_on_postgres(query_str)
#     print(df.head(10))

    return df

NameError: name 'os' is not defined

### BI Queries

In [2]:
global devs_lookup
devs_lookup = None

def query_bi(query_str):
    client = bigquery.Client()
    query_job = client.query(query_str)
    results = query_job.result() 
    
    return results

def query_bi_into_df(query_str):
    results = query_bi(query_str)
    df = results.to_dataframe()
    
    return df

def devs_df():
    global devs_lookup
    if devs_lookup is None:
        print('Quering developers from BI')
        query_str = "SELECT * FROM `kin-bi.kin.digital_services_lookup_view`"
        devs_lookup = query_bi_into_df(query_str)
    return devs_lookup

def devs_ids_list():
    devs_table = devs_df()
    return devs_table['digital_service_id'].tolist()
    
def dev_lookup_by_id(dev_id):
    devs_table = devs_df()
    dev_row = devs_table.loc[devs_table['digital_service_id'] == dev_id]
    
    return dev_row

def dev_name_by_id(dev_id):
    dev_row = dev_lookup_by_id(dev_id)
    dev_name = dev_row['digital_service_name'].iloc[0]
    
    return dev_name

def get_history_between_dates(date_start, date_end):
    query_str = f"""
    SELECT date, digital_service_id, tx_type, 
            COUNT(DISTINCT user_wallet_id) as num_of_uAU,
            SUM(amount) as total,
            COUNT(tx_hash) num_of_tx
        FROM `kin-bi.stellar.payments_with_tx_types_view` 
        WHERE time > '{date_start}'
            AND time < '{date_end}'
        GROUP BY date, digital_service_id, tx_type
        ORDER BY date, digital_service_id ASC
    """
    
    return query_bi_into_df(query_str)

def get_tx_log_between_dates(date_start, date_end):
    query_str = f"""
    SELECT source AS tx_source,
        destination AS tx_destination,
        UNIX_MILLIS(time) AS tx_time_epoch,
        amount AS tx_amount,
        digital_service_name AS ds_name
        FROM `kin-bi.stellar.payments_with_tx_types_view`
        WHERE time > '{date_start}'
          AND time < '{date_end}'
    """
    
    df = query_bi_into_df(query_str)

    return df

def monthly_evolution_table():
    query_str = """
        SELECT DATE_TRUNC(date, MONTH) as month, 
            digital_service_id, 
            tx_type,
            SUM(amount) AS total,
            COUNT(tx_hash) AS num_of_tx, 
            COUNT(DISTINCT destination) as dest,
            COUNT(DISTINCT source) as src
        FROM `kin-bi.stellar.payments_with_tx_types_view` 
        GROUP BY month, digital_service_id, tx_type
        ORDER BY month, digital_service_id, tx_type
    """
    
    df = query_bi_into_df(query_str)
    
    return df

"""
start_date < end_date
date format: YYYY-MM-DD
"""
def daily_evolution_table(start_date, end_date, index_data=True):
    query_str = f"""
        SELECT date, 
          source,
          destination,
          digital_service_id, 
          tx_type,
          SUM(amount) AS total,
          COUNT(tx_hash) AS num_of_tx
        FROM `kin-bi.stellar.payments_with_tx_types_view` 
        WHERE DATE(time) >= '{start_date}'
          AND DATE(time) <= '{end_date}'  
        GROUP BY date, source, destination, digital_service_id, tx_type
        ORDER BY date, digital_service_id, tx_type
    """
    
    df = query_bi_into_df(query_str)
    if index_data:
        df = df.set_index(['date', 'source', 'destination'])
    
    return df

def daily_evolution_table_for_dev(start_date, end_date, dev_id, tx_types=['earn', 'spend', 'p2p']):
    tx_types_list = [f"""'{a}'""" for a in tx_types]
    
    query_str = f"""
        SELECT date, 
          source,
          destination,
          tx_type,
          SUM(amount) AS total,
          COUNT(tx_hash) AS num_of_tx
        FROM `kin-bi.stellar.payments_with_tx_types_view` 
        WHERE DATE(time) >= '{start_date}'
          AND DATE(time) <= '{end_date}'
          AND digital_service_id = '{dev_id}'
          AND tx_type in ({','.join(tx_types_list)})
        GROUP BY date, source, destination, digital_service_id, tx_type
        ORDER BY date, tx_type
    """
    
    df = query_bi_into_df(query_str)
    
    return df
    

def all_paying_wallets_till_date(start_date, inclusive=False):
    if inclusive:
        sign = '<='
    else:
        sign = '<'
    
    query_str = f"""
        SELECT DISTINCT source
        FROM `kin-bi.stellar.payments_with_tx_types_view` 
        WHERE date {sign} '{start_date}'
    """
    
    df = query_bi_into_df(query_str)
    
    return df

def all_earning_wallets_till_date(start_date, inclusive=False):
    if inclusive:
        sign = '<='
    else:
        sign = '<'
    
    query_str = f"""
        SELECT DISTINCT destination
        FROM `kin-bi.stellar.payments_with_tx_types_view` 
        WHERE date {sign} '{start_date}'
    """
    
    df = query_bi_into_df(query_str)
    
    return df


def monthly_unique_wallets():
    query_str = """
        SELECT DATE_TRUNC(date, MONTH) as month, 
            source,
            destination,
            digital_service_id, 
            SUM(amount) AS total,
            COUNT(tx_hash) AS num_of_tx
        FROM `kin-bi.stellar.payments_with_tx_types_view` 
        GROUP BY month, digital_service_id, source, destination
        ORDER BY month, digital_service_id
    """
    
    df = query_bi_into_df(query_str).set_index(['month', 'source', 'destination'])
    
    return df
    

# Graph Functions

In [None]:
def create_tx_graph(df_data, source_col='tx_source',
                    target_col='tx_destination',
                    amount_col='tx_amount', 
                    time_col='tx_time_epoch',
                    ds_name_col=None):
    attrs = [amount_col, time_col]
    if ds_name_col:
        attrs.append(ds_name_col)
    
    G = nx.from_pandas_edgelist(df_data,
                                source=source_col,
                                target=target_col,
                                create_using=nx.DiGraph,
                                edge_attr=attrs)
    return G

def draw_graph(G):
#     nx.draw(G)
#     nx.draw(G, with_labels=True, font_weight='bold')
    # Find and draw dominant node
    node_and_degree = G.degree()
    (largest_hub, degree) = sorted(node_and_degree, key=itemgetter(1))[-1]
    pos = nx.spring_layout(G)

    fig = plt.figure('Tx Graph', figsize=(16,9))
#     nx.draw(G, node_color='b', node_size=50, with_labels=False)
    nx.draw(G, pos, node_color='b', node_size=50, with_labels=False)
    
#     nx.draw_networkx_nodes(G, pos, nodelist=[largest_hub], node_size=300, node_color='r')

## Run Tests

In [None]:
## Transactions from selected, very busy, wallets
# num_of_txs = 50 # 100 # 500 # 100 # 10
# df_data = get_some_transactions(num_of_txs)
# df_data.to_pickle(path=f'/Users/skulas/Dev/TEMP/kin_graph_{num_of_txs}_test.pkl')

# df_data = pd.read_pickle(path='/Users/skulas/Dev/TEMP/kin_graph_10_test.pkl')
# df_data = pd.read_pickle(path='/Users/skulas/Dev/TEMP/kin_graph_50_test.pkl')
# df_data = pd.read_pickle(path='/Users/skulas/Dev/TEMP/kin_graph_100_test.pkl')
# df_data = pd.read_pickle(path='/Users/skulas/Dev/TEMP/kin_graph_500_test.pkl')
# df_data = pd.read_pickle(path='/Users/skulas/Dev/TEMP/kin_graph_5500_test.pkl')
# display(df_data.head(15))

# draw_graph(None)

## Transaction stats by dates
date_start = '2019-02-15'
date_end = '2019-03-15'
# tx_df = get_history_between_dates(date_start, date_end)
# tx_df.to_pickle(path=f'/Users/skulas/Dev/TEMP/kin_tx_stats_history_{date_start}_{date_end}.pkl')

tx_df = pd.read_pickle(path=f'/Users/skulas/Dev/TEMP/kin_tx_stats_history_{date_start}_{date_end}.pkl')
display(tx_df.head(15))
print(tx_df.shape)
print(dev_name_by_id('g58b'))

In [None]:
## Transactions based on time. Exporting data for visualization on external tool
num_of_txs = 50 # 2500 # 500
# df_data = get_transactions_between_dates(date_start = '2019-02-01', date_end = '2019-03-15', limit = num_of_txs)
# df_data.to_pickle(path=f'/Users/skulas/Dev/TEMP/kin_graph_history_{num_of_txs}_test.pkl')

df_data = pd.read_pickle(path=f'/Users/skulas/Dev/TEMP/kin_graph_history_{num_of_txs}_test.pkl')

display(df_data.head(10))

G = create_tx_graph(df_data)
# nx.write_gml(G, f'/Users/skulas/Dev/TEMP/kin_graph_history_{num_of_txs}_g.gml')
nx.write_graphml_lxml(G, f'/Users/skulas/Dev/TEMP/kin_graph_history_{num_of_txs}_g.graphml')

# draw_graph(G)

In [None]:
date_start = '2019-03-15'
date_end = '2019-04-30'
# tx_log = get_tx_log_between_dates(date_start, date_end)
# tx_log.to_pickle(path=f'/Users/skulas/Dev/TEMP/tx_log_for_graph__{date_start}_{date_end}__.pkl')
tx_log = pd.read_pickle(path=f'/Users/skulas/Dev/TEMP/tx_log_for_graph__{date_start}_{date_end}__.pkl')
display(tx_log.head(3))
print(tx_log.shape)

G = create_tx_graph(tx_log, source_col='tx_source',
                    target_col='tx_destination',
                    amount_col='tx_amount', 
                    time_col='tx_time_epoch',
                    ds_name_col='ds_name')
nx.write_graphml_lxml(G, f'/Users/skulas/Dev/TEMP/graph_tx_log__{date_start}_{date_end}_.graphml')

In [None]:
# grouped_src = tx_log.groupby(['tx_source'])
# grouped_dst = tx_log.groupby(['tx_destination'])
# count_unique_dst = grouped_src['tx_source'].unique()
# count_unique_dst.tx_log
display(count_unique_dst.head(3))
# print(dir(count_unique_dst))
count_unique_dst[0][0]

## Graph Theory

### Bridge
An edge that if removed, it deteches different zones in the graph.<br>
[Wikipedia](https://en.wikipedia.org/wiki/Bridge_(graph_theory)) definition: an edge of a graph whose deletion increases its number of connected components. Equivalently, an edge is a bridge if and only if it is not contained in any cycle. A graph is said to be bridgeless or isthmus-free if it contains no bridges.<br><br>
In our case, the bridges would probably mean the transactions across DSs or to or from exchange wallets

###  Centrality
The number of edges comming in or out a node.

### Clique
Subsets of vertices, all adjacent to each other, also called complete subgraphs.<br>
Check [wikipedia's explanation](https://en.wikipedia.org/wiki/Clique_problem) of the problem finding cliques in a graph.

### Clustering
Algorithms to characterize the number of triangles in a graph.<br>
Use to detect sub-ecosystems.<br>
Use it to detect **monkey bussiness** within a DS by the DS or users.

### Coloring
We could use [coloring](https://en.wikipedia.org/wiki/Greedy_coloring) to detect areas in the ecosystem.

### Communities
In the study of complex networks, a network is said to have [community structure](https://en.wikipedia.org/wiki/Community_structure) if the nodes of the network can be easily grouped into (potentially overlapping) sets of nodes such that each set of nodes is densely connected internally

### Cores
We could use them to detect exchanges, DS wallets and dominant users within DS sub-ecosystems.

### Cycles
In graph theory, [a cycle](https://en.wikipedia.org/wiki/Cycle_(graph_theory)) is a path of edges and vertices wherein a vertex is reachable from itself. There are several different types of cycles, principally a closed walk and a simple cycle; also, e.g., an element of the cycle space of the graph. If a graph contains no cycles it is referred to as being acyclic.<br>
An edge is in a cycle if and only if it is in a **strongly connected component**.

### Stronly Connected Component (aka disconnected)
In directed graphs, a graph is said to be strongly connected or diconnected if every vertex is reachable from every other vertex. The strongly connected components or diconnected components of an arbitrary directed graph form a partition into subgraphs that are themselves strongly connected. It is possible to test the strong connectivity of a graph, or to find its strongly connected components, in linear time.

### Degree Histogram
A list of the frequency of each degree value. The list index is the degree of the node (number of edges, in and out).

### Dominators
Not sure it'd be relevant for us, but still ...<br>
a node d [dominates](https://en.wikipedia.org/wiki/Dominator_(graph_theory)) a node n if every path from the entry node to n must go through d. Notationally, this is written as d dom n (or sometimes d {\displaystyle \gg } \gg  n). By definition, every node dominates itself. 

### Eulerian
A graph is Eulerian if it has an _Eulerian circuit_.<br>
An Eulerian circuit is a closed walk that includes each edge of a graph exactly once.

### Flow Hierarchy
The fraction of edges not participating in cycles in a directed graph.

# Evolution Analysis
Analyse evolution over time

## Basic Growth Analysis

In [8]:
## Load evolution from BI
# evolution_data = monthly_evolution_table()
# evolution_data.to_pickle(path='/Users/skulas/Dev/TEMP/RIMS_evolution_table_till_2019_June.pkl')

## Load evolution from pickle
# evolution_data = pd.read_pickle(path='/Users/skulas/Dev/TEMP/RIMS_evolution_table_till_2019_Apr.pkl')
evolution_data = pd.read_pickle(path='/Users/skulas/Dev/TEMP/RIMS_evolution_table_till_2019_June.pkl')
display(evolution_data.head(1))

Unnamed: 0,month,digital_service_id,tx_type,total,num_of_tx,dest,src
0,2018-06-01,kik,earn,70612.0,709,228,1


In [9]:
## Load wallets from BI
# wallets_evolution = monthly_unique_wallets()
# wallets_evolution.to_pickle(path='/Users/skulas/Dev/TEMP/RIMS_evolution_wallets_till_2019_June.pkl')

## Load wallets from pickle
wallets_evolution = pd.read_pickle(path='/Users/skulas/Dev/TEMP/RIMS_evolution_wallets_till_2019_June.pkl')
display(wallets_evolution.head(10))
# months = wallets_evolution.index.get_level_values(0).tolist()
months = wallets_evolution.index.unique(level='month')
display(months)
first_month = months[0]


monthly_data = wallets_evolution.loc[first_month]
srcs = monthly_data.index.unique(level='source')
dsts = monthly_data.index.unique(level='destination')
num_of_srcs = srcs.shape[0]
num_of_dsts = dsts.shape[0]
print(f'Number of sources: {num_of_srcs}, number of destinations: {num_of_dsts}')

srcs_evo = [num_of_srcs]
dsts_evo = [num_of_dsts]
srcs_ret = [0]
dsts_ret = [0]
# curr_month = months[1]
# monthly_data = wallets_evolution.loc[slice(curr_month)]
# new_srcs = monthly_data.index.unique(level='source')
# new_dsts = monthly_data.index.unique(level='destination')
# print(new_srcs.shape[0])
# new_srcs = new_srcs.drop(srcs)
# print(new_srcs.shape[0])
# srcs.append(new_srcs)

num_of_months = months.shape[0]
for month_ix in range(1, num_of_months):
    curr_month = months[month_ix]
    monthly_data = wallets_evolution.loc[curr_month]
    month_srcs = monthly_data.index.unique(level='source')
    month_dsts = monthly_data.index.unique(level='destination')
    
    
    new_srcs = month_srcs.drop(srcs, errors='ignore')
    new_dsts = month_dsts.drop(dsts, errors='ignore')
    num_of_new_srcs = new_srcs.shape[0]
    num_of_new_dsts = new_dsts.shape[0]
    num_of_ret_srcs = month_srcs.shape[0]-num_of_new_srcs
    num_of_ret_dsts = month_dsts.shape[0]-num_of_new_dsts
    
#     print(curr_month)
#     print(f'New sources: {num_of_new_srcs}, New destinations: {num_of_new_dsts}')
#     print(f'Retained sources: {num_of_ret_srcs}, Retained destinations: {num_of_ret_dsts}')
    srcs = srcs.append(new_srcs)
    dsts = dsts.append(new_dsts)
    srcs_evo.append(num_of_new_srcs)
    dsts_evo.append(num_of_new_dsts)
    srcs_ret.append(num_of_ret_srcs)
    dsts_ret.append(num_of_ret_dsts)


fig = plt.figure('Ecosystem Spending Wallets', figsize=(9,4))
plt.plot(months, srcs_evo, 'b-', label='Newly Created')
plt.plot(months, srcs_ret, 'b:', label='Returning')
plt.legend(loc='best')
plt.show()

fig = plt.figure('Ecosystem Earning Wallets', figsize=(9,4))
plt.plot(months, dsts_evo, 'r-', label='Newly Created')
plt.plot(months, dsts_ret, 'r:', label='Returning')
plt.legend(loc='best')
plt.show()



Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,digital_service_id,total,num_of_tx
month,source,destination,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2018-06-01,GAFOHSU3YU6QW2ZIXFUDNE6PKHFV2UE4T5A57KHGVUVFSASZX2J3ZJKK,GCSMRVRSWROT5BYFPPGZEB6CJBPSRX6A4XW3HG7YBXN6B72E5T2DFNA6,kik,100.0,1
2018-06-01,GDNJRI53DAO63JXQE2COPUDE3B3B6V5GGIIGE5QE4VZQZ5S2EHXHMZ6E,GBNXLYC7A6LKHP27GD3UDUO4OT6KL5TMZ5QNFUJGRPNNC5LNX25IPQMD,kik,130.0,1
2018-06-01,GDNJRI53DAO63JXQE2COPUDE3B3B6V5GGIIGE5QE4VZQZ5S2EHXHMZ6E,GAMO4T3DTF6ETSFPUG4V6DNUD4BZVA7OY4AYXUKTTXFNYMBZBNIR3EVP,kik,160.0,2
2018-06-01,GADG3E6JOUA5JICEGSODVXCNKBP4LVNU7NW3JI5SLQMLLBQLPY3K33TF,GCSMRVRSWROT5BYFPPGZEB6CJBPSRX6A4XW3HG7YBXN6B72E5T2DFNA6,kik,100.0,1
2018-06-01,GDNJRI53DAO63JXQE2COPUDE3B3B6V5GGIIGE5QE4VZQZ5S2EHXHMZ6E,GA7JXMXQBRI7ZUFUG7474QGMMUK5MM6BK6YRCXN2NDVLFM6WXEGBUVBI,kik,120.0,4
2018-06-01,GDNJRI53DAO63JXQE2COPUDE3B3B6V5GGIIGE5QE4VZQZ5S2EHXHMZ6E,GALG5XES4PCABGY4LJ3KKGANOGPUUDQGVEDXLGGY4MHAAAVGQXOGAVGV,kik,140.0,2
2018-06-01,GD7PI6PW7DYZZKLXRLS27E6K4UP6IRGRDBDQ4XP2NUTN6BNAUD2PFCX3,GCSMRVRSWROT5BYFPPGZEB6CJBPSRX6A4XW3HG7YBXN6B72E5T2DFNA6,kik,100.0,1
2018-06-01,GCLP7ND3IAFSADA6YJANBXCXJV2SFAIXB7CI2CYAAVZMXGN3DZQYVK32,GCSMRVRSWROT5BYFPPGZEB6CJBPSRX6A4XW3HG7YBXN6B72E5T2DFNA6,kik,100.0,1
2018-06-01,GDNJRI53DAO63JXQE2COPUDE3B3B6V5GGIIGE5QE4VZQZ5S2EHXHMZ6E,GCZXHFPPIS46W7E2FNWFBRTSDODAV2A5SBZUXH5NGG5Z7JQ3FAWWWEVX,kik,190.0,3
2018-06-01,GDXPTSZK6TZF3WW2WQCNJJJBMMCWOANHWKLUYMFBBAZNSJKICFZOMIBW,GCSMRVRSWROT5BYFPPGZEB6CJBPSRX6A4XW3HG7YBXN6B72E5T2DFNA6,kik,200.0,2


DatetimeIndex(['2018-06-01', '2018-07-01', '2018-08-01', '2018-09-01',
               '2018-10-01', '2018-11-01', '2018-12-01', '2019-01-01',
               '2019-02-01', '2019-03-01', '2019-04-01', '2019-05-01',
               '2019-06-01'],
              dtype='datetime64[ns]', name='month', freq=None)

Number of sources: 178, number of destinations: 258


<IPython.core.display.Javascript object>


To register the converters:
	>>> from pandas.plotting import register_matplotlib_converters
	>>> register_matplotlib_converters()


<IPython.core.display.Javascript object>

### DS by DS - Draw evolution

In [10]:
def wallets_evo(dev_id, data):
    dev_wallets = data.loc[data['digital_service_id'] == dev_id]
    
def draw_ds_evo(dev_id, data):
    dev_name = dev_name_by_id(dev_id)
    display(Markdown('## {}'.format(dev_name)))
    dev_evo = data.loc[data['digital_service_id'] == dev_id]
    # Index data by month
    dev_evo.set_index(['month'], inplace=True)
    spend_evo = dev_evo.loc[dev_evo['tx_type'] == 'spend']
    p2p_evo = dev_evo.loc[dev_evo['tx_type'] == 'p2p']
    earn_evo = dev_evo.loc[dev_evo['tx_type'] == 'earn']
    
    merged_for_drawing = pd.merge(spend_evo, p2p_evo, how='outer', left_index=True, right_index=True)
    merged_for_drawing = pd.merge(merged_for_drawing, earn_evo, how='outer', left_index=True, right_index=True)
    merged_for_drawing.fillna(0, inplace=True)
#     display(merged_for_drawing)
    

    fig = plt.figure('Spend Transactions and Wallets', figsize=(9,4))
    plt.plot(merged_for_drawing.index, merged_for_drawing['num_of_tx_x'], 'b-', label='# of Spend')
    plt.plot(merged_for_drawing.index, merged_for_drawing['src_x'], 'r-', label='# of Wallets')
    plt.legend(loc='best')
    plt.show()
    
    fig = plt.figure('P2P Transactions and Wallets', figsize=(9,4))
    plt.plot(merged_for_drawing.index, merged_for_drawing['num_of_tx_y'], 'b-', label='# of P2P')
    wallets_p2p_count = merged_for_drawing['src_y'] + merged_for_drawing['dest_y']
    plt.plot(merged_for_drawing.index, wallets_p2p_count, 'r-', label='# of Wallets')
    plt.legend(loc='best')
    plt.show()
    
    fig = plt.figure('Earn Transactions and Wallets', figsize=(9,4))
    plt.plot(merged_for_drawing.index, merged_for_drawing['num_of_tx'], 'b-', label='# of Earns')
    plt.plot(merged_for_drawing.index, merged_for_drawing['dest'], 'r-', label='# of Wallets')
    plt.legend(loc='best')
    plt.show()
    
    fig = plt.figure('Volume', figsize=(9,4))
    plt.plot(merged_for_drawing.index, merged_for_drawing['total_x'], 'b-', label='Spend')
    plt.plot(merged_for_drawing.index, merged_for_drawing['total_y'], 'r-', label='Earn')
    plt.plot(merged_for_drawing.index, merged_for_drawing['total'], 'g:', label='P2P')
    plt.legend(loc='best')
    plt.show()


In [11]:
draw_ds_evo('8vlz', evolution_data)

Quering developers from BI


## Nearby

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

## Daily Growth for MVP

In [12]:
import time
from multiprocessing import Process, Queue

RET_KEY = 'returning'
NEW_KEY = 'new'


def read_data_for_dates(start_date, end_date):
    pickle_filename = f'RIMS_daily_evolution_between_dates_{start_date}_{end_date}.pkl'
    
    try:
        df = pd.read_pickle(path=f'{data_path}{pickle_filename}')
    except FileNotFoundError:
        df = daily_evolution_table(start_date, end_date)
        df.to_pickle(path=f'{data_path}{pickle_filename}')

    return df

def read_paying_wallets(start_date):
    pickle_filename = f'RIMS_paying_wallets_till_date_{start_date}.pkl'

    try:
        df = pd.read_pickle(path=f'{data_path}{pickle_filename}')
    except FileNotFoundError:
        df = all_paying_wallets_till_date(start_date)
        df.to_pickle(path=f'{data_path}{pickle_filename}')

    return df

def read_earning_wallets(start_date):
    pickle_filename = f'RIMS_earning_wallets_till_date_{start_date}.pkl'

    try:
        df = pd.read_pickle(path=f'{data_path}{pickle_filename}')
    except FileNotFoundError:
        df = all_earning_wallets_till_date(start_date)
        df.to_pickle(path=f'{data_path}{pickle_filename}')

    return df

## Load Data
start_date = '2019-02-01'
end_date = '2019-06-01'

print(f'Loading daily data between {start_date} and {end_date}')
def f1(resout):
    print('start read history')
    res = read_data_for_dates(start_date, end_date)
    q1.put(res)
    print('Tx history ready')

def f2(resout):
    print('start read earning wallets')
    earning_wallets_df = read_earning_wallets(start_date)
    num_of_existing_wallets, _ = earning_wallets_df.shape
    resout.put(earning_wallets_df)
    print(f'Number of existing e wallets:{num_of_existing_wallets}')

def f3(resout):
    print('start read paying wallets')
    paying_wallets_df = read_paying_wallets(start_date)
    num_of_existing_wallets, _ = paying_wallets_df.shape
    resout.put(paying_wallets_df)
    print(f'Number of existing p wallets:{num_of_existing_wallets}')

q1 = Queue()
p1 = Process(target=f1, args=(q1,))
q2 = Queue()
p2 = Process(target=f2, args=(q2,))
q3 = Queue()
p3 = Process(target=f3, args=(q3,))

p1.start()
p2.start()
p3.start()

day_by_day_df = q1.get()
earning_wallets_df = q2.get()
paying_wallets_df = q3.get()

p1.join()
p2.join()
p3.join()
print('ALL DONE')


def new_returning_unique_wallets(data, prev_srcs, prev_dsts):
    # Internally used by build_report_by_dev_ids
    days = data.index.unique(level='date')
    first_day = days[0]

    daily_data = data.loc[first_day]
    srcs = daily_data.index.unique(level='source')
    dsts = daily_data.index.unique(level='destination')
    
    num_of_srcs = srcs.shape[0]
    num_of_dsts = dsts.shape[0]

    srcs = srcs.drop(prev_srcs, errors='ignore')
    dsts = dsts.drop(prev_dsts, errors='ignore')

    num_of_ret_srcs = num_of_srcs-srcs.shape[0]
    num_of_ret_dsts = num_of_dsts-dsts.shape[0]
#     print(f'Number of sources: {num_of_srcs}, number of destinations: {num_of_dsts}')

    srcs_evo = [num_of_srcs]
    dsts_evo = [num_of_dsts]
    srcs_ret = [num_of_ret_srcs]
    dsts_ret = [num_of_ret_dsts]

    num_of_days = days.shape[0]
    for day_ix in range(1, num_of_days):
        curr_day = days[day_ix]
        daily_data = data.loc[curr_day]
        day_srcs = daily_data.index.unique(level='source')
        day_dsts = daily_data.index.unique(level='destination')

        new_srcs = day_srcs.drop(srcs, errors='ignore')
        new_dsts = day_dsts.drop(dsts, errors='ignore')
        new_srcs = new_srcs.drop(prev_srcs, errors='ignore')
        new_dsts = new_srcs.drop(prev_dsts, errors='ignore')
        num_of_new_srcs = new_srcs.shape[0]
        num_of_new_dsts = new_dsts.shape[0]
        num_of_ret_srcs = day_srcs.shape[0]-num_of_new_srcs
        num_of_ret_dsts = day_dsts.shape[0]-num_of_new_dsts

#         print(curr_day)
#         print(f'New sources: {num_of_new_srcs}, New destinations: {num_of_new_dsts}')
#         print(f'Retained sources: {num_of_ret_srcs}, Retained destinations: {num_of_ret_dsts}')
        srcs = srcs.append(new_srcs)
        dsts = dsts.append(new_dsts)
        srcs_evo.append(num_of_new_srcs)
        dsts_evo.append(num_of_new_dsts)
        srcs_ret.append(num_of_ret_srcs)
        dsts_ret.append(num_of_ret_dsts)
    
    result_dic = {
                    'date': days,
                    'new_paying': srcs_evo,
                    'ret_paying': srcs_ret,
                    'new_earning': dsts_evo,
                    'ret_earning': dsts_ret
                 }
    res_df = pd.DataFrame(result_dic)
    
    return res_df

def build_report_by_dev_ids(ids, data=day_by_day_df):
    ## Number of Transactions vs Volume by tx type
    dev_name = dev_name_by_id(ids[0])
    display(Markdown(f'## {dev_name} ...'))
    dev_data = data[data['digital_service_id'].isin(ids)]
    grouped = dev_data.groupby(['date', 'tx_type'])
    grp_sum = grouped.sum()
    
    df_pivot = grp_sum.reset_index()#['tx_type', 'total', 'num_of_tx'])
    totals = df_pivot.pivot(index='date', columns='tx_type', values=['total', 'num_of_tx']).fillna(0).reset_index()
    
    ## Retention vs Aquisition
    prev_srcs = paying_wallets_df['source'].tolist()
    prev_dsts = earning_wallets_df['destination'].tolist()
    ret_vs_new = new_returning_unique_wallets(dev_data, prev_srcs, prev_dsts)

    ## Merge report data
    r_df = totals.merge(ret_vs_new, how='outer', on='date', left_index=False, right_index=False)

    ## Draw
    # Spenders retention
    ptitle = f'{dev_name} RIMS Spend Retention Report'
    fig1 = plt.figure(ptitle, figsize=(9,4))
    plt.stackplot(r_df.index,[r_df['ret_paying'].tolist(), r_df['new_paying'].tolist()], labels=['Returning','New'])
#     plt.stackplot(r_df['date'].tolist(),[r_df['ret_paying'].tolist(), r_df['new_paying'].tolist()], labels=['Returning','New'])
    plt.legend(loc='best')
    plt.title(ptitle)
    plt.show()
    
    time.sleep(1)

    # Earners retention
    ptitle = f'{dev_name} RIMS Earn Retention Report'
    fig2 = plt.figure(ptitle, figsize=(9,4))
    plt.stackplot(r_df['date'].tolist(),[r_df['ret_earning'].tolist(), r_df['new_earning'].tolist()], labels=['Returning', 'New'])
    plt.legend(loc='best')
    plt.title(ptitle)
    plt.show()
    
    time.sleep(1)
    
    # Earn vs Spend
    ptitle = f'{dev_name} RIMS P2P vs C2B Spend Report'
    fig3 = plt.figure(ptitle, figsize=(9,4))
    plt.stackplot(r_df['date'].tolist(),[r_df[('num_of_tx', 'p2p')].tolist(), r_df[('num_of_tx', 'spend')].tolist()], labels=['P2P','C2B Spend'])
    plt.legend(loc='best')
    plt.title(ptitle)
    plt.show()
    
    time.sleep(1)
    
    display(r_df)


Loading daily data between 2019-02-01 and 2019-06-01
start read history
start read earning wallets
start read paying wallets
Number of existing p wallets:305967
Number of existing e wallets:816118
Tx history ready
ALL DONE


In [13]:
build_report_by_dev_ids(['8vlz'], day_by_day_df)

## Nearby ...



<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Unnamed: 0,date,"(date, )","(total, earn)","(total, p2p)","(total, spend)","(num_of_tx, earn)","(num_of_tx, p2p)","(num_of_tx, spend)",new_paying,ret_paying,new_earning,ret_earning
0,2019-02-01,2019-02-01,91721.0,12509.0,48850.0,991.0,109.0,977.0,524,76,981,287
1,2019-02-02,2019-02-02,76232.0,6415.0,37800.0,875.0,64.0,756.0,297,52,290,576
2,2019-02-03,2019-02-03,77022.0,8890.0,34650.0,896.0,76.0,693.0,332,61,327,546
3,2019-02-04,2019-02-04,101130.0,7991.0,43250.0,1117.0,73.0,865.0,415,63,411,683
4,2019-02-05,2019-02-05,113568.0,9366.0,61300.0,1182.0,76.0,1226.0,525,66,522,641
5,2019-02-06,2019-02-06,85707.0,13056.0,39000.0,960.0,111.0,780.0,366,70,362,573
6,2019-02-07,2019-02-07,100932.0,9261.0,54000.0,1084.0,67.0,1080.0,475,60,470,583
7,2019-02-08,2019-02-08,92477.0,10250.0,49100.0,990.0,62.0,982.0,463,61,459,508
8,2019-02-09,2019-02-09,90568.0,1030.0,49250.0,1004.0,15.0,985.0,430,64,427,549
9,2019-02-10,2019-02-10,88969.0,31021.0,34500.0,972.0,179.0,690.0,407,72,406,538


In [None]:
## Build Report
from concurrent.futures import ThreadPoolExecutor as PoolExecutor

dev_lst = devs_ids_list()

def parallel_func(dev_id):
    print(f'Report for {dev_id}')
    build_report_by_dev_ids(list(dev_id), day_by_day_df)

with PoolExecutor(max_workers=8) as executor:
    print('Start')
    for _ in executor.map(parallel_func, dev_lst):
        pass # Wailts till all executor threads are done running
    
    print('All Done')

# build_report_by_dev_ids(['8vlz'], day_by_day_df)
# build_report_by_dev_ids(['8vlz'], df)
# display(df.head(3))

In [None]:
dev_lst = devs_ids_list()

for devid in dev_lst:
    build_report_by_dev_ids([devid], day_by_day_df)


In [None]:
### TESTS
dev_data = df[df['digital_service_id'] == '2bpx']
evo_d_df = new_returning_unique_wallets(dev_data)
display(evo_d_df)

# days = dev_data.index.unique(level='date')
# first_day = days[2]
# daily_data = dev_data.loc[first_day]
# daily_data = dev_data.loc[slice(first_day)]
# srcs = daily_data.index.unique(level='destination')
# display(daily_data)
# display(srcs)

# Retention Analysis

## Daily Returns Historgram

In [None]:
devs_table = devs_df()
display(devs_table)

In [6]:
## Data Aquisition

def read_daily_evolution_table_for_dev(start_date, end_date, dev_id, tx_types=['earn', 'spend', 'p2p']):
    tx_types_str = '-'.join(tx_types)
    pickle_filename = f'RIMS_daily_evo_for_{dev_id}__{start_date}_{end_date}__{tx_types_str}.pkl'
    
    try:
        df = pd.read_pickle(path=f'{data_path}{pickle_filename}')
    except FileNotFoundError:
        df = daily_evolution_table_for_dev(start_date, end_date, dev_id, tx_types)
        df.to_pickle(path=f'{data_path}{pickle_filename}')

    return df



In [7]:
dev_id = '8vlz'
dev_name = dev_name_by_id(dev_id)
start_date = '2019-05-1'
end_date = '2019-06-01'

spends_df = read_daily_evolution_table_for_dev(start_date, end_date, dev_id, ['spend'])
p2ps_df = read_daily_evolution_table_for_dev(start_date, end_date, dev_id, ['p2p'])
# Earn is passive, so we don't care about earns
# earns_df = read_daily_evolution_table_for_dev(start_date, end_date, dev_id, ['earn'])


Quering developers from BI


In [None]:
## DELETE THIS CELL
display(spends_df.head(2))
display(p2ps_df.head(2))


In [None]:
number_of_repeated_spenders = spends_df.groupby('source').count()
hist_of_repeated_spenders = number_of_repeated_spenders.groupby('destination').count()

unified = p2ps_df.groupby(['date','source']).count().reset_index()
number_of_p2p_payers = unified.groupby('source').count()
display(number_of_p2p_payers)
hist_of_repeated_p2p_payers = number_of_p2p_payers.groupby('destination').count()


In [None]:
# hist_of_repeated_spenders
hist_of_repeated_spenders.rename(columns={'date':'DS_returns'}, inplace=True)
hist_of_repeated_p2p_payers.rename(columns={'date':'P2P_returns'}, inplace=True)

display(hist_of_repeated_spenders)
display(hist_of_repeated_p2p_payers)

df_merge = pd.merge(hist_of_repeated_spenders, hist_of_repeated_p2p_payers, how='outer', left_index=True, right_index=True).fillna(0, downcast='infer') # {float64:int64})
spends_hist = df_merge[['DS_returns', 'P2P_returns']]
display(spends_hist)




In [None]:
f = bf(title=f"Number of Users by Number of Days - {dev_name}", plot_width=600, plot_height=600) #, y_axis_type="log")

# s = f.vbar(x='index', bottom=0, top='S_w', width=0.5, source=hist_of_repeated_spenders)
# p = f.vbar(x='index', bottom='S_w', top=1, width=0.5, source=hist_of_repeated_p2p_payers, color='orange')
# s = f.vbar(hist_of_repeated_spenders.index, 'date', bottom=0, top='S_w', width=0.5, source=hist_of_repeated_spenders)
# p = f.vbar(hist_of_repeated_p2p_payers.index, 'date', bottom='S_w', top=1, width=0.5, source=hist_of_repeated_p2p_payers, color='orange')

# s_label = f.text(x='index', y='date', source=hist_of_repeated_spenders, text='DS Spenders')
# p_label = f.text(x='index', y='date', source=hist_of_repeated_p2p_payers, text='P2P Spenders')

f.vbar_stack(stackers=['DS_returns', 'P2P_returns'],
             x='destination', source=spends_hist,
             legend = ['DS Spends', 'P2P Spends'],
             width=0.5, color=['#ff7f0e', '#ffbb78']) #, y_axis_type="log")
show(f)



### Functional Implementation

In [8]:
# Init bokeh
from bokeh.io import output_notebook, show
from bokeh.plotting import figure as bf
output_notebook()

def calculate_retention_histogram_by_dates(dev_id, start_date, end_date):
    dev_name = dev_name_by_id(dev_id)

    # Load Data - even from BI or hard disk cache
    spends_df = read_daily_evolution_table_for_dev(start_date, end_date, dev_id, ['spend'])
    p2ps_df = read_daily_evolution_table_for_dev(start_date, end_date, dev_id, ['p2p'])

    # Payments made to DS
    # Count the number of different paying wallets daily. spends_df already contains one transaction
    # per day for each wallet.
    number_of_repeated_spenders = spends_df.groupby('source').count()
    # 'destination' has no meaning here, is just to count how many times each number appears in the table
    # 'destination' contains the number of days 'source' made payments during the time frame
    hist_of_repeated_spenders = number_of_repeated_spenders.groupby('destination').count()
    
    # Similar trick for P2P payments
    # p2ps_df cointain one transaction per day between wallet A and wallet B. Thus, if in the same date
    # wallet A also payed to wallet C, wallet A would be counted twice.
    # Group by date first, so many P2P from same wallet are counted once a day.
    unified = p2ps_df.groupby(['date','source']).count().reset_index()
    number_of_p2p_payers = unified.groupby('source').count()
    hist_of_repeated_p2p_payers = number_of_p2p_payers.groupby('destination').count()

    # Rename columns for a meaningful title
    hist_of_repeated_spenders.rename(columns={'date':'DS_returns'}, inplace=True)
    hist_of_repeated_p2p_payers.rename(columns={'date':'P2P_returns'}, inplace=True)

    # Merge P2P and DS payments
    df_merge = pd.merge(hist_of_repeated_spenders, hist_of_repeated_p2p_payers, how='outer', left_index=True, right_index=True).fillna(0, downcast='infer') # {float64:int64})
    spends_hist = df_merge[['DS_returns', 'P2P_returns']]
    
    return spends_hist

def draw_retention_histogram_by_dates(dev_id, start_date, end_date):
    dev_name = dev_name_by_id(dev_id)
    f = bf(title=f"Number of Users by Number of Days - {dev_name}", plot_width=600, plot_height=600) #, y_axis_type="log")

    spends_hist = calculate_retention_histogram_by_dates(dev_id, start_date, end_date)
    
    f.vbar_stack(stackers=['DS_returns', 'P2P_returns'],
                 x='destination', source=spends_hist,
                 legend = ['DS Spends', 'P2P Spends'],
                 width=0.5, color=['#ff7f0e', '#ffbb78']) #, y_axis_type="log")
    show(f)



In [None]:
dev_ids = ['kik', 'l83h', '8vlz', 'lipz', 'p365', 'l68b', 'tapa', 'swel', 'lsff', 'rced']
start_date = '2019-04-01'
end_date = '2019-04-30'


for dev_id in dev_ids:
    draw_retention_histogram_by_dates(dev_id, start_date, end_date)

## DAE, DAS, P2P Medians

### Functions

In [9]:
import math
from bokeh.plotting import figure as bf
from bokeh.io import output_notebook, show
output_notebook()


def median_num_of_txs(dev_id, start_date, end_date, action):
    txs_df = read_daily_evolution_table_for_dev(start_date, end_date, dev_id, [action])
    
    if action == 'p2p':
        # reduce all repeated sources
#         txs_grouper = txs_df.groupby(['source', 'date'])
        txs_grouper = txs_df.groupby(['date', 'source'])
        txs_df = txs_grouper.count()
#         txs_grouper = txs_df.groupby('date')
#         txs_df = txs_grouper.count()        
#     else:
    txs_grouper = txs_df.groupby('date')
    txs_df = txs_grouper.count()
        
#     tx_count, _ = txs_df.shape
#     display(txs_df.head(5))
#     median_res = txs_df['destination'].median(skipna=True)
    median_res = txs_df['destination'].median()
    if math.isnan(median_res):
        median_res = 0
    
    return median_res

def calculate_median_histograms_by_dates(dev_id, start_date, end_date):
    # Load Data - even from BI or hard disk cache
    spends_count = median_num_of_txs(dev_id, start_date, end_date, 'spend')
    earn_count = median_num_of_txs(dev_id, start_date, end_date, 'earn')
    p2p_count = median_num_of_txs(dev_id, start_date, end_date, 'p2p')
    dev_name = dev_name_by_id(dev_id)
    
    return {'dev_id': dev_id, 'dev_name': dev_name, 'spend': spends_count, 'p2p': p2p_count, 'earn':earn_count }

def draw_devs_data(devs_df, start_date, end_date):
#     dev_names = []
#     for dev_id in devs_df['dev_id']:
#         dev_name = dev_name_by_id(dev_id)
#         dev_names.append(dev_name)

    f = bf(title=f"Median Daily Users from {start_date} to {end_date}",
           x_range = devs_df['dev_name'],
           plot_width=600, plot_height=1000) #, y_axis_type="log")

    f.vbar_stack(stackers=['spend', 'p2p', 'earn'],
                 x='dev_name', source=devs_df,
                 legend = ['Spends', 'P2Ps', 'Earns'],
                 width=0.5, color=['#0000ff', '#ff0000', '#00ff00']) #, y_axis_type="log")
    show(f)


In [None]:
# calculate_median_histograms_by_dates('l83h', start_date, end_date)
dev_id = 'lipz'
print(median_num_of_txs(dev_id, start_date, end_date, 'p2p'))
print(median_num_of_txs(dev_id, start_date, end_date, 'spend'))
print(median_num_of_txs(dev_id, start_date, end_date, 'earn'))

In [None]:
dev_ids = ['kik', 'l83h', '8vlz', 'lipz', 'p365', 'l68b', 'tapa', 'swel', 'lsff', 'rced']
start_date = '2019-04-01'
end_date = '2019-04-30'

devs_list = []

for dev_id in dev_ids:
    dev_numbers = calculate_median_histograms_by_dates(dev_id, start_date, end_date)
    devs_list.append(dev_numbers)

devs_data = pd.DataFrame(devs_list)
draw_devs_data(devs_data, start_date, end_date)