# Price data quality dashboard

## Imports and set up django environment

In [11]:
import os
import pandas as pd
import django
from django.db import connection
from IPython.core.display import display, HTML
os.chdir('..')

# Allows async calls to django ORM in Jupyter. Required.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'rest.settings')
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
django.setup()

from pricedata import models





## Time ORM vs  Raw SQL
Retrieve data into dsataframe. We need:
* Symbol Name;
* Instrument Type Name;
* DataSource Name; and
* Time.

Where Retrieve Price Data is True and period is '1S'

## Django ORM QuertSet then convert to dataframe

In [32]:
%%timeit
price_data =  models.Candle.objects.filter(datasource_symbol__retrieve_price_data=True, period='1S')


df = pd.DataFrame(list(price_data.values('datasource_symbol__symbol__name',
                                                 'datasource_symbol__symbol__instrument_type',
                                                 'datasource_symbol__datasource__name', 'time')))

print(len(df.index))

2147012
2147012
2147012
2147012
2147012
2147012
2147012
2147012
25.4 s ± 1.45 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Raw sql straight into dataframe


In [61]:
%%timeit
sql = """SELECT	s.name AS symbol,
                s.instrument_type AS instrument_type,
                ds.name AS datasource,
                cdl.time AS time
        FROM public.pricedata_datasourcesymbol dss
            INNER JOIN pricedata_datasource ds ON dss.datasource_id = ds.id
            INNER JOIN pricedata_symbol s ON dss.symbol_id = s.id
            INNER JOIN pricedata_candle cdl ON cdl.datasource_symbol_id = dss.id  
        WHERE dss.retrieve_price_data = true
            AND cdl.period = %(period)s"""
    
df = pd.read_sql_query(sql=sql, con=connection, params={'period': '1S'})

print(len(df.index))

2240566


## % difference between results

In [55]:
100 - 22.2 / 25.4 * 100

12.5984251968504

## Results
### Django
Time: 25.4 s ± 1.45 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
Rowcount: 2147012
    
### Raw
Time: 22.2 s ± 221 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Rowcount:214012

#### We have a winner. Raw SQL is 12.5% faster than Django
    


## Get the data using raw SQL, this time without %%timeit

In [2]:
sql = """SELECT	s.name AS symbol,
                s.instrument_type AS instrument_type,
                ds.name AS datasource,
                cdl.time AS time
        FROM public.pricedata_datasourcesymbol dss
            INNER JOIN pricedata_datasource ds ON dss.datasource_id = ds.id
            INNER JOIN pricedata_symbol s ON dss.symbol_id = s.id
            INNER JOIN pricedata_candle cdl ON cdl.datasource_symbol_id = dss.id  
        WHERE dss.retrieve_price_data = true
            AND cdl.period = %(period)s"""
    
df = pd.read_sql_query(sql=sql, con=connection, params={'period': '1S'})

df

Unnamed: 0,symbol,instrument_type,datasource,time
0,CADCHF,FOREX,MT5,2021-07-16 00:00:00+00:00
1,CADCHF,FOREX,MT5,2021-07-16 00:05:01+00:00
2,CADCHF,FOREX,MT5,2021-07-16 00:05:27+00:00
3,CADCHF,FOREX,MT5,2021-07-16 00:05:43+00:00
4,CADCHF,FOREX,MT5,2021-07-16 00:05:51+00:00
...,...,...,...,...
4285989,USDCNH,FOREX,MT5,2021-07-20 10:25:13+00:00
4285990,USDCNH,FOREX,MT5,2021-07-20 10:25:14+00:00
4285991,USDCNH,FOREX,MT5,2021-07-20 10:25:15+00:00
4285992,USDCNH,FOREX,MT5,2021-07-20 10:25:16+00:00


## Add another datasource for testing

In [136]:
nds = df.copy()
nds['datasource'] = 'NDS'
df = df.append(nds)

print(f"NDS has {len(df[df['datasource'] == 'NDS'].index)} rows. MT5 has {len(df[df['datasource'] == 'MT5'].index)} rows.")

df

NDS has 2473058 rows. MT5 has 2473058 rows.


Unnamed: 0,symbol,instrument_type,datasource,time
0,CADCHF,FOREX,MT5,2021-07-16 00:00:00+00:00
1,CADCHF,FOREX,MT5,2021-07-16 00:05:01+00:00
2,CADCHF,FOREX,MT5,2021-07-16 00:05:27+00:00
3,CADCHF,FOREX,MT5,2021-07-16 00:05:43+00:00
4,CADCHF,FOREX,MT5,2021-07-16 00:05:51+00:00
...,...,...,...,...
2473053,USDCNH,FOREX,NDS,2021-07-19 12:25:07+00:00
2473054,USDCNH,FOREX,NDS,2021-07-19 12:25:08+00:00
2473055,USDCNH,FOREX,NDS,2021-07-19 12:25:10+00:00
2473056,USDCNH,FOREX,NDS,2021-07-19 12:25:11+00:00


