In [None]:
# Example notebook to create dataframes from pbn files. Compatible with jupyter and vscode.
# Minimal documentation provided. Assumes user is familiar with github, jupyter/vscode notebook and python.

# author: Robert Salita (research@aipolice.org)
# 1. read a pbn file (local file).
# 2. create a df of deals, par, double dummy, single dummy probabilities, expected values, best contract (max expected value contract).

# install:
# 1. git clone https://github.com/BSalita/Calculate_PBN_Results
# 2. pip install -U -r requirements.txt

# requirements (specific to this project):
# pandas
# endplay
# pathlib

In [None]:

import pathlib
import pandas as pd
from collections import defaultdict

from endplay.parsers import pbn
from endplay.types import Deal, Contract, Denom, Player, Penalty, Vul
from endplay.dds import par, calc_all_tables
from endplay.dealer import generate_deals


In [None]:
# configurations
direction_order = [0,2,1,3] # NSEW order
suit_order = [3,2,1,0,4] # SHDCN order?
pbn_filename = 'DDS_Camrose24_1- BENCAM22 v WBridge5.pbn' # local filename
sd_productions = 100 # number of random deals to generate for calculating single dummy probabilities. Use smaller number for testing.

In [None]:
pd.options.display.max_columns = 0
#pd.options.display.min_rows = 20

In [None]:
# read local pbn file
pbn_file = pathlib.Path(pbn_filename)
with open(pbn_file, 'r') as f:
    boards = pbn.load(f)
len(boards), vars(boards[0])

In [None]:
# create df from boards
df = pd.DataFrame([vars(b) for b in boards])
for col in df.columns:
    print(col, df[col].dtype)
    if df[col].dtype == 'object':
        if isinstance(df[col][0], dict):
            df = pd.concat([df,pd.DataFrame.from_records(df[col])],axis='columns')
df

In [None]:
# calculate double dummy and par
deals = df['deal']
batch_size = 40
t_t = []
tables = []
b_ptr = 0
for b in range(0,len(deals),batch_size):
    batch_tables = calc_all_tables(deals[b:min(b+batch_size,len(deals))])
    tables.extend(batch_tables)
    batch_t_t = (tt._data.resTable for tt in batch_tables)
    t_t.extend(batch_t_t)
    b_ptr += b

assert len(deals) == len(t_t) == len(tables)

In [None]:
# display a few hands and double dummy tables
dd_tricks_rows = []
max_display = 4
for ii,(dd,sd,tt) in enumerate(zip(deals,t_t,tables)):
    if ii < max_display:
        print(f"Deal: {ii+1}")
        dd.pprint()
        print()
        tt.pprint()
        print(tuple(tuple(sd[suit][direction] for suit in suit_order) for direction in direction_order))
        print()

In [None]:
# create dataframe of par scores (double dummy).
pars = [par(tt, b, Player.north) for tt, b in zip(tables, df['board_num'])]  # middle arg is board (if int) otherwise enum vul.
par_scores_ns = [parlist.score for parlist in pars]
par_scores_ew = [-score for score in par_scores_ns]
par_contracts = [[str(contract.level)+'SHDCN'[int(contract.denom)]+contract.declarer.abbr+contract.penalty.abbr+' '+str(contract.result) for contract in parlist] for parlist in pars]
par_df = pd.DataFrame({'Par_NS':par_scores_ns,'Par_EW':par_scores_ew,'Par_Contracts_Result':par_contracts})
par_df

In [None]:
# create dataframe of double dummy tricks per direction and suit.
dd_tricks_rows = [[sd[suit][direction] for direction in direction_order for suit in suit_order] for sd in t_t]
dd_tricks_df = pd.DataFrame(dd_tricks_rows,columns=['_'.join(['DD_Tricks',d,s]) for d in 'NSEW' for s in 'CDHSN'])
dd_tricks_df

