In [13]:
#前処理後のindex作るための処理
import os
import elasticsearch as es
import math
import numpy as np
from itertools import chain
import sys
import tensorflow as tf
import matplotlib.pyplot as plt

ES_HOST = "localhost"
escon = es.Elasticsearch(ES_HOST, port=9200)
GACRP_HOME=os.environ['GACRP_HOME']

# ESのデータ量依存設定値のグローバル変数化
# scrollのタイムアウト時間
es_s_time='2m'

maxPartition=90


In [14]:
#訓練データの期間
indataStartYmd=20160801
indataEndYmd=20161031
forecastStartYmd=20161101
forecastEndYmd=20161231

#評価データの期間
evalIndataStartYmd=20170201
evalIndataEndYmd=20170431
evalForecastStartYmd=20170501
evalForecastEndYmd=20170631

#教師データから変換した項目数（one-hot後）
# Todo 配列の形式から自動設定にする。
num_elements=18

In [15]:
# 元データ取得

def getInputData(startymd,endymd):
    es_size=10000
    body = {
      "_source": [
        "fullVisitorId","date",
        "channelGrouping","visitNumber","isMobile","country","hits","bounces",
        "newVisits","source","medium","isTrueDirect","transactionRevenue"
      ],
      "query": {
        "bool": {
          "must": [
            {
              "range": {
                "date": {
                  "gt": startymd,
                  "lt": endymd
                }
              }
            }
          ],
          "must_not": [],
          "should": []
        }
      },
      "from": 0,
      "size": 0,
      "sort": [],
      "aggs": {}
    }
  
    res=escon.search(index='gacrp_index',body=body,scroll=es_s_time,size=es_size)
    es_s_id=res['_scroll_id']
    es_s_size=len(res["hits"]["hits"])
    
    return(res,es_s_id,es_s_size)


In [16]:
#fullVisitorIdごとに、指定した期間のrevenueを取得
def getRevenue(startYmd,endYmd,partitionNumber,maxPartition):
    es_size=10000
    body = {
        "size":0,
        "aggs":{
            "agg_fullVisitorId":{
                "terms":{
                    "field":"fullVisitorId.keyword",
                    "include":{
                        "partition":partitionNumber,
                        "num_partitions":maxPartition
                    },
                    "size":10000
                },
                "aggs":{
                    "agg_revenue":{
                        "sum":{"field":"transactionRevenue"}
                    }
                }
            }
        },
        "query":{
            "bool":{
                "must":[
                    {"range": {"date": {
                                "gt": startYmd,"lt": endYmd
                    }}}
                ]
            }
        }
    }
    #print(body)
    res=escon.search(index='gacrp_index',body=body)
    #revenue=math.log(int(res['aggregations']['agg_revenue']['value'])+1)
    #print(revenue)
    return(res['aggregations']['agg_fullVisitorId']['buckets'])


In [17]:
#oneHotベクトル化
def es2oneHot(res,data):
    for i in range(len(res["hits"]["hits"])):
        row=res["hits"]["hits"][i]["_source"]
        rec=[]

        col_date=row['date']
        col_fullVisitorId=row['fullVisitorId']
        
        if row['channelGrouping']=='Display':
            col_channelGrouping=1
        elif row['channelGrouping']=='Social':
            col_channelGrouping=2
        else:
            col_channelGrouping=0
        oneHot_col_channelGrouping=np.eye(2+1)[col_channelGrouping]
        
        #visitNumberは最大を5で打ち切って0～1にnormalize。
        col_visitNumber=min(int(row['visitNumber']),5)/5.0 

        if row['isMobile']=="TRUE":
            col_isMobile=1
        else:
            col_isMobile=0

        if row['country']=="United States":
            col_country =1
        elif row['country']=="Canada":
            col_country =2
        elif row['country']=="Japan":
            col_country=3
        else:
            col_country=0
        oneHot_col_country=np.eye(3+1)[col_country]
    
        col_hits=math.log10(int(row['hits'])*1.0)
    
        if row['bounces']=="1":
            col_bounces=1
        else:
            col_bounces=0

        if row['newVisits']== "1":
            col_newVisits=1
        else:
            col_newVisits=0
    
        if row['source']=="mail.googleplex.com" or row['source']=="dfa":
            col_source =1
        elif row['source']=="(direct)":
            col_source=2
        else:
            col_source=0
        oneHot_col_source=np.eye(2+1)[col_source]
        
        if row['medium']=="cpm":
            col_medium=1
        else:
            col_medium=0
        oneHot_col_medium=np.eye(1+1)[col_medium]
    
        if row['isTrueDirect']=="1":
            col_isTrueDirect=1
        else:
            col_isTrueDirect=0
    
        #revenue = math.log(int(row['transactionRevenue'])+1)
    
        l=[oneHot_col_channelGrouping,[col_visitNumber],[col_isMobile],oneHot_col_country,[col_hits],[col_bounces],[col_newVisits],oneHot_col_source,oneHot_col_medium,[col_isTrueDirect]]
        data.append([col_fullVisitorId,list(chain.from_iterable(l))])
        #data.append([col_fullVisitorId,l])
        #ans.append(revenue)
    
    return(data)

