# Source Code Analysis

## Initialization

In [None]:
import os
import sys

def add_path(path):
    if path not in sys.path:
        sys.path.insert(0, path)
        sys.path.append(path)
add_path('/home/jjian03/anaconda3/lib/python3.7/site-packages')
add_path(f'{os.path.abspath(os.path.join("."))}/lib')


### Load Data

In [None]:
from lib.Repository import *
from lib.Utility import *
from lib.modeling import *
from lib.preprocessing import *
from lib.preprocessing.HTMLParser import html_parser
from lib.viz import *

### Spark environment setup

In [None]:
def load_dataset(spark, path, name):
    return spark.read.parquet(path).registerTempTable(name)

def shape(df):
    print((df.count(), len(df.columns)))

try:
    print(spark.version)
except NameError as e:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder. \
                config('spark.app.name', 'Tobit regression'). \
                config('spark.dynamicAllocation.enabled', 'true'). \
                config('spark.dynamicAllocation.maxExecutors', '50'). \
                config('spark.dynamicAllocation.executorIdleTimeout', '30s'). \
                config('spark.driver.maxResultSize', '8g'). \
                config('spark.driver.memory', '50g'). \
                config('spark.executor.memory', '10g'). \
                config('spark.task.maxFailures', '3'). \
                config('spark.yarn.am.memory', '50g'). \
                config('spark.yarn.max.executor.failures', '3'). \
                config('spark.kryoserializer.buffer.max', '1024m'). \
                config('spark.yarn.executor.memoryOverhead', '50g'). \
                config('spark.executorEnv.PYTHON_EGG_CACHE', '/home/jjian03/cache'). \
                getOrCreate()
    sc = spark.sparkContext
    spark_sql = SQLContext(sc)
    print(spark.version)

    load_dataset(spark, '/user/jjian03/WebResourceQuality.parquet', 'web_resource_quality')
    load_dataset(spark, '/user/jjian03/WebResourceQuality_pmid.parquet', 'web_resource_quality_pmid')
    load_dataset(spark, '/datasets/MAG_20200403/MAG_Azure_Parquet/mag_parquet/Papers.parquet', 'Paper')
    load_dataset(spark, '/user/lliang06/icon/MAG_publication_features.parquet', 'mag')


### Fetch the raw data

In [None]:
fract = 0.003

raw_data = spark_sql.sql(f'''
        SELECT wr.id
            , wr.url
            , wr.actual_scrape_url
            , wr.first_appear
            , wr.first_available_timestamp
            , wr.last_available_timestamp
            , wr.header
            , wr.html_text
            , wr.comment
            , wr.from_waybackmachine
            , wr.http_status_code
            , wr.original_check_failure
            , wr.original_check_error_log
            , wr.terminate_reason
            , wr.terminate_reason_error_log

            , m.paperId
            , m.total_num_of_paper_citing
            , m.total_num_of_author_citing
            , m.total_num_of_affiliation_citing
            , m.total_num_of_journal_citing
            , m.total_num_of_author_self_citation
            , m.total_num_of_affiliation_self_citation
            , m.total_num_of_journal_self_citation
            , m.avg_year
            , m.min_year
            , m.max_year
            , m.median
            , m.num_of_author
            , m.num_of_author_citing
            , m.num_of_affiliation_citing
            , m.num_of_journal_citing
            , m.avg_hindex
            , m.first_author_hindex
            , m.last_author_hindex
            , m.avg_mid_author_hindex
            , m.paper_unique_affiliation

        FROM web_resource_quality wr
        JOIN web_resource_quality_pmid wr_doi ON wr.id = wr_doi.id
        JOIN Paper p ON wr_doi.doi = p.doi
        JOIN mag m ON p.paperId = m.paperId
        WHERE wr.label IS NOT NULL
        AND wr.label IN ('0', '1')
        AND isNaN(wr.label) = false
        AND wr.first_appear IS NOT NULL
        AND isNaN(wr.first_appear) = false
        AND lower(wr.url) NOT LIKE "%doi.org%"
    ''') \
    .orderBy(fn.rand(seed=seed)) \
    .sample(False, fract, seed)


In [None]:
import time
import datetime
start_time = time.time()

# raw_data.printSchema()

print(f'raw_data: {shape(raw_data)}')

t = str(datetime.timedelta(seconds=time.time() - start_time)).split(':')
print("--- %s minutes, %.2f seconds ---" % (t[1], float(t[2])))

### Customized Tobit Regression

In [None]:
import math

from multiprocessing.pool import ThreadPool

from pyspark import keyword_only
from pyspark.ml import Estimator, Model
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.regression import Params, HasRegParam, HasElasticNetParam, HasMaxIter, Param, \
    TypeConverters, HasInputCol, HasRawPredictionCol, HasPredictionCol
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.sql.functions import rand
import numpy as np
import copy


class HasLogSigma(Params):
    """
    Mixin for param log sigma: log sigma names.
    """

    logSigma = Param(Params._dummy(), "logSigma", "log sigma names.", typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasLogSigma, self).__init__()

    def setLogSigma(self, value):
        return self._set(logSigma=value)

    def getLogSigma(self):
        """
        Gets the value of log sigma or its default value.
        """
        return self.getOrDefault(self.logSigma)


