In [1]:
!pip install numpy requests joblib pyquery

Collecting pyquery
  Downloading https://files.pythonhosted.org/packages/09/c7/ce8c9c37ab8ff8337faad3335c088d60bed4a35a4bed33a64f0e64fbcf29/pyquery-1.4.0-py2.py3-none-any.whl
Collecting cssselect>0.7.9 (from pyquery)
  Downloading https://files.pythonhosted.org/packages/7b/44/25b7283e50585f0b4156960691d951b05d061abf4a714078393e51929b30/cssselect-1.0.3-py2.py3-none-any.whl
Installing collected packages: cssselect, pyquery
Successfully installed cssselect-1.0.3 pyquery-1.4.0


  Cache entry deserialization failed, entry ignored
  Cache entry deserialization failed, entry ignored
  Cache entry deserialization failed, entry ignored
  Cache entry deserialization failed, entry ignored
You are using pip version 9.0.1, however version 19.1.1 is available.
You should consider upgrading via the 'python -m pip install --upgrade pip' command.


In [6]:
%%writefile Lottos.py
## numpy (1.16.3)
## requests (2.21.0)
## joblib (0.12.5)
## pyquery (1.4.0)

import numpy as np
import requests
import pyquery
from datetime import datetime as dt
from time import sleep
from joblib import Parallel, delayed, cpu_count
np.set_printoptions(precision=8, suppress=True, linewidth=120)
#np.set_printoptions()

class Lottos:
    def __init__(self, filename='./lottos_db.npz'):
        self.filename = filename
        try:
            with np.load(self.filename) as npz :
                self.lottos = npz['lottos']
                self.add_lottos=npz['add_lottos']
            print("loaded!! ", self.lottos.shape)
        except Exception as e:
            print(e)
            self.lottos=np.zeros((0,45),dtype=np.int16)
            self.add_lottos=np.zeros((0,45),dtype=np.int16)
        finally:
            self.update()
            self.create_statistic()
            
    def create_statistic(self):
        self.neg_lottos = (1-(self.lottos)).astype(np.int16) +self.add_lottos.astype(np.int16)
        self.tomap = np.zeros((45,70,2))
        for i in range(1,self.neg_lottos.shape[0]):
            self.neg_lottos[i] += self.neg_lottos[i]*self.neg_lottos[i-1]
        for x in range(45):
            bincnt = np.bincount(self.neg_lottos[:,x], minlength=69)
            for y in range(len(bincnt)-1):
                self.tomap[x,y,0] = bincnt[y]-bincnt[y+1]
                self.tomap[x,y,1] = bincnt[y+1]
                    
    def __len__(self):
        return self.lottos.shape[0]
    
    def get_url(self, no):
        return f'https://search.naver.com/search.naver?query={no+1}회로또'
    
    def update(self):
        starttime = dt.now()
        url = self.get_url(9998)
        body = requests.get(url)
        while body.status_code != 200:
            print(f'\r{body.status_code} {dt.now()}', end='')
            sleep(5)
            body = requests.get(url)
        d = pyquery.PyQuery(body.text)('._lotto-btn-current em')
        limit = int(d.html()[:-1])
        end = self.lottos.shape[0]
        
        def get_balls(no):
            result = []
            url = self.get_url(no)
            body = requests.get(url)
            while True:
                d = pyquery.PyQuery(body.text)('.num_box .num')
                result = [ int(x.text)-1 for x in d]
                if len(result)==7:
                    break
                sleep(5)
                body = requests.get(url)
            print(f'\r    {no+1} ', end='')
            if no%180 == 179:
                print(f'  {dt.now()-starttime}')
            return result
        
        if limit>end:
            self.lottos = np.append(self.lottos,np.zeros((limit-end,45)),axis=0)
            self.add_lottos = np.append(self.add_lottos,np.zeros((limit-end,45)),axis=0)
            print(self.lottos.shape)
            verb=0
            crawled = Parallel(n_jobs=10, backend='threading', verbose=verb)(
                delayed(get_balls)(x) for x in range(end,limit)
            )
            print(f'  {dt.now()-starttime}')
            for rowno,row in enumerate(crawled):
                self.lottos[end+rowno,row]=1
                self.add_lottos[end+rowno,row[-1]] = 1
            np.savez_compressed(self.filename, lottos=self.lottos, add_lottos=self.add_lottos)
            print(f'\nwe had {end}. so update to {limit}. now we have {self.lottos.shape[0]} rows.')
        else:
            print('\nno update')
        
    def get_probability(self, no=None, history=1):
        if no is None:
            no=self.lottos.shape[0]+1
        assert 1< no <= self.lottos.shape[0]+1
        assert history < no
        probability = np.zeros(45)
        predict_seed = self.neg_lottos[no-1-history:no-1]
        prob_bias = (1-np.power((38/45), predict_seed[-1]))*0.03
        #print(f'predict_seed{no} : \n{predict_seed}\n###\n{prob_bias}\n\
        #cov : {np.cov(predict_seed)}, mean : {np.mean(predict_seed)}')
        for x in range(45):
            rate = 1.
            for y in range(history):
                percent = (self.tomap[x,predict_seed[y,x],0]/
                           np.sum(self.tomap[x,predict_seed[y,x]]))
                probability[x] += percent *rate
                rate = rate * (1-percent)
        #probability += prob_bias
        return (probability/np.sum(probability)), predict_seed
    
    def get_real_history(self, no):
        assert 1< no <= self.lottos.shape[0]
        return np.where((self.lottos[no-1]-self.add_lottos[no-1])>0.5)[0]+1
    
    def recommend(self, prob=None, count=5):
        if prob is None:
            p = np.ones((45))/45
        result = np.zeros((count,6))
        for i in range(count):
            result[i] = np.sort(np.random.choice(45, 6, replace=False, p=prob)+1)
        result.astype(np.int8)
        return result
      
    def validation_history(self, recommends):
        assert recommends is not None
        assert recommends.ndim == 2
        assert recommends.shape[1] == 6
        rr = np.zeros((recommends.shape[0], 45))
        for i, r in enumerate(recommends):
            rr[i,r.astype(np.int32)-1] = 1
        
        rrr = np.expand_dims(rr,1)
        lll = np.expand_dims((self.lottos-self.add_lottos), 0)
        temp = np.sum((lll-rrr)**2, axis=2)
        for tt in temp:
            print(np.bincount(tt.astype(np.int32))[::2])