In [18]:
rowdata=[]
ans = []

res,es_s_id,es_s_size=getInputData(indataStartYmd,indataEndYmd)
while(es_s_size>0):
    rowdata=es2oneHot(res,rowdata)
    res=escon.scroll(scroll_id=es_s_id,scroll=es_s_time)
    es_s_id=res['_scroll_id']
    es_s_size=len(res["hits"]["hits"])

    print(len(rowdata))

10000
20000
30000
40000
50000
60000
70000
80000
90000
100000
110000
120000
130000
140000
150000
160000
170000
180000
190000
200000
210000
220000
230000
237728


In [19]:
#rowdataを対象期間のfullVisitorIdごとに集約して予測期間のtransactionRevenueを当てる
def transformInputdata(rowdata):
    rowdata=sorted(rowdata,key=lambda x:x[0])
    print("ソート完了")
    key_fullVisitorId=rowdata[0][0]
    sumdata=[]

    tmp_indata=[]
    tmp_indata.append(rowdata[0][1])
    tmp_cnt=1
    for i in range(1,len(rowdata)):
        #if i %10000==0:
        #    print("集約処理実行中："+str(i) )
        if key_fullVisitorId==rowdata[i][0]:
            tmp_indata.append(rowdata[i][1])
            tmp_cnt+=1
        else:
            #列方向に平均をとってappend
            #print(tmp_indata)
            tmp_np = np.empty((0,18),float)
            for j in range(tmp_cnt):
                tmp_np = np.append(tmp_np,np.array([tmp_indata[j]]),axis=0)
            sumdata.append([key_fullVisitorId,np.average(tmp_np,axis=0).tolist()])
        #ans.append(getRevenueByFullVisitorId(key_fullVisitorId,forecastStartYmd,forecastEndYmd))
            key_fullVisitorId=rowdata[i][0]
            tmp_indata=[]
            tmp_indata.append(rowdata[i][1])
            tmp_cnt=1

    tmp_np = np.empty((0,18),float)
    for j in range(tmp_cnt):
        tmp_np = np.append(tmp_np,np.array([tmp_indata[j]]),axis=0)
    sumdata.append([key_fullVisitorId,np.average(tmp_np,axis=0)])
    print(len(sumdata))
    
    return(sumdata)

In [20]:
sumdata=transformInputdata(rowdata)

#partitionでぐるぐる回してtransactionRevenueをとってくる
#sumdataに合わせてansを当てて設定する。

tmpans=[]
for partitionNumber in range(maxPartition):
    #print(partitionNumber)
    res=getRevenue(forecastStartYmd,forecastEndYmd,partitionNumber,maxPartition)
    for i in range(len(res)):
        #if i==1:
        #    print(res[i])
        tmpans.append([res[i]['key'], math.log(int(res[i]['agg_revenue']['value'])+1)])


ソート完了
195131