class HasLearningRate(Params):
    """
    Mixin for param learningRate: learning rate of the model.
    """

    learningRate = Param(Params._dummy(), "learningRate", "Learning Rate of the model.", typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasLearningRate, self).__init__()

    def setLearningRate(self, value):
        return self._set(learningRate=value)

    def getLearningRate(self):
        """
        Gets the value of learningRate or its default value.
        """
        return self.getOrDefault(self.learningRate)


class HasCoefficients(Params):
    """
    Mixin for param coefficients: coefficients of the model.
    """

    coefficients = Param(Params._dummy(), "coefficients", "Coefficients of the model.", typeConverter=TypeConverters.toList)

    def __init__(self):
        super(HasCoefficients, self).__init__()

    def setCoefficients(self, value):
        return self._set(coefficients=value)

    def getCoefficients(self):
        """
        Gets the value of coefficients or its default value.
        """
        return self.getOrDefault(self.coefficients)


class _LassoTobitParameter(HasCoefficients, HasLogSigma, HasRegParam, HasElasticNetParam, HasMaxIter):

    leftCensorPoint = Param(Params._dummy(), "leftCensorPoint",
                   "Censored threshold on the left hand side",
                   typeConverter=TypeConverters.toFloat)
    rightCensorPoint = Param(Params._dummy(), "rightCensorPoint",
                   "Censored threshold on the right hand side",
                   typeConverter=TypeConverters.toFloat)

    def setLeftCensorPoint(self, value):
        return self._set(leftCensorPoint=value)

    def getLeftCensorPoint(self):
        """
        Gets the value of :py:attr:`leftCensorPoint` or its default value.
        """
        return self.getOrDefault(self.leftCensorPoint)

    def setRightCensorPoint(self, value):
        return self._set(rightCensorPoint=value)

    def getRightCensorPoint(self):
        """
        Gets the value of :py:attr:`rightCensorPoint` or its default value.
        """
        return self.getOrDefault(self.rightCensorPoint)


def _parallelFitTasks(est, train, eva, validation, epm):
    """
    Creates a list of callables which can be called from different threads to fit and evaluate
    an estimator in parallel. Each callable returns an `(index, metric)` pair.

    :param est: Estimator, the estimator to be fit.
    :param train: DataFrame, training data set, used for fitting.
    :param eva: Evaluator, used to compute `metric`
    :param validation: DataFrame, validation data set, used for evaluation.
    :param epm: Sequence of ParamMap, params maps to be used during fitting & evaluation.
    :return: (int, float), an index into `epm` and the associated metric value.
    """
    modelIter = est.fitMultiple(train, epm)

    def singleTask():
        index, model = next(modelIter)
        eva_copy = copy.copy(eva)
        eva_copy.model = model.coefficients
        metric = eva.evaluate(model.transform(validation, epm[index]))
        return index, metric

    return [singleTask] * len(epm)



class TobitCrossValidator(CrossValidator):
    """
    Avoid mulitple calculation on coefficient.
    """

    @keyword_only
    def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,
                 seed=None, parallelism=1, collectSubModels=False):
        """
        __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
                 seed=None, parallelism=1, collectSubModels=False)
        """
        super(TobitCrossValidator, self).__init__()
        self._setDefault(numFolds=3, parallelism=1)
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)
        eva = self.getOrDefault(self.evaluator)
        nFolds = self.getOrDefault(self.numFolds)
        seed = self.getOrDefault(self.seed)
        h = 1.0 / nFolds
        randCol = self.uid + "_rand"
        df = dataset.select("*", rand(seed).alias(randCol))
        metrics = [0.0] * numModels

        pool = ThreadPool(processes=min(self.getParallelism(), numModels))

        for i in range(nFolds):
            validateLB = i * h
            validateUB = (i + 1) * h
            condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
            validation = df.filter(condition).cache()
            train = df.filter(~condition).cache()

            tasks = _parallelFitTasks(est, train, eva, validation, epm)
            for j, metric in pool.imap_unordered(lambda f: f(), tasks):
                metrics[j] += (metric / nFolds)
            validation.unpersist()
            train.unpersist()

        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)
        bestModel = est.fit(dataset, epm[bestIndex])
        return self._copyValues(CrossValidatorModel(bestModel, metrics))


In [None]:
class LassoTobitRegression(Estimator, HasInputCol, HasRawPredictionCol, HasPredictionCol,
                           HasLearningRate, _LassoTobitParameter,
                           DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, rawPredictionCol=None, predictionCol=None,
                 coefficients: list=None, logSigma: float=None,
                 regParam: float=None, elasticNetParam: float=None, maxIter: int=None,
                 leftCensorPoint: float=None, rightCensorPoint: float=None,
                 learningRate: float = None,
                 ):
        super(LassoTobitRegression, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, rawPredictionCol=None, predictionCol=None,
                  coefficients: list=None, logSigma: float=None,
                  regParam: float = None, elasticNetParam: float = None, maxIter: int = None,
                  leftCensorPoint: float = None, rightCensorPoint: float = None,
                  learningRate: float = None,
                  ):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _fit(self, dataset):
        X = self.getInputCol()
        y = self.getRawPredictionCol()
        lbd = self.getRegParam()
        alpha = self.getElasticNetParam()
        maxIter = self.getMaxIter()
        left = self.getLeftCensorPoint()
        right = self.getRightCensorPoint()
        learningRate = self.getLearningRate()

        # initialize with OLS
        coef = self.getCoefficients()
        if len(coef) != len(dataset.schema.names):
            print('Column does not match!')
            print(*coef, sep='\n')
            print('------')
            print(*dataset.schema.names, sep='\n')
            raise AssertionError('Column does not match!')
        logSigma = self.getLogSigma()

        gradient = 
        print('训练')
        coefficients = None
        
        return LassoTobitRegressionModel(
            inputCol=c, predictionCol=self.getPredictionCol(),
            regParam=self.getRegParam(),
            elasticNetParam=self.getElasticNetParam(),
            maxIter=self.getMaxIter(),
            leftCensorPoint=self.getLeftCensorPoint(),
            rightCensorPoint=self.getRightCensorPoint(),
            coefficients=coefficients,
            logSigma=self.getLogSigma(),
        )


