In [None]:
#!pip install pyarrow

In [1]:
# Importazione librerie
import findspark
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark import StorageLevel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator



In [2]:
# Creation of ChessGame class, used to get game state metrics from a sequence of moves.
class ChessGame:

    """Classe ChessGame che, presa in input una sequenza di mosse in notazione statistica tradizionale, restituisce una serie di 
    features in merito allo stato del gioco sottoforma di dizionario Python."""

    def __init__(self, moves):
        self.board = self.create_board()
        self.current_player = 'w'  # Inizia con giocatore bianco
        self.play_moves(moves)

    def create_board(self):

        """Inizializza la scacchiera."""

        chess_board = pd.DataFrame(None, index=range(1, 9), columns=range(1, 9))
        chess_board.loc[8] = [('b', 'R'), ('b', 'N'), ('b', 'B'), ('b', 'Q'), ('b', 'K'), ('b', 'B'), ('b', 'N'),('b', 'R')]
        chess_board.loc[7] = [('b', 'P')] * 8
        chess_board.loc[2] = [('w', 'P')] * 8
        chess_board.loc[1] = [('w', 'R'), ('w', 'N'), ('w', 'B'), ('w', 'Q'), ('w', 'K'), ('w', 'B'), ('w', 'N'),('w', 'R')]
        return chess_board

    def play_moves(self, moves):
        """Effettua la sequenza di mosse fornita in input, e tiene traccia delle catture."""
        self.captured_pieces = {"white":[],"black":[]} #log dei pezzi catturati
        self.has_checked = 0
        for move in moves: 
            if "+" in move:
                if self.current_player == "w":
                    self.has_checked += 1
                else:
                    self.has_checked -= 1
            #pulizia dal segno di check
            move = move.strip("+")
            self.make_move(move)
            self.current_player = 'w' if self.current_player == 'b' else 'b'  # Cambio giocatore

    def make_move(self, move):
        """Effettua una singola mossa."""
        if move == "O-O":
            self.perform_castling(short=True)
            return
        elif move == "O-O-O":
            self.perform_castling(short=False)
            return
        else:
            start_pos = self.find_start_position(move)
            if start_pos == None:
                #interruzione del gioco se non il metodo non è riuscito a trovare la starting position. 
                return self.eval_status()
            end_pos = self.convert_position(move[-2:])
            piece = self.board.loc[start_pos]
            self.board.loc[start_pos] = np.NaN
            if self.board.loc[end_pos] is not np.NaN:
                captured_piece = self.board.loc[end_pos]
                self.captured_pieces["white" if self.current_player == "w" else "black"].append(captured_piece) #aggiorna log catture
            self.board.loc[end_pos] = piece

    def find_start_position(self, move):
        end_pos = self.convert_position(move[-2:])
        piece_name = move[0] if move[0] in ['N', 'B', 'Q', 'K', 'R'] else "P"
        piece_color = self.current_player

        if piece_name == "P" and 'x' in move:
            start_file = self.file_to_num(move[0])
            valid_pawn_captures = []
            # Trova i pedoni alla colonna di partenza, indicata nel caso in cui il pedone effettua una cattura.
            for index, row in self.board.iterrows():
                for file, piece in enumerate(row):
                    if piece == (piece_color, piece_name):
                        if file + 1 != start_file:  # Skip pawns not on the specified starting file
                            continue
                        valid_moves = self.get_valid_moves((index, file + 1))
                        if end_pos in valid_moves:
                            valid_pawn_captures.append((index, file + 1))

            # Choose the pawn capture with the same starting rank as the given move
            for start_pos in valid_pawn_captures:
                if start_pos[1] == self.file_to_num(move[0]):
                    return start_pos

        # Estrae l'informazione della casella di partenza se questa è presente nella mossa
        start_file = None
        if len(move) >= 4 and move[1] != "x":
            start_file = self.file_to_num(move[1])

        # Se la mossa non ricade nei casi precedenti, si itera su tutti i pezzi per vedere quali possono aver effettuato la mossa.
        for index, row in self.board.iterrows():
            for file, piece in enumerate(row):
                if piece == (piece_color, piece_name):
                    if start_file is not None and file + 1 != start_file:
                        continue
                    valid_moves = self.get_valid_moves((index, file + 1))
                    if end_pos in valid_moves:
                        return index, file + 1
                    else:
                        continue
        return None

    def file_to_num(self, file):
        """Converte la lettera indicante la colonna della scacchiera in un numero da 1 a 8."""
        return ord(file) - ord('a') + 1

    def convert_position(self, pos):
        """Converte in coordinate numeriche le coordinate in notazione scacchistica tradizionale"""
        file = ord(pos[0]) - ord('a') + 1
        rank = int(pos[1])
        return rank, file

    def perform_castling(self, short=True):

        """Effettua l'arrocco."""

        rank = 1 if self.current_player == 'w' else 8
        king_start_file = 5
        rook_start_file = 8 if short else 1
        king_end_file = 7 if short else 3

        king_start_pos = (rank, king_start_file)
        king_end_pos = (rank, king_end_file)
        rook_start_pos = (rank, rook_start_file)
        rook_end_pos = (rank, king_end_file - 1)

        king = self.board.loc[king_start_pos]
        rook = self.board.loc[rook_start_pos]

        self.board.loc[king_start_pos] = np.NaN
        self.board.loc[king_end_pos] = king
        self.board.loc[rook_start_pos] = np.NaN
        self.board.loc[rook_end_pos] = rook

    def get_valid_moves(self, position):
        """Restituisce le possibile mosse valide per ciascun tipo di pezzo, chiamando per ciascuno il metodo specifico."""
        piece = self.board.loc[position]
        if pd.isna(piece):
            return []

        color, name = piece

        if name == 'P':
            return self.get_pawn_moves(position)
        elif name == 'R':
            return self.get_rook_moves(position)
        elif name == 'N':
            return self.get_knight_moves(position)
        elif name == 'B':
            return self.get_bishop_moves(position)
        elif name == 'Q':
            return self.get_queen_moves(position)
        elif name == 'K':
            return self.get_king_moves(position)


    def get_rook_moves(self, position):
        moves = []
        rank, file = position

        # Mosse orizzontali
        for f in range(file + 1, 9):
            if pd.isna(self.board.loc[(rank, f)]):
                moves.append((rank, f))
            else:
                piece = self.board.loc[(rank, f)]
                if piece[0] != self.board.loc[position][0]:
                    moves.append((rank, f))
                break

        for f in range(file - 1, 0, -1):
            if pd.isna(self.board.loc[(rank, f)]):
                moves.append((rank, f))
            else:
                piece = self.board.loc[(rank, f)]
                if piece[0] != self.board.loc[position][0]:
                    moves.append((rank, f))
                break

        # mosse verticali 
        for r in range(rank + 1, 9):
            if pd.isna(self.board.loc[(r, file)]):
                moves.append((r, file))
            else:
                piece = self.board.loc[(r, file)]
                if piece[0] != self.board.loc[position][0]:
                    moves.append((r, file))
                break

        for r in range(rank - 1, 0, -1):
            if pd.isna(self.board.loc[(r, file)]):
                moves.append((r, file))
            else:
                piece = self.board.loc[(r, file)]
                if piece[0] != self.board.loc[position][0]:
                    moves.append((r, file))
                break

        return moves

    def get_pawn_moves(self, position):
        moves = []
        rank, file = position
        piece = self.board.loc[position]
        color = piece[0]  # Colore

        # Pedina bianca 
        if color == 'w':
            # Singolo passo avanti 
            if rank < 8 and pd.isna(self.board.loc[(rank + 1, file)]):
                moves.append((rank + 1, file))

            # Doppio passo avanti se la pedina parte dalla riga di partenza
            if rank == 2 and pd.isna(self.board.loc[(rank + 2, file)]):
                moves.append((rank + 2, file))

            # Cattura diagonale a sinistra
            if rank < 8 and file > 1:
                if not pd.isna(self.board.loc[(rank + 1, file - 1)]) and self.board.loc[(rank + 1, file - 1)][0] == 'b':
                    moves.append((rank + 1, file - 1))

            # Cattura diagonale a destra
            if rank < 8 and file < 8:
                if not pd.isna(self.board.loc[(rank + 1, file + 1)]) and self.board.loc[(rank + 1, file + 1)][0] == 'b':
                    moves.append((rank + 1, file + 1))

        # Pedina nera
        elif color == 'b':
            # Singolo passo avanti
            if rank > 1 and pd.isna(self.board.loc[(rank - 1, file)]):
                moves.append((rank - 1, file))

            # Doppio passo avanti se parte dalla casella di partenza
            if rank == 7 and pd.isna(self.board.loc[(rank - 2, file)]):
                moves.append((rank - 2, file))

            # Cattura diagonale a sinistra
            if rank > 1 and file > 1:
                if not pd.isna(self.board.loc[(rank - 1, file - 1)]) and self.board.loc[(rank - 1, file - 1)][0] == 'w':
                    moves.append((rank - 1, file - 1))

            # Cattura diagonale a destra
            if rank > 1 and file < 8:
                if not pd.isna(self.board.loc[(rank - 1, file + 1)]) and self.board.loc[(rank - 1, file + 1)][0] == 'w':
                    moves.append((rank - 1, file + 1))

        return moves

    def get_knight_moves(self, position):
        moves = []
        rank, file = position
        knight_moves = [(-2, -1), (-2, 1), (-1, -2), (-1, 2),
                        (1, -2), (1, 2), (2, -1), (2, 1)]

        for move in knight_moves:
            new_rank = rank + move[0]
            new_file = file + move[1]

            if 1 <= new_rank <= 8 and 1 <= new_file <= 8:
                if pd.isna(self.board.loc[(new_rank, new_file)]):
                    moves.append((new_rank, new_file))
                else:
                    piece = self.board.loc[(new_rank, new_file)]
                    if piece[0] != self.board.loc[position][0]:
                        moves.append((new_rank, new_file))

        return moves

    def get_bishop_moves(self, position):
        moves = []
        rank, file = position

        # Mosse diagonali in alto a sinistra
        r, f = rank + 1, file - 1
        while r <= 8 and f >= 1:
            if pd.isna(self.board.loc[(r, f)]):
                moves.append((r, f))
            else:
                piece = self.board.loc[(r, f)]
                if piece[0] != self.board.loc[position][0]:
                    moves.append((r, f))
                break
            r += 1
            f -= 1

        # Mosse diagonali in alto a destra
        r, f = rank + 1, file + 1
        while r <= 8 and f <= 8:
            if pd.isna(self.board.loc[(r, f)]):
                moves.append((r, f))
            else:
                piece = self.board.loc[(r, f)]
                if piece[0] != self.board.loc[position][0]:
                    moves.append((r, f))
                break
            r += 1
            f += 1

        # Mosse diagonali in alto a sinistra
        r, f = rank - 1, file - 1
        while r >= 1 and f >= 1:
            if pd.isna(self.board.loc[(r, f)]):
                moves.append((r, f))
            else:
                piece = self.board.loc[(r, f)]
                if piece[0] != self.board.loc[position][0]:
                    moves.append((r, f))
                break
            r -= 1
            f -= 1

        # Mosse diagonali in basso a destra
        r, f = rank - 1, file + 1
        while r >= 1 and f <= 8:
            if pd.isna(self.board.loc[(r, f)]):
                moves.append((r, f))
            else:
                piece = self.board.loc[(r, f)]
                if piece[0] != self.board.loc[position][0]:
                    moves.append((r, f))
                break
            r -= 1
            f += 1

        return moves

    def get_queen_moves(self, position):
        """Combinazione delle mosse di torre e alfiere. Restituisce le possibile mosse della regina."""
        moves = []
        moves.extend(self.get_rook_moves(position))
        moves.extend(self.get_bishop_moves(position))
        return moves

    def get_king_moves(self, position):
        moves = []
        rank, file = position

        king_moves = [(-1, -1), (-1, 0), (-1, 1),
                      (0, -1), (0, 1),
                      (1, -1), (1, 0), (1, 1)]

        for move in king_moves:
            new_rank = rank + move[0]
            new_file = file + move[1]

            if 1 <= new_rank <= 8 and 1 <= new_file <= 8:
                if pd.isna(self.board.loc[(new_rank, new_file)]): #check if potential destination square is empty
                    moves.append((new_rank, new_file))
                else:
                    piece = self.board.loc[(new_rank, new_file)]
                    if piece[0] != self.board.loc[position][0]: #check if potential destination square is occupied by enemy piece
                        moves.append((new_rank, new_file))

        return moves

    def eval_status(self):
        """Restituisce lo stato della partita all'ultima mossa fornita in input, sottoforma di dizionario python"""
        state = {}

        # Number of Passed Pawns
        white_passed_pawns = 0
        black_passed_pawns = 0

        for file in range(1, 9):
            white_pawn_found = False
            black_pawn_found = False

            for rank in range(1, 9):
                piece = self.board.loc[(rank, file)]

                if piece == ('w', 'P'):
                    white_pawn_found = True
                elif piece == ('b', 'P'):
                    black_pawn_found = True

                if white_pawn_found and black_pawn_found:
                    break

            if white_pawn_found and not black_pawn_found:
                white_passed_pawns += 1
            elif black_pawn_found and not white_pawn_found:
                black_passed_pawns += 1

        state['Passed Pawns'] = {
            'white': white_passed_pawns,
            'black': black_passed_pawns
        }

        # Piece Advantage
        """Assegna un valore a ciascun pezzo mediante l'apposito metodo, 'get_piece_value'. Per ciascun pezzo catturato, se la cattura è stata effettuata dal bianco
        incrementa il valore, viceversa se la cattura è effettuata dal nero. Restituisce dunque un unico valore che indica, mediante il segno, quale
        giocatore è in vantaggio; mediante il valore assoluto l'entità del vantaggio"""
        piece_advantage = 0
        for rank in range(1, 9):
            for file in range(1, 9):
                piece = self.board.loc[(rank, file)]
                if piece is not np.NaN :
                    color, name = piece[0], piece[1]
                    if color == 'w':
                        piece_advantage += self.get_piece_value(name)
                    else:
                        piece_advantage -= self.get_piece_value(name)

        state['Piece Advantage'] = piece_advantage

        #Captured pieces
        """Restituisce il log dei pezzi catturati da ciascun giocatore."""
        state["Captured Pieces_val"] = {"white": 0,"black":0} #total value captured
        for piece in self.captured_pieces["white"]:
            piece_type = piece[1]
            value = self.get_piece_value(piece_type)
            state["Captured Pieces_val"]["white"] += value
        for piece in self.captured_pieces["black"]:
            piece_type = piece[1]
            value = self.get_piece_value(piece_type)
            state["Captured Pieces_val"]["black"] += value

        #Files controlled by rooks
        state["Files_controlled_by_Rooks"] = self.get_files_controlled_by_rooks()
        #CheckCount
        state["check_count"] = self.has_checked

        return state

    def get_piece_value(self, piece_name):
        piece_values = {
            'P': 1,
            'N': 3,
            'B': 3,
            'R': 5,
            'Q': 9,
            'K': 0
        }
        return piece_values.get(piece_name, 0)

    def get_files_controlled_by_rooks(self):
        files_controlled = {"white": 0, "black": 0}

        for file in range(1, 9):
            pieces = self.board[file].dropna()
            if len(pieces) == 1 and pieces.iloc[0][1] == 'R':
                color = pieces.iloc[0][0]
                files_controlled["white" if color == "w" else "black"] += 1

        return files_controlled

    def print_board(self):
        print(self.board)


