In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
import sys
sys.path.append( "/kaggle/input/agency-lab-functions" )

path_essays = '/kaggle/input/learning-agency-lab-automated-essay-scoring-2/'
path_models = '/kaggle/working/models/'
path_files  = '/kaggle/working/files/'

try:
    os.mkdir('models')
    os.mkdir('files') 
    os.mkdir('checkpoints')
except:
    print('folders already created')

# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))
#!ls /kaggle/input
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
%%time
#Installing and choosing java 8. (also worked with java 11)
try:
    os.mkdir('check_java')
    !cp -r /kaggle/input/java-8/jdk1.8.0_401 /kaggle/working
    !chmod -R 777 /kaggle/working/jdk1.8.0_401
    !update-alternatives --install /usr/bin/java java /kaggle/working/jdk1.8.0_401/bin/java 100
    !update-alternatives --install /usr/bin/javac javac /kaggle/working/jdk1.8.0_401/bin/javac 100
    !sudo update-alternatives --set java /kaggle/working/jdk1.8.0_401/bin/java
    os.mkdir('check_java')
except:
    print(f'Java already installed.')

In [None]:
%%time
try:
    os.mkdir('check_pyspark')
    !cp -r /kaggle/input/pyspark/pyspark-3.5.1 /kaggle/working
    !cp /kaggle/input/spark-nlp/spark_nlp-5.3.3-py2.py3-none-any.whl /kaggle/working
    !cp /kaggle/input/tumb-py/py4j-0.10.9.7-py2.py3-none-any.whl /kaggle/working

    !pip install --no-index py4j-0.10.9.7-py2.py3-none-any.whl pyspark-3.5.1/. --target=/kaggle/working --find-links=file:///kaggle/working
    !pip install spark_nlp-5.3.3-py2.py3-none-any.whl --target=/kaggle/working

    !cp /kaggle/input/spark-nlp-assembly/spark-nlp-assembly-5.3.3.jar /kaggle/working/pyspark/jars

    !chmod -R 777 /kaggle/working/pyspark/./bin/
    os.mkdir('check_pyspark')
except:
    print('pyspark already installed')

In [None]:
import time
import shutil
import pyspark
import subprocess
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import sparknlp
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import Tokenizer as Tknzr, Bucketizer, StopWordsRemover
from pyspark.ml.linalg import VectorUDT, Vectors
from pyspark.ml.functions import vector_to_array, array_to_vector
from pyspark.ml.feature import VectorAssembler, MaxAbsScaler, MinMaxScaler
from pyspark.ml.regression import LinearRegression, LinearRegressionModel, RandomForestRegressor,\
                                    RandomForestRegressionModel, GBTRegressor, GBTRegressionModel
from pyspark.ml.classification import LogisticRegression, OneVsRest, OneVsRestModel,\
                                        LogisticRegressionModel,\
                                        RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
import lalab_functions as lab
import similarity_measures_2 as sims 
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
from xgboost.spark import SparkXGBClassifier, SparkXGBClassifierModel
from functools import reduce

from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.evaluation import ClusteringEvaluator

import seaborn as sns
import matplotlib.pyplot as plt
from collections import Counter