class LassoTobitRegressionModel(Model,
                                HasInputCol, HasRawPredictionCol, HasPredictionCol,
                                _LassoTobitParameter,
                                DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None,
                 coefficients: list=None, logSigma: float=None,
                 regParam=None, elasticNetParam=None, maxIter=None,
                 leftCensorPoint=None, rightCensorPoint=None,
                 ):
        super(LassoTobitRegressionModel, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None,
                  regParam=None, elasticNetParam=None, maxIter=None,
                  leftCensorPoint=None, rightCensorPoint=None,
                  coefficients=None,
                  ):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        x = self.getInputCol()
        y = self.getPredictionCol()
        coefficients = self.getCoefficients()

        # Predict
        # return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)
        return dataset.withColumn('Clever', fn.col('x'))


# Test
from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType


class LassoTobitEvaluator(Evaluator, HasInputCol, HasRawPredictionCol, HasPredictionCol):

    def __init__(self, inputCol='features', predictionCol="prediction", rawPredictionCol="rawPredictionCol",
                 model: Model=None,
                 ):
        self.inputCol = inputCol
        self.predictionCol = predictionCol
        self.rawPredictionCol = rawPredictionCol
        self.model = model

    @staticmethod
    def _censored_udf(x):
        pass

    def _calculate_loglik():
        pass

    def isLargerBetter(self):
        return True

    @property
    def model(self):
        return self.model

    @model.setter
    def model(self, model):
        self.model = model

    def _evaluate(self, dataset):
        """
        Returns a random number.
        Implement here the true metric
        """
        # calculate loglik
        ll = self.model.getLeftCensorPoint()
        rl = self.model.getRightCensorPoint()
        logSigma = self.model.getLogSigma()
        label = dataset.select(udf(LassoTobitEvaluator._censored_udf(self.getPredictionCol()), IntegerType()))
        X = dataset.withColumn('intercept', fn.lit(1)) \
            .select([fn.col(col_name) for col_name in ['intercept', *self.getInputCol()]])
        y = dataset.select(fn.col(self.getRawPredictionCol()))
        coef = model.getCoefficients()
        
        xb = X@coef
        
        X = X.select(fn.col(col_names[len(col_names)-1]))
        col_names = dataset.schema.names
        label_udf = udf(lambda x: ,FloatType())
          uncensored <- sum(log(dnorm(((Y[which(I>ll & I <ul)] - xb[which(I>ll & I <ul)])/ sigma), mean = 0, sd = 1)) - log(sigma))

          # The alive resources are considered to be censored. They are labeled as 1
          # Only right censored term applied for our case
          ll_censored <- sum(log(1-pnorm((xb[which(I<=ll)]) / sigma, mean = 0, sd = 1)))
          ul_censored <- sum(log(1-pnorm((ul - xb[which(I>=ul)]) / sigma, mean = 0, sd = 1)))
        return ll_censored + uncensored + ul_censored

In [None]:



    

df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])

lasso_tobit_regressor = LassoTobitRegression() \
    .setInputCol("x") \
    .setLeftCensorPoint(0) \
    .setRightCensorPoint(100) \
    .setRegParam(1) \
    .setMaxIter(100) \
    .setElasticNetParam(1)
pipe  = Pipeline(stages=[lasso_tobit_regressor])

paramGrid = ParamGridBuilder() \
    .addGrid(lasso_tobit_regressor.leftCensorPoint, [0]) \
    .addGrid(lasso_tobit_regressor.rightCensorPoint, [100]) \
    .addGrid(lasso_tobit_regressor.regParam, [1]) \
    .addGrid(lasso_tobit_regressor.maxIter, [100]) \
    .addGrid(lasso_tobit_regressor.elasticNetParam, [1]) \
    .build()

# evaluator = LassoTobitEvaluator(labelCol='price')
evaluator = LassoTobitEvaluator()

# import inspect

# print(inspect.getsource(CrossValidator()._fit))
crossval = TobitCrossValidator() \
    .setEstimator(pipe) \
    .setEstimatorParamMaps(paramGrid) \
    .setEvaluator(evaluator) \
    .setNumFolds(2)
# crossval = TobitCrossValidator(estimator=pipe,
#                           estimatorParamMaps=paramGrid,
#                           evaluator=evaluator,
#                           numFolds=2)
cvModel = crossval.fit(df)
bestModel = cvModel.bestModel
preds = bestModel.transform(df)

