<a href="https://colab.research.google.com/github/jckim79/jckim79/blob/main/keras_lstm_lotto_968_ipynb%EC%9D%98_%EC%82%AC%EB%B3%B8%EC%9D%98_%EC%82%AC%EB%B3%B8%EC%9D%98_%EC%82%AC%EB%B3%B8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 딥러닝 로또 번호 예측

- 딥러닝 로또 예측 번호 확인: https://animalface.site/lotto

- 동영상 설명: 유튜브 조코딩 채널(https://www.youtube.com/channel/UCQNE2JmbasNYbjGAcuBiRRg)

- 참고 문헌: 김태영님 블로그(https://tykimos.github.io/2020/01/25/keras_lstm_lotto_v895/)

In [None]:
!python3.10 -m venv my_env

In [None]:
!source my_env/bin/activate

In [None]:
!apt-get install -y python3.10

In [None]:
 !apt install python3.10-venv


In [None]:
!python3.10 -m venv tf210_env

In [None]:
!source tf210_env/bin/activate

In [None]:
!python3.10 -m venv tf210_env  # Create a new environment
!source tf210_env/bin/activate  # Activate the environment

In [None]:
!pip install --platform manylinux2014_x86_64 --python-version 39 tensorflow_gpu==2.10.0 --only-binary=:all: --target=/tmp/tensorflow_gpu
!pip install numpy pandas matplotlib # any other requirements

In [None]:
import numpy as np

# Handle potential empty strings, non-numeric characters, and quotes
converters = {i: lambda s: float(s.strip().replace('"', '') or 0) for i in range(7)}

# Load the data, specifying to use only the first 7 columns (indices 0 to 6)
# This avoids the error caused by inconsistent column counts in the file
rows = np.loadtxt("./lotto.csv", delimiter=",", converters=converters, encoding='utf-8-sig', usecols=range(7))
row_count = len(rows)
print(row_count)

In [None]:
import numpy as np

# 당첨번호를 원핫인코딩벡터(ohbin)으로 변환
def numbers2ohbin(numbers):

    ohbin = np.zeros(45) #45개의 빈 칸을 만듬

    for i in range(6): #여섯개의 당첨번호에 대해서 반복함
        ohbin[int(numbers[i])-1] = 1 #로또번호가 1부터 시작하지만 벡터의 인덱스 시작은 0부터 시작하므로 1을 뺌

    return ohbin

# 원핫인코딩벡터(ohbin)를 번호로 변환
def ohbin2numbers(ohbin):

    numbers = []

    for i in range(len(ohbin)):
        if ohbin[i] == 1.0: # 1.0으로 설정되어 있으면 해당 번호를 반환값에 추가한다.
            numbers.append(i+1)

    return numbers

In [None]:
numbers = rows[:, 1:7]
ohbins = list(map(numbers2ohbin, numbers))

x_samples = ohbins[0:row_count-1]
y_samples = ohbins[1:row_count]

#원핫인코딩으로 표시
print("ohbins")
print("X[0]: " + str(x_samples[0]))
print("Y[0]: " + str(y_samples[0]))

#번호로 표시
print("numbers")
print("X[0]: " + str(ohbin2numbers(x_samples[0])))
print("Y[0]: " + str(ohbin2numbers(y_samples[0])))

In [None]:
train_idx = (0, 800)
val_idx = (801,900)
test_idx = (901, len(x_samples))

print("train: {0}, val: {1}, test: {2}".format(train_idx, val_idx, test_idx))

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import models

# Define the input shape with batch size
input_shape = (1, 1, 45)  # (batch_size, timesteps, features)

# Define the input layer, using only batch_shape for stateful LSTM
inputs = keras.Input(batch_shape=input_shape)

# Define the LSTM layer
lstm_layer = layers.LSTM(128, return_sequences=False, stateful=True)(inputs)

# Define the output layer
outputs = layers.Dense(45, activation='sigmoid')(lstm_layer)

# Create the model
model = keras.Model(inputs=inputs, outputs=outputs)

# Compile the model
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
# 매 에포크마다 훈련과 검증의 손실 및 정확도를 기록하기 위한 변수
train_loss = []
train_acc = []
val_loss = []
val_acc = []

# 최대 100번 에포크까지 수행
for epoch in range(100):

    # Reset the state of the LSTM layer instead of the entire model
    model.layers[1].reset_states()  # 중요! 매 에포크마다 1회부터 다시 훈련하므로 상태 초기화 필요

    batch_train_loss = []
    batch_train_acc = []

    for i in range(train_idx[0], train_idx[1]):

        xs = x_samples[i].reshape(1, 1, 45)
        ys = y_samples[i].reshape(1, 45)

        loss, acc = model.train_on_batch(xs, ys) #배치만큼 모델에 학습시킴

        batch_train_loss.append(loss)
        batch_train_acc.append(acc)

    train_loss.append(np.mean(batch_train_loss))
    train_acc.append(np.mean(batch_train_acc))

    batch_val_loss = []
    batch_val_acc = []

    for i in range(val_idx[0], val_idx[1]):

        xs = x_samples[i].reshape(1, 1, 45)
        ys = y_samples[i].reshape(1, 45)

        loss, acc = model.test_on_batch(xs, ys) #배치만큼 모델에 입력하여 나온 답을 정답과 비교함

        batch_val_loss.append(loss)
        batch_val_acc.append(acc)

    val_loss.append(np.mean(batch_val_loss))
    val_acc.append(np.mean(batch_val_acc))

    print('epoch {0:4d} train acc {1:0.3f} loss {2:0.3f} val acc {3:0.3f} loss {4:0.3f}'.format(epoch, np.mean(batch_train_acc), np.mean(batch_train_loss), np.mean(batch_val_acc), np.mean(batch_val_loss)))

In [None]:
# Ensure the indices are within the bounds of x_samples
total_samples = len(x_samples)
train_idx = (0, 800)
val_idx = (801, 900)

# Adjust the validation and test indices if they exceed the total number of samples
if val_idx[0] >= total_samples:
    val_idx = (total_samples, total_samples) # Empty validation set
if val_idx[1] > total_samples:
    val_idx = (val_idx[0], total_samples)

# Calculate the test index based on the end of the validation index and total samples
test_idx = (val_idx[1], total_samples)

print("train: {0}, val: {1}, test: {2}".format(train_idx, val_idx, test_idx))

In [None]:
import matplotlib.pyplot as plt

fig, ax1 = plt.subplots()
ax2 = ax1.twinx()

for data, color, label, ax in zip(
    [train_loss, val_loss, train_acc, val_acc],
    ['y', 'r', 'b', 'g'],
    ['train loss', 'val loss', 'train acc', 'val acc'],
    [ax1, ax1, ax2, ax2]
):
    ax.plot(data, color, label=label)

ax1.set_xlabel('epoch')
ax1.set_ylabel('loss')
ax2.set_ylabel('accuracy')

ax1.legend(loc='upper left')
ax2.legend(loc='lower left')

plt.show()

In [None]:
# 88회부터 지금까지 1등부터 5등까지 상금의 평균낸다.
mean_prize = [ np.mean(rows[87:, 8]),
           np.mean(rows[87:, 9]),
           np.mean(rows[87:, 10]),
           np.mean(rows[87:, 11]),
           np.mean(rows[87:, 12])]

print(mean_prize)

In [None]:
import numpy as np

# Handle potential empty strings, non-numeric characters, and quotes
# Extend converters to handle columns up to index 12
converters = {i: lambda s: float(s.strip().replace('"', '') or 0) for i in range(13)}

# Load the data, specifying to use columns from index 0 to 12 (inclusive)
# This includes the lottery numbers and the prize money columns
rows = np.loadtxt("./lotto.csv", delimiter=",", converters=converters, encoding='utf-8-sig', usecols=range(13))

# Assuming columns 1 to 6 contain the lottery numbers
lottery_number_columns = rows[:, 1:7]

# Create a mask to identify rows where any number is outside the range [1, 45]
# Ensure this check still works with potentially more columns loaded
invalid_rows_mask = np.any((lottery_number_columns < 1) | (lottery_number_columns > 45), axis=1)

# Filter out the invalid rows
filtered_rows = rows[~invalid_rows_mask]

# Update row_count and proceed with the filtered data
row_count = len(filtered_rows)
print(f"Original row count: {len(rows)}")
print(f"Filtered row count: {row_count}")

# Now use the filtered_rows for further processing
# Update 'rows' to refer to the filtered data for consistency
rows = filtered_rows

numbers = rows[:, 1:7]

# ... (rest of your code that uses the 'rows' variable should now work)

In [None]:
# 등수와 상금을 반환함
# 순위에 오르지 못한 경우에는 등수가 0으로 반환함
def calc_reward(true_numbers, true_bonus, pred_numbers):

    count = 0

    for ps in pred_numbers:
        if ps in true_numbers:
            count += 1

    if count == 6:
        return 0, mean_prize[0]
    elif count == 5 and true_bonus in pred_numbers:
        return 1, mean_prize[1]
    elif count == 5:
        return 2, mean_prize[2]
    elif count == 4:
        return 3, mean_prize[3]
    elif count == 3:
        return 4, mean_prize[4]

    return 5, 0

In [None]:
def gen_numbers_from_probability(nums_prob):

    ball_box = []

    for n in range(45):
        ball_count = int(nums_prob[n] * 100 + 1)
        ball = np.full((ball_count), n+1) #1부터 시작
        ball_box += list(ball)

    selected_balls = []

    while True:

        if len(selected_balls) == 6:
            break

        ball_index = np.random.randint(len(ball_box), size=1)[0]
        ball = ball_box[ball_index]

        if ball not in selected_balls:
            selected_balls.append(ball)

    return selected_balls

In [None]:
train_total_reward = []
train_total_grade = np.zeros(6, dtype=int)

val_total_reward = []
val_total_grade = np.zeros(6, dtype=int)

test_total_reward = []
test_total_grade = np.zeros(6, dtype=int)

model.layers[1].reset_states()  # Assuming your LSTM layer is the second layer in the model (index 1)

print('[No. ] 1st 2nd 3rd 4th 5th 6th Rewards')

for i in range(len(x_samples)):
    xs = x_samples[i].reshape(1, 1, 45)
    ys_pred = model.predict_on_batch(xs) # 모델의 출력값을 얻음

    sum_reward = 0
    sum_grade = np.zeros(6, dtype=int) # 6등까지 변수

    for n in range(10): # 10판 수행
        numbers = gen_numbers_from_probability(ys_pred[0])

        #i회차 입력 후 나온 출력을 i+1회차와 비교함
        grade, reward = calc_reward(rows[i+1,1:7], rows[i+1,7], numbers)

        sum_reward += reward
        sum_grade[grade] += 1

        if i >= train_idx[0] and i < train_idx[1]:
            train_total_grade[grade] += 1
        elif i >= val_idx[0] and i < val_idx[1]:
            val_total_grade[grade] += 1
        elif i >= test_idx[0] and i < test_idx[1]:
            val_total_grade[grade] += 1

    if i >= train_idx[0] and i < train_idx[1]:
        train_total_reward.append(sum_reward)
    elif i >= val_idx[0] and i < val_idx[1]:
        val_total_reward.append(sum_reward)
    elif i >= test_idx[0] and i < test_idx[1]:
        test_total_reward.append(sum_reward)

    print('[{0:4d}] {1:3d} {2:3d} {3:3d} {4:3d} {5:3d} {6:3d} {7:15,d}'.format(i+1, sum_grade[0], sum_grade[1], sum_grade[2], sum_grade[3], sum_grade[4], sum_grade[5], int(sum_reward)))

print('Total')
print('==========')
print('Train {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(train_total_grade[0], train_total_grade[1], train_total_grade[2], train_total_grade[3], train_total_grade[4], train_total_grade[5], int(sum(train_total_reward))))
print('Val   {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(val_total_grade[0], val_total_grade[1], val_total_grade[2], val_total_grade[3], val_total_grade[4], val_total_grade[5], int(sum(val_total_reward))))
print('Test  {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(test_total_grade[0], test_total_grade[1], test_total_grade[2], test_total_grade[3], test_total_grade[4], test_total_grade[5], int(sum(test_total_reward))))
print('==========')

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

total_reward = train_total_reward + val_total_reward + test_total_reward

plt.plot(total_reward)
plt.ylabel('rewards')
plt.show()

In [None]:
train_total_reward = []
train_total_grade = np.zeros(6, dtype=int)

val_total_reward = []
val_total_grade = np.zeros(6, dtype=int)

test_total_reward = []
test_total_grade = np.zeros(6, dtype=int)

# 88회부터 지금까지 1등부터 5등까지 상금의 평균을 계산합니다.
# 이 코드를 calc_reward 함수 호출 전에 배치하여 NameError를 방지합니다.
mean_prize = [ np.mean(rows[87:, 8]),
           np.mean(rows[87:, 9]),
           np.mean(rows[87:, 10]),
           np.mean(rows[87:, 11]),
           np.mean(rows[87:, 12])]

model.layers[1].reset_states()  # Assuming your LSTM layer is the second layer in the model (index 1)

print('[No. ] 1st 2nd 3rd 4th 5th 6th Rewards')

# len(x_samples) 대신 len(rows) - 1 을 사용합니다.
# x_samples는 y_samples보다 하나 적으므로, rows[i+1] 접근 시 인덱스 오류가 발생할 수 있습니다.
# calc_reward는 i+1 회차의 실제 번호와 비교하므로, 루프는 len(rows) - 1까지 실행되어야 합니다.
for i in range(len(rows) - 1):
    xs = x_samples[i].reshape(1, 1, 45)
    ys_pred = model.predict_on_batch(xs) # 모델의 출력값을 얻음

    sum_reward = 0
    sum_grade = np.zeros(6, dtype=int) # 6등까지 변수

    for n in range(10): # 10판 수행
        numbers = gen_numbers_from_probability(ys_pred[0])

        #i회차 입력 후 나온 출력을 i+1회차와 비교함
        grade, reward = calc_reward(rows[i+1,1:7], rows[i+1,7], numbers)

        sum_reward += reward
        sum_grade[grade] += 1

        # 인덱스 범위를 조정하여 i+1 회차와 비교한 결과를 저장합니다.
        # i는 0부터 시작하지만, 결과는 i+1 회차에 대한 예측이므로 i+1을 사용합니다.
        if (i+1) >= train_idx[0] and (i+1) < train_idx[1]:
            train_total_grade[grade] += 1
        elif (i+1) >= val_idx[0] and (i+1) < val_idx[1]:
            val_total_grade[grade] += 1
        elif (i+1) >= test_idx[0] and (i+1) < test_idx[1]:
            test_total_grade[grade] += 1

    # 인덱스 범위를 조정하여 i+1 회차와 비교한 결과를 저장합니다.
    if (i+1) >= train_idx[0] and (i+1) < train_idx[1]:
        train_total_reward.append(sum_reward)
    elif (i+1) >= val_idx[0] and (i+1) < val_idx[1]:
        val_total_reward.append(sum_reward)
    elif (i+1) >= test_idx[0] and (i+1) < test_idx[1]:
        test_total_reward.append(sum_reward)

    print('[{0:4d}] {1:3d} {2:3d} {3:3d} {4:3d} {5:3d} {6:3d} {7:15,d}'.format(i+1, sum_grade[0], sum_grade[1], sum_grade[2], sum_grade[3], sum_grade[4], sum_grade[5], int(sum_reward)))


print('Total')
print('==========')
print('Train {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(train_total_grade[0], train_total_grade[1], train_total_grade[2], train_total_grade[3], train_total_grade[4], train_total_grade[5], int(sum(train_total_reward))))
print('Val   {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(val_total_grade[0], val_total_grade[1], val_total_grade[2], val_total_grade[3], val_total_grade[4], val_total_grade[5], int(sum(val_total_reward))))
# Test 데이터 인덱스 저장 로직 수정:
# test_total_grade와 test_total_reward에 결과가 저장되도록 수정합니다.
print('Test  {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(test_total_grade[0], test_total_grade[1], test_total_grade[2], test_total_grade[3], test_total_grade[4], test_total_grade[5], int(sum(test_total_reward))))
print('==========')

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator

ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))

rewards = [sum(train_total_reward), sum(val_total_reward), sum(test_total_reward)]

class_color=['green', 'blue', 'red']

plt.bar(['train', 'val', 'test'], rewards, color=class_color)
plt.ylabel('rewards')
plt.show()

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import models
import matplotlib.pyplot as plt

# ... (previous code for data loading, preprocessing, etc.) ...

# Define the input shape with batch size
input_shape = (1, 1, 45)  # (batch_size, timesteps, features)

# Define the input layer, using only batch_shape for stateful LSTM
inputs = keras.Input(batch_shape=input_shape)

# Define the LSTM layer
lstm_layer = layers.LSTM(128, return_sequences=False, stateful=True)(inputs)

# Define the output layer
outputs = layers.Dense(45, activation='sigmoid')(lstm_layer)

# Create the model
model = keras.Model(inputs=inputs, outputs=outputs)

# Compile the model
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

# ... (previous code for calc_reward, gen_numbers_from_probability) ...

# Early stopping callback
early_stopping = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Variables for tracking training and validation metrics
train_loss = []
train_acc = []
val_loss = []
val_acc = []

# Maximum number of epochs
max_epochs = 100

# Training loop with early stopping
for epoch in range(max_epochs):

    # Reset the state of the LSTM layer
    model.layers[1].reset_states()

    batch_train_loss = []
    batch_train_acc = []

    for i in range(train_idx[0], train_idx[1]):
        xs = x_samples[i].reshape(1, 1, 45)
        ys = y_samples[i].reshape(1, 45)
        loss, acc = model.train_on_batch(xs, ys)
        batch_train_loss.append(loss)
        batch_train_acc.append(acc)

    train_loss.append(np.mean(batch_train_loss))
    train_acc.append(np.mean(batch_train_acc))

    batch_val_loss = []
    batch_val_acc = []

    for i in range(val_idx[0], val_idx[1]):
        xs = x_samples[i].reshape(1, 1, 45)
        ys = y_samples[i].reshape(1, 45)
        loss, acc = model.test_on_batch(xs, ys)
        batch_val_loss.append(loss)
        batch_val_acc.append(acc)

    val_loss.append(np.mean(batch_val_loss))
    val_acc.append(np.mean(batch_val_acc))

    print('epoch {0:4d} train acc {1:0.3f} loss {2:0.3f} val acc {3:0.3f} loss {4:0.3f}'.format(epoch, np.mean(batch_train_acc), np.mean(batch_train_loss), np.mean(batch_val_acc), np.mean(batch_val_loss)))

    # Check for early stopping
    if len(val_loss) > 10 and val_loss[-1] > val_loss[-11]:  # Stop if val_loss hasn't improved in 10 epochs
        print("Early stopping triggered at epoch", epoch)
        break

# Get optimal epoch
optimal_epoch = len(val_loss) - 10

# Print optimal epoch
print("Optimal epoch:", optimal_epoch)

# Get probabilities for each number
xs = x_samples[-1].reshape(1, 1, 45)  # Use the last input sequence for prediction
ys_pred = model.predict_on_batch(xs)
probabilities = ys_pred[0]  # Probabilities for each number (0-44)

# Print probabilities
print("Probabilities for each number (1-45):")
for i, prob in enumerate(probabilities):
    print(f"{i + 1}: {prob:.4f}")

In [None]:
# 마지막 회차까지 학습한 모델로 다음 회차 추론

print('receive numbers')

xs = x_samples[-1].reshape(1, 1, 45)

ys_pred = model.predict_on_batch(xs)

list_numbers = []

for n in range(10):
    numbers = gen_numbers_from_probability(ys_pred[0])
    numbers.sort()
    print('{0} : {1}'.format(n, numbers))
    list_numbers.append(numbers)