In [1]:
from utils.imports import *
from utils.start_spark import spark
! start http://localhost:4040/jobs/

Uma forma de transformar a target é, em vês da planeada "dataframe de sessões", teriamos uma "dataframe de perguntas", onde em vês de estar uma dataframe com cada linah uma sessão, estar 18 das mesmas linhas, com a diferença nas questões. Ou seja, a nossa chave primária seria o conjunto da das colunas `session_id` e `question` (categórica ou inteira), em vês de apenas ter como chave primária `session_id`.

| session_id | q1 | q2 | ... | q18 | feature1 | feature2 | ... |
|------------|----|----|-----|-----|----------|----------|-----|
| 1          | 0  | 1  | ... | 1   | 423      | 0        | ... |
| 2          | 1  | 0  | ... | 1   | 231      | 1        | ... |
| 3          | 1  | 1  | ... | 1   | 345      | 1        | ... |

para

| session_id | question | answer | feature1 | feature2 | ... |
|------------|----------|--------|----------|----------|-----|
| 1          | 1        | 0      | 423      | 0        | ... |
| 1          | 2        | 1      | 423      | 0        | ... |
| ...        | ...      | ...    | ...      | ...      | ... |
| 1          | 18       | 1      | 423      | 0        | ... |
| 2          | 1        | 1      | 231      | 1        |     |
| ...        | ...      | ...    | ...      | ...      | ... |

Esta opção iria facilitar o processo de previsão, pois podemos usar métodos simples como regressão logística, usando `question` como uma feature também.

Uma segunda opção seria usar classificação *multi-label*, sendo que temos vários `targets`, mas estes só têm uma carnidalidade de 2. Isto iria aumentar a complexidade do projeto. Estamos também limitados pela utilização de MLLIB do spark.

A terceira opção seria treinar 18 modelos, e criar um algoritmo para escolher um modelos na precisão. Essencialmente, este é o método de *binary relevance*, sendo um dos métodos da segunda opção; no entanto, estamos a notar como opção porque não parece que o MLLIB tenha qualquer opção de *binary relevance*.

Nota: da forma como os dados estão agora formatados, essencialmente é o mesmo problema se tivéssemos em vês de uma matriz de 0s e 1s, uma coluna com um array das classes onde a sessão teve certo. 

Decidimos usar *binary relevance*, ou seja, vamos treinar um modelo para cada pergunta.

Para análise das métricas, vamos usar `MultilabelClassificationEvaluator` do MLlib.

In [2]:
doing_features = False
training = True

In [4]:
if 'stages' in sys.modules: del sys.modules['stages']
from stages import *

pipeline_no_ML = Pipeline(stages = [
    the_transformer(
        add_id,
        (elapsed_to_diff, "elapsed_time", "elapsed_diff_ms"),
        (negative_to_0, "elapsed_diff_ms"),
        (elapsed_to_hours, "elapsed_time", "elapsed_time_h"),
        agg1,
        agg2,
        typeOfText,
    ),
    VectorAssembler(inputCols = ["inv_total_time_h", "inv_total_time_h_0-4", "inv_total_time_h_5-12"], outputCol = "inv_time_stand_per_group"),
    StandardScaler(inputCol = "inv_time_stand_per_group", outputCol = "inv_time_standed_per_group", withMean = True),
    StringIndexer(inputCol = "type_of_script", outputCol = "index_of_type_of_script"),
    OneHotEncoder(inputCol = "index_of_type_of_script", outputCol="dummies_of_type_of_script"), #sparse vector
])

In [5]:
if doing_features:
    train = spark.read.parquet(".\\data\\df_train\\")
    test = spark.read.parquet(".\\data\\df_test\\")

    transformer = pipeline_no_ML.fit(train)
    trans_train = transformer.transform(train)
    # pivoted {
    from files.dfs import train_labels as labels
    if labels._isRead == False: labels.read(spark)
    splited = labels.df \
        .select(
            split("session_id", "_").alias("both"),
            "correct"
        ).select(
            col("both")[0].alias("session_id"),
            col("both")[1].alias("question"),
            col("correct").alias("isCorrect")
        )
    pivoted = splited \
        .groupby("session_id") \
        .pivot("question") \
        .agg(first("isCorrect"))
    # }
    # add pivoted
    trans_joined = trans_train.join(pivoted, ["session_id"], "left")
    trans_joined.drop(trans_joined.session_id)
    trans_joined.write.mode("overwrite").parquet(r"data\trans_train")

    trans_test = transformer.transform(test)
    trans_test.drop(trans_test.session_id)
    trans_test.write.mode("overwrite").parquet(r"data\trans_test")

    trans_train = trans_joined

In [6]:
if not doing_features:
    trans_train = spark.read.parquet(r"data\trans_train")
    trans_test = spark.read.parquet(r"data\trans_train")

In [7]:
features_ass = VectorAssembler(inputCols = [
 'max_index',
 'obs_opcional',
 'obs_no_in',
 'notebook_opens',
 'notebook_explorer',
 'fullscreen',
 'hq',
 'music',
 'avg_elapsed_diff_ms_cutscene',
 'avg_elapsed_diff_ms_person',
 'avg_elapsed_diff_ms_navigate',
 'inv_time_standed_per_group',
 'dummies_of_type_of_script'
    ], outputCol="features")

In [28]:
def BinRel(model, features, needsProb = False,  **kwargs) -> Pipeline:
    stages = [features]
    for i in range(1, 19):
        (modelParams := {
            "labelCol": f"q{i}",
            "predictionCol":f"q{i}_pred",
            "rawPredictionCol": f"q{i}_pred_raw" if needsProb else None,
            "probabilityCol": f"q{i}_pred_prob" if needsProb else None
        }).update(kwargs)
        model(**{a: b for a, b in modelParams.items() if b is not None})
        stages.append(model)
    return Pipeline(stages = stages)

def train_predict(pipeline, train, test, write = True, parquetFile = None):
    if parquetFile is None and write is True:
        raise ValueError("parquet name not provided")
    fit = pipeline.fit(train) # train must be transformed already
    prev = fit.transform(test)
    if write: prev.write.mode("overwrite").parquet(f".\\data\\{parquetFile}")
    return (fit, prev)

In [30]:
Lr_Pipeline = BinRel(LogisticRegression, features_ass, 
                     needsProb = True,
                     standardization = False
)
Svm_Pilenine = BinRel(LinearSVC, features_ass, standardization = False)
DT_Pipeline = BinRel(DecisionTreeClassifier, features_ass, seed = 1)
RF_Pipeline = BinRel(RandomForestClassifier, features_ass, seed = 1)

In [None]:
# TODO: train & evaluation