preds.show()

# cvModel.transform(df).show()

In [None]:
sc.addPyFile('/home/jjian03/lib/pymongo-3.10.1-py2.7-macosx-10.14-intel.egg')


In [None]:
from pyspark.sql import types as t
from bson import ObjectId
import bson


df = sc.parallelize([('5ecd87e7150a1889d703ea37', 2.0), ('5ecd87e7150a1889d703ea37', 3.0)]).toDF(["id", "x"])
def _extract_year_udf(oid_str):
    def _get_year_from_id(oid_str):
        return type(oid_str)
#         return ObjectId(oid_str).generation_time.year
    return ObjectId(oid_str).generation_time.year

    return oid_str
df.withColumn('y', fn.udf(_extract_year_udf, t.StringType())('id')).toPandas()


In [None]:
from pyspark.ml import Pipeline, Transformer, Estimator
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import types as t
from bson import ObjectId


def _extract_year_udf(oid_str):
    def _get_year_from_id(oid_str):
        return ObjectId(oid_str).generation_time.year

    return oid_str

@singleton
class LabelBuilder(Transformer):
    def __init__(self):
        self._extract_year_udf = fn.udf(_extract_year_udf, t.IntegerType())('id')

    @staticmethod
    def _get_year_from_id(oid_str):
        return ObjectId(oid_str).generation_time.year

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('first_appear_from_id', self._extract_year_udf) \
            .withColumn('first_appear', fn.coalesce(fn.col('first_appear'), self._extract_year_udf))
#         df
#         first_appear = df.select(fn.col('first_appear')) \
#             .withColumn('first_appear_id', coalesce('age', 'best_guess_age')).show()
        
#         result = x
#         first_appear = result.first_appear.fillna(self._extract_year(result.id.apply(ObjectId)))
#         last_appear = result.last_available_timestamp \
#             .apply(self._convert_timestamp_to_coef) \
#             .fillna(self._extract_year(result.id.apply(ObjectId))) \
#             .astype(int)
#         result.loc[:, 'label'] = last_appear - first_appear
#         result = result[result.label.apply(lambda _x: not math.isnan(_x))]
#         result = result[result.label >= 0]

        return df
    
#     @staticmethod
#     def _
#     def bool_map(x):
#       if x in self._bool_dict.keys():
#         return self._bool_dict[x]
#       return x
#     self._bool_encode_udf = fn.udf(bool_map, t.IntegerType())

pipe  = Pipeline(stages=[
    LabelBuilder(),

])

pipe.fit(raw_data).transform(raw_data).limit(10).toPandas().loc[:,'id']

In [None]:
from urllib.parse import urlparse


@singleton
class URLParser(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('url_parse_obj', fn.udf(lambda x: urlparse(x), t.UserDefinedType())('url')) \
            .withColumn('scheme', fn.udf(lambda x: x.scheme, t.StringType())('url_parse_obj')) \ 
            .withColumn('netloc', fn.udf(lambda x: x.netloc, t.StringType())('url_parse_obj')) \ 
            .withColumn('path', fn.udf(lambda x: x.path, t.StringType())('url_parse_obj')) \ 
            .withColumn('params', fn.udf(lambda x: None if '' == x.params.strip() else x.params, t.StringType())('url_parse_obj')) \ 
            .drop('url_parse_obj')

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
])

In [None]:
@singleton
class URLLengthCounter(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('url_length', fn.udf(lambda x: len(x), t.IntegerType())('url'))

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
])

In [None]:
@singleton
class URLDepthCounter(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('url_depth', fn.udf(URLDepthCounter._get_depth, t.IntegerType())('path'))

    @staticmethod
    def _get_depth(self, path):
        last_idx = path.rindex('/')
        if last_idx + 1 < len(path):
            last_idx = len(path)
        return path[:last_idx].count('/')

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
])

In [None]:
@singleton
class HasWWWConverter(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('has_www', fn.udf(HasWWWConverter._has_www, t.BooleanType())('netloc'))

    @staticmethod
    def _has_www(domain):
        return int(domain.startswith('www.'))

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
])


In [None]:
@singleton
class SubdomainLevelCounter(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('subdomain_level', fn.udf(SubdomainLevelCounter._get_level, t.IntegerType())('netloc'))

    @staticmethod
    def _get_level(self, domain):
        return domain.count('.')

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
])


In [None]:
@singleton
class RequestParameterCounter(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('params', fn.udf(RequestParameterCounter._default_blank_str, t.IntegerType())('params')) \
            .withColumn('param_cnt', fn.udf(RequestParameterCounter._count_param, t.IntegerType())('params'))

    @staticmethod
    def _default_blank_str(self, params):
        if np.nan == x:
            return ''
        return x.strip()

    @staticmethod
    def _count_param(self, params):
        if params is '':
            return 0
        return params.count('&') + 1

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
])


In [None]:
from pyspark.ml.feature import StringIndexer


