# bootstrap_puzzles_03_word_probabilities

- load silver.word_states to df
- drop batch_id col (not relevant for this stage)
- split df into:
    - df_truth (df.filter(F.col("label").isNotNull())
    - df_model (df.filter(F.col("label").isNull())
- for df_truth:
	- rename col: label -> probability
	- set source col: F.lit("truth")
	- set model_version col: F.lit(None).cast("int")
	- drop embedding and frequency cols
	- keep cols: word, letter_set, last_seen_on
- for df_model:
	- pass to inference() function to get probability col: df_model = inference(df_model)
	- set source col: F.lit("model")
	- set model_version col: F.when(F.lit(True), 1) in order to make column nullable
	- AFTER this, drop embedding, frequency, and label cols
	- keep cols: word, letter_set, last_seen_on
- final_df = df_truth.union(df_model)
- add a batch_id: "bootstrap_puzzles_1"
- Save as a repartitioned table? Or partition by source at least? Gets us 2 unequal partitions.
- Without embeddings, this should not be too memory intensive to save (115k simple rows)  

In [None]:
%run "./00_setup.ipynb"

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import FloatType, IntegerType
import pandas as pd
import numpy as np
import joblib

from src.constants import TRAINED_MODELS_PATH
from src.fileutils import get_local_path
from src.sparkdbutils import create_db, write_to_table_replace_where

In [None]:
# TODO: Parameterize
_SOURCE_DB_NAME = "silver"
_SOURCE_TABLE_NAME = "word_states"
_TARGET_DB_NAME = "gold"
_TARGET_TABLE_NAME = "word_probabilities"

In [None]:
# Read the source table
df = spark.sql(f"SELECT * FROM {_SOURCE_DB_NAME}.{_SOURCE_TABLE_NAME}")

In [None]:
# Drop the source batch_id column (not relevant to current job or batch)
df = df.drop("batch_id")

In [None]:
# Split into two data frames:
# df_truth = words which have been explicitly accepted or implicitly rejected
# These words have a probability of 0.0 or 1.0, derived from their label in silver.word_states

# df_model = words which exist in bronze.words but have never come up in a puzzle yet
# These words will be assigned a probability by the model

df_truth = df.filter(F.col("label").isNotNull())
df_model = df.filter(F.col("label").isNull())

In [None]:
df_truth = df_truth.withColumnRenamed("label", "probability") \
                   .withColumn("source", F.lit("truth")) \
                   .withColumn("model_version", F.lit(None).cast("int")) \
                   .drop("embedding", "frequency")

In [None]:
# Create broadcast for clf and svd for the workers
model_path = get_local_path(f"{TRAINED_MODELS_PATH}/model_v1.joblib")

with open(model_path, "rb") as f:
    model = joblib.load(f)

svd = model['svd']
clf = model['clf']

broadcast_svd = spark.sparkContext.broadcast(svd)
broadcast_clf = spark.sparkContext.broadcast(clf)

In [None]:
def setup_worker_imports():
    import sys
    from pathlib import Path
    current_dir = Path(os.getcwd())
    project_root = current_dir.parent
    if str(project_root) not in sys.path:
        sys.path.insert(0, str(project_root))

@pandas_udf(returnType=FloatType())
def predict_probability(frequencies: pd.Series, embeddings: pd.Series) -> pd.Series:
    """
    Converts frequency and embedding column values to feature vectors
    and passes the features to the pretrained svd and model to get
    predicted probabilities of a positive classification.

    Adds a `probabilities` column with the predicted probabilities to the df
    and returns the df.
    """

    # UDF functions run in separate processes with their own namespaces
    # so it is necessary to import the classes of the SVD and classifier
    # as though we are running a new file in a brand new process.
    setup_worker_imports()
    
    from src.models.HybridFrequencyBinaryClassifier import HybridFrequencyBinaryClassifier
    from sklearn.decomposition import TruncatedSVD
    
    # Get model from broadcast
    svd = broadcast_svd.value
    clf = broadcast_clf.value
    
    # Convert embeddings from list format to numpy array
    embedding_matrix = np.array(embeddings.tolist())

    # Apply SVD to reduce dimensions from 768 to 50
    reduced_embeddings = svd.transform(embedding_matrix)

    # Transform frequencies with log10(frequency + 1) and reshape
    freq_array = np.log10(frequencies.to_numpy() + 1).reshape(-1, 1)

    # Concatenate frequency with reduced embeddings
    features = np.concatenate([freq_array, reduced_embeddings], axis=1)

    # Get predictions (probability of positive class)
    probabilities = clf.predict_proba(features)[:, 1]

    return pd.Series(probabilities)

In [None]:
# Apply the UDF to get probabilities
df_model = df_model.withColumn(
    "probability", predict_probability(F.col("frequency"), F.col("embedding"))
)

In [None]:
df_model = df_model.withColumn("source", F.lit("model")) \
                   .withColumn("model_version", F.when(F.lit(True), 1)) \
                   .drop("embedding", "frequency", "label")

In [None]:
# Recombine the two data frames
final_df = df_truth.union(df_model)

In [None]:
# add a batch_id: "bootstrap_puzzles_1"
BATCH_ID = "bootstrap_puzzles_1"
final_df = final_df.withColumn("batch_id", F.lit(BATCH_ID))

In [None]:
# Create db if it doesn't exist
create_db(spark, _TARGET_DB_NAME)

In [None]:
# Write to the table (this has the effect of creating it)
replace_where_dict = {"batch_id": "bootstrap_puzzles_1" }
write_to_table_replace_where(spark, 
                             final_df, 
                             _TARGET_DB_NAME, 
                             _TARGET_TABLE_NAME, 
                             replace_where_dict)

In [None]:
df2 = spark.sql(f"SELECT * FROM {_TARGET_DB_NAME}.{_TARGET_TABLE_NAME}")
print(df2.count())
df2.printSchema()
df2.show()