*这是一个股票数据分析方法的说明, 基本思路如下:*
1. 历史数据中, 总有很多相似形状的走势, 这些形状有一些是可能预示后边走势的概率的
2. 形状的处理上, 通过滚动切片的方式分割日k线数据, 并以每段的第一个k线收盘价作为基数, 整个线段只保留收盘价相对基数的涨幅数据,
      这实际是一个归一化的处理过程, 股票和时段 价格的差异都抹去了, 只保留走势的形状片段
3. 每个片段实际上就是一个向量, 且所有的片段长度相同, 起始点都是0, 所以以kmeans聚类方法对数据进行分类的话, 实际上就是将相近形状的片段聚类
4. 每个片段都对应后续一段时间的走势情况, 保留n个交易日的最高价和最低价与片段最后一个收盘价的涨幅
5. 使用聚类计算的结果, 对后续走势的情况进行分类, 然后统计每一类的最高 最低涨幅的相对某个值的分布, 能发现某些类别中的分布有明显不同
6. 对分布比例相对突出的类别, 画出其均线形状, 可以发现有比较近似的形状
7. 根据这类形状, 继续以之前聚类的模型对最新的数据进行分类, 寻找相似的形状, 这是一种选股方法

  是否有效, 还在验证过程中......

数据来源是 tushare.org 提供的免费数据, 实际采用的后复权的日线数据,  依赖的工具是python+numpy 以及 spark的聚类算法, 聚类计算是关键, 对比了tensorflow-gpu 和 spark,
   发现单机模式下计算速度差别不大, 而前者还使用支持cuda的显卡, 这样对照来看, spark还是要牛逼些, 毕竟还天生支持集群的, tensorflow怎么在集群中使用还不知道, 肯定是支持, 但不会用.

 下面是操作方法, 

In [1]:
# set path=d:\dev\Anaconda3\Scripts;d:\dev\Anaconda3;C:\WINDOWS\system32;C:\WINDOWS
# set QT_PLUGIN_PATH=d:\dev\Anaconda3\Library\plugins
import sys
from importlib import reload
sys.path.insert(0, 'e:/worksrc/pycode/stock')
import tushare_ut as tu
import numpy as np
import time
import shutil

In [None]:
# 1. 下载股票列表
k_data_path = "e:/stock/list11"
stocklist_file = 'e:/stock/stocklist.txt'

tu.downtushare_stocklist(k_data_path) # 参数为要保存数据的文件名

# 2. 全量下载k线数据， 第一个参数为输出数据的目录， 最后一个为并发进程数
tu.downtushare_hday(k_data_path, stocklist_file , process_count=8)

In [None]:
# 3. 增量下载数据
tu.ts_down_increasely(k_data_path, process_count=8)

In [None]:
# 4. 拆分k线为线段， 以及后续走势的标签数据
lines_file="e:/stock/2010_lines0330.txt"
tags_file = "e:/stock/2010_tag0330.txt"
k_data_path = "e:/stock/list11"
spark_cluster_out_path = "e:/stock/clustering_out"
spark_kmeans_model_path = "e:/stock/kmeans_spark_model"
if os.path.exists(spark_cluster_out_path):
    shutil.rmtree(spark_cluster_out_path)
if os.path.exists(spark_kmeans_model_path):
    shutil.rmtree(spark_kmeans_model_path)
num_clusters = 120 #聚类数量
tu.split_k_data(k_data_path, #k线数据文件所在目录
                 lines_file, # 输出线段文件
                 tags_file, # 输出标记文件， 行数与前一个文件相同， 格式 code date maxclose minclose
                 lsize=30,  # 线段长度
                 skip=5,    # 滚动条约的数量
                 nextdays=6, # 标签走势取线段的长度
                 start_year="2010" #数据起始年份
                )

In [None]:
# 5. 对线段数据进行聚类， 这部分是在spark-shell 中执行的 scala 代码
# 
'''
bin\spark-shell --driver-memory=4g


import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

val model_path="e:/stock/kmeans_spark_model"
val lines_path="e:/stock/2010_lines0330.txt"
val output_path="e:/stock/clustering_out"

val numClusters = 120
val numIterations = 100

// Load and parse the data
val data = sc.textFile(lines_path)
val parsedData = data.map(s => Vectors.dense(s.split(" ").map(_.toDouble)))
parsedData.cache
parsedData.first
parsedData.count

val clusters = KMeans.train(parsedData, numClusters, numIterations)
clusters.save(sc, model_path)
val sameModel = KMeansModel.load(sc, model_path)

val out = sameModel.predict(parsedData)
out.saveAsTextFile(output_path)

// test different num_clusters and compare costs
//val ks:Array[Int] = Array(10,15, 20, 25, 30, 50)
//val ks:Array[Int] = Array(60,70,100,150)

//ks.foreach(cluster => {
// val model:KMeansModel = KMeans.train(parsedData, cluster, numIterations)
// val ssd = model.computeCost(parsedData)
// println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd)
//})

// here is what I added to predict data points that are within the clusters
//sameModel.predict(parsedData).foreach(println)
'''