In [3]:
# Avvio della SparkSession

findspark.init()
conf = SparkConf()
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
conf.set("spark.executor.heartbeatInterval", "50000s")
conf.set("spark.network.timeout","1000000000s")
spark = SparkSession.builder.config(conf=conf).master("local[4]").appName("Chess Project").getOrCreate()

In [4]:
spark

## Importazione del dataset

In [5]:
n_rows = 50000

In [6]:
# Caricamento del file
file_path = r"all_with_filtered_anotations_since1998.txt"
df = spark.read.csv(file_path, sep = " ", comment = "#" , nullValue= "None", mode = "DROPMALFORMED").limit(n_rows)

In [7]:
df.show(2,vertical = True)

-RECORD 0-------------
 _c0  | 1             
 _c1  | 2000.03.14    
 _c2  | 1-0           
 _c3  | 2851          
 _c4  | null          
 _c5  | 67            
 _c6  | date_false    
 _c7  | result_false  
 _c8  | welo_false    
 _c9  | belo_true     
 _c10 | edate_true    
 _c11 | setup_false   
 _c12 | fen_false     
 _c13 | result2_false 
 _c14 | oyrange_false 
 _c15 | blen_false    
 _c16 | ###           
 _c17 | W1.d4         
 _c18 | B1.d5         
 _c19 | W2.c4         
 _c20 | B2.e6         
 _c21 | W3.Nc3        
 _c22 | B3.Nf6        
 _c23 | W4.cxd5       
 _c24 | B4.exd5       
 _c25 | W5.Bg5        
 _c26 | B5.Be7        
 _c27 | W6.e3         
 _c28 | B6.Ne4        
 _c29 | W7.Bxe7       
 _c30 | B7.Nxc3       
 _c31 | W8.Bxd8       
 _c32 | B8.Nxd1       
 _c33 | W9.Bxc7       
 _c34 | B9.Nxb2       
 _c35 | W10.Rb1       
 _c36 | B10.Nc4       
 _c37 | W11.Bxc4      
 _c38 | B11.dxc4      
 _c39 | W12.Ne2       
 _c40 | B12.O-O       
 _c41 | W13.Nc3       
 _c42 | B13