In [None]:
# create dataframe of double dummy scores per direction and suit.
def Tricks_To_Score(sd):
    return [Contract(level=level,denom=suit,declarer=direction,penalty=Penalty.passed if sd[suit][direction]-6-level>=0 else Penalty.doubled,result=sd[suit][direction]-6-level).score(0) for direction in direction_order for suit in suit_order for level in range(1,8)]

direction_order = [0,2,1,3] # NSEW order
suit_order = [3,2,1,0,4] # SHDCN order?
dd_score_rows = [Tricks_To_Score(sd) for sd in t_t]
dd_score_df = pd.DataFrame(dd_score_rows,columns=['_'.join(['DD_Score',str(l)+s,d]) for d in 'NSEW' for s in 'CDHSN' for l in range(1,8)])
dd_score_df


In [None]:
# functions to calculate single dummy probabilities.

# todo: obsolete these constants?
CDHS = 'CDHS' # string ordered by suit rank - low to high
CDHSN = CDHS+'N' # string ordered by strain
NSHDC = 'NSHDC' # order by highest score value. useful for idxmax(). coincidentally reverse of CDHSN.
SHDC = 'SHDC' # Hands, PBN, board_record_string (brs) ordering
NSEW = 'NSEW' # double dummy solver ordering
NESW = 'NESW' # Hands and PBN order
NWES = 'NWES' # board_record_string (brs) ordering
SHDCN = 'SHDCN' # ordering used by dds

# todo: could save a couple seconds by creating dict of deals
def calc_double_dummy_deals(deals, batch_size=40):
    t_t = []
    tables = []
    for b in range(0,len(deals),batch_size):
        batch_tables = calc_all_tables(deals[b:min(b+batch_size,len(deals))])
        tables.extend(batch_tables)
        batch_t_t = (tt._data.resTable for tt in batch_tables)
        t_t.extend(batch_t_t)
    assert len(t_t) == len(tables)
    return deals, t_t, tables
    return df

def constraints(deal):
    return True

def generate_single_dummy_deals(predeal_string, produce, env=dict(), max_attempts=1000000, seed=None, show_progress=True, strict=True, swapping=0):
    
    predeal = Deal(predeal_string)

    deals_t = generate_deals(
        constraints,
        predeal=predeal,
        swapping=swapping,
        show_progress=show_progress,
        produce=produce,
        seed=seed,
        max_attempts=max_attempts,
        env=env,
        strict=strict
        )

    deals = tuple(deals_t) # create a tuple before interop memory goes wonky
    
    return calc_double_dummy_deals(deals)

def calculate_single_dummy_probabilities(deal, produce=100):

    ns_ew_rows = {}
    for ns_ew in ['NS','EW']:
        s = deal[2:].split()
        if ns_ew == 'NS':
            s[1] = '...'
            s[3] = '...'
        else:
            s[0] = '...'
            s[2] = '...'
        predeal_string = 'N:'+' '.join(s)
        #print_to_log(f"predeal:{predeal_string}")

        d_t, t_t, tables = generate_single_dummy_deals(predeal_string, produce, show_progress=False)

        rows = []
        max_display = 4 # pprint only the first n generated deals
        direction_order = [0,2,1,3] # NSEW order
        suit_order = [3,2,1,0,4] # SHDCN order?
        for ii,(dd,sd,tt) in enumerate(zip(d_t,t_t,tables)):
            # if ii < max_display:
                # print_to_log(f"Deal:{ii+1} Fixed:{ns_ew} Generated:{ii+1}/{produce}")
                # dd.pprint()
                # print_to_log()
                # tt.pprint()
                # print_to_log()
            nswe_flat_l = [sd[suit][direction] for direction in direction_order for suit in suit_order]
            rows.append([dd.to_pbn()]+nswe_flat_l)

        dd_df = pd.DataFrame(rows,columns=['Deal']+[d+s for d in NSEW for s in CDHSN])
        for d in NSEW:
            for s in SHDCN:
                ns_ew_rows[(ns_ew,d,s)] = dd_df[d+s].value_counts(normalize=True).reindex(range(14), fill_value=0).tolist() # ['Fixed_Direction','Direction_Declarer','Suit']+['SD_Prob_Take_'+str(n) for n in range(14)]
    
    return ns_ew_rows