Writing Lottos.py


In [1]:
from Lottos import Lottos
import numpy as np
from datetime import datetime as dt
from time import sleep
from joblib import Parallel, delayed, cpu_count
np.set_printoptions(precision=8, suppress=True, linewidth=120)

def aug_pb(pb, pb_pow=3):
    pb=pb**pb_pow
    pb=pb/np.sum(pb)
    #pb = pb + (1/100)
    #pb = pb / np.sum(pb)
    return pb

def correct_test(i, pb_power=1, testtime = 70000):
    starttime = dt.now()
    pb = lottos.get_probability(i)
    pb = aug_pb(pb, pb_pow=pb_power)
    y_ = lottos.get_real_history(i)
    count_pb = np.bincount(np.sum(np.isin( lottos.recommend(prob=pb, count=testtime),y_), axis=1), minlength=7)
    count_no = np.bincount(np.sum(np.isin( lottos.recommend(count=testtime),y_)         , axis=1), minlength=7)
    #(np.sort(np.argsort(-pb)[(y_-1)])+1)}\
    print(f"\n========  {i} : {(y_)} ========\
    \npred : {count_pb}\nunif : {count_no}\nrate : {(count_pb/count_no)[:-1]}\
    \npredict_acc : {count_pb/testtime}\npred_cumsum : {np.cumsum(count_pb/testtime)}\
    \nuniform_acc : {count_no/testtime}\nunif_cumsum : {np.cumsum(count_no/testtime)}\
    \n{dt.now()-starttime}\n")
    
    pb_correct_rate = np.sum(count_pb[3:])/np.sum(count_pb[:3])
    no_correct_rate = np.sum(count_no[3:])/np.sum(count_no[:3])
    return (count_pb / testtime)


lottos = Lottos()

pb, seed = lottos.get_probability()
r= lottos.recommend(pb)

print(lottos.get_real_history(len(lottos)))
print(r)
print(seed)
print(pb)
print(lottos.validation_history(r))

print("\n\n===================\n\n")

pb, seed = lottos.get_probability()
r= lottos.recommend(pb)
pb = aug_pb(pb)
r= lottos.recommend(pb)
print(r)
print(pb)
print(lottos.validation_history(r))