## Table showing num prices by symbol, by DS for each aggregarion period

In [82]:
# First group by to get min and max times
grouped = df.groupby(['symbol', 'instrument_type', 'datasource']).agg(first=('time', 'min'), last=('time', 'max'), count=('time', 'count'))

# Now get min, max and mean for each aggregation period for each datasource.
# 'minutes': 'T', 'hours': 'H', 'days': 'D', 'weeks': 'W', 'months': 'M'
aggs = {'minutes': 'T', 'hours': 'H', 'days': 'D', 'weeks': 'W', 'months': 'M'}

for key in aggs:
    # Get counts for aggregation period, then group by symbol, instrument type and datasource to get min, max and avg counts for aggregation period
    agg_period_ungrouped = df.groupby(['symbol', 'instrument_type', 'datasource', pd.Grouper(key='time', freq=aggs[key])]).agg(count=('time', 'count'))
    agg_period_grouped = agg_period_ungrouped.groupby(['symbol', 'instrument_type', 'datasource']).agg(min=('count', 'min'), max=('count', 'max'), avg=('count', 'median'))

    # Rename columns to include aggregation period key, then merge into original dataframe
    agg_period_grouped = agg_period_grouped.rename(columns={'min': f'{key}_min', 'max': f'{key}_max', 'avg': f'{key}_avg'})
    grouped = grouped.join(agg_period_grouped, on=['symbol', 'instrument_type', 'datasource'])
    
# Now unstack, so we show columns for each datasource
grouped = grouped.unstack()

# Create the same aggregations but across all datasources
agg_cols = set([x[0] for x in grouped.columns])
ds_cols = set([x[1] for x in grouped.columns])

for agg_col in agg_cols:
    if '_min' in agg_col or agg_col == 'first':
        grouped[(agg_col, 'all')] = grouped[[(agg_col, ds) for ds in ds_cols]].min(axis=1)
    elif '_max' in agg_col or agg_col == 'last':
        grouped[(agg_col, 'all')] = grouped[[(agg_col, ds) for ds in ds_cols]].max(axis=1)
    elif '_avg' in agg_col:
        grouped[(agg_col, 'all')] = grouped[[(agg_col, ds) for ds in ds_cols]].mean(axis=1)
        

# Flatten the columns and reset the index
grouped.columns = ['_'.join(col).strip() for col in grouped.columns]
grouped = grouped.reset_index()
                          
grouped


Unnamed: 0,symbol,instrument_type,first_MT5,last_MT5,count_MT5,minutes_min_MT5,minutes_max_MT5,minutes_avg_MT5,hours_min_MT5,hours_max_MT5,...,weeks_max_all,months_min_all,hours_avg_all,days_max_all,months_max_all,minutes_min_all,minutes_avg_all,days_min_all,weeks_avg_all,months_avg_all
0,AUDCAD,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:47+00:00,75431,1,60,27.0,126,3341,...,39258,75431,1499.0,39258,75431,1,27.0,36173,37715.5,75431.0
1,AUDCHF,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:51+00:00,71641,1,60,26.0,99,3310,...,37219,71641,1432.5,37219,71641,1,26.0,34422,35820.5,71641.0
2,AUDJPY,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:52+00:00,76895,1,60,27.0,134,3319,...,40185,76895,1575.5,40185,76895,1,27.0,36710,38447.5,76895.0
3,AUDNZD,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:47+00:00,72478,1,60,25.0,117,3338,...,38308,72478,1492.5,38308,72478,1,25.0,34170,36239.0,72478.0
4,AUDUSD,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:47+00:00,70547,1,60,24.0,133,3308,...,38767,70547,1351.0,38767,70547,1,24.0,31780,35273.5,70547.0
5,CADCHF,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:23+00:00,54384,1,59,18.0,80,3040,...,29427,54384,958.5,29427,54384,1,18.0,24957,27192.0,54384.0
6,CADJPY,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:26+00:00,64055,1,59,22.0,137,3081,...,33468,64055,1339.0,33468,64055,1,22.0,30587,32027.5,64055.0
7,CHFJPY,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:23+00:00,58093,1,59,20.0,134,2684,...,30346,58093,1178.5,30346,58093,1,20.0,27747,29046.5,58093.0
8,CHFSGD,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:58+00:00,41969,1,56,14.0,9,2384,...,22202,41969,743.5,22202,41969,1,14.0,19767,20984.5,41969.0
9,EURAUD,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:55+00:00,95460,1,60,36.0,198,3508,...,49112,95460,2128.0,49112,95460,1,36.0,46348,47730.0,95460.0