def append_single_dummy_results(pbns,sd_cache_d,produce=100):

    for pbn in pbns:
        if pbn not in sd_cache_d:
            sd_cache_d[pbn] = calculate_single_dummy_probabilities(pbn, produce) # all combinations of declarer pair direction, declarer direciton, suit, tricks taken
    return sd_cache_d


In [None]:
# takes 1000 seconds for 100 sd calcs, or 10 sd calcs per second.
sd_cache_d = {}
pbns = [str(pbn) for pbn in deals]
for i,pbn in enumerate(pbns):
    print(f"{i} of {len(pbns)} boards. pbn:{pbn}")
    if pbn not in sd_cache_d:
        sd_cache_d[pbn] = calculate_single_dummy_probabilities(pbn, sd_productions) # all combinations of declarer pair direction, declarer direciton, suit, tricks taken


In [None]:
# calculate single dummy trick taking probability distribution
sd_probs_d = defaultdict(list)
for pbn in pbns:
    #d['PBN'].append(pbn)
    v = sd_cache_d[pbn]
    print(pbn,v)
    for (pair_direction,declarer_direction,suit),tricks in v.items():
        for i,t in enumerate(tricks):
            sd_probs_d['_'.join(['Probs',pair_direction,declarer_direction,suit,str(i)])].append(t)
print(sd_probs_d)
sd_probs_df = pd.DataFrame(sd_probs_d)
sd_probs_df

In [None]:
# calculate dict of contract result scores
sd_scores_d = {}
for suit in suit_order:
    for level in range(1,8): # contract level
        for tricks in range(14):
            result = tricks-6-level
            sd_scores_d[(level,'SHDCN'[suit],tricks,False)] = Contract(level=level,denom=suit,declarer=0,penalty=Penalty.passed if result>=0 else Penalty.doubled,result=result).score(Vul.none)
            sd_scores_d[(level,'SHDCN'[suit],tricks,True)] = Contract(level=level,denom=suit,declarer=0,penalty=Penalty.passed if result>=0 else Penalty.doubled,result=result).score(Vul.both)
sd_scores_d

In [None]:
# create score dataframe from dict
scores_d = defaultdict(list)
for suit in 'SHDCN':
    for level in range(1,8):
        for i in range(14):
            scores_d['_'.join(['Score',str(level)+suit])].append([sd_scores_d[(level,suit,i,False)],sd_scores_d[(level,suit,i,True)]])
print(scores_d)
sd_scores_df = pd.DataFrame(scores_d)
sd_scores_df.index.name = 'Taken'
sd_scores_df

In [None]:
# create dict of expected values (probability * score)
exp_d = defaultdict(list)
pbn_vul = zip(pbns,df['_vul'])
for pbn,vul in pbn_vul:
    #print(pbn,vul)
    for (pair_direction,declarer_direction,suit),probs in sd_cache_d[pbn].items():
        is_vul = vul == 1 or (declarer_direction in 'NS' and vul == 2) or (declarer_direction in 'EW' and vul == 3)
        #print(pair_direction,declarer_direction,suit,probs,is_vul)
        for level in range(1,8):
            #print(scores_d['_'.join(['Score',str(level)+suit])][is_vul])
            exp_d['_'.join(['Exp',pair_direction,declarer_direction,suit,str(level)])].append(sum([prob*score[is_vul] for prob,score in zip(probs,scores_d['_'.join(['Score',str(level)+suit])])]))
        #print(exp_d)
#print(exp_d)
sd_exp_df = pd.DataFrame(exp_d)
sd_exp_df

