In [1]:

import numpy as np
import pandas as pd
import pymongo
import pandas as pd
import pickle
import datetime
import time
import gzip
import lzma
import pytz


def DB(host, db_name, user, passwd):
    auth_db = db_name if user not in ('admin', 'root') else 'admin'
    uri = 'mongodb://%s:%s@%s/?authSource=%s' % (user, passwd, host, auth_db)
    return DBObj(uri, db_name=db_name)


class DBObj(object):
    def __init__(self, uri, symbol_column='skey', db_name='white_db'):
        self.db_name = db_name
        self.uri = uri
        self.client = pymongo.MongoClient(self.uri)
        self.db = self.client[self.db_name]
        self.chunk_size = 20000
        self.symbol_column = symbol_column
        self.date_column = 'date'

    def parse_uri(self, uri):
        # mongodb://user:password@example.com
        return uri.strip().replace('mongodb://', '').strip('/').replace(':', ' ').replace('@', ' ').split(' ')

    def drop_table(self, table_name):
        self.db.drop_collection(table_name)

    def rename_table(self, old_table, new_table):
        self.db[old_table].rename(new_table)

    def write(self, table_name, df):
        if len(df) == 0: return

        multi_date = False

        if self.date_column in df.columns:
            date = str(df.head(1)[self.date_column].iloc[0])
            multi_date = len(df[self.date_column].unique()) > 1
        else:
            raise Exception('DataFrame should contain date column')

        collection = self.db[table_name]
        collection.create_index([('date', pymongo.ASCENDING), ('symbol', pymongo.ASCENDING)], background=True)
        collection.create_index([('symbol', pymongo.ASCENDING), ('date', pymongo.ASCENDING)], background=True)

        if multi_date:
            for (date, symbol), sub_df in df.groupby([self.date_column, self.symbol_column]):
                date = str(date)
                symbol = int(symbol)
                collection.delete_many({'date': date, 'symbol': symbol})
                self.write_single(collection, date, symbol, sub_df)
        else:
            for symbol, sub_df in df.groupby([self.symbol_column]):
                collection.delete_many({'date': date, 'symbol': symbol})
                self.write_single(collection, date, symbol, sub_df)

    def write_single(self, collection, date, symbol, df):
        for start in range(0, len(df), self.chunk_size):
            end = min(start + self.chunk_size, len(df))
            df_seg = df[start:end]
            version = 1
            seg = {'ver': version, 'data': self.ser(df_seg, version), 'date': date, 'symbol': symbol, 'start': start}
            collection.insert_one(seg)

    def build_query(self, start_date=None, end_date=None, symbol=None):
        query = {}

        def parse_date(x):
            if type(x) == str:
                if len(x) != 8:
                    raise Exception("`date` must be YYYYMMDD format")
                return x
            elif type(x) == datetime.datetime or type(x) == datetime.date:
                return x.strftime("%Y%m%d")
            elif type(x) == int:
                return parse_date(str(x))
            else:
                raise Exception("invalid `date` type: " + str(type(x)))

        if start_date is not None or end_date is not None:
            query['date'] = {}
            if start_date is not None:
                query['date']['$gte'] = parse_date(start_date)
            if end_date is not None:
                query['date']['$lte'] = parse_date(end_date)

        def parse_symbol(x):
            if type(x) == int:
                return x
            else:
                return int(x)

        if symbol:
            if type(symbol) == list or type(symbol) == tuple:
                query['symbol'] = {'$in': [parse_symbol(x) for x in symbol]}
            else:
                query['symbol'] = parse_symbol(symbol)

        return query

    def delete(self, table_name, start_date=None, end_date=None, symbol=None):
        collection = self.db[table_name]

        query = self.build_query(start_date, end_date, symbol)
        if not query:
            print('cannot delete the whole table')
            return None

        collection.delete_many(query)

    def read(self, table_name, start_date=None, end_date=None, symbol=None):
        collection = self.db[table_name]

        query = self.build_query(start_date, end_date, symbol)
        if not query:
            print('cannot read the whole table')
            return None

        segs = []
        for x in collection.find(query):
            x['data'] = self.deser(x['data'], x['ver'])
            segs.append(x)
        segs.sort(key=lambda x: (x['symbol'], x['date'], x['start']))
        return pd.concat([x['data'] for x in segs], ignore_index=True) if segs else None

    def list_tables(self):
        return self.db.collection_names()

    def list_dates(self, table_name, start_date=None, end_date=None, symbol=None):
        collection = self.db[table_name]
        dates = set()
        if start_date is None:
            start_date = '00000000'
        if end_date is None:
            end_date = '99999999'
        for x in collection.find(self.build_query(start_date, end_date, symbol), {"date": 1, '_id': 0}):
            dates.add(x['date'])
        return sorted(list(dates))

    def ser(self, s, version):
        pickle_protocol = 4
        if version == 1:
            return gzip.compress(pickle.dumps(s, protocol=pickle_protocol), compresslevel=2)
        elif version == 2:
            return lzma.compress(pickle.dumps(s, protocol=pickle_protocol), preset=1)
        else:
            raise Exception('unknown version')

    def deser(self, s, version):
        def unpickle(s):
            return pickle.loads(s)

        if version == 1:
            return unpickle(gzip.decompress(s))
        elif version == 2:
            return unpickle(lzma.decompress(s))
        else:
            raise Exception('unknown version')