## We will keep the full data table intact, and create a new one for display purposes. This will:
* Remove the DS agg columns; 
* Move the agg functions into the cells; 
* Rename the columns; and
* Reorder columns.

In [83]:
display = grouped

# Merge the agg functions for _all into cells for each aggregation period
for agg in ['minutes', 'hours', 'days', 'weeks', 'months']:
    # Get the min, max and avg columns and consolidate into a new column. Drop the existing ones.
    min_col = f'{agg}_min_all'
    max_col = f'{agg}_max_all'
    avg_col = f'{agg}_avg_all'
    display[agg] = 'min:' + display[min_col].astype(str) + ' max:' + display[max_col].astype(str) + ' avg:' + display[avg_col].astype(str)

# Rename the columns that we will be keeping
display = display.rename(
    columns={'symbol': 'Symbol', 'instrument_type': 'Instrument Type', 'first_all': 'First Price',
             'last_all': 'Last Price', 'minutes': 'Minutes', 'hours': 'Hours', 'days': 'Days', 'weeks': 'Weeks',
             'months': 'Months'})

# Reorder the columns, excluding any that we dont want to keep
display = display[
    ['Symbol', 'Instrument Type', 'First Price', 'Last Price', 'Minutes', 'Hours', 'Days', 'Weeks', 'Months']]

display

Unnamed: 0,Symbol,Instrument Type,First Price,Last Price,Minutes,Hours,Days,Weeks,Months
0,AUDCAD,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:47+00:00,min:1 max:60 avg:27.0,min:126 max:3341 avg:1499.0,min:36173 max:39258 avg:37715.5,min:36173 max:39258 avg:37715.5,min:75431 max:75431 avg:75431.0
1,AUDCHF,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:51+00:00,min:1 max:60 avg:26.0,min:99 max:3310 avg:1432.5,min:34422 max:37219 avg:35820.5,min:34422 max:37219 avg:35820.5,min:71641 max:71641 avg:71641.0
2,AUDJPY,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:52+00:00,min:1 max:60 avg:27.0,min:134 max:3319 avg:1575.5,min:36710 max:40185 avg:38447.5,min:36710 max:40185 avg:38447.5,min:76895 max:76895 avg:76895.0
3,AUDNZD,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:47+00:00,min:1 max:60 avg:25.0,min:117 max:3338 avg:1492.5,min:34170 max:38308 avg:36239.0,min:34170 max:38308 avg:36239.0,min:72478 max:72478 avg:72478.0
4,AUDUSD,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:47+00:00,min:1 max:60 avg:24.0,min:133 max:3308 avg:1351.0,min:31780 max:38767 avg:35273.5,min:31780 max:38767 avg:35273.5,min:70547 max:70547 avg:70547.0
5,CADCHF,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:23+00:00,min:1 max:59 avg:18.0,min:80 max:3040 avg:958.5,min:24957 max:29427 avg:27192.0,min:24957 max:29427 avg:27192.0,min:54384 max:54384 avg:54384.0
6,CADJPY,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:26+00:00,min:1 max:59 avg:22.0,min:137 max:3081 avg:1339.0,min:30587 max:33468 avg:32027.5,min:30587 max:33468 avg:32027.5,min:64055 max:64055 avg:64055.0
7,CHFJPY,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:23+00:00,min:1 max:59 avg:20.0,min:134 max:2684 avg:1178.5,min:27747 max:30346 avg:29046.5,min:27747 max:30346 avg:29046.5,min:58093 max:58093 avg:58093.0
8,CHFSGD,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:58+00:00,min:1 max:56 avg:14.0,min:9 max:2384 avg:743.5,min:19767 max:22202 avg:20984.5,min:19767 max:22202 avg:20984.5,min:41969 max:41969 avg:41969.0
9,EURAUD,FOREX,2021-07-16 00:00:00+00:00,2021-07-19 21:24:55+00:00,min:1 max:60 avg:36.0,min:198 max:3508 avg:2128.0,min:46348 max:49112 avg:47730.0,min:46348 max:49112 avg:47730.0,min:95460 max:95460 avg:95460.0


### This is slow. Timeit shows: 21.9 s ± 60.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

## Tieing it all together

