# Wordle Solver

In [1]:
from wordle.utils import Wordle

%load_ext autoreload
%autoreload 2

In [2]:
# Initialize Wordle class
wordle = Wordle()

# Main function that returns a list of possible words
green_letters = "de"
green_positions = [1, 2]
yellow_letters = "cael"
yellow_positions = [1, 3, 5, 3]
bad_letters = "rnft"

# Find words in a list of possible words
words = wordle.find_words(
    green_letters, green_positions, yellow_letters, yellow_positions, bad_letters
)
print(words)

['decal']


In [3]:
# Repetitive letters
letters_df = wordle.repetitive_letters(wordle_list=words)
print(letters_df.to_string(index=False))

Letters  Count
      D      1
      E      1
      C      1
      A      1
      L      1


#### Choosing next word

In [4]:
# Main function that returns a list of possible words
green_letters = ""
green_positions = []
yellow_letters = "ldtf"
yellow_positions = []
bad_letters = ""

# Find words in a list of possible words
words = wordle.find_words(
    green_letters,
    green_positions,
    yellow_letters,
    yellow_positions,
    bad_letters,
    answer_word_list=False,
)
print(words)

['delft']


In [5]:
df = wordle.load_data()
df

Unnamed: 0,Names,Games_Won
0,Murilo,255
1,Barbara,106
2,Draw,257


In [6]:
# Update score
score_df = wordle.score()
score_df

Unnamed: 0,Names,Games_Won
0,Murilo,255
1,Barbara,107
2,Draw,257


In [7]:
# Print results
print(score_df.to_string(index=False))

  Names  Games_Won
 Murilo        255
Barbara        107
   Draw        257


In [8]:
# help(wordle.reset_score)

#### Set custom score

In [9]:
# help(wordle.set_score)

In [10]:
# # Set custom score
# df = wordle.set_score(m_score=253, b_score=105, draw_score=252)
# df

#### Reset score

In [11]:
## Reset score
# wordle.reset_score()

In [None]:
from plantclef.utils import get_spark
from pyspark.sql import functions as F

spark = get_spark()
display(spark)

In [None]:
# get dataframes
gcs_path = "gs://dsgt-clef-plantclef-2024"
test_data_path = "data/parquet_files/PlantCLEF2024_test"

# paths to dataframe
test_path = f"{gcs_path}/{test_data_path}"
# read data
test_df = spark.read.parquet(test_path)
# show
test_df.show(n=5, truncate=50)

In [None]:
import io

import numpy as np
import timm
import torch
from PIL import Image
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType, MapType, StringType
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from plantclef.model_setup import setup_pretrained_model
from pyspark.sql import DataFrame
from pyspark.ml.functions import vector_to_array
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer

In [None]:
class PretrainedDinoV2(
    Transformer,
    HasInputCol,
    HasOutputCol,
    DefaultParamsReadable,
    DefaultParamsWritable,
):
    def __init__(
        self,
        pretrained_path: str,
        input_col: str = "input",
        output_col: str = "output",
        model_name: str = "vit_base_patch14_reg4_dinov2.lvd142m",
        batch_size: int = 8,
    ):
        super().__init__()
        self._setDefault(inputCol=input_col, outputCol=output_col)
        self.model_name = model_name
        self.batch_size = batch_size
        self.pretrained_path = pretrained_path
        self.num_classes = 7806  # total number of plant species
        self.local_directory = "/mnt/data/models/pretrained_models"
        self.class_mapping_file = f"{self.local_directory}/class_mapping.txt"
        # Model
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = timm.create_model(
            self.model_name,
            pretrained=False,
            num_classes=self.num_classes,
            checkpoint_path=self.pretrained_path,
        )
        self.model.to(self.device)
        self.model.eval()
        # Data transform
        self.data_config = timm.data.resolve_model_data_config(self.model)
        self.transforms = timm.data.create_transform(
            **self.data_config, is_training=False
        )
        self.sql_statement = "SELECT image_name, dino_logits FROM __THIS__"

    def _load_class_mapping(self):
        with open(self.class_mapping_file) as f:
            class_index_to_class_name = {i: line.strip() for i, line in enumerate(f)}
        return class_index_to_class_name

    def _make_predict_fn(self):
        """Return PredictBatchFunction using a closure over the model"""
        self.cid_to_spid = self._load_class_mapping()

        def predict(inputs: np.ndarray) -> np.ndarray:
            batch_results = []
            for i, input in enumerate(inputs):
                print(f"Item {i} type: {type(input)}")  # Check the type of the input
                if not isinstance(input, bytes):
                    print("Error: Input is not bytes.")
                    batch_results.append({})
                    continue

                try:
                    image = Image.open(io.BytesIO(input))
                    processed_image = self.transforms(image).unsqueeze(0)
                    batch_input = torch.cat([processed_image]).to(self.device)

                    with torch.no_grad():
                        outputs = self.model(batch_input)
                        probabilities = torch.softmax(outputs, dim=1) * 100
                        top_probs, top_indices = torch.topk(probabilities, k=20)

                    top_probs = top_probs.cpu().numpy()
                    top_indices = top_indices.cpu().numpy()

                    # Convert top indices and probabilities to a dictionary
                    result = {
                        self.cid_to_spid.get(index, "Unknown"): float(prob)
                        for index, prob in zip(
                            top_indices.flatten(), top_probs.flatten()
                        )
                    }
                    batch_results.append(result)

                except Exception as e:
                    print(f"Failed to process input due to: {str(e)}")
                    batch_results.append({})

            return pd.Series(batch_results)

        return predict

    def _transform(self, df):
        print(f"df schema: {df.schema}")
        predict_udf = F.udf(
            self._make_predict_fn(), ArrayType(MapType(StringType(), FloatType()))
        )
        return df.withColumn(self.getOutputCol(), predict_udf(df[self.getInputCol()]))

    def transform(self, df) -> DataFrame:
        transformed = self._transform(df)

        for c in self.feature_columns:
            # check if the feature is a vector and convert it to an array
            if "array" in transformed.schema[c].simpleString():
                continue
            transformed = transformed.withColumn(c, vector_to_array(F.col(c)))
        return transformed

    @property
    def feature_columns(self) -> list:
        return ["dino_logits"]

    def pipeline(self):
        return Pipeline(stages=[self, SQLTransformer(statement=self.sql_statement)])

    def run(self, df: DataFrame) -> DataFrame:
        model = self.pipeline().fit(df)
        transformed = model.transform(df)

        return transformed

In [None]:
pretrained_path = setup_pretrained_model()
pretrained_dino = PretrainedDinoV2(
    pretrained_path=pretrained_path,
    input_col="data",
    output_col="dino_logits",
)

In [None]:
transformed_df = pretrained_dino.run(df=test_df)