# Sodoku Quality Test 

In [None]:
import numpy as np
import pandas as pd
from pyspark.sql.functions import lit, monotonically_increasing_id
from pyspark.sql.session import SparkSession
import sys
sys.path.append("..")
from helpers.data_prep_and_print import print_df

In [None]:
spark = (SparkSession
       .builder   
       .master("local[*]")
       .appName("Sudoku Quality Test")
       .config("spark.driver.memory", "8g") \
       .getOrCreate())
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [None]:
path = "../data/"
input_file = path+"sudoku_1_mio_sequenced_columns.csv"
# load data file.
# create a DataFrame using an infered Schema 
df = spark.read.option("header", "true") \
       .option("inferSchema", "true") \
       .option("delimiter", ",") \
       .csv(input_file)
df.printSchema()
#df = df.limit(10)
# alternativ 
#df = df.sample(0.00001,5)
print_df(df,10)

## All cells are filled => no Null values

In [None]:
df = df.na.drop()

## All numbers must be between 0 and 9 for the quizzes 

In [None]:
query_conditions = []
# Iterate over all quizzes col names 
for col_name in df.columns[:81]:
    query_conditions.append (col_name + " >= 0 AND " + col_name+ " <= 9")
df = df.where(" AND ".join(query_conditions))

## Solutions must contain numbers between 1 and 9

In [None]:
query_conditions = []
# Iterate over all col names with solutions 
for col_name in df.columns[81:]:
    query_conditions.append (col_name + " > 0 AND " + col_name + " <=9")
df = df.where(" AND ".join(query_conditions))

## All cells != 0 must have identical content

In [None]:
# Build Query
query_conditions = [] 
num_cols = int(len(df.columns)/2 )
for i in range(num_cols):
    query_conditions.append (df.columns[i]+ " > 0 AND " + df.columns[i] +" == "+df.columns[i+num_cols])
df = df.where(" OR ".join(query_conditions))
print_df(df,10)

## Checking Solutions: Checking all Rows and Columns

In [None]:
# Checking each row
#start = 81
df_only_solutions = df[df.columns[81:]].toPandas()

def is_in_range_and_unique(values, min_val, max_val):
    """Check if the list given contains only unique elements"""
    if len(values) == len(set(values)):
        return min(values) == min_val and max(values)== max_val 
    return False

def check_unique_num_in_row_and_col(row_as_series):
    """Input: pd.Series, Output boolean"""
    for start in range(0,row_as_series.size,9):
        if not is_in_range_and_unique (row_as_series[start:start+9].to_list(),1,9):
            return False
    for start in range(0,9):
        list_of_indexes = [*range (start,row_as_series.size,9)]
        if not is_in_range_and_unique (row_as_series.iloc[list_of_indexes].to_list(),1,9):
            return False
    return True

df = df.withColumn("ID", monotonically_increasing_id())
result_df = spark.createDataFrame(df_only_solutions.apply(lambda x: check_unique_num_in_row_and_col(x),axis=1).to_frame("test_row").join(df.select("ID").toPandas()))
df_joined = df.join(result_df, ["ID"])
df_joined = df_joined.where("test_row == True")
df = df_joined.drop("ID","test_row")
print_df(df,10)

## Checking Solutions: Checking all Squares

In [None]:
# check square
# there are nine 3x3 squares 
# the first square has the indexes (S1A, S1B, S1C, 
#                                   S2A, S2B, S2C,
#                                   S3A, S3B, S3C)

def get_square_definition ():
    """Returns the square positions of a 9*9 sodoku"""
    col_names = ["A","B","C","D","E","F","G","H","I"] 
    row_nums = [*range(1,10)]

    col_chunks = np.array_split(col_names, 3)
    row_chunks = np.array_split(row_nums, 3)
    square_pos=[]
    for curr_rows in row_chunks :
        for curr_cols in col_chunks:
            pos_names = []        
            for curr_row in curr_rows :
                for curr_col in curr_cols:
                    pos_names.append("S"+str(curr_row)+str(curr_col))
            square_pos.append(pos_names)
    return square_pos       

def check_squares(row_as_series:pd.Series) -> bool:
    """Input=pd.Series Output = True/False"""
    for curr_square in get_square_definition() : 
        if not is_in_range_and_unique(row_as_series.get(curr_square).to_list(),1,9):
            return False
    return True
    
data = df.toPandas()
df_only_solutions = data[data.columns[81:]]

data["test_row"] = df_only_solutions.apply(lambda x: check_squares(x),axis=1)
data = data.query("test_row == True")
data = data.drop(columns=["test_row"])
df = spark.createDataFrame(data)
print_df(df,10)

In [None]:
spark.stop()