def patch_pandas_pickle():
    if pd.__version__ < '0.24':
        import sys
        from types import ModuleType
        from pandas.core.internals import BlockManager
        pkg_name = 'pandas.core.internals.managers'
        if pkg_name not in sys.modules:
            m = ModuleType(pkg_name)
            m.BlockManager = BlockManager
            sys.modules[pkg_name] = m
patch_pandas_pickle()


In [76]:
database_name = 'com_md_eq_cn'
user = "zhenyuy"
password = "bnONBrzSMGoE"

db1 = DB("192.168.10.178", database_name, user, password)
data = db1.read('md_trade', 20200818, 20200818, 1600000)
pd.set_option('max_columns', 200)
data[data['trade_flag'] == 1].head(10)

Unnamed: 0,skey,date,time,clockAtArrival,datetime,ApplSeqNum,trade_type,trade_flag,trade_price,trade_qty,BidApplSeqNum,OfferApplSeqNum
253,1600000,20200818,93000680000,1597714200680000,2020-08-18 09:30:00.680,43863,1,1,10.85,9800,270557,249731
254,1600000,20200818,93000680000,1597714200680000,2020-08-18 09:30:00.680,43864,1,1,10.85,200,270557,259836
262,1600000,20200818,93000910000,1597714200910000,2020-08-18 09:30:00.910,45215,1,1,10.85,300,274163,259836
263,1600000,20200818,93000910000,1597714200910000,2020-08-18 09:30:00.910,45216,1,1,10.85,700,274163,262142
266,1600000,20200818,93000910000,1597714200910000,2020-08-18 09:30:00.910,45452,1,1,10.85,300,274826,262142
267,1600000,20200818,93000910000,1597714200910000,2020-08-18 09:30:00.910,45453,1,1,10.85,700,274826,265955
268,1600000,20200818,93001030000,1597714201030000,2020-08-18 09:30:01.030,45868,1,1,10.85,300,275845,265955
269,1600000,20200818,93001030000,1597714201030000,2020-08-18 09:30:01.030,45869,1,1,10.85,1700,275845,267679
290,1600000,20200818,93001030000,1597714201030000,2020-08-18 09:30:01.030,46196,1,1,10.85,1000,276855,267679
291,1600000,20200818,93001140000,1597714201140000,2020-08-18 09:30:01.140,46475,1,1,10.85,2000,277730,267679


In [46]:
import datetime
datetime.datetime.fromtimestamp(1597713300020000/1e6).strftime("%Y-%m-%d %H:%M:%S %f")

'2020-08-18 09:15:00 020000'

In [80]:
pd.set_option('max_rows', 200)
data.dtypes

skey                        int32
date                        int32
time                        int64
clockAtArrival              int64
datetime           datetime64[ns]
ApplSeqNum                  int32
trade_type                  int32
trade_flag                  int32
trade_price               float64
trade_qty                   int32
BidApplSeqNum               int32
OfferApplSeqNum             int32
dtype: object

In [38]:
import datetime
data[(data['datetime'] - data['datetime'].shift(1)).fillna(datetime.timedelta(seconds=0)).apply(lambda x: x.seconds) != 1]

Unnamed: 0,skey,date,time,clockAtArrival,datetime,ordering,cum_volume,cum_amount,open,close
0,1000905,20200818,92503000000,1597713903000000,2020-08-18 09:25:03,1,1638911,1567733000.0,6747.6059,6747.6059
7798,1000905,20200818,125500000000,1597726500000000,2020-08-18 12:55:00,7799,117241673,127657700000.0,6747.6059,6802.9655
15302,1000905,20200818,150003000000,1597734003000000,2020-08-18 15:00:03,15303,189166175,208576400000.0,6747.6059,6789.3979