## Data Cleaning

In [8]:
#limit the number of moves
non_moves_columns = 17
n_moves = 40
tot_columns = non_moves_columns + n_moves
n_cols = len(df.columns)
columns_to_drop = ["_c"+str(i) for i in range(tot_columns,n_cols)]
df_limited = df.drop(*columns_to_drop)


In [9]:
#df_limited.show(10, vertical = True)

In [9]:
#rimozione delle righe in cui non è il numero minimo di mosse
df_filtered = df_limited.dropna(how = "any").cache()
df_filtered.show(10,vertical = True)

-RECORD 0-------------
 _c0  | 5             
 _c1  | 2000.02.20    
 _c2  | 1/2-1/2       
 _c3  | 2851          
 _c4  | 2633          
 _c5  | 97            
 _c6  | date_false    
 _c7  | result_false  
 _c8  | welo_false    
 _c9  | belo_false    
 _c10 | edate_false   
 _c11 | setup_false   
 _c12 | fen_false     
 _c13 | result2_false 
 _c14 | oyrange_false 
 _c15 | blen_false    
 _c16 | ###           
 _c17 | W1.e4         
 _c18 | B1.e5         
 _c19 | W2.Nf3        
 _c20 | B2.Nc6        
 _c21 | W3.Bb5        
 _c22 | B3.a6         
 _c23 | W4.Ba4        
 _c24 | B4.Nf6        
 _c25 | W5.O-O        
 _c26 | B5.Be7        
 _c27 | W6.Re1        
 _c28 | B6.b5         
 _c29 | W7.Bb3        
 _c30 | B7.d6         
 _c31 | W8.c3         
 _c32 | B8.O-O        
 _c33 | W9.h3         
 _c34 | B9.Na5        
 _c35 | W10.Bc2       
 _c36 | B10.c5        
 _c37 | W11.d4        
 _c38 | B11.Qc7       
 _c39 | W12.Nbd2      
 _c40 | B12.Bd7       
 _c41 | W13.Nf1       
 _c42 | B13

