In [1]:
import pymongo
import pandas as pd
import pickle
import datetime
import time
import gzip
import lzma
import pytz


def DB(host, db_name, user, passwd):
    auth_db = db_name if user not in ('admin', 'root') else 'admin'
    uri = 'mongodb://%s:%s@%s/?authSource=%s' % (user, passwd, host, auth_db)
    return DBObj(uri, db_name=db_name)


class DBObj(object):
    def __init__(self, uri, symbol_column='skey', db_name='white_db'):
        self.db_name = db_name
        self.uri = uri
        self.client = pymongo.MongoClient(self.uri)
        self.db = self.client[self.db_name]
        self.chunk_size = 20000
        self.symbol_column = symbol_column
        self.date_column = 'date'

    def parse_uri(self, uri):
        # mongodb://user:password@example.com
        return uri.strip().replace('mongodb://', '').strip('/').replace(':', ' ').replace('@', ' ').split(' ')

    def drop_table(self, table_name):
        self.db.drop_collection(table_name)

    def rename_table(self, old_table, new_table):
        self.db[old_table].rename(new_table)

    def write(self, table_name, df):
        if len(df) == 0: return

        multi_date = False

        if self.date_column in df.columns:
            date = str(df.head(1)[self.date_column].iloc[0])
            multi_date = len(df[self.date_column].unique()) > 1
        else:
            raise Exception('DataFrame should contain date column')

        collection = self.db[table_name]
        collection.create_index([('date', pymongo.ASCENDING), ('symbol', pymongo.ASCENDING)], background=True)
        collection.create_index([('symbol', pymongo.ASCENDING), ('date', pymongo.ASCENDING)], background=True)

        if multi_date:
            for (date, symbol), sub_df in df.groupby([self.date_column, self.symbol_column]):
                date = str(date)
                symbol = int(symbol)
                collection.delete_many({'date': date, 'symbol': symbol})
                self.write_single(collection, date, symbol, sub_df)
        else:
            for symbol, sub_df in df.groupby([self.symbol_column]):
                collection.delete_many({'date': date, 'symbol': symbol})
                self.write_single(collection, date, symbol, sub_df)

    def write_single(self, collection, date, symbol, df):
        for start in range(0, len(df), self.chunk_size):
            end = min(start + self.chunk_size, len(df))
            df_seg = df[start:end]
            version = 1
            seg = {'ver': version, 'data': self.ser(df_seg, version), 'date': date, 'symbol': symbol, 'start': start}
            collection.insert_one(seg)

    def build_query(self, start_date=None, end_date=None, symbol=None):
        query = {}

        def parse_date(x):
            if type(x) == str:
                if len(x) != 8:
                    raise Exception("`date` must be YYYYMMDD format")
                return x
            elif type(x) == datetime.datetime or type(x) == datetime.date:
                return x.strftime("%Y%m%d")
            elif type(x) == int:
                return parse_date(str(x))
            else:
                raise Exception("invalid `date` type: " + str(type(x)))

        if start_date is not None or end_date is not None:
            query['date'] = {}
            if start_date is not None:
                query['date']['$gte'] = parse_date(start_date)
            if end_date is not None:
                query['date']['$lte'] = parse_date(end_date)

        def parse_symbol(x):
            if type(x) == int:
                return x
            else:
                return int(x)

        if symbol:
            if type(symbol) == list or type(symbol) == tuple:
                query['symbol'] = {'$in': [parse_symbol(x) for x in symbol]}
            else:
                query['symbol'] = parse_symbol(symbol)

        return query

    def delete(self, table_name, start_date=None, end_date=None, symbol=None):
        collection = self.db[table_name]

        query = self.build_query(start_date, end_date, symbol)
        if not query:
            print('cannot delete the whole table')
            return None

        collection.delete_many(query)

    def read(self, table_name, start_date=None, end_date=None, symbol=None):
        collection = self.db[table_name]

        query = self.build_query(start_date, end_date, symbol)
        if not query:
            print('cannot read the whole table')
            return None

        segs = []
        for x in collection.find(query):
            x['data'] = self.deser(x['data'], x['ver'])
            segs.append(x)
        segs.sort(key=lambda x: (x['symbol'], x['date'], x['start']))
        return pd.concat([x['data'] for x in segs], ignore_index=True) if segs else None

    def list_tables(self):
        return self.db.collection_names()

    def list_dates(self, table_name, start_date=None, end_date=None, symbol=None):
        collection = self.db[table_name]
        dates = set()
        if start_date is None:
            start_date = '00000000'
        if end_date is None:
            end_date = '99999999'
        for x in collection.find(self.build_query(start_date, end_date, symbol), {"date": 1, '_id': 0}):
            dates.add(x['date'])
        return sorted(list(dates))

    def ser(self, s, version):
        pickle_protocol = 4
        if version == 1:
            return gzip.compress(pickle.dumps(s, protocol=pickle_protocol), compresslevel=2)
        elif version == 2:
            return lzma.compress(pickle.dumps(s, protocol=pickle_protocol), preset=1)
        else:
            raise Exception('unknown version')

    def deser(self, s, version):
        def unpickle(s):
            return pickle.loads(s)

        if version == 1:
            return unpickle(gzip.decompress(s))
        elif version == 2:
            return unpickle(lzma.decompress(s))
        else:
            raise Exception('unknown version')