In [4]:
# 6. 对聚类结果进行统计
lines_file="e:/stock/2010_lines0330.txt"
tags_file = "e:/stock/2010_tag0330.txt"
spark_cluster_out_path = "e:/stock/clustering_out"
num_clusters = 120 #聚类数量
lines, tag, clusters = tu.load_cluster_datasets(lines_file, # 线段数据文件
                                                tags_file, # 标签数据文件
                                                spark_cluster_out_path)
win_point, lose_point = 5.0, 0.0
centers = tu.check_clusters(tag, clusters, lines, num_clusters, win_point, lose_point)

 cluster 13  count: 4 win: 0.00 lose 100.00
 cluster 43  count: 22 win: 13.64 lose 54.55
 cluster 27  count: 30084 win: 16.48 lose 23.97
 cluster 45  count: 32981 win: 17.54 lose 22.62
 cluster 22  count: 26519 win: 17.67 lose 24.23
 cluster 33  count: 31274 win: 18.15 lose 23.07
 cluster 39  count: 26703 win: 20.60 lose 21.98
 cluster 101  count: 19781 win: 21.70 lose 23.80
 cluster 86  count: 20969 win: 22.35 lose 21.99
 cluster 73  count: 14998 win: 23.20 lose 20.98
 cluster 5  count: 26482 win: 23.48 lose 21.97
 cluster 107  count: 26783 win: 23.58 lose 21.37
 cluster 84  count: 25689 win: 23.64 lose 20.39
 cluster 7  count: 17527 win: 23.75 lose 20.13
 cluster 63  count: 25936 win: 23.94 lose 22.16
 cluster 76  count: 15265 win: 24.38 lose 22.26
 cluster 113  count: 27881 win: 24.46 lose 21.50
 cluster 89  count: 4 win: 25.00 lose 50.00
 cluster 8  count: 22916 win: 25.21 lose 20.23
 cluster 64  count: 17789 win: 25.30 lose 20.55
 cluster 2  count: 22820 win: 25.43 lose 20.61
 clu

check_clusters 输出的内容如下:
 ...
 cluster 58  count: 1706 win: 58.44 lose 18.93
 cluster 97  count: 3976 win: 58.63 lose 12.37
 cluster 25  count: 3472 win: 58.78 lose 12.99
 cluster 27  count: 1335 win: 59.10 lose 16.78
 cluster 114  count: 733 win: 59.62 lose 23.06
 cluster 64  count: 2443 win: 62.05 lose 13.67
 cluster 51  count: 1692 win: 63.06 lose 20.92
 cluster 113  count: 1660 win: 67.77 lose 13.49
 cluster 91  count: 2491 win: 73.30 lose 8.87
 cluster 35  count: 1333 win: 73.82 lose 11.33
 cluster 31  count: 1 win: 100.00 lose 0.00

各列数字分别是： 分类编号， 线段数量， 胜率（maxclose>=5.0), 失败率（maxclose<=0.0), 输出结果按照win排序
可以看到后边的4 5个类别有较高的胜率和低的失败率，  这就是理想中的形态了吧

In [5]:
# 6.1 继续分析结果， 将较高胜率的类别的均值线画出来看看
tu.draw_centers(centers, [50, 51, 108, 77, 41, 96])

<Figure size 640x480 with 1 Axes>

git exclude:
*/target/*
*/.ipynb_checkpoints/*
*/__pycache__/*
*.class
.idea/
*.iml
*.pyc

In [6]:
# 7. 下面是日常选股要做的事情， 首先是增量更新k线数据
import sys
from importlib import reload
sys.path.insert(0, 'e:/worksrc/pycode/stock')
import tushare_ut as tu
import numpy as np
import time

k_data_path = "e:/stock/list11"
cluster_result_path = "e:/stock/clustering_out_0330"
spark_kmeans_model_path = "e:/stock/kmeans_spark_model"
day_lines_file = "e:/stock/day_lines%s.txt"%(time.strftime("%m%d"))
day_tags_file = "e:/stock/day_tags%s.txt"%(time.strftime("%m%d"))

# 7.1 增量下载k线数据
#tu.ts_down_increasely(k_data_path, process_count=8)


In [8]:
# 7.2 拆分线段
start_year = "2018"
lsize = 30
skip = 2
reload(tu)
tu.daily_split_k_data(k_data_path, start_year, day_lines_file, day_tags_file, lsize, skip)

done


7.3 使用 spark 已有的model对线段进行分类

bin\spark-shell --driver-memory=4g

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

val model_path="e:/stock/kmeans_spark_model"
val lines_path="e:/stock/day_lines0330.txt"
val cluster_result_path="e:/stock/clustering_out_0330"

val data = sc.textFile(lines_path)
val parsedData = data.map(s => Vectors.dense(s.split(" ").map(_.toDouble)))
parsedData.cache
parsedData.first
parsedData.count

val sameModel = KMeansModel.load(sc, model_path)

val out = sameModel.predict(parsedData)
out.saveAsTextFile(cluster_result_path)


In [None]:
# 7.4 寻找符合高概率的票, 得从上面的统计结果中获取类别编号, 这里的结果是 [50, 51, 108, 77, 41, 96]
clusters_want = [50, 51, 108, 77, 41, 96]
tu.daily_check_clusters(tags_file, cluster_result_path, clusters_want)