In [11]:
#df_filtered.count()

In [10]:
#recupero del nome delle colonne(dal file)
columns = "1.t 2.date 3.result 4.welo 5.belo 6.len 7.date_c 8.resu_c 9.welo_c 10.belo_c 11.edate_c 12.setup 13.fen 14.resu2_c 15.oyrange 16.bad_len 17.game"
columns = columns.split(" ")
column_names = [col.split(".")[1] for col in columns]

In [11]:
#Adjust column names
columns = [str(i) for i in column_names[:] + list(range(1, n_moves + 1))]

In [12]:
#Add new names
df_filtered = df_filtered.toDF(*columns)

In [15]:
#df_filtered.show(2,vertical = True)

In [13]:
#Eliminazione colonne superflue
df_filtered = df_filtered.drop(
    *["edate_c",
    "game",
    "fen"
     ]
    )

In [14]:
# Eliminazione delle righe contenenti dati corrotti
df_filtered = df_filtered.filter(
col(r"`resu_c`").contains("result_false")
).filter(
col(r"`welo_c`").contains("welo_false")
).filter(
col(r"`belo_c`").contains("belo_false")
).filter(
col(r"`resu2_c`").contains("result2_false")
)

Eliminazione delle colonne di check che sono state appena filtrate, in quanto contenenti tutte valori uguali, allegerendo ulteriormente il dataset.

In [15]:
df_filtered = df_filtered.drop(
    *[
        "date_c", 
        "resu_c",
        "welo_c",
        "belo_c",
        "setup", 
        "resu2_c",
        "oyrange",
        "bad_len" 
     ]
    )

In [16]:
df_filtered.show(2, vertical = True)