In [21]:
#集約後のfullVisitorIdにjoinする
#print(tmpans[0])
def joinRevenue(sumdata,tmpans):
    ans=[]
    #npans=np.array(tmpans)
    #print(npans[0])
    print(sumdata[0][0])

    tmpans=sorted(tmpans,key=lambda x:x[0])
    print("tmpansソート完了"+str(len(tmpans)) )
    print(tmpans[0])

    key_ans=tmpans[0][0]
    ix_ans=0
    key_data=sumdata[0][0]
    ix_data=0
    inf = '9999999999999999999'

    while key_ans != inf or key_data != inf:
        if key_ans == key_data:
            ans.append(tmpans[ix_ans][1])
            ix_data+=1
            ix_ans+=1
        elif key_ans > key_data:
            ans.append(0)
            ix_data+=1
        else:
            ix_ans+=1
        if ix_ans >= len(tmpans):
            key_ans=inf
        else:
            key_ans=tmpans[ix_ans][0]

        if ix_data >= len(sumdata):
            key_data=inf
        else:
            key_data=sumdata[ix_data][0]

#for ix_data in range(len(sumdata)):
#    if ix_data % 10000 ==0:
#        print("join中："+str(ix_data))
    #r = np.where(npans[:,0]==sumdata[i][0])
    
    #print(r[0])
    #print("len:",len(r[0]))
#    if len(r[0])==0:
#        ans.append(0)
#    else:
#        ans.append(r[1]['value'])

    #print(getRevenueByFullVisitorId(rowdata[i][0],"20160101","20180101"))

    print(len(ans))
    print(sum(ans))
    
    return(ans)


In [22]:
ans=joinRevenue(sumdata,tmpans)


0000010278554503158
tmpansソート完了155959
[u'0000020424342248747', 0.0]
195131
5665.12325108


In [23]:
#評価データ読み込み
evalrowdata=[]
evalans = []

res,es_s_id,es_s_size=getInputData(evalIndataStartYmd,evalIndataEndYmd)
while(es_s_size>0):
    rowdata=es2oneHot(res,evalrowdata)
    res=escon.scroll(scroll_id=es_s_id,scroll=es_s_time)
    es_s_id=res['_scroll_id']
    es_s_size=len(res["hits"]["hits"])

    print(len(evalrowdata))

evalsumdata=transformInputdata(evalrowdata)

#partitionでぐるぐる回してtransactionRevenueをとってくる
#sumdataに合わせてansを当てて設定する。

evaltmpans=[]
for partitionNumber in range(maxPartition):
    #print(partitionNumber)
    res=getRevenue(evalForecastStartYmd,evalForecastEndYmd,partitionNumber,maxPartition)
    for i in range(len(res)):
        evaltmpans.append([res[i]['key'], math.log(int(res[i]['agg_revenue']['value'])+1)])

evalans=joinRevenue(evalsumdata,evaltmpans)


10000
20000
30000
40000
50000
60000
70000
80000


ConnectionTimeout: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host=u'localhost', port=9200): Read timed out. (read timeout=10))

In [None]:
#tensorflowセットアップ
datalength=len(sumdata)

#print(data[0],ans[0])
#inputData=tf.transpose(data)
debugFlg=False
batch_size=datalength/2

P=int(36)
Q=int(36)

num_rows=len(sumdata)
#sumdataから１列目を削除してinputDataに渡す
#data=[]
#for i in range(len(sumdata)):
    #data.append(sumdata[i][1][0])

#print(sumdata[14])

#高速化のため見直し
#data=np.empty((0,num_elements))
#for i in range(len(sumdata)):
#    data=np.append(data,[np.array(sumdata[i][1])],axis=0 )
#    if i % 10000==0:
#        print(i)

##訓練データをtensorflowが食えるように変換
data=np.empty((0,num_elements))
tmp_array=data.tolist()
for i in range(len(sumdata)):
    tmp_array.append(np.array(sumdata[i][1]))
data=np.asarray(tmp_array)        
print(data.shape)

inputData=data
inputAns=np.reshape(ans,(num_rows,1))


#print(data[0:20])

##評価データを同様に設定
evalnum_rows=len(evalsumdata)
evaldata=np.empty((0,num_elements))
evaltmp_array=evaldata.tolist()
for i in range(len(evalsumdata)):
    evaltmp_array.append(np.array(evalsumdata[i][1]))
evaldata=np.asarray(evaltmp_array)
print(evaldata.shape)