In [None]:
# create columns for the column name of the max expected value, the max expected value, the contract having the max expected value.
def create_best_contracts(r):
    exp_tuples = tuple([(v,k) for k,v in r.items()])
    ex_tuples_sorted = sorted(exp_tuples,reverse=True)
    best_contract_tuple = ex_tuples_sorted[0]
    best_contract_split = best_contract_tuple[1].split('_')
    best_contract = best_contract_split[4]+best_contract_split[3]+best_contract_split[2]
    return [best_contract_tuple[1],best_contract_tuple[0],best_contract_tuple[0] if best_contract_tuple[1][-5] in ['N','S'] else -best_contract_tuple[0],best_contract]

sd_best_contract_l = sd_exp_df.apply(create_best_contracts,axis='columns')
sd_best_contract_df = pd.DataFrame(sd_best_contract_l.tolist(),columns=['Exp_Max_Col','Exp_Max','Exp_Max_NS','Best_Contract'])
sd_best_contract_df

In [None]:
merged_df = pd.concat([df,par_df,dd_tricks_df,dd_score_df,sd_best_contract_df],axis='columns')
merged_df

In [None]:
def convert_contract_to_contract(r):
    return str(r['_contract']).upper().replace('♠','S').replace('♥','H').replace('♦','D').replace('♣','C').replace('NT','N')

def convert_contract_to_declarer(r):
    declarer = pd.NA if r['Contract'] == 'PASS' else r['Contract'][2]
    assert declarer is pd.NA or declarer in 'NSEW', f"declarer:{declarer}"
    return declarer

def declarer_to_declarer_name(r):
    declarer_name = pd.NA if r['Declarer'] is pd.NA else r[r['Declarer']]
    return declarer_name

def convert_contract_to_result(r):
    result = pd.NA if r['Contract'] == 'PASS' else 0 if r['Contract'][-1] in ['=','0'] else int(r['Contract'][-1]) if r['Contract'][-2] == '+' else -int(r['Contract'][-1])
    assert result is pd.NA or result in range(-13,8), f"result:{result}"
    return result

def convert_contract_to_tricks(r):
    tricks = pd.NA if r['Contract'] == 'PASS' else int(r['Contract'][0])+6+r['Result']
    assert tricks is pd.NA or tricks in range(0,14), f"tricks:{tricks}"
    return tricks

def convert_contract_to_dd_tricks(r):
    dd_tricks = 0 if r['Contract'] == 'PASS' else dd_tricks_df['_'.join(['DD_Tricks',r['Declarer'],r['Contract'][1]])].iloc[r.name]
    assert dd_tricks in range(0,14), f"dd_tricks:{dd_tricks}"
    return dd_tricks

def convert_score_to_score(r):
    score_split = r['_score'].split()
    assert len(score_split) == 2, f"score_split:{score_split}"
    assert score_split[0] in ['NS','EW'], f"score_split:{score_split[0]}"
    assert score_split[1][0] == '-' or str.isdigit(score_split[1][0]), f"score_split:{score_split[1]}"
    score_split_direction = score_split[0]
    score_split_value = score_split[1]
    score_value = -int(score_split_value) if score_split_value[0] == '-' else int(score_split_value)
    return score_value if score_split_direction == 'NS' else -score_value