In [None]:
%%time
conf = SparkConf() \
    .setAppName("YourAppName") \
    .setMaster("local[*]") \
    .set("spark.driver.memory", "10g") \
    .set("spark.executor.memory", "15g") \
    .set("spark.executor.cores", "8") \
    .set("spark.executor.instances", "1") \
    .set("spark.default.parallelism", "8") \
    .set("spark.sql.shuffle.partitions", "4") \
    .set("spark.memory.fraction", "0.8") \
    .set("spark.memory.storageFraction", "0.40")\
    .set("spark.jars", 
         "/kaggle/working/pyspark/jars/spark-nlp-assembly-5.3.3.jar"
        )\
    .set("spark.executorEnv.JAVA_HOME", "/kaggle/working/jdk1.8.0_401")\
    .set("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .set("spark.sql.execution.arrow.pyspark.fallback.enabled", "true")

#     .set("spark.emptyArrowBatchCheck", True)

#Enabling spark.databricks.pyspark.emptyArrowBatchCheck prevents 
#a NoSuchElementException error from occurring when the Arrow batch size is 0.
#"spark.sql.execution.arrow.pyspark.fallback.enabled", "true"
   
spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()

sc = spark.sparkContext
# Get the number of cores
num_cores = sc.defaultParallelism
# Get the number of executors
num_executors = sc._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("Number of cores:", num_cores)
print("Number of executors:", num_executors)
# Get the SparkConf object
conf = spark.sparkContext.getConf()
# Get the memory configurations for driver and executors
driver_memory = conf.get("spark.driver.memory", "Not set")
executor_memory = conf.get("spark.executor.memory", "Not set")
print("Driver memory:", driver_memory)
print("Executor memory:", executor_memory)
# Set the checkpoint directory
spark.sparkContext.setCheckpointDir('checkpoints')

In [None]:
# Pipeline to process documents
document = DocumentAssembler() \
    .setInputCol("full_text") \
    .setOutputCol("document")
sentenceDetector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")
use = UniversalSentenceEncoder.load('/kaggle/input/sentence-encoder')\
                 .setInputCols(["document"]).setOutputCol("sentence_embeddings")
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")\
                        .setSplitPattern("\\b\\w+\\b")  # Split by word boundaries
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalized_tokens") \
                .setLowercase(True).setCleanupPatterns(["[^\\w\\s]"])# Retain numbers as tokens  
stopWords = StopWordsCleaner().setInputCols(["normalized_tokens"]) \
               .setOutputCol("cleanTokens").setCaseSensitive(False)
lemmatizer = LemmatizerModel.load('/kaggle/input/spacylemmatizer')\
                .setInputCols(["cleanTokens"]).setOutputCol("lemmas")
postagger = PerceptronModel.load("/kaggle/input/pos-tagger")\
             .setInputCols("document", "lemmas").setOutputCol("pos")
use_pos_pipeline = Pipeline().setStages([
      document,
      sentenceDetector,
      use,
      tokenizer,
      normalizer,
      stopWords,
      lemmatizer,
      postagger
    ])

empty_data = spark.createDataFrame([[""]]).toDF("full_text")
usePosModel = use_pos_pipeline.fit(empty_data)

In [None]:
#loading models to process word embeddings
binary_models = {}
scores = [1,2,3,4,5]
binary_path = '/kaggle/input/use-xgb-5binaries-imp-feats-600-estimators/'\
        +'use_xgb_super_binaries_5score_improved_features_600_estimators/'
binary_folders = lab.list_folders_in_folder(binary_path)
model_class = SparkXGBClassifierModel
for score in scores:
    label = 'xgb_target_'+str(score)
    key_folder = [mname for mname in binary_folders 
              if label in mname][0]
    
    _path = binary_path+key_folder+'/'
    model_word = label
    binary_models[score] = lab.load_models_in_folder(_path, model_class,
                                              model_word)[0] 
    

separators_keys = [
    (1,2), (1,3), (1,4), (1,5),
    (2,3), (2,4), (2,5),
    (3,4), (3,5), 
    (4,5) 
]
sep_models = {}

sep_path = '/kaggle/input/use-xgb-psep-5scores-'\
            +'impfeat-300-estimators/use_xgb_pure_sep_5scores_improved_features_300_estimators/'
# sep_path = '/kaggle/input/use-xgb-psep-imp-feats-1000-est-30-depth/'\
#                 +'use_xgb_pure_sep_5scores_improved_features_1000_estimators_30_depth/'
# sep_path = '/kaggle/input/use-xgb-psep-optimized-imp-feats/use_xgb_psep_optimized_by_accuracy/'
sep_folders = lab.list_folders_in_folder(sep_path)
print(sep_folders)
for key in separators_keys:

    label = f'psep_for_'+'_'.join(str(p) for p in key)
    
    key_folder = [mname for mname in sep_folders 
              if label in mname][0]
    _path = sep_path+key_folder+'/'
    model_word = 'psep_'+'_'.join(str(p) for p in key)
    sep_models[key] = lab.load_models_in_folder(_path, model_class,
                                              model_word)[0]
    
#     model_word = 'xgb_'+'_'.join(str(p) for p in key)
#     sep_models[key] = lab.load_models_in_folder(sep_path, model_class,
#                                               model_word)[0]
    print(f'key: {key} model: {sep_models[key]}')
    
qtern_models = {}    
qtern_path = '/kaggle/input/use-xgb-qterns-1000-estimators-30-depth/'\
        +'use_xgb_super_qtsep_5scores_improved_feats_1000_estimators+30_depth/'
qtern_folders = lab.list_folders_in_folder(qtern_path)
print(qtern_folders)
for key in separators_keys:

    label = f'qsep_for_'+''.join(str(p) for p in key)
    key_folder = [mname for mname in qtern_folders 
              if label in mname][0]
    _path = qtern_path+key_folder+'/'
    model_word = 'qtsep_0-'+''.join(str(p) for p in key)
    qtern_models[key] = lab.load_models_in_folder(_path, model_class,
                                              model_word)[0]
    print(f'key: {key} model: {qtern_models[key]}')
    
sep_56 = model_class.load('/kaggle/input/use-xgb-psep-56-'\
                +'imp-feats-acc075/use_xgb_sep_56_imp_feats_acc075')
use_xgb_general = model_class.load('/kaggle/input/use-xgb-general-'\
                                   +'from-combined-models/xgb_general_from_combined_models')

    
print('models loaded')

In [None]:
%%time
print('Evaluating the essays')
t0 = time.time()
essays = spark.createDataFrame(pd.read_csv(path_essays+'test.csv'))\
            .withColumn('batches', (10*F.rand()).cast(IntegerType()))\
            .cache()
# essays = df_test\
#                 .withColumn('score_5', F.when(F.col('score')==6,5)\
#                           .otherwise(F.col('score')))\
#                 .withColumnRenamed('score','original_score')\
#             .withColumn('batches', (10*F.rand()).cast(IntegerType()))

batches = essays.select('batches').distinct().toPandas()['batches'].tolist()
print(f'batches: {len(batches)}')
counter = 0

for batch in batches:
    
    tm = time.time()
    
    counter += 1
    
    df2evaluate = essays.filter(F.col('batches')==batch)
    
    df2evaluate = usePosModel.transform(df2evaluate)\
                   .withColumn('p_features', array_to_vector(F.col('sentence_embeddings').embeddings[0]))

    df2evaluate = lab.sentences_and_embeddings(df2evaluate)
    df2evaluate = lab.features_for_modeling(df2evaluate)
    columns_for_assembling = [ 'p_features', 'words_fraction',  
                'nouns_fraction', 'verbs_fraction', 'others_fraction', 
               'uniqwd_fraction', 'toksxsent_fraction','raw_length_fraction']
    df2evaluate = lab.vector_assembler(df2evaluate, columns_for_assembling, 'features')
    
    cols2keep = ['essay_id','features']
    for col in df2evaluate.columns:
        if col not in cols2keep:
            df2evaluate = df2evaluate.drop(col)
    print(f'   Deleting unnecessary columns. Entries: {df2evaluate.count()}')
    bincols = []
    for score in scores:
        model = binary_models[score]
        colname = 'pred_'+str(score)
        df2evaluate = model.transform(df2evaluate)\
                .withColumn(colname,F.col(model.getPredictionCol()).cast(DoubleType()))
        bincols.append(colname)
        cols2keep.append(colname)
        df2evaluate = df2evaluate.select(cols2keep)
    sepcols = []
    for key in separators_keys:
        model = sep_models[key]
        colname = 'pred_'+'_'.join(str(p) for p in key)
        df2evaluate = model.transform(df2evaluate)\
                .withColumn(colname,F.col(model.getPredictionCol()).cast(DoubleType()))\
                .withColumn(colname,F.when(F.col(colname)==1,key[1]).otherwise(key[0]).cast(DoubleType()))
        sepcols.append(colname)
        cols2keep.append(colname)
        df2evaluate = df2evaluate.select(cols2keep)
    
    df2evaluate = df2evaluate.checkpoint()
    
#     df2evaluate = df2evaluate.withColumn('weigthed_prediction', 
#                             lab.weigthed_predictions()(F.array(*sepcols))[0].cast(DoubleType()))
#     cols2keep.append('weigthed_prediction')
    for key in separators_keys:
        colname = 'pred_'+'_'.join(str(p) for p in key)
        df2evaluate = df2evaluate\
                    .withColumn(colname,F.when(F.col(colname)==key[1],1).otherwise(0).cast(DoubleType()))

    qterncols = []
    for key in separators_keys:
        model = qtern_models[key]
        colname = 'pred_0-'+''.join(str(p) for p in key)
        df2evaluate = model.transform(df2evaluate)\
                .withColumn(colname,F.col(model.getPredictionCol()).cast(DoubleType()))
        qterncols.append(colname)
        cols2keep.append(colname)
        df2evaluate = df2evaluate.select(cols2keep) 

    df2evaluate = df2evaluate.withColumn('features_2', array_to_vector(F.array(*(bincols+sepcols+qterncols))))
    
    df2evaluate = use_xgb_general.transform(df2evaluate)\
                    .withColumn('xgb_score', (F.col('xgb_score')+1).cast(IntegerType()))\
                    .withColumnRenamed('xgb_score', 'score')\
                    .select('essay_id', 'score')
        
#     df2evaluate = df2evaluate.withColumn('score', 
#                             lab.weigthed_predictions()(F.array(*sepcols))[0].cast(IntegerType()))\
#                             .select('essay_id', 'score')
    
#     df_56 = df2evaluate.filter(F.col('score')==5)
    
#     df2evaluate = df2evaluate.filter(F.col('score')!=5).select('essay_id', 'score')
    
#     if df_56.count() > 0:
#         df_56 = df_56.select(initial_columns)
#         df_56 = sep_56.transform(df_56)\
#                     .withColumn('score', F.col(sep_56.getPredictionCol()).cast(IntegerType()))\
#                     .withColumn('score', F.when(F.col('score')==1,6).otherwise(5))\
#                     .select('essay_id', 'score')
#         df2evaluate = df2evaluate.union(df_56)
                    
    if counter == 1:   
        df2evaluate.write.mode('overwrite').parquet(path_files+'submission.parquet')
    else:
        df2evaluate.write.mode('append').parquet(path_files+'submission.parquet')
    
    print(f'Evaluation of batch {counter}/{len(batches)} in '\
          +f'{round((time.time()-tm)/60,2)} minutes. '\
          +f'Entries: {df2evaluate.count()}. '\
          +f'Time elapsed: {round((time.time()-t0)/60,2)} minutes.')

In [None]:
%%time
evaluated_essays = spark.read.parquet(path_files+'submission.parquet')
evaluated_essays.toPandas().to_csv('submission.csv', index=False)

folders = lab.list_folders_in_folder('/kaggle/working/')
files = lab.list_files_in_folder('/kaggle/working/')

for fold in folders:
    shutil.rmtree(fold)

for file in files:
    if file != 'submission.csv':
        os.remove(file)