In [1]:
from pyspark.sql import *
from pyspark import SparkConf

from pyspark.sql import DataFrame
from pyspark.sql.functions import rand
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Tokenizer, CountVectorizer, StringIndexer
from pyspark.ml.classification import LogisticRegression, LinearSVC, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
class Selector(Transformer):
    def __init__(self, outputCols=['id','features', 'label']):
        self.outputCols=outputCols

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.select(*self.outputCols)

In [3]:
def base_features_gen_pipeline(input_descript_col="descript", input_category_col="category", output_feature_col="features", output_label_col="label"):
    # white space expression tokenizer
    word_tokenizer = Tokenizer(inputCol=input_descript_col, outputCol="words")
    #bag of word_count
    count_vectors = CountVectorizer(inputCol="words", outputCol=output_feature_col)
    # label indexer
    label_maker = StringIndexer(inputCol = input_category_col, outputCol = output_label_col)

    selector = Selector(outputCols = ['id',output_feature_col, output_label_col])
    # build the pipeline
    pipeline = Pipeline(stages=[word_tokenizer, count_vectors, label_maker,selector])

    return pipeline

In [4]:
def gen_meta_features(training_df, nb_0, nb_1, nb_2, svm_0, svm_1, svm_2):
    print(training_df.printSchema())
    lr_f1 = []
    nb_f1 = []
    svm_f1 = []
    print(training_df.groupBy('group').count().show())
    for i in range(5):
        condition = training_df['group'] == i