In [28]:
y = '20200818'
readPath = '/mnt/dailyRawData/' + y + '/logs_' + y + '_zt_88_03_day_pcap/mdIndexPcap_SH_***'
dataPathLs = np.array(glob.glob(readPath))
startTm = datetime.datetime.now()
SH = pd.read_csv(dataPathLs[0])
print(datetime.datetime.now() - startTm)

0:00:01.752759


In [31]:
SH[SH['ID'] == 1000016].tail(50)

Unnamed: 0,clockAtArrival,sequenceNo,ID,time,cum_volume,cum_amount,close,open,prevClose
703599,1597733932995781,56480934,1000016,145822630,46931579,922876426690000,33691790,33698674,33688488
703786,1597733934948752,56485317,1000016,145822630,46931579,922876426690000,33691790,33698674,33688488
703823,1597733937963522,56490677,1000016,145828640,46931579,922876426690000,33691790,33698674,33688488
704034,1597733942935583,56500201,1000016,145833750,46931579,922876426690000,33691790,33698674,33688488
704254,1597733947925004,56508817,1000016,145837620,46931579,922876426690000,33691790,33698674,33688488
704464,1597733952995183,56518552,1000016,145843570,46931579,922876426690000,33691790,33698674,33688488
704679,1597733957955937,56527888,1000016,145848720,46931579,922876426690000,33691790,33698674,33688488
704898,1597733962945567,56536425,1000016,145852600,46931579,922876426690000,33691790,33698674,33688488
705109,1597733967913964,56547503,1000016,145858620,46931579,922876426690000,33691790,33698674,33688488
705325,1597733972976293,56557238,1000016,145903760,46931579,922876426690000,33691790,33698674,33688488


In [19]:
y = '20200818'
readPath = '/mnt/dailyRawData/' + y + '/logs_' + y + '_zt_88_03_day_pcap/mdIndexPcap_SH_***'
dataPathLs = np.array(glob.glob(readPath))
startTm = datetime.datetime.now()
SH = pd.read_csv(dataPathLs[0])
print(datetime.datetime.now() - startTm)

startTm = datetime.datetime.now()
SH = SH.rename(columns={"ID":"skey"})
in_dex = [1000016, 1000300, 1000852, 1000905]
SH = SH[SH['skey'].isin(in_dex)]

for cols in ["cum_amount", "close", "open"]:
    SH[cols] = (SH[cols]/10000).round(4)