In [13]:
def retrieve_price_data(period: str) -> pd.DataFrame:
    """
    Retrieves the price data.
    
    :param period: The candle period that we are producing the dashboard for
    
    :return: DataFrame containing price data
    """
    sql = """SELECT	s.name AS symbol,
                s.instrument_type AS instrument_type,
                ds.name AS datasource,
                cdl.time AS time
        FROM public.pricedata_datasourcesymbol dss
            INNER JOIN pricedata_datasource ds ON dss.datasource_id = ds.id
            INNER JOIN pricedata_symbol s ON dss.symbol_id = s.id
            INNER JOIN pricedata_candle cdl ON cdl.datasource_symbol_id = dss.id  
        WHERE dss.retrieve_price_data = true
            AND cdl.period = %(period)s"""
    
    price_data = pd.read_sql_query(sql=sql, con=connection, params={'period': period})
    
    return price_data
    

def summarise_data(price_data: pd.DataFrame) -> pd.DataFrame:
    """
    Produces a data summary dashboard from provided data showing:
        * The datetime of the first and last candles for each datasource;
        * The number of candles for each datasource;
        * The minimum, maximum and average number of candles for each aggregation period for each datasource; and
        * The minimum, maximum and average number of candles for each aggregation period across all datasources.

    :param price_data: The candle period that we are producing the dashboard for
    """
    # First group by to get min and max times
    grouped = price_data.groupby(['symbol', 'instrument_type', 'datasource']).agg(first=('time', 'min'), last=('time', 'max'), count=('time', 'count'))

    # Now get min, max and mean for each aggregation period for each datasource.
    # 'minutes': 'T', 'hours': 'H', 'days': 'D', 'weeks': 'W', 'months': 'M'
    aggs = {'minutes': 'T', 'hours': 'H', 'days': 'D', 'weeks': 'W', 'months': 'M'}

    for key in aggs:
        # Get counts for aggregation period, then group by symbol, instrument type and datasource to get min, max and avg counts for aggregation period
        agg_period_ungrouped = price_data.groupby(['symbol', 'instrument_type', 'datasource', pd.Grouper(key='time', freq=aggs[key])]).agg(count=('time', 'count'))
        agg_period_grouped = agg_period_ungrouped.groupby(['symbol', 'instrument_type', 'datasource']).agg(min=('count', 'min'), max=('count', 'max'), avg=('count', 'median'))

        # Rename columns to include aggregation period key, then merge into original dataframe
        agg_period_grouped = agg_period_grouped.rename(columns={'min': f'{key}_min', 'max': f'{key}_max', 'avg': f'{key}_avg'})
        grouped = grouped.join(agg_period_grouped, on=['symbol', 'instrument_type', 'datasource'])

    # Now unstack, so we show columns for each datasource
    grouped = grouped.unstack()

    # Create the same aggregations but across all datasources
    agg_cols = set([x[0] for x in grouped.columns])
    ds_cols = set([x[1] for x in grouped.columns])

    for agg_col in agg_cols:
        if '_min' in agg_col or agg_col == 'first':
            grouped[(agg_col, 'all')] = grouped[[(agg_col, ds) for ds in ds_cols]].min(axis=1)
        elif '_max' in agg_col or agg_col == 'last':
            grouped[(agg_col, 'all')] = grouped[[(agg_col, ds) for ds in ds_cols]].max(axis=1)
        elif '_avg' in agg_col:
            grouped[(agg_col, 'all')] = grouped[[(agg_col, ds) for ds in ds_cols]].mean(axis=1)


    # Flatten the columns and reset the index
    grouped.columns = ['_'.join(col).strip() for col in grouped.columns]
    grouped = grouped.reset_index()
    
    return grouped

def format_data(summarised_data: pd.DataFrame) -> str:
    """
    Returns the HTML for the provided summary data. Formats as required for display.
    
    :param summary_data: A dataframe containing the summary data
    
    :return: html as string
    """
    
    display = summarised_data

    # Merge the agg functions for _all into cells for each aggregation period
    for agg in ['minutes', 'hours', 'days', 'weeks', 'months']:
        # Get the min, max and avg columns and consolidate into a new column. Drop the existing ones.
        min_col = f'{agg}_min_all'
        max_col = f'{agg}_max_all'
        avg_col = f'{agg}_avg_all'
        display[agg] = 'min:' + display[min_col].astype(str) + ' max:' + display[max_col].astype(str) + ' avg:' + display[avg_col].astype(str)

    # Rename the columns that we will be keeping
    display = display.rename(
        columns={'symbol': 'Symbol', 'instrument_type': 'Instrument Type', 'first_all': 'First Price',
                 'last_all': 'Last Price', 'minutes': 'Minutes', 'hours': 'Hours', 'days': 'Days', 'weeks': 'Weeks',
                 'months': 'Months'})

    # Reorder the columns, excluding any that we dont want to keep
    display = display[
        ['Symbol', 'Instrument Type', 'First Price', 'Last Price', 'Minutes', 'Hours', 'Days', 'Weeks', 'Months']]
    
    # Convert to HTML
    table_styles = 'table table-striped thead-dark'
    html = display.to_html(classes=table_styles)

    return html