print("\n\n===================\n\n")

pb, seed = lottos.get_probability()
r= lottos.recommend(pb)
pb = aug_pb(pb,4)
r= lottos.recommend(pb)
print(r)
print(pb)
print(lottos.validation_history(r))

'''
test_pow = 8
history = 200
cpucnt = cpu_count()
d = len(lottos)
print(cpucnt,d)
teststarttime = dt.now()
report_str =""
for p in range(1,test_pow):
    print(f"\n\n%%%%%%%%%%%% ## pb_pow is {p} ## %%%%%%%%%%%%%%%\n\n")
    returns = Parallel(n_jobs=(cpucnt*2), backend='multiprocessing')(
        delayed(correct_test)(i,p) for i in range(d-history,d))
    returns_filter = np.sum(returns,axis=0)/history
    returns_filter_len = len([np.sum(x[3:]) for x in returns if np.sum(x[3:]) > (1-0.976) ])
    #'mean : {returns_filter.mean()}, {returns_filter.max()}~{returns_filter.min()}\n\'
    temp_report_str = f'#$#$#$#$aug_pow:{p}, history : {history}, wins :{returns_filter_len}\n\
    {returns_filter}\n\
    #$#$#$#$  test total time : {dt.now()- teststarttime}\n\n'
    report_str += temp_report_str
    #print(temp_report_str)
    #print(returns, returns_filter)
    
print(report_str)
####
'''

loaded!!  (857, 45)

no update
[ 6 10 16 28 34 38]
[[ 3. 13. 18. 23. 41. 45.]
 [ 7. 16. 17. 25. 38. 40.]
 [ 7. 21. 24. 28. 37. 41.]
 [ 2.  9. 20. 25. 27. 34.]
 [ 7. 12. 20. 24. 25. 40.]]
[[ 9  4 18 17  8  0 13  2 18  0  5 10  8  6  2  0  2  6  2  3 14  6  4  1  3  4  4  0  8  5  3  3  5  0  5  3 27  0  7
   1  1 10  1  1 11]]
[0.02241733 0.01705187 0.05572308 0.01625257 0.03095727 0.00963115 0.02889345 0.02022542 0.02000316 0.01918336
 0.01322243 0.03000474 0.01857436 0.01383197 0.01096559 0.01654807 0.01936476 0.01659837 0.01606995 0.02701725
 0.02052956 0.02167009 0.02097105 0.02437885 0.01881876 0.01529653 0.01746544 0.02105094 0.01902739 0.02698539
 0.02000316 0.02103273 0.02203738 0.01940605 0.01569213 0.02600411 0.06501026 0.01757034 0.02219863 0.01701203
 0.02097105 0.02516526 0.02065747 0.02210349 0.03640575]
[  0   0   3  18 125 361 350]
[  0   0   3  23 123 368 340]
[  0   0   0  13 129 370 345]
[  0   0   0  24 124 375 334]
[  0   0   3  24 141 347 342]
None




[[ 3.  5.  7

'\ntest_pow = 8\nhistory = 200\ncpucnt = cpu_count()\nd = len(lottos)\nprint(cpucnt,d)\nteststarttime = dt.now()\nreport_str =""\nfor p in range(1,test_pow):\n    print(f"\n\n%%%%%%%%%%%% ## pb_pow is {p} ## %%%%%%%%%%%%%%%\n\n")\n    returns = Parallel(n_jobs=(cpucnt*2), backend=\'multiprocessing\')(\n        delayed(correct_test)(i,p) for i in range(d-history,d))\n    returns_filter = np.sum(returns,axis=0)/history\n    returns_filter_len = len([np.sum(x[3:]) for x in returns if np.sum(x[3:]) > (1-0.976) ])\n    #\'mean : {returns_filter.mean()}, {returns_filter.max()}~{returns_filter.min()}\n\'\n    temp_report_str = f\'#$#$#$#$aug_pow:{p}, history : {history}, wins :{returns_filter_len}\n    {returns_filter}\n    #$#$#$#$  test total time : {dt.now()- teststarttime}\n\n\'\n    report_str += temp_report_str\n    #print(temp_report_str)\n    #print(returns, returns_filter)\n    \nprint(report_str)\n####\n'