-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
executable file
·127 lines (104 loc) · 4.42 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
import numpy as np
import os
import xlsxwriter
import requests
import sys
import argparse
import logging
import csv
import multiprocessing
import glob
class StockInfo:
def __init__(self, token):
self.token = token
session = requests.Session()
session.headers['Authorization'] = 'Bearer ' + token
session.headers['Accept-Encoding'] = 'gzip, deflate'
self._sess = session
def get_history(self, sec_id):
url = 'https://api.wmcloud.com/data/v1//api/equity/getMktEqudCCXE.json'
r = self._sess.get(url, params={'secID': sec_id})
return r.json()
def moving_mean(array, window_size):
cumsum = np.cumsum(array)
return 1.0 / window_size * (cumsum[window_size:] - cumsum[:-window_size])
def mm_actions(prices, mm):
assert len(prices) == len(mm)
result = []
last_buy = None
for p, m in zip(prices, mm):
if p >= m and last_buy is None:
last_buy = p
elif p < m and last_buy is not None:
result.append((last_buy, p))
last_buy = None
return np.array(result)
def analyze_actions(actions):
ratios = actions[:, 1] / actions[:, 0]
total_win_ratio = np.prod(ratios) - 1.0
return (len(actions), total_win_ratio, max(1.0, np.max(ratios)) - 1.0, min(1.0, np.min(ratios)) - 1.0,
np.sum(actions[:, 0] < actions[:, 1]), np.sum(actions[:, 0] > actions[:, 1]))
def parse_csv_file(filename):
prices = []
ma20s = []
with open(filename, encoding='gbk') as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
try:
stock_id = row['股票代码']
stock_name = row['股票名称']
first_date = row['交易日期']
close_price = float(row['收盘价'])
ma20 = float(row['MA_20'])
except (KeyError, ValueError) as e:
logging.error('解析第%d行时发生错误: %r', i + 2, e)
else:
prices.append(close_price)
ma20s.append(ma20)
return stock_id, stock_name, first_date, np.array(prices)[::-1], np.array(ma20s)[::-1]
def _worker_main(filename):
try:
stock_id, stock_name, first_date, prices, ma20 = parse_csv_file(filename)
return (stock_id, stock_name, first_date) + analyze_actions(mm_actions(prices, ma20))
except Exception as e:
logging.error('解析文件%r时发生错误: %r', filename, e)
def main():
logging.basicConfig(level=logging.INFO, format='[%(levelname)s %(asctime)s] %(message)s')
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--file', required=True, help='输出Excel文件名')
parser.add_argument('-d', '--dir', required=True, help='CSV文件目录')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format='[%(levelname)s %(asctime)s] %(message)s')
filenames = glob.glob(f'{args.dir}/*.csv')
logging.info('总共有%d个文件待处理', len(filenames))
pool = multiprocessing.Pool()
lazy_results = pool.imap_unordered(_worker_main, filenames, 16)
writer = xlsxwriter.Workbook(args.file)
logging.info('打开文件 %s', args.file)
try:
percent_format = writer.add_format({'num_format': '0.00%'})
sheet = writer.add_worksheet()
for i, name in enumerate(['股票代码', '股票名称', '数据最早日期', '累计次数',
'累计收益率', '单次最大收益率', '单次最大亏损率', '收益次数', '亏损次数']):
sheet.write_string(0, i, name)
for j, info in enumerate(lazy_results):
if not info:
continue # Error results
for i, v in enumerate(info):
if isinstance(v, str):
sheet.write_string(j + 1, i, v)
elif isinstance(v, (float, np.float32, np.float64)):
sheet.write_number(j + 1, i, v, percent_format)
elif isinstance(v, (int, np.int32, np.int64)):
sheet.write_number(j + 1, i, v)
else:
raise AssertionError(f'Invalid value {v} of type {type(v)}')
if j % 10 == 0:
logging.info('完成%f%%', (j + 1) / len(filenames) * 100)
logging.info('完成100%')
finally:
writer.close()
logging.info('关闭文件 %s', args.file)
if __name__ == '__main__':
main()