def patch_pandas_pickle():
    if pd.__version__ < '0.24':
        import sys
        from types import ModuleType
        from pandas.core.internals import BlockManager
        pkg_name = 'pandas.core.internals.managers'
        if pkg_name not in sys.modules:
            m = ModuleType(pkg_name)
            m.BlockManager = BlockManager
            sys.modules[pkg_name] = m
patch_pandas_pickle()

import pandas as pd
import random
import numpy as np
import glob
import os
from unrar import rarfile
import py7zr
import pickle
import datetime
import time
pd.set_option("max_columns", 200)

startTm = datetime.datetime.now()
readPath = r'\\192.168.10.30\Kevin_zhenyu\day_stock\***'
dataPathLs = np.array(glob.glob(readPath))
dataPathLs = dataPathLs[[np.array([os.path.basename(i).split('.')[0][:2] == 'SZ' for i in dataPathLs])]]
db = pd.DataFrame()
for p in dataPathLs:
    dayData = pd.read_csv(p, compression='gzip')
    db = pd.concat([db, dayData])
print(datetime.datetime.now() - startTm)

year = "2017"
startDate = '20170101'
endDate = '20170228'
readPath = 'J:\\' + year + '\\***\\Order\\SZ\\***'
dataPathLs = np.array(glob.glob(readPath))
dateLs = np.array([os.path.basename(i) for i in dataPathLs])
dataPathLs = dataPathLs[(dateLs >= startDate) & (dateLs <= endDate)]
date_list = np.unique(np.array([os.path.basename(i) for i in dataPathLs]))
wr_ong = []
mi_ss = []

for date in date_list[1:]:  
    dateLs = np.array([os.path.basename(i) for i in dataPathLs])
    path1 = dataPathLs[dateLs == date]
    OrderLog = []
    ll = []
    for data in path1:
        readPath = data + '\\***'
        dp = np.array(glob.glob(readPath))
        dateLs = np.array([int(os.path.basename(i).split('.')[0]) for i in dp])
        dp = dp[(dateLs < 4000) | ((dateLs > 300000) & (dateLs < 310000))]
        startTm = datetime.datetime.now()
        for i in dp:
            try:
                df = pd.read_csv(i)
            except:
                print("empty data")
                print(i)
                ll.append(int(os.path.basename(i).split('.')[0]))
                continue
            OrderLog += [df]
            del df
    OrderLog = pd.concat(OrderLog).reset_index(drop=True)
    print(datetime.datetime.now() - startTm)
    
    OrderLog["skey"] = OrderLog['WindCode'].apply(lambda x: int(x.split('.')[0])) + 2000000
    OrderLog = OrderLog.rename(columns={"Volume":"order_qty", "Price":"order_price", "FunctionCode":"order_side", 
                                        'Date':"date", "Order":"ApplSeqNum", "Time":"time", "OrderKind":"order_type"})
    OrderLog['order_price'] = OrderLog['order_price']/10000
    OrderLog['TransactTime'] = OrderLog['time'] + OrderLog['date'] * 1000000000
    OrderLog["clockAtArrival"] = OrderLog["TransactTime"].astype(str).apply(lambda x: np.int64(datetime.datetime.strptime(x, '%Y%m%d%H%M%S%f').timestamp()*1e6))
    OrderLog['datetime'] = OrderLog["clockAtArrival"].apply(lambda x: datetime.datetime.fromtimestamp(x/1e6))
    OrderLog["time"] = OrderLog["time"]*1000
    assert((OrderLog['order_side'].nunique() == 2) & ('B' in OrderLog['order_side'].unique()) & ('S' in OrderLog['order_side'].unique()))
    OrderLog["order_side"] = np.where(OrderLog["order_side"] == 'B', 1, 2)
    OrderLog['order_type'] = OrderLog['order_type'].astype(str)
    assert((OrderLog['order_type'].nunique() == 3) & ('U' in OrderLog['order_type'].unique()) & ('0' in OrderLog['order_type'].unique()) \
          & ('1' in OrderLog['order_type'].unique()))
    OrderLog['order_type'] = np.where(OrderLog["order_type"] == 'U', 3, np.where(OrderLog['order_type'] == '0', 2, 1))
    
    for col in ["skey", "date", "ApplSeqNum", "order_qty", "order_side", "order_type"]:
        OrderLog[col] = OrderLog[col].astype('int32')
    display(OrderLog["order_price"].astype(str).apply(lambda x: len(x.split('.')[1])).unique())
    
    assert(OrderLog[((OrderLog["order_side"] != 1) & (OrderLog["order_side"] != 2)) | (OrderLog["order_type"].isnull())].shape[0] == 0)
    da_te = str(OrderLog["date"].iloc[0]) 
    da_te = da_te[:4] + '-' + da_te[4:6] + '-' + da_te[6:8]
    db1 = db[db["date"] == da_te]
    sl = (db1["ID"].str[2:].astype(int) + 2000000).unique()
    del db1
    try:
        assert(len(set(sl) - set(OrderLog["skey"].unique())) == 0)
    except:
        print(set(sl) - set(OrderLog["skey"].unique()))
    
    OrderLog = OrderLog[["skey", "date", "time", "clockAtArrival", "datetime", "ApplSeqNum", "order_side", "order_type", "order_price",
                                                 "order_qty"]]
    startDate = str(OrderLog['date'].iloc[0])
    endDate = str(OrderLog['date'].iloc[0])
    database_name = 'com_md_eq_cn'
    user = "zhenyuy"
    password = "bnONBrzSMGoE"
    db1 = DB("192.168.10.178", database_name, user, password)
    trade = db1.read('md_trade', start_date=startDate, end_date=endDate)
    trade = trade[trade['skey'] > 2000000]
    t1 = trade.groupby('skey')['BidApplSeqNum'].unique().reset_index()
    t2 = trade.groupby('skey')['OfferApplSeqNum'].unique().reset_index()
    t3 = OrderLog.groupby('skey')['ApplSeqNum'].unique().reset_index()
    t = pd.merge(t1, t2, on='skey', how='outer')
    t['union'] = [list(set(a) | set(b)) for a, b in zip(t.BidApplSeqNum, t.OfferApplSeqNum)]
    t = pd.merge(t, t3, on='skey')
    t['less'] = [len(set(a) - set(b)) for a, b in zip(t.union, t.ApplSeqNum)]
    t['less1'] = [list(set(a) - set(b))[0] for a, b in zip(t.union, t.ApplSeqNum)]
    assert(t[t['less'] > 1].shape[0] == 0)
    
    print(da_te)
    print("order finished")
    
    database_name = 'com_md_eq_cn'
    user = "zhenyuy"
    password = "bnONBrzSMGoE"

    db1 = DB("192.168.10.178", database_name, user, password)
    db1.write('md_order', OrderLog)

    print(datetime.datetime.now() - startTm)



