In [7]:
import os
import sys
import traceback
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window


class ChessPatternAnalysis:
    def __init__(self):
        """Initialize Spark session with configurations"""
        self.spark = SparkSession.builder \
            .appName("Chess Pattern Analysis") \
            .config("spark.sql.crossJoin.enabled", "true") \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.shuffle.partitions", "10") \
            .config("spark.driver.memory", "4g") \
            .config("spark.sql.warehouse.dir", "./spark-warehouse") \
            .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
            .master("local[*]") \
            .getOrCreate()

        # Set log level
        self.spark.sparkContext.setLogLevel("ERROR")

    def process_data(self, file_path):
        """Load and process chess games with schema validation"""
        print(f"Processing data from: {file_path}")

        # Define schema with correct data types
        schema = StructType([
            StructField("Event", StringType(), True),
            StructField("White", StringType(), True),
            StructField("Black", StringType(), True),
            StructField("Result", StringType(), True),
            StructField("UTCDate", StringType(), True),
            StructField("UTCTime", StringType(), True),
            StructField("WhiteElo", IntegerType(), True),
            StructField("BlackElo", IntegerType(), True),
            StructField("WhiteRatingDiff", IntegerType(), True),
            StructField("BlackRatingDiff", IntegerType(), True),
            StructField("ECO", StringType(), True),
            StructField("Opening", StringType(), True),
            StructField("TimeControl", StringType(), True),
            StructField("Termination", StringType(), True)
        ])

        # Load and clean data
        self.df = self.spark.read.csv(file_path, header=True, schema=schema)
        
        # Handle missing values
        self.df = self.df.na.fill({
            'WhiteElo': 0,
            'BlackElo': 0,
            'WhiteRatingDiff': 0,
            'BlackRatingDiff': 0
        })

        # Create derived columns
        self.df = self.df \
            .withColumn("StrengthCategory",
                when(col("WhiteElo") >= 2400, "Master")
                .when(col("WhiteElo") >= 2000, "Expert")
                .when(col("WhiteElo") >= 1600, "Club")
                .otherwise("Beginner")) \
            .withColumn("OpeningFamily",
                regexp_extract(col("Opening"), "^[^:]+", 0)) \
            .withColumn("Date", 
                to_date(col("UTCDate"), "yyyy.MM.dd")) \
            .withColumn("TimeFormat",
                when(col("TimeControl").contains("+"), "Increment")
                .when(col("TimeControl").contains("|"), "Tournament")
                .otherwise("Standard"))

        print(f"Processed {self.df.count():,} games")
        return self.df
def main():
    try:
        # Initialize pipeline
        pipeline = ChessPatternAnalysis()

        # Define paths
        input_path = r"C:\Users\Sai Kiran\Downloads\chess_games.csv\chess_games.csv"
        output_path = "chess_analysis_results"

        # Execute pipeline
        #pipeline.run_pipeline(input_path, output_path)

    except Exception as e:
        print("\nPipeline execution failed!")
        traceback.print_exc()  
        raise



if __name__ == "__main__":
    main()

In [2]:
import os
os.environ["JAVA_HOME"] = r"C:\java"  # Update this path
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["JAVA_HOME"], "bin")

In [4]:

import findspark
findspark.init()