cols = ['board_num','deal','Room','_contract','Score','_vul','Par_NS','Exp_Max_Col','Exp_Max','Exp_Max_NS','Best_Contract','North','East','South','West']
augmented_df = merged_df[cols].copy()
augmented_df.rename(columns={'North':'N','East':'E','South':'S','West':'W'},inplace=True)
augmented_df['Contract'] = augmented_df.apply(convert_contract_to_contract,axis='columns').astype('string')
augmented_df['Declarer'] = augmented_df.apply(convert_contract_to_declarer,axis='columns').astype('string')
augmented_df['Declarer_Name'] = augmented_df.apply(declarer_to_declarer_name,axis='columns').astype('string')
augmented_df['Result'] = augmented_df.apply(convert_contract_to_result,axis='columns').astype('Int16')
augmented_df['Tricks'] = augmented_df.apply(convert_contract_to_tricks,axis='columns').astype('UInt8')
augmented_df['DD_Tricks'] = augmented_df.apply(convert_contract_to_dd_tricks,axis='columns').astype('UInt8')
augmented_df.rename(columns={'Score':'_score'},inplace=True)
augmented_df['Score_NS'] = augmented_df.apply(convert_score_to_score,axis='columns').astype('int16')
augmented_df['Par_Diff_NS'] = augmented_df['Score_NS']-augmented_df['Par_NS'].astype('int16')
augmented_df['DD_Tricks_Diff'] = augmented_df['Tricks']-augmented_df['DD_Tricks'].astype('int8')
augmented_df['Exp_Max_Diff_NS'] = augmented_df['Score_NS']-augmented_df['Exp_Max_NS'].astype('int16')
augmented_df.drop(columns=['_contract','_score'],inplace=True)
augmented_df

Following cells contain WIP experiments with comparative statistics; BENCAM22 vs WBridge5, Open vs Closed rooms, Tricks vs DD, par diffs, expected max diffs.

In [None]:
# describe() over Par_Diff_NS for all, bencam22, wbridge5
print('Describe North, BENCAM22, Par_Diff_NS:')
print(augmented_df[augmented_df['N'].eq('BENCAM22')]['Par_Diff_NS'].describe())
print('Describe North, WBridge5, Par_Diff_NS:')
print(augmented_df[augmented_df['N'].eq('WBridge5')]['Par_Diff_NS'].describe())

# sum over Par_Diff_NS for all, bencam22, wbridge5
all, bencam22, wbridge5 = augmented_df['Par_Diff_NS'].sum(),augmented_df[augmented_df['N'].eq('BENCAM22')]['Par_Diff_NS'].sum(),augmented_df[augmented_df['N'].eq('WBridge5')]['Par_Diff_NS'].sum()
print(f"Sum of Par_Diff_NS: All:{all} BENCAM22:{bencam22} WBridge5:{wbridge5} BENCAM22-WBridge5:{bencam22-wbridge5}")

# frequency where par was exceeded for all, bencam22, wbridge5
all, bencam22, wbridge5 = sum(augmented_df['Par_Diff_NS'].gt(0)),sum(augmented_df['N'].eq('BENCAM22')&augmented_df['Par_Diff_NS'].gt(0)),sum(augmented_df['N'].eq('WBridge5')&augmented_df['Par_Diff_NS'].gt(0))
print(f"Frequency where exceeding Par: All:{all} BENCAM22:{bencam22} WBridge5:{wbridge5} BENCAM22-WBridge5:{bencam22-wbridge5}")

In [None]:
# describe() over DD_Tricks_Diff for all, bencam22, wbridge5
print('Describe Declarer, BENCAM22, DD_Tricks_Diff:')
print(augmented_df[augmented_df['Declarer_Name'].eq('BENCAM22')]['DD_Tricks_Diff'].describe())
print('Describe Declarer, WBridge5, DD_Tricks_Diff:')
print(augmented_df[augmented_df['Declarer_Name'].eq('WBridge5')]['DD_Tricks_Diff'].describe())

# sum over DD_Tricks_Diff for all, bencam22, wbridge5
all, bencam22, wbridge5 = augmented_df['DD_Tricks_Diff'].sum(),augmented_df[augmented_df['Declarer_Name'].eq('BENCAM22')]['DD_Tricks_Diff'].sum(),augmented_df[augmented_df['Declarer_Name'].eq('WBridge5')]['DD_Tricks_Diff'].sum()
print(f"Sum of DD_Tricks_Diff: All:{all} BENCAM22:{bencam22} WBridge5:{wbridge5} BENCAM22-WBridge5:{bencam22-wbridge5}")