@singleton
class DomainSuffixBuilder(Estimator, Transformer, Model):

    def __init__(self):
        self._stringIndexerModel = None

    def _fit(self, dataset):
        dataset = DomainSuffixBuilder.build_suffix_port_feature(dataset)
        
        self._stringIndexerModel = StringIndexer(
                inputCol="suffix", outputCol="suffix_idx", 
                handleInvalid="error", stringOrderType="frequencyDesc") \
            .fit(dataset)

        return self

    @staticmethod
    def build_suffix_port_feature(dataset):
        return dataset.withColumn('suffix', fn.udf(DomainSuffixBuilder._get_url_suffix, t.StringType())('netloc')) \
            .withColumn('is_port_access', fn.udf(DomainSuffixBuilder._is_port_access, t.BooleanType())('suffix')) \
            .withColumn('suffix', fn.udf(DomainSuffixBuilder._clean_url_suffix, t.StringType())('suffix')) \
            .dropna()

    @staticmethod
    def _get_url_suffix(url):
        if not '.' in url:
            return None
        last_idx = url.rindex('.')
        return url[last_idx + 1:]

    @staticmethod
    def _is_port_access(suffix):
        if None is suffix:
            return None
        return int(len([token for token in suffix.split(':') if token.strip() != '']) > 1)

    @staticmethod
    def _clean_url_suffix(url):
        if None is url:
            return None
        return url.split(':')[0]

    def _transform(self, df: DataFrame) -> DataFrame:
        df = DomainSuffixBuilder.build_suffix_port_feature(dataset)
        return self._stringIndexerModel.transform(df).na.drop(subset=['is_port_access', 'suffix', 'suffix_idx'])

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
])


In [None]:
@singleton
class IncorrectDomainUrlCleaner(Transformer):
    """
    Remove the Incorrect Domains
    TLD ranges from 2 to 63

    Ref: https://en.wikipedia.org/wiki/Domain_Name_System#cite_ref-rfc1034_1-2
    """
    def __init__(self):
        self._regex = re.compile(r'^[a-zA-Z]{2,63}$', re.I)

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('is_correct', fn.udf(self._is_correct, t.BooleanType())('suffix')) \
            .filter(fn.col('is_correct') == True) \
            .drop('is_correct')

    def _is_correct(self, domain_suffix):
        return True if self._regex.match(domain_suffix) else False

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
])


In [None]:
@singleton
class ColumnRenamer(Transformer):
    def __init__(self, mapping):
        self._mapping = mapping

    def _transform(self, df: DataFrame) -> DataFrame:
        existing_mapping = {old: new for old, new in self._mapping.items() if old in df.schema.names}
        for old, new in existing_mapping.items():
            df = df.withColumnRenamed(old, new)
        
        return df

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
])


In [None]:
@singleton
class BinaryNAEncoder(Transformer):
    def __init__(self, columns):
        self._columns = columns

    def _transform(self, df: DataFrame) -> DataFrame:
        existing_columns = [col_name for col_name in self._columns if col_name in df.schema.names]

        for col_name in existing_columns:
            df = df.withColumn(f'has_{col_name}', fn.udf(BinaryNAEncoder._encode, t.IntegerType())(col_name))

        return result

    @staticmethod
    def _encode(x):
        if x not in [np.nan, None]:
            return 1
        return 0

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
])


In [None]:
@singleton
class EmptyHTMLFilter(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.na.drop(subset=['html_text']).filter("html_text != ''")
    
pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
])


In [None]:
@singleton
class SourceCodeByteCounter(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('code_size', fn.udf(SourceCodeByteCounter._count_code_length, t.IntegerType())('html_text'))

    @staticmethod
    def _count_code_length(x):
        if x not in [np.nan, None]:
            return len(x)
        return 0

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
])


In [None]:
@singleton
class HTML5Justifier(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('is_html5', fn.udf(HTML5Justifier._is_html5, t.IntegerType())('html_text'))

    @staticmethod
    def _is_html5(x):
        if x not in [np.nan, None]:
            is_html5 = x.replace('\n', '') \
                .replace('\r', '') \
                .strip() \
                .lower() \
                .startswith('<!doctype html>')
            return 1 if is_html5 else 0
        return 0

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
])


In [None]:
@singleton
class BeautifulSoupParser(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('bs_obj', fn.udf(
            lambda html_doc: BeautifulSoupParser._safe_create_parser(html_doc), t.UserDefinedType()
        )('html_text'))

    @staticmethod
    def _safe_create_parser(html_doc):
        try:
            return BeautifulSoup(html_doc, 'html.parser')
        except:
            return BeautifulSoup('', 'html.parser')

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
])


In [None]:
@singleton
class SourceTitleLengthParser(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('title_length', fn.udf(SourceTitleLengthParser._get_title_length, t.IntegerType())('bs_obj'))

    @staticmethod
    def _get_title_length(soup):
        """
        Title Length
        :param soup:
        :return:
        """
        title = soup.title.string if soup.title else ''
        if not title:
            title = ''
        return len(title)

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
])


In [None]:
@singleton
class SourceInternalJSLibCounter(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('internal_js_cnt', fn.udf(SourceInternalJSLibCounter._count_internal_js_lib, t.IntegerType())('bs_obj'))

    @staticmethod
    def _count_internal_js_lib(soup):
        """
        No of internal JS files
        :param soup:
        :return:
        """
        sources = soup.findAll('script', {"src": True})
        return len([0 for source in sources if not source['src'].startswith('http')])

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
])