-RECORD 0------------
 t      | 5          
 date   | 2000.02.20 
 result | 1/2-1/2    
 welo   | 2851       
 belo   | 2633       
 len    | 97         
 1      | W1.e4      
 2      | B1.e5      
 3      | W2.Nf3     
 4      | B2.Nc6     
 5      | W3.Bb5     
 6      | B3.a6      
 7      | W4.Ba4     
 8      | B4.Nf6     
 9      | W5.O-O     
 10     | B5.Be7     
 11     | W6.Re1     
 12     | B6.b5      
 13     | W7.Bb3     
 14     | B7.d6      
 15     | W8.c3      
 16     | B8.O-O     
 17     | W9.h3      
 18     | B9.Na5     
 19     | W10.Bc2    
 20     | B10.c5     
 21     | W11.d4     
 22     | B11.Qc7    
 23     | W12.Nbd2   
 24     | B12.Bd7    
 25     | W13.Nf1    
 26     | B13.cxd4   
 27     | W14.cxd4   
 28     | B14.Rac8   
 29     | W15.Ne3    
 30     | B15.Nc6    
 31     | W16.d5     
 32     | B16.Nb4    
 33     | W17.Bb1    
 34     | B17.a5     
 35     | W18.a3     
 36     | B18.Na6    
 37     | W19.b4     
 38     | B19.Ra8    
 39     | 

### Data Casting
Conversione di ciascuna colonna nel DataType adeguato

In [17]:
df_filtered.schema

