In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [0]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import DataFrame
import numpy as np


def correlation_matrix(df: DataFrame, feature_columns: list, output_column: str) -> np.ndarray:
    """
    generates the Pearson correlation coefficients matrix for feature ranking
    """

    vec_assembler = VectorAssembler(inputCols=feature_columns + [output_column], outputCol="all_columns")
    df_vectorized = vec_assembler.transform(df)
    cor_matrix = Correlation.corr(df_vectorized, "all_columns").collect()[0][0].toArray()
    return cor_matrix


def feature_ranker(df: DataFrame, feature_columns: list, output_column: str, must_include_features: list = None):
    """
    ranks features based on their correlation with the output and their inter-correlations
    """

    cor_matrix = abs(correlation_matrix(df, feature_columns, output_column))

    if must_include_features:
        must_include_feature_ids = [i for i in range(len(feature_columns)) \
                                    if feature_columns[i] in must_include_features]
        must_include_feature_scores = cor_matrix[must_include_feature_ids, -1]
        must_include_features_ids = [must_include_feature_ids[i] for i in np.argsort(must_include_feature_scores)[::-1]]
        ranked_feature_ids = must_include_feature_ids
        remaining_feature_ids = [i for i in range(len(feature_columns)) if i not in must_include_features_ids]
        remaining_feature_scores = []
        for feature_id in remaining_feature_ids:
            remaining_feature_scores.append(cor_matrix[feature_id, -1] - max([cor_matrix[feature_id, i] for i in
                                                                              ranked_feature_ids]))
    else:
        ranked_feature_ids = []
        remaining_feature_ids = range(len(feature_columns))
        remaining_feature_scores = cor_matrix[remaining_feature_ids, -1]

    while len(ranked_feature_ids) < len(feature_columns):
        best_feature_arg = np.argmax(remaining_feature_scores)
        best_feature_id = remaining_feature_ids[best_feature_arg]
        remaining_feature_ids = [remaining_feature_ids[i] for i in range(len(remaining_feature_ids)) if i !=
                                 best_feature_arg]
        ranked_feature_ids.append(best_feature_id)
        remaining_feature_scores = []
        for feature_id in remaining_feature_ids:
            remaining_feature_scores.append(cor_matrix[feature_id, -1] - max([cor_matrix[feature_id, i] for i in
                                                                              ranked_feature_ids]))
    ranked_features = [feature_columns[i] for i in ranked_feature_ids]
    return ranked_features

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import DataFrame
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler


def feature_selector(df: DataFrame, ranked_features: list, output_column: str, estimator_obj=RandomForestRegressor,
                     feature_inclusion_increments: int = 1, train_test_split_ratio: list = None, cv: int = -1,
                     evaluation_metric: str = 'r2'):
    """
    Trains the estimator at multiple steps, with features progressively added to the input list based on their ranks
    :param df: the input dataset with features and output as columns
    :param ranked_features: the output of the feature ranking algorithm or a manually selected ranking scheme
    :param output_column: the name of the output column in the dataset
    :param estimator_obj: the training model object
    :param train_test_split_ratio: the default for train_test_split_ratio is [0.66, 0.33]
    :param cv: if left as default (c = -1), changes nothing. If selected as a value > 1, it enforces cross validation
                and overrides the train-test-splitting
    :param feature_inclusion_increments:
    :param evaluation_metric: evaluation metric to return for predictions on test set - "rmse": root mean
            squared error - "mse": mean squared error - "r2" (default): coefficient of determination -
            "mae": mean absolute error
    """

    if train_test_split_ratio is None:
        train_test_split_ratio = [0.66, 0.33]

    feature_count_list = list(range(1, len(ranked_features), feature_inclusion_increments)) + [len(ranked_features)]

    estimator_features_col = 'features'
    while estimator_features_col in df.columns:
        estimator_features_col += '_'
    estimator_prediction_col = 'prediction'
    while estimator_prediction_col in df.columns:
        estimator_prediction_col += '_'
    estimator_obj.setFeaturesCol(estimator_features_col)
    estimator_obj.setPredictionCol(estimator_prediction_col)
    estimator_obj.setLabelCol(output_column)

    evaluator = RegressionEvaluator(
        labelCol=output_column, predictionCol=estimator_prediction_col, metricName=evaluation_metric)

    scores = []
    if cv <= 1:
        df_train, df_test = df.randomSplit(train_test_split_ratio)
        for feature_count in feature_count_list:
            input_features = ranked_features[0: feature_count]
            assembler = VectorAssembler(
                inputCols=input_features,
                outputCol=estimator_features_col)
            df_train = assembler.transform(df_train)
            fit_model = estimator_obj.fit(df_train)
            df_test = assembler.transform(df_test)
            df_test = fit_model.transform(df_test)
            score = evaluator.evaluate(df_test)
            scores.append((feature_count, score))
            df_train = df_train.drop(estimator_features_col)
            df_test = df_test.drop(estimator_features_col, estimator_prediction_col)
    else:
        for feature_count in feature_count_list:
            input_features = ranked_features[0: feature_count]
            assembler = VectorAssembler(
                inputCols=input_features,
                outputCol=estimator_features_col)
            df = assembler.transform(df)
            grid = ParamGridBuilder().addGrid(estimator_obj.featuresCol, [estimator_obj.getFeaturesCol()]).build()
            crossval = CrossValidator(estimator=estimator_obj,
                                      evaluator=evaluator,
                                      numFolds=cv,
                                      estimatorParamMaps=grid)
            fit_crossval = crossval.fit(df)
            scores.append((feature_count, fit_crossval.avgMetrics[0]))
            df = df.drop(estimator_features_col)

    return scores

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from sklearn.datasets import load_boston



from pyspark.sql import SparkSession


# %% Loading the Boston Dataset as a sample dataset and creating the spark dataframes
def test_boston_dataset(spark_session: SparkSession):
    boston = load_boston()
    feature_names = boston.feature_names.tolist()
    output_name = 'outcome'
    boston_columns = feature_names + [output_name]
    X = boston.data.tolist()
    y = boston.target.tolist()
    Xy = [(i + [j]) for (i, j) in zip(X, y)]
    boston_df = spark_session.createDataFrame(Xy, boston_columns)
    print(feature_names)
    must_include_features = []
    # must_include_features = ['TAX', 'INDUS']

    # %% Ranking features
    ranked_features = feature_ranker(df=boston_df,
                                     feature_columns=feature_names,
                                     output_column=output_name,
                                     must_include_features=must_include_features)
    print(ranked_features)
    # %% Feature selection
    scores = feature_selector(df=boston_df,
                              ranked_features=ranked_features,
                              output_column=output_name,
                              estimator_obj=RandomForestRegressor(),
                              feature_inclusion_increments=1,
                              train_test_split_ratio=[0.66, 0.33],
                              cv=-1,
                              evaluation_metric='r2')
    return scores

In [0]:
score = test_boston_dataset(spark_session=spark)

['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT']
['LSTAT', 'PTRATIO', 'RM', 'CHAS', 'B', 'ZN', 'CRIM', 'TAX', 'AGE', 'INDUS', 'NOX', 'DIS', 'RAD']


In [0]:
print(score)

[(1, 0.6321032889048015), (2, 0.7146604090385802), (3, 0.7821895502958525), (4, 0.7874075309489468), (5, 0.788983942181348), (6, 0.7911891834273723), (7, 0.835318890990556), (8, 0.8220140654821992), (9, 0.7945915217044365), (10, 0.8258071729108915), (11, 0.824370214198534), (12, 0.8293450908069684), (13, 0.8443275163505498)]


In [0]:
top_score = sorted(score, key=lambda item: item[1], reverse=True)[:7]

In [0]:
index=[]
for i in top_score:
    index.append(i[0])

print(index)

[13, 7, 12, 10, 11, 8, 9]