In [None]:
@singleton
class SourceExternalJSLibCounter(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('external_js_cnt', fn.udf(SourceExternalJSLibCounter._count_external_js_lib, t.IntegerType())('bs_obj'))

    @staticmethod
    def _count_external_js_lib(soup):
        """
        No of external JS files
        :param soup:
        :return:
        """
        sources = soup.findAll('script', {"src": True})
        return len([0 for source in sources if source['src'].startswith('http')])

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
    SourceExternalJSLibCounter(),
])


In [None]:
@singleton
class SourceCharsetParser(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('charset', fn.udf(SourceCharsetParser._get_charset, t.StringType())('bs_obj'))

    @staticmethod
    def _get_charset(soup):
        """
        Charset
        :param soup:
        :return:
        """
        sources = soup.findAll('meta', {"charset": True})
        if 0 == len(sources):
            return ''
        return sources[0]['charset'].lower().replace('\'', '').replace('"', '')

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
    SourceExternalJSLibCounter(),
    SourceCharsetParser(),
])


In [None]:
@singleton
class SourceIFrameChecker(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('has_iframe', fn.udf(SourceIFrameChecker._has_iframe, t.BooleanType())('bs_obj'))

    @staticmethod
    def _has_iframe(soup):
        """
        iFrame in Body
        :param soup:
        :return:
        """
        sources = soup.findAll('iframe')
        return int(0 == len(sources))

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
    SourceExternalJSLibCounter(),
    SourceCharsetParser(),
    SourceIFrameChecker(),
])


In [None]:
@singleton
class SourceHyperlinkCounter(Transformer):
    def __init__(self):
        pass

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.withColumn('has_iframe', fn.udf(SourceHyperlinkCounter._count_hyperlink, t.BooleanType())('bs_obj'))

    @staticmethod
    def _count_hyperlink(soup):
        """
        No of hyperlink
        :param soup:
        :return:
        """
        sources = soup.findAll('a')
        return len([1 for source in sources if source.has_attr('href') and source['href'].lower().startswith('http')])


pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
    SourceExternalJSLibCounter(),
    SourceCharsetParser(),
    SourceIFrameChecker(),
    SourceHyperlinkCounter(),
])


In [None]:
@singleton
class FeatureValueMapper(Transformer):
    def __init__(self, column_name, mapping):
        self._column_name = column_name
        self._mapping = mapping

    def _transform(self, df: DataFrame) -> DataFrame:
        return df.replace(to_replace=self._mapping, subset=[self._column_name])

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
    SourceExternalJSLibCounter(),
    SourceCharsetParser(),
    SourceIFrameChecker(),
    SourceHyperlinkCounter(),
    FeatureValueMapper('protocol_type', {
        'http': 1,
        'https':0,
    }),
])


In [None]:
@singleton
class NanToZeroConverter(Transformer):
    def __init__(self, columns):
        self._columns = columns

    def _transform(self, df: DataFrame) -> DataFrame:
        existing_columns = [col_name for col_name in self._columns if col_name in df.schema.names]

        return df.fillna(0, subset=existing_columns)
    
pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
    SourceExternalJSLibCounter(),
    SourceCharsetParser(),
    SourceIFrameChecker(),
    SourceHyperlinkCounter(),
    FeatureValueMapper('protocol_type', {
        'http': 1,
        'https':0,
    }),
    NanToZeroConverter([
        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation',
    ]),
])


In [None]:
@singleton
class FeaturePicker(Transformer):
    def __init__(self, features):
        self._features = features

    def _transform(self, df: DataFrame) -> DataFrame:
        existing_columns = [col_name for col_name in self._columns if col_name in df.schema.names]
        
        return df.select(*existing_columns)

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
    SourceExternalJSLibCounter(),
    SourceCharsetParser(),
    SourceIFrameChecker(),
    SourceHyperlinkCounter(),
    FeatureValueMapper('protocol_type', {
        'http': 1,
        'https':0,
    }),
    NanToZeroConverter([
        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation',
    ]),
    FeaturePicker([

        'protocol_type',
        'url_depth',
        'has_www',
        'subdomain_level',
        'param_cnt',
        'suffix_idx',
        'is_port_access',
        'code_size',
        'title_length',
        'internal_js_cnt',
        'external_js_cnt',
        'charset',
        'is_html5',
        'has_iframe',
        'hyperlink_cnt',
        'first_appear',

        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation',

        'label',
    ]),
])


In [None]:
@singleton
class DummySuffixDescritizer(Transformer):
    def __init__(self, features):
        self._features = features

    def _transform(self, df: DataFrame) -> DataFrame:
        categories = df.select("suffix").distinct().rdd.flatMap(lambda x: x).collect()

        exprs = [
            fn.when(F.col("suffix") == category, 1).otherwise(0).alias(category)
            for category in categories
        ]
        dummy_df = df.select(fn.col('id'), fn.('suffix')).select('id', *exprs)

        return df.join(dummy_df, 'id', 'inner').drop('suffix')

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
    SourceExternalJSLibCounter(),
    SourceCharsetParser(),
    SourceIFrameChecker(),
    SourceHyperlinkCounter(),
    FeatureValueMapper('protocol_type', {
        'http': 1,
        'https':0,
    }),
    NanToZeroConverter([
        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation',
    ]),
    FeaturePicker([

        'protocol_type',
        'url_depth',
        'has_www',
        'subdomain_level',
        'param_cnt',
        'suffix_idx',
        'is_port_access',
        'code_size',
        'title_length',
        'internal_js_cnt',
        'external_js_cnt',
        'charset',
        'is_html5',
        'has_iframe',
        'hyperlink_cnt',
        'first_appear',

        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation',

        'label',
    ]),
    DummySuffixDescritizer(['int', 'org', 'gov', 'in', 'eu', 'cn', 'kr', 'en']),
])


