In [1]:
from search_run.ranking.ranking import Ranking
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType


spark = SparkSession.builder.getOrCreate()

## Entries

In [None]:
entries = Ranking().load_entries()

entries
entries = [{"key": entry[0], "content": f"{entry[1]}", "position": position+1} for position, entry in enumerate(entries
                                                                                                                .items())]
#entries

rdd = spark.sparkContext.parallelize(entries)

entries_df = spark.read.json(rdd)
entries_df = entries_df.drop("_corrupt_record", 'content')
entries_df.show()

## Commands performed

In [None]:
dataset = Ranking().load_commands_performed_df()
#display(dataset)h_input': 116643}
schema = '`key` STRING,  `generated_date` STRING, `uuid` STRING, `given_input` STRING'
original_df = spark.createDataFrame(dataset, schema=schema)
performed_df = original_df.withColumn("input_lenght", F.length("given_input"))
performed_df = performed_df.filter('given_input != "NaN"')
performed_df = performed_df.drop('generated_date', 'uuid')
stats={
    "total_entries": original_df.count(),
    "entries_with_input": performed_df.filter("input_lenght > 0").count()
}
print(stats)

performed_df.show()


## Final Dataset

In [None]:

df = performed_df.join(entries_df, on='key', how='left')
df = df.filter('position is not null')

df.orderBy('input_lenght').show()

In [5]:
import numpy as np

X = np.array(df.select('position').collect())
Y = np.array(df.select('input_lenght').collect())

## Train model

In [6]:

from sklearn.ensemble import RandomForestRegressor
regr = RandomForestRegressor(max_depth=2, random_state=0)
model = regr.fit(X, Y)

  model = regr.fit(X, Y)


## Predict ranking

In [8]:
def evaluate_model(x):
    return float(model.predict(np.array([[x]]))[0])

model_udf = udf(evaluate_model, FloatType())

result_df = entries_df.drop('key').withColumn("predicted_score", model_udf(F.col('position')))

result_df.orderBy(F.col('position').desc()).show()

+--------+---------------+
|position|predicted_score|
+--------+---------------+
|    3859|       88.29857|
|    3858|       88.29857|
|    3857|       88.29857|
|    3856|       88.29857|
|    3855|       88.29857|
|    3854|       88.29857|
|    3853|       88.29857|
|    3852|       88.29857|
|    3851|       88.29857|
|    3850|       88.29857|
|    3849|       88.29857|
|    3848|       88.29857|
|    3847|       88.29857|
|    3846|       88.29857|
|    3845|       88.29857|
|    3844|       88.29857|
|    3843|       88.29857|
|    3842|       88.29857|
|    3841|       88.29857|
|    3840|       88.29857|
+--------+---------------+
only showing top 20 rows

