In [3]:
import numpy as np
import pandas as pd

import time
import datetime
import threading
import redis

import talib
import tushare as ts
from utils.time_util import get_market_date
from utils.stat_util import get_stat_info

red = redis.Redis(host='localhost')

In [4]:
def resample_1min(df_buffer, date_time, df_last_1min):
    df_freq = None
    data_list = []
    for ticker, group in df_buffer.groupby(['code']):
        group = group.set_index(['datetime']).astype(float)
        Open = group['price'].iloc[0]
        Close = group['price'].iloc[-1]
        High = group['price'].max()
        Low =  group['price'].min()
        Vol = group['volume'].iloc[-1]
        Amount = group['amount'].iloc[-1]
        data_list.append([ticker,date_time,Open,High,Low,Close,Vol,Amount,Vol,Amount])
    df_1min = pd.DataFrame(data_list,columns=['code','datetime','open','high','close','low','vol','amount','vol_sum','amount_sum'])
    
    if df_last_1min is not None:
        df_1min['vol'] = df_1min['vol_sum'] - df_last_1min['vol_sum']
        df_1min['amount'] = df_1min['amount_sum'] - df_last_1min['amount_sum']
    return df_1min

In [5]:
def run_resample_1min():
    resample_time = '09:30:00'
    cur_date = datetime.datetime.now().strftime('%Y-%m-%d')
    key_1min = 't_1min_' + cur_date
    df_1min, df_last_1min = None, None
    df_bytes_1min = red.get(key_1min)
    if df_bytes_1min is not None:
        df_1min = pd.read_msgpack(df_bytes_1min)
        last_datetime = df_1min['datetime'].iloc[-1].strftime('%Y-%m-%d %H:%M:%S')   
        df_last_1min = df_1min.loc[df_1min['datetime']==last_datetime]
        resample_time = last_datetime[-8:]
        
    while resample_time < '15:00:00':
        t_tick_keys = map(lambda r:str(r, encoding='utf-8'), red.keys('t_tick_'+cur_date+'*:*'))
        t_tick_keys = sorted(filter(lambda r: r[-8:]>resample_time, t_tick_keys))
        if len(t_tick_keys) == 0 and datetime.datetime.now().strftime('%H-%M-%S')>'15:00:00':
            print('the market has closed!')
            break
        df_cur_1min = None
        for key_tick in t_tick_keys:
            df_bytes_tick = red.get(key_tick)
            df_buffer = pd.read_msgpack(df_bytes_tick)
            df_cur_1min = resample_1min(df_buffer, pd.to_datetime(key_tick[7:]), df_last_1min)
            if df_1min is None:
                df_1min = df_cur_1min
            else:
                df_1min = pd.concat([df_1min, df_cur_1min])
            df_bytes_1min = df_1min.to_msgpack()
            red.set(key_1min, df_bytes_1min)
            print(key_tick + '  has resample into table ' + key_1min + '!')
            df_last_1min = df_cur_1min
            resample_time = key_tick[-8:]
        time.sleep(50)

In [7]:
run_resample_1min()

It is recommended to use pyarrow for on-the-wire transmission of pandas objects.
  """Entry point for launching an IPython kernel.
It is recommended to use pyarrow for on-the-wire transmission of pandas objects.


t_tick_2020-03-03 11:30:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:00:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:01:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:02:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:03:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:04:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:05:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:06:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:07:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:08:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:09:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:10:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:11:00  has resample into table t_1min_2020-03-03!
t_tick_2020-03-03 13:12:00  has resample into table t_1min_2020-03-03!
t_tick