In [None]:
@singleton
class FeatureRemover(Transformer):
    def __init__(self, features):
        self._features = features

    def _transform(self, df: DataFrame) -> DataFrame:
        removed_features = [col_name for col_name in self._columns if col_name in df.schema.names]
        return df.drop(*removed_features)

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
    SourceExternalJSLibCounter(),
    SourceCharsetParser(),
    SourceIFrameChecker(),
    SourceHyperlinkCounter(),
    FeatureValueMapper('protocol_type', {
        'http': 1,
        'https':0,
    }),
    NanToZeroConverter([
        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation',
    ]),
    FeaturePicker([

        'protocol_type',
        'url_depth',
        'has_www',
        'subdomain_level',
        'param_cnt',
        'suffix_idx',
        'is_port_access',
        'code_size',
        'title_length',
        'internal_js_cnt',
        'external_js_cnt',
        'charset',
        'is_html5',
        'has_iframe',
        'hyperlink_cnt',
        'first_appear',

        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation',

        'label',
    ]),
    DummySuffixDescritizer(['int', 'org', 'gov', 'in', 'eu', 'cn', 'kr', 'en']),
    FeatureRemover([
        'is_port_access',
    ]),
])


In [None]:
pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
    SourceExternalJSLibCounter(),
    SourceCharsetParser(),
    SourceIFrameChecker(),
    SourceHyperlinkCounter(),
    FeatureValueMapper('protocol_type', {
        'http': 1,
        'https':0,
    }),
    NanToZeroConverter([
        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation',
    ]),
    FeaturePicker([

        'protocol_type',
        'url_depth',
        'has_www',
        'subdomain_level',
        'param_cnt',
        'suffix_idx',
        'is_port_access',
        'code_size',
        'title_length',
        'internal_js_cnt',
        'external_js_cnt',
        'charset',
        'is_html5',
        'has_iframe',
        'hyperlink_cnt',
        'first_appear',

        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation',

        'label',
    ]),
    DummySuffixDescritizer(['int', 'org', 'gov', 'in', 'eu', 'cn', 'kr', 'en']),
    FeatureRemover([
        'is_port_access',
    ]),
    StringIndexer(
                inputCol="charset", outputCol="charset", 
                handleInvalid="error", stringOrderType="frequencyDesc"),
])


In [None]:
@singleton
class CustomizedStandardizer(Transformer):
    def __init__(self, norm='l2'):
        self._pipe = Pipeline([
            ('standard_scaler', preprocessing.StandardScaler()),

        ])
        self._columns = None

    def _transform(self, df: DataFrame) -> DataFrame:
        
        
        df_unique = df.agg(*(fn.countDistinct(fn.col(col_name)).alias(col_name) for col_name in df.schema.names))
        
        
        return df

    
# class CustomizedStandardizer(BaseEstimator, TransformerMixin):
#     """
#     Add Sklearn Build-in Function
#     """
#     def __init__(self, norm='l2'):
#         self._pipe = Pipeline([
#             ('standard_scaler', preprocessing.StandardScaler()),

#         ])
#         self._columns = None

#     @property
#     def columns(self):
#         return self._columns

#     def fit(self,x,y=None):
#         return self

#     def transform(self,x,y=None):
#         result = x

#         df_unique = pd.DataFrame()
#         for col_name in result.drop('label', axis=1).columns:
#             df_unique[col_name] = [len(result[col_name].unique())]

#         df_unique.index = ['unique count']
#         df_unique = df_unique.T.squeeze()

#         binary_columns = df_unique[df_unique < 3].index.tolist()
#         numeric_columns = x.drop([*binary_columns, 'label'], axis=1).select_dtypes(include=np.number).columns.tolist()
#         other_columns = x.drop([*binary_columns, *numeric_columns, 'label'], axis=1).columns.tolist()
#         label = x.label.tolist()
#         label = np.array([label]).T

#         result = label
#         if len(binary_columns) > 0:
#             result = np.append(result, x[binary_columns], axis=1)
#         if len(numeric_columns) > 0:
#             numeric_result = self._pipe.fit_transform(x[numeric_columns])
#             result = np.append(result, numeric_result, axis=1)
#         if len(other_columns) > 0:
#             result = np.append(result, x[other_columns], axis=1)

#         result = pd.DataFrame(result, columns= ['label', *binary_columns, *numeric_columns, *other_columns])
#         self._columns = [*binary_columns, *numeric_columns, *other_columns, 'label']

# #         result.loc[:, 'label'] = x.label-1970
#         return result[self._columns]