evalInputData=evaldata
evalAns=np.reshape(evalans,(evalnum_rows,1))


#inputData=np.reshape(np.delete(sumdata,0,1),(num_rows,num_elements))
#inputData=np.reshape(sumdata[:][1],(num_rows,num_elements))
##inputData=np.reshape(data,(num_rows,num_elements))
#inputData=data,(num_rows,num_elements)
x =tf.placeholder("float",[None,num_elements])
y_=tf.placeholder("float",[None,1])

a=tf.Variable( tf.random_uniform([num_elements,1],-1.0,1.0),name="weights")    
w1=tf.Variable( tf.random_uniform([num_elements,P],-1.0,1.0 )  )
w2=tf.Variable( tf.random_uniform([P,Q],-1.0,1.0 )  )
w3=tf.Variable( tf.random_uniform([Q,1],-1.0,1.0 )  )

#a=tf.Variable( tf.random_uniform([[18]],-1.0,1.0),name="weights")    
#b=tf.Variable(tf.zeros([1,1]),name="intercept")
b=tf.Variable(tf.random_normal([P]),name="intercept")
print("x:",x.shape,"a:",a.shape)
#y=tf.matmul(tf.transpose(a),x)+b
#y=tf.matmul(x,a)

x2=tf.sigmoid(tf.matmul(x,w1))+b
#y=tf.matmul(x2,w2)
x3=tf.sigmoid(tf.matmul(x2,w2))
y =tf.matmul(x3,w3)

init = tf.global_variables_initializer()
loss = tf.reduce_sum(tf.square(y_-y))/batch_size
train_step=tf.train.GradientDescentOptimizer(0.0001).minimize(loss)


In [None]:
#データ表示用セットアップ
graphLoss=[]
graphEval=[]
graphStep=[]
graphNum=30
repeatAll=3000
debugFlg=False
with tf.Session() as sess:
    sess.run(init)
    print('初期状態')
    inpShape=inputData[0:batch_size]
    ansShape=inputAns[0:batch_size]
    #print('誤差' + str(sess.run(loss, feed_dict={x: inputData[0:batch_size], y_: inputAns[0:batch_size]})))
    ##print('誤差' + str(sess.run(loss, feed_dict={x: inpShape, y_: ansShape})))
    #print("weigths: %f, intercept: %f" % (sess.run(a), sess.run(b) ))
    #print("input:",inputAns[batch_size*step+1:batch_size*step+1+batch_size])
    ##print("weigths:",sess.run(a),"intercept:",sess.run(b) )
    
    for step in range(repeatAll):
        if (batch_size*step) %datalength < (batch_size*step+batch_size) %datalength :
            batchInputData= inputData[(batch_size*step)%datalength:(batch_size*step+batch_size)%datalength]
            batchInputAns = inputAns[(batch_size*step)%datalength:(batch_size*step+batch_size)%datalength]
        else:
            batchInputData= inputData[(batch_size*step)%datalength:min( (batch_size*step+batch_size)%datalength,datalength-1 )]
            batchInputAns = inputAns[(batch_size*step)%datalength:min( (batch_size*step+batch_size)%datalength,datalength-1) ]
        sess.run(train_step, feed_dict={x: batchInputData, y_: batchInputAns })
        if (step+1) % (repeatAll/graphNum) == 0:
            print('\nStep: %s' % (step+1))
            print('誤差' + str(sess.run(loss, feed_dict={x: batchInputData, y_: batchInputAns })))
            print('評価' + str(sess.run(loss, feed_dict={x: evalInputData, y_: evalAns })))
            if debugFlg==True:
                print("weights:",sess.run(w1), " intercept:", sess.run(b) )
            graphLoss.append( sess.run(loss, feed_dict={x: batchInputData, y_: batchInputAns }) )
            graphEval.append( sess.run(loss, feed_dict={x: evalInputData, y_: evalAns }))
            graphStep.append(step)
    #最終状態を表示
    print("weights1:",sess.run(w1),"weights2:",sess.run(w2), " intercept:", sess.run(b))
    
    
# めも
#Step: 10000
#誤差0.574773
#評価1.05919

In [None]:
    

plt.plot(graphStep,graphLoss,graphStep,graphEval)