StructType([StructField('t', StringType(), True), StructField('date', StringType(), True), StructField('result', StringType(), True), StructField('welo', StringType(), True), StructField('belo', StringType(), True), StructField('len', StringType(), True), StructField('1', StringType(), True), StructField('2', StringType(), True), StructField('3', StringType(), True), StructField('4', StringType(), True), StructField('5', StringType(), True), StructField('6', StringType(), True), StructField('7', StringType(), True), StructField('8', StringType(), True), StructField('9', StringType(), True), StructField('10', StringType(), True), StructField('11', StringType(), True), StructField('12', StringType(), True), StructField('13', StringType(), True), StructField('14', StringType(), True), StructField('15', StringType(), True), StructField('16', StringType(), True), StructField('17', StringType(), True), StructField('18', StringType(), True), StructField('19', StringType(), True), StructField(

In [18]:
integers = ['T','Welo','Belo','Len']

for c in integers:
    df_filtered = df_filtered.withColumn(c,df_filtered[c].cast(IntegerType()))

In [19]:
df_filtered

DataFrame[T: int, date: string, result: string, Welo: int, Belo: int, Len: int, 1: string, 2: string, 3: string, 4: string, 5: string, 6: string, 7: string, 8: string, 9: string, 10: string, 11: string, 12: string, 13: string, 14: string, 15: string, 16: string, 17: string, 18: string, 19: string, 20: string, 21: string, 22: string, 23: string, 24: string, 25: string, 26: string, 27: string, 28: string, 29: string, 30: string, 31: string, 32: string, 33: string, 34: string, 35: string, 36: string, 37: string, 38: string, 39: string, 40: string]

In [20]:
# modifica della target variable "Result" da {1-0,1/2-1/2,0-1 a 1,0,-1}.
df_filtered= df_filtered.withColumn("Result", 
              when(col("Result")== "1-0", 1)
                .when(col("Result")== "1/2-1/2", 0)
                .when(col("Result") == "0-1", -1)
             )

In [21]:
df_filtered = df_filtered.drop("Date")

In [22]:
df_filtered = df_filtered.drop("T")

In [23]:
df_filtered.show(2)

+------+----+----+---+-----+-----+------+------+------+-----+------+-------+------+------+------+-----+-------+-------+-------+------+------+------+-------+--------+--------+-------+--------+--------+---------+--------+--------+--------+-------+-------+------+-------+-------+--------+--------+-------+-------+-------+--------+--------+
|Result|Welo|Belo|Len|    1|    2|     3|     4|     5|    6|     7|      8|     9|    10|    11|   12|     13|     14|     15|    16|    17|    18|     19|      20|      21|     22|      23|      24|       25|      26|      27|      28|     29|     30|    31|     32|     33|      34|      35|     36|     37|     38|      39|      40|
+------+----+----+---+-----+-----+------+------+------+-----+------+-------+------+------+------+-----+-------+-------+-------+------+------+------+-------+--------+--------+-------+--------+--------+---------+--------+--------+--------+-------+-------+------+-------+-------+--------+--------+-------+-------+-------+--------

### Creazione di feature aggiuntive

Utilizzo della classe "ChessGame" per ottenere feature relativi allo stato della partita alla mossa 40.

In [24]:
inputcols = [str(i) for i in range(1,n_moves + 1 )]
outputcols = df_filtered.columns[:4] + df_filtered.columns[ 4 + n_moves:] 
outputcols.append("Moves")
print(df_filtered.columns)
print(inputcols)
print(outputcols)

['Result', 'Welo', 'Belo', 'Len', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40']
['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40']
['Result', 'Welo', 'Belo', 'Len', 'Moves']


In [25]:
# Creazione del vettore contenente le mosse.
df_filtered = df_filtered.withColumn("Moves", array(inputcols)).select(*outputcols)

In [26]:
#Creazione della colonna Elo_Difference
df_filtered = df_filtered.withColumn("Elo_Difference", df_filtered["Welo"] - df_filtered["Belo"])

In [27]:
# Pulizia delle mosse bianche e nere, per tenere solo l'indicazione della mossa effettuata
df_filtered =  df_filtered.withColumn("Moves", transform("Moves", lambda x:substring_index(x,".", -1)))

In [28]:
df_filtered.show()

+------+----+----+---+--------------------+--------------+
|Result|Welo|Belo|Len|               Moves|Elo_Difference|
+------+----+----+---+--------------------+--------------+
|     0|2851|2633| 97|[e4, e5, Nf3, Nc6...|           218|
|     0|2851|2748| 52|[d4, e6, Nf3, Nf6...|           103|
|     1|2851|2191| 79|[e4, c5, Nf3, Nc6...|           660|
|     0|2851|2175| 72|[c4, e6, g3, d5, ...|           676|
|     1|2851|2646| 49|[e4, c5, Nf3, d6,...|           205|
|     0|2851|2725| 68|[e4, c5, Nf3, Nc6...|           126|
|     1|2851|2555|147|[d4, d5, c4, e6, ...|           296|
|     0|2851|2748| 75|[d4, e6, c4, b6, ...|           103|
|    -1|2851|2725| 94|[d4, Nf6, Nf3, g6...|           126|
|     1|2851|2658| 87|[e4, c5, Nf3, d6,...|           193|
|    -1|2851|2650|117|[e4, e5, Nf3, Nc6...|           201|
|     1|2851|2769| 73|[e4, e6, d4, d5, ...|            82|
|     0|2851|2332| 90|[c4, g6, g3, Bg7,...|           519|
|     0|2851|2656|110|[e4, e6, d4, d5, ...|           19

In [32]:
#first checkpoint
pandas_df = df_filtered.toPandas()

In [33]:
pd.set_option('display.max_colwidth', None)
pandas_df

Unnamed: 0,Result,Welo,Belo,Len,Moves,Elo_Difference
0,0,2851,2633,97,"[e4, e5, Nf3, Nc6, Bb5, a6, Ba4, Nf6, O-O, Be7, Re1, b5, Bb3, d6, c3, O-O, h3, Na5, Bc2, c5, d4, Qc7, Nbd2, Bd7, Nf1, cxd4, cxd4, Rac8, Ne3, Nc6, d5, Nb4, Bb1, a5, a3, Na6, b4, Ra8, Bd2, Rfc8]",218
1,0,2851,2748,52,"[d4, e6, Nf3, Nf6, c4, d5, Nc3, dxc4, e4, Bb4, Bg5, c5, Bxc4, cxd4, Nxd4, Qa5, Bd2, O-O, Nc2, Bxc3, Bxc3, Qg5, Qe2, Qxg2, O-O-O, Qxe4, Rhg1, g6, Ne3, e5, f4, Be6, Bd3, Qxf4, Rgf1, Qh4, Be1, Qa4, Rxf6, Nc6]",103
2,1,2851,2191,79,"[e4, c5, Nf3, Nc6, Bb5, g6, Bxc6, dxc6, d3, Bg7, h3, Nf6, Nc3, O-O, Be3, Qa5, Qd2, Rd8, O-O, Bd7, Bh6, Qc7, Bxg7, Kxg7, Qe3, b6, Nh2, Rf8, f4, Rad8, Rae1, Bc8, f5, e5, Rf2, Qd6, Ref1, h6, b3, Qd4]",660
3,0,2851,2175,72,"[c4, e6, g3, d5, Bg2, Nf6, Nf3, Be7, b3, O-O, O-O, c5, Bb2, Nc6, e3, b6, Nc3, Bb7, cxd5, Nxd5, Nxd5, Qxd5, d4, Rad8, Ne5, Qd6, dxc5, Qxc5, Qe2, Nxe5, Bxb7, Qc7, Bg2, Bc5, Rfd1, a5, Rxd8, Rxd8, Rd1, Rxd1+]",676
4,1,2851,2646,49,"[e4, c5, Nf3, d6, d4, cxd4, Nxd4, Nf6, Nc3, a6, Be3, e6, f3, b5, g4, h6, Qd2, Nbd7, O-O-O, Bb7, h4, b4, Na4, d5, Bh3, g5, Bg2, gxh4, Rxh4, dxe4, g5, Nd5, Rxe4, hxg5, Bxg5, Qa5, f4, Rh2, Nxe6, fxe6]",205
...,...,...,...,...,...,...
43930,1,2614,2459,62,"[d4, d5, Nf3, Nf6, c4, dxc4, e3, Bg4, Bxc4, e6, h3, Bh5, g4, Bg6, Ne5, Nbd7, Nxg6, hxg6, Nc3, c6, Bf1, e5, Bg2, exd4, exd4, Nb6, O-O, Be7, Bf4, Nbd5, Be5, O-O, Qf3, Qb6, g5, Nd7, Bg3, Nxc3, bxc3, Qa5]",155
43931,0,2614,2631,93,"[d4, f5, Nc3, d5, Bf4, Nf6, e3, a6, Nf3, e6, h3, c5, g4, Nc6, gxf5, exf5, Bg2, c4, O-O, Bb4, Ne2, O-O, Ne5, Be6, b3, cxb3, axb3, Ne4, Nxc6, bxc6, Be5, a5, Nf4, Bf7, Bxe4, dxe4, Kh2, Kh8, Rg1, Rg8]",-17
43932,0,2614,2443,131,"[d4, Nf6, Bg5, e6, e4, h6, Bxf6, Qxf6, Nc3, d6, Qd2, c6, f4, e5, dxe5, dxe5, f5, Bb4, Nf3, Nd7, a3, Ba5, Bc4, Qe7, O-O-O, Nf6, Rhe1, Bd7, Ba2, b5, Re3, Bc7, Rd3, a5, g4, O-O-O, Qe2, Kb7, a4, b4]",171
43933,-1,2614,2473,102,"[e4, e6, d4, d5, Nd2, Nf6, e5, Nfd7, f4, c5, c3, Nc6, Ndf3, Qb6, a3, a5, b3, Be7, h4, f5, h5, cxd4, cxd4, Ndb8, Bd3, Bd7, Ne2, a4, b4, Na7, Nc3, Nb5, Nxa4, Qc7, Nc5, Nc3, Qc2, Ne4, h6, Rg8]",141


In [34]:
pandas_df.to_csv(path_or_buf=r"C:\Users\mario\Documents\Mario Meloni\Università\Magistrale DSBAI\Laboratorio Big Data\Chess Project\save\class\part1\to_process{}.csv".format(str(n_rows)),
                 header= True,
                 index = False)

In [36]:
#restart spark session to have new sparkContext with free memory
spark.stop()

In [37]:
# Avvio della SparkSession
findspark.init()

In [38]:
conf = SparkConf()
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
conf.set("spark.executor.heartbeatInterval", "50000s")
conf.set("spark.network.timeout","1000000000s")
spark = SparkSession.builder.config(conf=conf).master("local[4]").appName("Chess Project").getOrCreate()
spark

In [39]:
n_rows2 = 1000

In [40]:
df = spark.read.csv(path = r"C:\Users\mario\Documents\Mario Meloni\Università\Magistrale DSBAI\Laboratorio Big Data\Chess Project\save\class\part1\to_process{}.csv".format(n_rows),
                   sep = ",",
                   header = True,
                    multiLine= True,
                    escape = '"'
                   ).limit(n_rows2)

In [41]:
df.show(vertical = True, truncate = False)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Result         | 0                                                                                                                                                                                                                                                            
 Welo           | 2851                                                                                                                                                                                                                                                         
 Belo           | 2633                                                                                                                                                                  

In [42]:
#from pyspark.sql.functions import regexp_replace

df = df.withColumn("Moves", regexp_replace(df.Moves, "\n", ""))
df = df.withColumn("Moves", translate(df.Moves,"[]",""))
df = df.withColumn("Moves", translate(df.Moves,"'","")).cache()


In [43]:
# Data Castning
column_data_types = {
    "Result": IntegerType(),
    "Welo": IntegerType(),
    "Belo": IntegerType(),
    "Len": IntegerType(),
    "Moves": ArrayType(StringType()),
    "Elo_Difference": IntegerType()
}

In [44]:
# Preprocess the Moves column by splitting the string into an array of strings
df = df.withColumn("Moves", split(col("Moves"), " "))

In [45]:
# Apply the data type conversions to the DataFrame
for column_name, data_type in column_data_types.items():
    df = df.withColumn(column_name, col(column_name).cast(data_type))

In [46]:
df.show(2,vertical = True, truncate = False)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Result         | 0                                                                                                                                                                                                             
 Welo           | 2851                                                                                                                                                                                                          
 Belo           | 2633                                                                                                                                                                                                          
 Len            | 97                                                                                

In [47]:
df

DataFrame[Result: int, Welo: int, Belo: int, Len: int, Moves: array<string>, Elo_Difference: int]

In [48]:
# Definizione della struttura dei dati che la user defined function dovrà restituire.
passed_pawns_type = StructType([
    StructField("white", IntegerType(), True),
    StructField("black", IntegerType(), True)
])


captured_pieces_type = StructType([
    StructField("white", IntegerType(), True),
    StructField("black", IntegerType(), True)
])


return_type = StructType([
    StructField("Passed Pawns", passed_pawns_type, True),
    StructField("Piece Advantage", IntegerType(), True),
    StructField("Captured Pieces_val", captured_pieces_type, True),
    StructField("Files_controlled_by_Rooks", captured_pieces_type, True),
    StructField("check_count",IntegerType(),True)
])


In [49]:
def handle_errors(moves):
    try:
        game = ChessGame(moves)
        game_state = game.eval_status()
        return game_state
    except Exception as e:
        # gestione eccezioni per maggiore robustezza
        return None

In [50]:
#definizione della user defined function
transform_moves_udf = udf(f = lambda row: handle_errors(row) , returnType = return_type)

In [51]:
df = df.withColumn("game_state" , transform_moves_udf(col("Moves")))

In [52]:
df.show(10, vertical = True, truncate = False)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Result         | 0                                                                                                                                                                                                             
 Welo           | 2851                                                                                                                                                                                                          
 Belo           | 2633                                                                                                                                                                                                          
 Len            | 97                                                                                

In [53]:
df = df.drop("Moves")

In [54]:
keys = ['Files_controlled_by_Rooks','check_count','Piece Advantage', 'Captured Pieces_val', 'Passed Pawns']
df = df.select(
    *df.columns,
    *[col("game_state").getItem(key) for key in keys]
).drop("game_state")

In [55]:
column_names = df.columns
clear_names = [column.replace("game_state.","") for column in df.columns]

In [56]:
df= df.toDF(*clear_names)

In [57]:
df.show(1, truncate = False, vertical = True)

-RECORD 0---------------------------
 Result                    | 0      
 Welo                      | 2851   
 Belo                      | 2633   
 Len                       | 97     
 Elo_Difference            | 218    
 Files_controlled_by_Rooks | {0, 0} 
 check_count               | 0      
 Piece Advantage           | 0      
 Captured Pieces_val       | {1, 1} 
 Passed Pawns              | {0, 0} 
only showing top 1 row



In [58]:
cols =  ["Files_controlled_by_Rooks", "Captured Pieces_val", "Passed Pawns"]

In [59]:
df = df.withColumns(
{"Files_controlled_by_Rooks_w": df.Files_controlled_by_Rooks.white,
 "Captured_Pieces_val_w" : col("Captured Pieces_val").white,
 "Passed_Pawns_w": col("Passed Pawns").white,
 "Files_controlled_by_Rooks_b": df.Files_controlled_by_Rooks.black,
 "Captured_Pieces_val_b" : col("Captured Pieces_val").black,
 "Passed_Pawns_b": col("Passed Pawns").black,
}).drop(*cols)

In [60]:
df.show(2, vertical = True, truncate = False)

-RECORD 0---------------------------
 Result                      | 0    
 Welo                        | 2851 
 Belo                        | 2633 
 Len                         | 97   
 Elo_Difference              | 218  
 check_count                 | 0    
 Piece Advantage             | 0    
 Files_controlled_by_Rooks_w | 0    
 Captured_Pieces_val_w       | 1    
 Passed_Pawns_w              | 0    
 Files_controlled_by_Rooks_b | 0    
 Captured_Pieces_val_b       | 1    
 Passed_Pawns_b              | 0    
-RECORD 1---------------------------
 Result                      | 0    
 Welo                        | 2851 
 Belo                        | 2748 
 Len                         | 52   
 Elo_Difference              | 103  
 check_count                 | 0    
 Piece Advantage             | 0    
 Files_controlled_by_Rooks_w | 0    
 Captured_Pieces_val_w       | 8    
 Passed_Pawns_w              | 0    
 Files_controlled_by_Rooks_b | 0    
 Captured_Pieces_val_b       | 8    
 

In [61]:
#save progress to disk
df_pandas = df.toPandas()

df_pandas.to_csv(r"C:\Users\mario\Documents\Mario Meloni\Università\Magistrale DSBAI\Laboratorio Big Data\Chess Project\save\class\part2\dataframe{}.csv".format(str(n_rows2)),
                index = False)

In [62]:
spark.stop()

In [63]:
conf = SparkConf()
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
conf.set("spark.executor.heartbeatInterval", "50000s")
conf.set("spark.network.timeout","1000000000s")
spark = SparkSession.builder.config(conf=conf).master("local[4]").appName("Chess Project").getOrCreate()
spark

In [64]:
n_rows2 = 1000
df = spark.read.csv(r"C:\Users\mario\Documents\Mario Meloni\Università\Magistrale DSBAI\Laboratorio Big Data\Chess Project\save\class\part2\dataframe{}.csv".format(n_rows2),
                   sep = ",",
                   header= True,
                   inferSchema= True)

In [65]:
df.show(2,truncate = False, vertical = True)

-RECORD 0---------------------------
 Result                      | 0    
 Welo                        | 2851 
 Belo                        | 2633 
 Len                         | 97   
 Elo_Difference              | 218  
 check_count                 | 0.0  
 Piece Advantage             | 0.0  
 Files_controlled_by_Rooks_w | 0.0  
 Captured_Pieces_val_w       | 1.0  
 Passed_Pawns_w              | 0.0  
 Files_controlled_by_Rooks_b | 0.0  
 Captured_Pieces_val_b       | 1.0  
 Passed_Pawns_b              | 0.0  
-RECORD 1---------------------------
 Result                      | 0    
 Welo                        | 2851 
 Belo                        | 2748 
 Len                         | 52   
 Elo_Difference              | 103  
 check_count                 | 0.0  
 Piece Advantage             | 0.0  
 Files_controlled_by_Rooks_w | 0.0  
 Captured_Pieces_val_w       | 8.0  
 Passed_Pawns_w              | 0.0  
 Files_controlled_by_Rooks_b | 0.0  
 Captured_Pieces_val_b       | 8.0  
 

In [66]:
df

DataFrame[Result: int, Welo: int, Belo: int, Len: int, Elo_Difference: int, check_count: double, Piece Advantage: double, Files_controlled_by_Rooks_w: double, Captured_Pieces_val_w: double, Passed_Pawns_w: double, Files_controlled_by_Rooks_b: double, Captured_Pieces_val_b: double, Passed_Pawns_b: double]

In [67]:
df = df.dropna()

In [68]:
df.count()

999

In [69]:
# Select the feature columns
feature_columns = df.columns[1:]

# Create a VectorAssembler to combine the feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Create a StandardScaler to scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Create a StringIndexer to index the "Result" column
label_indexer = StringIndexer(inputCol="Result", outputCol="label")

In [70]:
# Create the Random Forest model
rf = RandomForestClassifier(featuresCol="scaledFeatures", labelCol="label")

# Create a pipeline to assemble, scale, index the label, and train the model
pipeline = Pipeline(stages=[assembler, scaler, label_indexer, rf])

# Split the data into train and test samples 
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# Fit the pipeline to the train data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)



### Model Evaluation

In [71]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

In [72]:
# Evaluate the model and compute the accuracy
accuracy = evaluator.evaluate(predictions)

In [73]:
print(accuracy)

0.611764705882353


In [114]:
spark.stop()