# frequency where Tricks > DD for all, bencam22, wbridge5
all, bencam22, wbridge5 = sum(augmented_df['DD_Tricks_Diff'].notna() & augmented_df['DD_Tricks_Diff'].gt(0)),sum(augmented_df[augmented_df['Declarer_Name'].eq('BENCAM22')]['DD_Tricks_Diff'].gt(0)),sum(augmented_df[augmented_df['Declarer_Name'].eq('WBridge5')]['DD_Tricks_Diff'].gt(0))
print(f"Frequency where Tricks > DD: All:{all} BENCAM22:{bencam22} WBridge5:{wbridge5} BENCAM22-WBridge5:{bencam22-wbridge5}")

# frequency where Tricks < DD for all, bencam22, wbridge5
all, bencam22, wbridge5 = sum(augmented_df['DD_Tricks_Diff'].notna() & augmented_df['DD_Tricks_Diff'].lt(0)),sum(augmented_df[augmented_df['Declarer_Name'].eq('BENCAM22')]['DD_Tricks_Diff'].lt(0)),sum(augmented_df[augmented_df['Declarer_Name'].eq('WBridge5')]['DD_Tricks_Diff'].lt(0))
print(f"Frequency where Tricks < DD: All:{all} BENCAM22:{bencam22} WBridge5:{wbridge5} BENCAM22-WBridge5:{bencam22-wbridge5}")

In [None]:
# describe() over Par_Diff_NS for all, open, closed
print(augmented_df['Par_Diff_NS'].describe(),augmented_df[augmented_df['Room'].eq('Open')]['Par_Diff_NS'].describe(),augmented_df[augmented_df['Room'].eq('Closed')]['Par_Diff_NS'].describe())
# sum over Par_Diff_NS for all, bencam22, wbridge5
all, bencam22, wbridge5 = augmented_df['Par_Diff_NS'].sum(),augmented_df[augmented_df['Room'].eq('Open')]['Par_Diff_NS'].sum(),augmented_df[augmented_df['Room'].eq('Closed')]['Par_Diff_NS'].sum()
print(f"Sum of Par_Diff_NS: All:{all} BENCAM22:{bencam22} WBridge5:{wbridge5} BENCAM22-WBridge5:{bencam22-wbridge5}")
all, open, closed = sum(augmented_df['Par_Diff_NS'].gt(0)),sum(augmented_df['Room'].eq('Open')&augmented_df['Par_Diff_NS'].gt(0)),sum(augmented_df['Room'].eq('Closed')&augmented_df['Par_Diff_NS'].gt(0))
print(f"Frequency where exceeding Par: All:{all} Open:{open} Closed:{closed} Open-Closed:{open-closed}")

In [None]:
# describe() over Exp_Max_Diff_NS for all, open, closed
print(augmented_df['Exp_Max_Diff_NS'].describe(),augmented_df[augmented_df['Room'].eq('Open')]['Exp_Max_Diff_NS'].describe(),augmented_df[augmented_df['Room'].eq('Closed')]['Exp_Max_Diff_NS'].describe())
# sum over Exp_Max_Diff_NS for all, bencam22, wbridge5
all, bencam22, wbridge5 = augmented_df['Exp_Max_Diff_NS'].sum(),augmented_df[augmented_df['Room'].eq('Open')]['Exp_Max_Diff_NS'].sum(),augmented_df[augmented_df['Room'].eq('Closed')]['Exp_Max_Diff_NS'].sum()
print(f"Sum of Exp_Max_Diff_NS: All:{all} BENCAM22:{bencam22} WBridge5:{wbridge5} BENCAM22-WBridge5:{bencam22-wbridge5}")
all, open, closed = sum(augmented_df['Exp_Max_Diff_NS'].gt(0)),sum(augmented_df['Room'].eq('Open')&augmented_df['Exp_Max_Diff_NS'].gt(0)),sum(augmented_df['Room'].eq('Closed')&augmented_df['Exp_Max_Diff_NS'].gt(0))
print(f"Frequency where exceeding Exp_Max_Diff_NS: All:{all} Open:{open} Closed:{closed} Open-Closed:{open-closed}")