pipe  = Pipeline(stages=[
    LabelBuilder(),
    URLParser(),
    URLLengthCounter(),
    URLDepthCounter(),
    HasWWWConverter(),
    SubdomainLevelCounter(),
    RequestParameterCounter(),
    DomainSuffixBuilder(),
    IncorrectDomainUrlCleaner(),
    ColumnRenamer({'scheme': 'protocol_type'}),
    BinaryNAEncoder(['content_type']),
    EmptyHTMLFilter(),
    SourceCodeByteCounter(),
    HTML5Justifier(),
    BeautifulSoupParser(),
    SourceTitleLengthParser(),
    SourceInternalJSLibCounter(),
    SourceExternalJSLibCounter(),
    SourceCharsetParser(),
    SourceIFrameChecker(),
    SourceHyperlinkCounter(),
    FeatureValueMapper('protocol_type', {
        'http': 1,
        'https':0,
    }),
    NanToZeroConverter([
        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation',
    ]),
    FeaturePicker([

        'protocol_type',
        'url_depth',
        'has_www',
        'subdomain_level',
        'param_cnt',
        'suffix_idx',
        'is_port_access',
        'code_size',
        'title_length',
        'internal_js_cnt',
        'external_js_cnt',
        'charset',
        'is_html5',
        'has_iframe',
        'hyperlink_cnt',
        'first_appear',

        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation',

        'label',
    ]),
    DummySuffixDescritizer(['int', 'org', 'gov', 'in', 'eu', 'cn', 'kr', 'en']),
    FeatureRemover([
        'is_port_access',
    ]),
    StringIndexer(
                inputCol="charset", outputCol="charset", 
                handleInvalid="error", stringOrderType="frequencyDesc"),
])


In [None]:
raw_data.limit(2000).toPandas().to_json('tmp_spark.json', orient='index')

In [None]:
raw_data = pd.read_json('tmp_spark.json', orient='index')

raw_data = spark.createDataFrame(raw_data)

shape(raw_data)

In [None]:
raw_data.toPandas()

### Train Test Split

#### 6th Edition - Combine suffix dummy with MAG

In [None]:
from feature_engine import categorical_encoders


pipe = Pipeline([
    ('label_builder', TobitLabelBuilder()),
    ('url_parser', URLParser()),
    ('url_length_counter', URLLengthCounter()),
    ('url_depth_counter', URLDepthCounter()),
    ('has_www_converter', HasWWWConverter()),
    ('subdomain_level_counter', SubdomainLevelCounter()),
    ('request_parameter_counter', RequestParameterCounter()),
    ('domain_suffix_builder', DomainSuffixBuilder()),
    ('incorrect_domain_url_cleaner', IncorrectDomainUrlCleaner()),
    ('column_renamer', ColumnRenamer({'scheme': 'protocol_type'})),
    ('binary_na_encoder', BinaryNAEncoder(['content_type'])),
    ('html_parser', html_parser),
    ('binary_feature_converter', FeatureValueMapper('protocol_type', {
                                        'http': 1,
                                        'https':0,
                                        })),

    ('nan_to_Zero_converter', NanToZeroConverter([
        'total_num_of_paper_citing',
        'total_num_of_author_citing',
        'total_num_of_affiliation_citing',
        'total_num_of_journal_citing',
        'total_num_of_author_self_citation',
        'total_num_of_affiliation_self_citation',
        'total_num_of_journal_self_citation',
        'avg_year',
        'min_year',
        'max_year',
        'median',
        'num_of_author',
        'num_of_author_citing',
        'num_of_affiliation_citing',
        'num_of_journal_citing',
        'avg_hindex',
        'first_author_hindex',
        'last_author_hindex',
        'avg_mid_author_hindex',
        'paper_unique_affiliation'
    ])),
    
    ('feature_picker', FeaturePicker([
                                        'protocol_type',
                                        'url_depth',
                                        'has_www',
                                        'subdomain_level',
                                        'param_cnt',
                                        'suffix_idx',
                                        'is_port_access',
                                        'code_size',
                                        'title_length',
                                        'internal_js_cnt',
                                        'external_js_cnt',
                                        'charset',
                                        'is_html5',
                                        'has_iframe',
                                        'hyperlink_cnt',
                                        'first_appear',

                                        'total_num_of_paper_citing',
                                        'total_num_of_author_citing',
                                        'total_num_of_affiliation_citing',
                                        'total_num_of_journal_citing',
                                        'total_num_of_author_self_citation',
                                        'total_num_of_affiliation_self_citation',
                                        'total_num_of_journal_self_citation',
                                        'avg_year',
                                        'min_year',
                                        'max_year',
                                        'median',
                                        'num_of_author',
                                        'num_of_author_citing',
                                        'num_of_affiliation_citing',
                                        'num_of_journal_citing',
                                        'avg_hindex',
                                        'first_author_hindex',
                                        'last_author_hindex',
                                        'avg_mid_author_hindex',
                                        'paper_unique_affiliation',

                                        'label',
                                       ])),
    ('dummy_suffix_descritizer', DummySuffixDescritizer()),

    ('feature_remover', FeatureRemover([
                                        'is_port_access',
                                       ])),
    ('frequency_indexer', categorical_encoders.CountFrequencyCategoricalEncoder(
        encoding_method='frequency',
        variables=['charset'])),
    ('standard_scaler', TobitCustomizedStandardizer(norm='l2')),

])

pipe.fit_transform(DataSource().raw_data).to_csv('untrunc_data_cleaned.csv')