0:04:25.058853


  interactivity=interactivity, compiler=compiler, result=result)


0:00:15.396732


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-01-04
order finished
0:05:36.814233
0:00:15.712347


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-01-05
order finished
0:05:51.556125
0:00:15.580596


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-01-06
order finished
0:05:38.360134
0:00:14.408573


array([1, 2], dtype=int64)

{2001872, 2001914}
2017-01-09
order finished
0:05:00.262694
0:00:14.518433


array([1, 2], dtype=int64)

{2001872, 2001914}
2017-01-10
order finished
0:05:05.278325
0:00:19.901957


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-01-11
order finished
0:05:39.615952
0:00:14.214792


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-01-12
order finished
0:05:22.522924
0:00:14.358781


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-01-13
order finished
0:05:23.026311
0:00:15.743892


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-01-16
order finished
0:05:48.577237
0:00:19.867432


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-01-17
order finished
0:04:56.434645
0:00:13.859026


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-01-18
order finished
0:04:19.740795
0:00:13.717395


array([1, 2], dtype=int64)

{2001872, 2001914}
2017-01-19
order finished
0:04:13.054375
0:00:13.403078


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-01-20
order finished
0:04:29.923279
0:00:13.377974


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-01-23
order finished
0:04:24.375849
0:00:13.334772


array([1, 2], dtype=int64)

{2001872, 2001914}
2017-01-24
order finished
0:04:12.193667
0:00:16.934236


array([1, 2], dtype=int64)

{2001872, 2001914}
2017-01-25
order finished
0:03:58.922037
0:00:12.124716


array([1, 2], dtype=int64)

{2001872, 2001914}
2017-01-26
order finished
0:03:34.815480
0:00:12.242689


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-03
order finished
0:03:23.056411
0:00:16.484509


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-06
order finished
0:04:28.160817
0:00:14.731003


array([1, 2], dtype=int64)

{2001872, 2001914}
2017-02-07
order finished
0:04:51.132170
0:00:15.355201


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-08
order finished
0:05:12.813119
0:00:16.052672


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-09
order finished
0:05:42.929708
0:00:21.896601


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-10
order finished
0:05:53.819988
0:00:15.407845


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-13
order finished
0:05:33.398155
0:00:14.690234


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-14
order finished
0:05:14.576801
0:00:15.946021


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-15
order finished
0:05:49.533855
0:00:20.366764


array([1, 2], dtype=int64)

{2001872, 2001914}
2017-02-16
order finished
0:05:23.010092
0:00:21.525176


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-17
order finished
0:06:00.271161
0:00:15.962189


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-20
order finished
0:05:37.056275
0:00:15.988860


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-21
order finished
0:06:04.233880
0:00:16.947185


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-22
order finished
0:06:09.871857
0:00:17.545170


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-23
order finished
0:06:21.057604
0:00:17.349071


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-24
order finished
0:05:59.360585
0:00:21.523296


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-27
order finished
0:05:56.686400
0:00:15.823503


array([2, 1], dtype=int64)

{2001872, 2001914}
2017-02-28
order finished
0:05:30.765078
