In [1]:
import math
import numpy as np
import random
from copy import deepcopy

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.optim import Adam

In [2]:
import pooltool as pt

In [3]:
import pandas as pd

In [4]:
reg_net = torch.load('reg_net.pt')

In [5]:
labels = ['cue', '1', '2', '3', '4', '5', '6', '7', '8', '9']

def shoot(balls, act):

    cue = pt.Cue(cueing_ball=balls['cue'])
    target = list(balls.keys())[1]
    cue.aim_for_best_pocket(balls[target], pockets.values())

    speed, vspin, hspin = act_to_aim(act)
    cue.strike(V0=speed, b=vspin, a=hspin)

    # Evolve the shot
    shot = pt.System(cue=cue, table=table, balls=balls)

    try:
        shot.simulate(continuize=True)

    except:
        print('exception')

    on_table = [key for key, value in balls.items() if value.rvw[0,2] > 0]
    
    if on_table == []:
        return {}

    balls = {key : balls[key] for key in on_table}
    
    return balls

        
def random_ball():
    a = np.random.rand(2)
    a[0] = (0.93)*a[0]+0.03
    a[1] = (1.92)*a[1]+0.03
    return a

def generate_balls(n):
    points = []
    points.append(random_ball())
    
    for _ in range(n):
        close = True
        while close:
            test = random_ball()
            close = False
            for point in points:
                if np.square(point - test).sum() < 0.06:
                    close = True
                    break
        points.append(test)
    
    balls = {}
    
    for i in range(n+1):
        balls[labels[i]] = pt.Ball(labels[i], xyz=points[i])
        
    return balls

def ball_to_obs(balls):
    ar = [x.rvw[0][:2] for x in balls.values()]
    #extra = np.zeros(2*(starting_ball_amount+1-len(ar)))
    #ar.append(extra)
    return np.hstack(ar)