SH['t'] = SH['time']
SH.loc[SH['t']%1000 != 0, 't'] = SH.loc[SH['t']%1000 != 0, 't'] + 1000
SH['t'] = (SH['t'] // 1000) * 1000
SH['t1'] = SH['time']
SH['time'] = SH['t']

SH['date'] = int(y)
SH['time1'] = int(y) * 1000000000 + SH['time']
SH['time'] = SH['time'].astype('int64') * 1000
SH["clockAtArrival"] = SH["time1"].astype(str).apply(
    lambda x: np.int64(datetime.datetime.strptime(x, '%Y%m%d%H%M%S%f').timestamp() * 1e6))
SH.drop("time1", axis=1, inplace=True)
SH['datetime'] = SH["clockAtArrival"].apply(lambda x: datetime.datetime.fromtimestamp(x / 1e6))

SH = SH.fillna(0)
SH['close'] = np.where(SH['cum_volume'] > 0, SH['close'], 0)
SH = SH.drop_duplicates(['cum_volume', 'open', 'close', 'cum_amount', 'skey', 
              'date', 'time', 'clockAtArrival', 'datetime'])
assert(SH[SH.duplicated(['skey', 'time'], keep=False)].drop_duplicates(['skey', 't1'], keep=False).shape[0] == 0)
assert(sum(SH['time']%1000000) == 0)
assert(sum(SH[SH['cum_volume'] == 0].groupby('skey')['time'].max() 
           <= SH[SH['cum_volume'] > 0].groupby('skey')['time'].min()))
assert(SH['time'].max() < 150500000000)
SH = SH[(SH['cum_volume'] > 0) & (SH['time'] <= 150500000000)]

k1 = SH.groupby('skey')['datetime'].min().reset_index()
k1 = k1.rename(columns={'datetime':'min'})
k2 = SH.groupby('skey')['datetime'].max().reset_index()
k2 = k2.rename(columns={'datetime':'max'})
k = pd.merge(k1, k2, on='skey')
k['diff'] = (k['max']-k['min']).apply(lambda x: x.seconds)
df = pd.DataFrame()
for i in np.arange(k.shape[0]):
    df1 = pd.DataFrame()
    df1['datetime1'] = [k.loc[i, 'min'] + datetime.timedelta(seconds=int(x)) for x in np.arange(0, k.loc[i, 'diff'] + 1)]
    df1['skey'] = k.loc[i, 'skey']
    assert(df1['datetime1'].min() == k.loc[i, 'min'])
    assert(df1['datetime1'].max() == k.loc[i, 'max'])
    df = pd.concat([df, df1])

SH = pd.merge(SH, df, left_on=['skey', 'datetime'], right_on=['skey', 'datetime1'], how='outer').sort_values(by=['skey', 'datetime1']).reset_index(drop=True)
assert(SH[SH['datetime1'].isnull()].shape[0] == 0)
for cols in ['date', 'cum_volume', 'cum_amount', 'open', 'close']:
    SH[cols] = SH.groupby('skey')[cols].ffill()
SH.drop(["datetime"],axis=1,inplace=True)
SH = SH.rename(columns={'datetime1':'datetime'})
SH['date'] = SH['date'].iloc[0]
SH['date'] = SH['date'].astype('int32')
SH['skey'] = SH['skey'].astype('int32')
SH["time"] = SH['datetime'].astype(str).apply(lambda x: int(x.split(' ')[1].replace(':', ""))).astype(np.int64)
SH['SendingTime'] = SH['date'] * 1000000 + SH['time']
SH["clockAtArrival"] = SH["SendingTime"].astype(str).apply(lambda x: np.int64(datetime.datetime.strptime(x, '%Y%m%d%H%M%S').timestamp()*1e6))
SH.drop(["SendingTime"],axis=1,inplace=True)
SH['time'] = SH['time'] * 1000000

assert(sum(SH[SH["open"] != 0].groupby("skey")["open"].nunique() != 1) == 0)
SH["open"] = np.where(SH["cum_volume"] > 0, SH.groupby("skey")["open"].transform("max"), SH["open"])
assert(sum(SH[SH["open"] != 0].groupby("skey")["open"].nunique() != 1) == 0)
assert(SH[SH["cum_volume"] > 0]["open"].min() > 0)

for cols in ['open', 'close', 'cum_amount']:
    SH[cols] = SH[cols].apply(lambda x: round(x, 4)).astype('float64')
m_in = SH[SH['time'] <= 113500000000].groupby('skey').last()['time'].min()
m_ax = SH[SH['time'] >= 125500000000].groupby('skey').first()['time'].max()
assert((SH[(SH['time'] >= m_in) & (SH['time'] <= m_ax)].drop_duplicates(['cum_volume', 'open', 
                                               'close', 'cum_amount', 'skey', 'date'], keep=False).shape[0] == 0)
          & (sum(SH[(SH['time'] >= m_in) & (SH['time'] <= m_ax)].groupby('skey')['cum_volume'].nunique() != 1) == 0) & 
           (sum(SH[(SH['time'] >= m_in) & (SH['time'] <= m_ax)].groupby('skey')['close'].nunique() != 1) == 0))
SH = pd.concat([SH[SH['time'] <= 113500000000], SH[SH['time'] >= 125500000000]])
    
SH = SH.sort_values(by=['skey', 'time', 'cum_volume'])
SH["ordering"] = SH.groupby("skey").cumcount()
SH["ordering"] = SH["ordering"] + 1
SH['ordering'] = SH['ordering'].astype('int32')
SH['cum_volume'] = SH['cum_volume'].astype('int64')

SH = SH[["skey", "date", "time", "clockAtArrival", "datetime", "ordering", "cum_volume", "cum_amount", 
         "open", "close"]]
        
print(SH["date"].iloc[0])
print("index finished")

database_name = 'com_md_eq_cn'
user = "zhenyuy"
password = "bnONBrzSMGoE"

db1 = DB("192.168.10.178", database_name, user, password)
db1.write('md_index', SH)

del SH
print(datetime.datetime.now() - startTm)

0:00:02.070481
20200818
index finished
0:00:07.721614