In [0]:
top_columns=[]
for i in index:
    top_columns.append(feature_names[i-1])
    
print(top_columns)

['LSTAT', 'AGE', 'B', 'TAX', 'PTRATIO', 'DIS', 'RAD']


In [0]:
boston = load_boston()
feature_names = boston.feature_names.tolist()
output_name = 'outcome'
boston_columns = feature_names + [output_name]
X = boston.data.tolist()
y = boston.target.tolist()
Xy = [(i + [j]) for (i, j) in zip(X, y)]
boston_df = spark.createDataFrame(Xy, boston_columns)

In [0]:
top_columns.append(output_name)
print(top_columns)

['LSTAT', 'AGE', 'B', 'TAX', 'PTRATIO', 'DIS', 'RAD', 'outcome']


In [0]:
boston_df = boston_df.selectExpr(top_columns)

In [0]:
boston_df.show(2)

+-----+----+-----+-----+-------+------+---+-------+
|LSTAT| AGE|    B|  TAX|PTRATIO|   DIS|RAD|outcome|
+-----+----+-----+-----+-------+------+---+-------+
| 4.98|65.2|396.9|296.0|   15.3|  4.09|1.0|   24.0|
| 9.14|78.9|396.9|242.0|   17.8|4.9671|2.0|   21.6|
+-----+----+-----+-----+-------+------+---+-------+
only showing top 2 rows



In [0]:
train, test = boston_df.randomSplit([0.7, 0.3], seed=7)

In [0]:
train.show(2)

+-----+----+------+-----+-------+------+---+-------+
|LSTAT| AGE|     B|  TAX|PTRATIO|   DIS|RAD|outcome|
+-----+----+------+-----+-------+------+---+-------+
| 1.98|15.8|395.62|252.0|   18.3|5.4011|3.0|   34.9|
| 2.94|45.8|394.63|222.0|   18.7|6.0622|3.0|   33.4|
+-----+----+------+-----+-------+------+---+-------+
only showing top 2 rows



In [0]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [
    x for (x, dataType) in train.dtypes if ((dataType == "double"))
]
print(numCols)
print(catCols)

['LSTAT', 'AGE', 'B', 'TAX', 'PTRATIO', 'DIS', 'RAD', 'outcome']
[]


In [0]:
from pyspark.ml.feature import VectorAssembler
assemblerInput = [x for x in numCols]
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

In [0]:
stages = [vector_assembler]

In [0]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)
pp_df = model.transform(test)

CPU times: user 5.09 ms, sys: 1.25 ms, total: 6.34 ms
Wall time: 191 ms


In [0]:
pp_df

Out[108]: DataFrame[LSTAT: double, AGE: double, B: double, TAX: double, PTRATIO: double, DIS: double, RAD: double, outcome: double, VectorAssembler_features: vector]

In [0]:
from pyspark.ml.regression import LinearRegression

data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("outcome").alias("label"),
)

In [0]:
data.show(5, truncate=False)

+---------------------------------------------+-----+
|features                                     |label|
+---------------------------------------------+-----+
|[3.95,40.5,392.9,256.0,15.1,8.3248,5.0,31.6] |31.6 |
|[4.81,21.9,395.93,226.0,17.9,8.6966,5.0,35.4]|35.4 |
|[4.98,65.2,396.9,296.0,15.3,4.09,1.0,24.0]   |24.0 |
|[5.28,21.1,396.9,243.0,16.8,6.8147,4.0,25.0] |25.0 |
|[5.33,54.2,396.9,222.0,18.7,6.0622,3.0,36.2] |36.2 |
+---------------------------------------------+-----+
only showing top 5 rows



In [0]:
%%time
model = LinearRegression().fit(data)

CPU times: user 13.8 ms, sys: 5.84 ms, total: 19.6 ms
Wall time: 931 ms


In [0]:
print(model.summary.meanAbsoluteError)

8.071321389024888e-15


In [0]:
print(model.summary.rootMeanSquaredError)

1.0704760961584674e-14