## Run it and display output

In [14]:
data = retrieve_price_data('1S')
summarised_data = summarise_data(data)
formatted_data = format_data(summarised_data)

display(HTML(formatted_data))

Unnamed: 0,Symbol,Instrument Type,First Price,Last Price,Minutes,Hours,Days,Weeks,Months
0,AUDCAD,FOREX,2021-07-16 00:00:00+00:00,2021-07-20 11:24:38+00:00,min:1 max:60 avg:26.0,min:126 max:3341 avg:1499.0,min:15545 max:43699 avg:36173.0,min:36173 max:59244 avg:47708.5,min:95417 max:95417 avg:95417.0
1,AUDCHF,FOREX,2021-07-16 00:00:00+00:00,2021-07-20 11:24:38+00:00,min:1 max:60 avg:24.0,min:99 max:3310 avg:1375.5,min:13660 max:40886 avg:34422.0,min:34422 max:54546 avg:44484.0,min:88968 max:88968 avg:88968.0
2,AUDJPY,FOREX,2021-07-16 00:00:00+00:00,2021-07-20 11:24:38+00:00,min:1 max:60 avg:27.0,min:134 max:3319 avg:1577.5,min:16942 max:44561 avg:36710.0,min:36710 max:61503 avg:49106.5,min:98213 max:98213 avg:98213.0
3,AUDNZD,FOREX,2021-07-16 00:00:00+00:00,2021-07-20 11:24:37+00:00,min:1 max:60 avg:25.0,min:117 max:3338 avg:1489.5,min:15575 max:42435 avg:34170.0,min:34170 max:58010 avg:46090.0,min:92180 max:92180 avg:92180.0
4,AUDUSD,FOREX,2021-07-16 00:00:00+00:00,2021-07-20 11:24:38+00:00,min:1 max:60 avg:24.0,min:133 max:3308 avg:1440.0,min:15003 max:43225 avg:31780.0,min:31780 max:58228 avg:45004.0,min:90008 max:90008 avg:90008.0
5,CADCHF,FOREX,2021-07-16 00:00:00+00:00,2021-07-20 11:24:20+00:00,min:1 max:59 avg:17.0,min:80 max:3040 avg:917.5,min:10286 max:32372 avg:24957.0,min:24957 max:42658 avg:33807.5,min:67615 max:67615 avg:67615.0
6,CADJPY,FOREX,2021-07-16 00:00:00+00:00,2021-07-20 11:24:25+00:00,min:1 max:59 avg:21.0,min:137 max:3081 avg:1232.5,min:12852 max:36651 avg:30587.0,min:30587 max:49503 avg:40045.0,min:80090 max:80090 avg:80090.0
7,CHFJPY,FOREX,2021-07-16 00:00:00+00:00,2021-07-20 11:24:25+00:00,min:1 max:59 avg:20.0,min:134 max:2684 avg:1217.5,min:15141 max:32785 avg:27747.0,min:27747 max:47926 avg:37836.5,min:75673 max:75673 avg:75673.0
8,CHFSGD,FOREX,2021-07-16 00:00:00+00:00,2021-07-20 11:24:46+00:00,min:1 max:56 avg:13.0,min:9 max:2384 avg:683.0,min:7382 max:23720 avg:19767.0,min:19767 max:31102 avg:25434.5,min:50869 max:50869 avg:50869.0
9,EURAUD,FOREX,2021-07-16 00:00:00+00:00,2021-07-20 11:24:44+00:00,min:1 max:60 avg:36.0,min:198 max:3508 avg:2128.0,min:21280 max:54363 avg:46348.0,min:46348 max:75643 avg:60995.5,min:121991 max:121991 avg:121991.0


In [28]:
import functools


def decorator_factory(request):
    def mydecorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Prints the caller signiture
            caller = f"{func.__name__}_{','.join([f'{x}' for x in args])}_" \
                          f"{','.join([f'{x}={kwargs[x]}' for x in kwargs.keys()])}"

            print(f'caller: {caller}')
        return wrapper
    return mydecorator


@decorator_factory('req')
def myfunc(myfuncparam):
    print('myfunc')


myfunc('myfuncparam')

caller: myfunc_myfuncparam_