def obs_to_ball(obs):
    balls = {}
    for i in range(len(obs)//2):
        balls[labels[i]] = pt.Ball(labels[i], xyz=points[i])
    return balls

def balls_to_obs(balls):
    ar = tuple(x.rvw[0,:2] for x in balls.values())
    return np.hstack(ar)

def act_to_aim(act):
    speed = act%9*0.3+0.3
    vspin = act%11*0.1-0.5
    hspin = act%9*0.1-0.4
    return speed, vspin, hspin


In [186]:
def find_best_stroke(balls):
    high_score = 0
    choice = 0
    for act in range(891):
        test_balls = deepcopy(balls)
        test_balls = shoot(test_balls, act)
        if len(test_balls) != 2 or not 'cue' in test_balls.keys():
            continue

        appraisal = reg_net(torch.as_tensor(np.array(ball_to_obs(test_balls)[:4]), dtype=torch.float32)).item()
        if appraisal > high_score:
            choice = act
            high_score = appraisal
    return choice

In [215]:
#View shot

table = pt.PocketTable(model_name="7_foot")
pockets = table.get_pockets()
balls = generate_balls(2)
act = find_best_stroke(balls)

speed, vspin, hspin = act_to_aim(act)

target = list(balls.keys())[1]

cue = pt.Cue(cueing_ball=balls['cue'])
cue.aim_for_best_pocket(balls[target], pockets.values())
cue.strike(V0=speed, b=vspin, a=hspin,)

# Evolve the shot
shot = pt.System(cue=cue, table=table, balls=balls)

shot.simulate(continuize=True)

interface.show(shot)



In [7]:
interface = pt.ShotViewer()

Known pipe types:
  CocoaGraphicsPipe
(all display modules loaded.)


In [11]:
#Supervized learning model

def mlp(sizes, activation=nn.ReLU, output_activation=nn.Tanh):
    # Build a feedforward neural network.
    layers = []
    for j in range(len(sizes)-1):
        act = activation if j < len(sizes)-2 else output_activation
        layers += [nn.Linear(sizes[j], sizes[j+1]), act()]
    return nn.Sequential(*layers)

def get_batch(df, n):
    sample = df.sample(n=n)
    return sample

def train(df, estimator=None, hidden_sizes=[32], lr=1e-2, 
          epochs=50):
    
    col = len(df.iloc[0])-1

    # make core of regression network if it's not there
    if estimator is None:
        reg_net = mlp(sizes=[col]+hidden_sizes+[1])
    else:
        reg_net = estimator

    # make prediction
    def get_pred(obs):
        return reg_net(obs)*100

    # make loss function
    def compute_loss(pred, true):
        loss = torch.pow(pred - true, 2).mean()
        return loss

    # make optimizer
    optimizer = Adam(reg_net.parameters(), lr=lr)

    # for training policy
    def train_one_epoch():

        # get predictions
        sample = get_batch(df, 1000)
        pred = get_pred(torch.as_tensor(np.array(sample.iloc[:,:-1]), dtype=torch.float32))

        # take a single regression gradient update step
        optimizer.zero_grad()
        batch_loss = compute_loss(pred,
                                  torch.as_tensor(np.array(sample.iloc[:,-1:]), dtype=torch.int8)
                                  )
        batch_loss.backward()
        optimizer.step()
        return batch_loss

    # training loop
    for i in range(epochs):
        batch_loss = train_one_epoch()
        if i%1000 == 0:
            print('epoch: %3d \t loss: %.3f'%
                    (i, batch_loss))
    return reg_net

def test(reg_net, df):
    pred = reg_net(torch.as_tensor(np.array(df.iloc[:,:-1]), dtype=torch.float32))
    true = torch.as_tensor(np.array(df.iloc[:,-1:]), dtype=torch.int32)
    mse = torch.pow(pred*100 - true, 2).mean()
    return mse

def compare(reg_net, df):
    pred = reg_net(torch.as_tensor(np.array(df.iloc[:,:-1]), dtype=torch.float32))
    pred_np = pred.detach().numpy()*100
    return np.hstack([np.array(df.iloc[:,-1:]), pred_np])

In [7]:
#Generate random hit data

def get_data(steps):

    table = pt.PocketTable(model_name="7_foot")
    pockets = table.get_pockets()

    df = pd.DataFrame(columns = ['cue_x', 'cue_y', 'one_x', 'one_y', 'two_x', 'two_y', 'speed', 'vspin', 'hspin', 'value'])

    for i in range(steps):
        balls = generate_balls(2)
        obs = balls_to_obs(balls)
        target = list(balls.keys())[1]
        cue = pt.Cue(cueing_ball=balls['cue'])
        cue.aim_for_best_pocket(balls[target], pockets.values())

        #randomly generate hit

        a = np.random.rand(3)
        a = np.array([3, 1, 0.6])*a + np.array([0,-0.5,-0.3])

        #execute shot
        cue.strike(V0=a[0], b=a[1], a=a[2])
        shot = pt.System(cue=cue, table=table, balls=balls)

        try:
            shot.simulate()
            on_table = [key for key, value in balls.items() if value.rvw[0,2] > 0]

            if (balls['2'].rvw[0,:2] != balls['2'].rvw[0,:2]).any():
                score = 0

            elif 'cue' in on_table and '1' not in on_table and '2' in on_table:
                pred = reg_net(torch.as_tensor(np.hstack((balls['cue'].rvw[0,:2],
                                                          balls['2'].rvw[0,:2])), dtype=torch.float32))
                score = pred.item()*100

            else:
                score = -100

        except:
            continue

        row = np.hstack((obs, a, [score]))
        df.loc[len(df.index)] = row

    return df

In [54]:
df = get_data(100000)

In [34]:
df.value.mean()

-8.855270114759163

In [55]:
cdf = pd.concat([df, cdf])

In [56]:
cdf

Unnamed: 0,cue_x,cue_y,one_x,one_y,two_x,two_y,speed,vspin,hspin,value
0,0.763430,1.440257,0.169685,1.462369,0.240710,0.644611,2.604395,0.167688,-0.174082,-100.000000
1,0.291909,1.510707,0.564651,1.801589,0.331328,1.121993,0.739555,0.430319,0.298032,98.997408
2,0.380535,0.230496,0.666107,0.420873,0.371221,1.475326,2.132442,-0.214466,0.286296,19.203024
3,0.263268,1.839438,0.918798,1.886039,0.597970,1.379408,0.828687,0.155783,-0.192457,66.579920
4,0.494110,0.173006,0.958246,1.710236,0.928815,0.030223,2.364499,0.206003,-0.187373,63.674426
...,...,...,...,...,...,...,...,...,...,...
399887,0.346444,1.947722,0.775000,0.456012,0.781778,1.712285,0.826913,-0.041827,-0.161269,41.300017
399888,0.376960,1.468631,0.587283,0.823795,0.653248,1.766823,2.743712,-0.010850,0.202223,27.634087
399889,0.065372,0.475757,0.484679,1.140897,0.131480,1.376145,1.081166,0.115485,-0.226151,1.667846
399890,0.198073,1.307141,0.100374,0.540258,0.851241,1.017936,0.484090,0.244350,-0.297313,50.665212


In [57]:
cdf.to_csv('two_ball_value.csv', index = False)

In [75]:
cdf = pd.read_csv('two_ball_value2.csv')

In [74]:
cdf.to_csv('two_ball_value2.csv', index = False)

In [89]:
reg_net_2 = train(cdf, estimator=reg_net_2, hidden_sizes = [32, 32, 32, 32], epochs = 10000, lr = 0.001)

epoch:   0 	 loss: 650.294
epoch: 1000 	 loss: 637.390
epoch: 2000 	 loss: 658.594
epoch: 3000 	 loss: 655.265
epoch: 4000 	 loss: 675.034
epoch: 5000 	 loss: 636.930
epoch: 6000 	 loss: 614.431
epoch: 7000 	 loss: 634.954
epoch: 8000 	 loss: 586.945
epoch: 9000 	 loss: 619.419


In [90]:
torch.save(reg_net_2, 'reg_net_2c.pt')

In [19]:
test_df = get_data(5000)

In [20]:
from sklearn.metrics import r2_score

In [91]:
pred = reg_net_2(torch.as_tensor(np.array(cdf.iloc[:,:-1]), dtype=torch.float32))
pred = pred * 100
r2_score(cdf.iloc[:,-1], pred.detach().numpy())

0.3388182369503405

In [26]:
from scipy.optimize import differential_evolution

In [27]:
#find the best stroke using reg_net_2

def get_hit(balls):
    
    def get_loss(hit_array, model, balls):
        obs = balls_to_obs(balls)
        obs = np.hstack((obs, hit_array))
        pred = model(torch.as_tensor(obs, dtype=torch.float32))
        return -pred.detach().numpy()

    args = (reg_net_2, balls)

    bounds = [(0,3), (-1,1), (-0.3,0.3)]

    result = differential_evolution(get_loss, bounds, args, maxiter=1000, tol=1e-7)
    return result.x

In [29]:
#view shot

table = pt.PocketTable(model_name="7_foot")
pockets = table.get_pockets()
    
for i in range(10):

    balls = generate_balls(2)
    hit = get_hit(balls)

    target = list(balls.keys())[1]

    cue = pt.Cue(cueing_ball=balls['cue'])
    cue.aim_for_best_pocket(balls[target], pockets.values())
    
    # show starting point
    shot = pt.System(cue=cue, table=table, balls=balls)
    interface.show(shot)

    # show actual shot
    cue.strike(V0=hit[0], b=hit[1], a=hit[2])
    shot.simulate()

    interface.show(shot)



In [28]:
interface = pt.ShotViewer()

Known pipe types:
  CocoaGraphicsPipe
(all display modules loaded.)
