### Get Spark session

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType, \
    StructType, StructField, ArrayType

def start_spark():
    spark_builder = SparkSession.builder.appName('test_task')
    spark_sess = spark_builder.getOrCreate()
    return spark_sess


TEST_PATH = '../../jooble/jooble_test_task/data/test.tsv'
TRAIN_PATH = '../../jooble/jooble_test_task/data/train.tsv'


spark = start_spark()
df_train = spark.read.csv(
    TRAIN_PATH, sep='\t', header=True, inferSchema=True)
df_test = spark.read.csv(
    TEST_PATH, sep='\t', header=True, inferSchema=True)

df_test.show()

+--------------------+--------------------+
|              id_job|            features|
+--------------------+--------------------+
|-9168029089769934451|2,9954,9999,9974,...|
|-9167993139315005259|2,9254,9999,9695,...|
|-9167993136660569470|2,9251,9999,9605,...|
|-9167993126042826314|2,9148,9998,9470,...|
|-9167914043308884846|2,8973,9753,9347,...|
|-9167902304941037655|2,9755,9999,9940,...|
|-9167765335697991344|2,9064,9998,9646,...|
|-9167720755345085880|2,9593,9999,9774,...|
|-9167649475391355941|2,8680,9998,9551,...|
|-9167624825207691929|2,9995,9999,9995,...|
|-9167624822553256140|2,9995,9999,9995,...|
|-9167624819898820351|2,9995,9999,9995,...|
|-9167624801317769828|2,9864,9999,9964,...|
|-9167574350382882656|2,8261,9998,9389,...|
|-9167531199124987909|2,9606,9999,9683,...|
|-9167531191161680542|2,9512,9998,9659,...|
|-9167465589378705354|2,9762,9992,9920,...|
|-9167456924931511488|2,9906,9999,9956,...|
|-9167191998703983048|2,9391,9999,9528,...|
|-9167147480828500349|2,9072,988

## Measure means and stds in `features_metrics` dataframe

In [29]:
def get_features_metrics(df):
    # this takes train df dataset to calculate mean and std
    select_columns = []
    agg_columns = []

    for features_column in df.columns[1:]:
        # this loop is needed for possible multiple feature cols
        features_arr = F.split(F.col(features_column), ',')

        features_params = (
            df.select(features_arr.getItem(0).alias('id'),
                      (F.size(features_arr) - 1).alias('len'))
              .take(1)[0]
        )
        select_columns.extend([
            features_arr
            .getItem(i + 1)
            .alias(f'features_{features_params.id}_{i}')
            for i in range(features_params.len)
        ])
        agg_columns.extend([
            F.mean(f'features_{features_params.id}_{i}')
             .alias(f'features_mean_{features_params.id}_{i}')
            for i in range(features_params.len)
        ] + [
            F.stddev(f'features_{features_params.id}_{i}')
             .alias(f'features_std_{features_params.id}_{i}')
            for i in range(features_params.len)
        ])

    features_metrics = df.select(
        'id_job',
        *select_columns
    ).agg(*agg_columns)
    # Normally you would want to save this on HDFS or S3 or checkpoint()
    # ... must have performance optimization for production size dataset okay ?
    # features_metrics.checkpoint()

    return features_metrics


features_metrics = get_features_metrics(df_train)
features_metrics

DataFrame[features_mean_2_0: double, features_mean_2_1: double, features_mean_2_2: double, features_mean_2_3: double, features_mean_2_4: double, features_mean_2_5: double, features_mean_2_6: double, features_mean_2_7: double, features_mean_2_8: double, features_mean_2_9: double, features_mean_2_10: double, features_mean_2_11: double, features_mean_2_12: double, features_mean_2_13: double, features_mean_2_14: double, features_mean_2_15: double, features_mean_2_16: double, features_mean_2_17: double, features_mean_2_18: double, features_mean_2_19: double, features_mean_2_20: double, features_mean_2_21: double, features_mean_2_22: double, features_mean_2_23: double, features_mean_2_24: double, features_mean_2_25: double, features_mean_2_26: double, features_mean_2_27: double, features_mean_2_28: double, features_mean_2_29: double, features_mean_2_30: double, features_mean_2_31: double, features_mean_2_32: double, features_mean_2_33: double, features_mean_2_34: double, features_mean_2_35: 

In [30]:
features_metrics[['features_mean_2_0', 'features_mean_2_1',
                  'features_std_2_0', 'features_std_2_1']].show()

+-----------------+-----------------+-----------------+-----------------+
|features_mean_2_0|features_mean_2_1| features_std_2_0| features_std_2_1|
+-----------------+-----------------+-----------------+-----------------+
|9528.702127659575|9867.720901126408|553.3024900510752|560.6386516245801|
+-----------------+-----------------+-----------------+-----------------+



## Join `features_metrics` with test df and calculate the rest

In [32]:
df = df_test

def _get_zscored_and_prepare_for_array_metrics(df, features_metrics):
    df_joined = df.crossJoin(
        features_metrics
    )

    select_columns = []
    for features_column in df.columns[1:]:
        # this loop is needed for possible multiple feature cols
        features_arr = F.split(F.col(features_column), ',')
        features_params = (
            df.select(features_arr.getItem(0).alias('id'),
                      (F.size(features_arr) - 1).alias('len'))
              .take(1)[0]
        )
        
        actual_features_arr = F.slice(features_arr,
                                      start=2, length=features_params.len) \
                               .cast(ArrayType(DoubleType()))

        # z-score normalized array
        zscore_normed = F.array(*[
            (features_arr.getItem(i + 1)
             - F.col(f'features_mean_{features_params.id}_{i}'))
            / F.col(f'features_std_{features_params.id}_{i}')
            for i in range(features_params.len)
        ]).alias(f'feature_{features_params.id}_stand')

        max_feature = F.array_max(actual_features_arr) \
                       .alias(f'feature_{features_params.id}_max')

        select_columns.extend([
            zscore_normed,
            max_feature,
            # collect means in array (need later)
            F.array(*[
                f'features_mean_{features_params.id}_{i}'
                for i in range(features_params.len)
            ]).alias(f'features_means_{features_params.id}'),
            actual_features_arr.alias(f'features_{features_params.id}')
        ])

    return df_joined.select(
        'id_job',
        *select_columns,
    )


# I admit its kind of ugly to do the transformation in 2 steps
# ... and py UDFs are less efficient than native SQL ...
# anyway this is less ugly than no-UDF approach
# since this is not a production env I choose beauty over efficiency

# UDF that calculates max_index of array and abs_mean_diff for max
def _udf_get_array_max_index_and_mean_diff(features, features_means):
    max_index = features.index(max(features))
    max_abs_mean_diff = abs(float(features[max_index])
                            - features_means[max_index])
    return {'max_index': max_index,
            'max_abs_mean_diff': max_abs_mean_diff}
udf_get_array_max_index_and_mean_diff = F.udf(
    lambda inp_cols: _udf_get_array_max_index_and_mean_diff(*inp_cols),
    StructType([
        StructField("max_index", IntegerType(), True),
        StructField("max_abs_mean_diff", DoubleType(), True),
    ])
)


def _get_array_metrics(df, df_test):
    select_columns = []
    for features_column in df_test.columns[1:]:
        # this loop is needed for possible multiple feature cols
        features_arr = F.split(F.col(features_column), ',')
        features_params = (
            df_test
                .select(features_arr.getItem(0).alias('id'),
                        (F.size(features_arr) - 1).alias('len'))
                .take(1)[0]
        )

        udf_res = udf_get_array_max_index_and_mean_diff(
            F.struct(f'features_{features_params.id}',
                     f'features_means_{features_params.id}'))
        select_columns.extend([
            udf_res['max_index']
            .alias(f'feature_{features_params.id}_index'),
            udf_res['max_abs_mean_diff']
            .alias(f'feature_{features_params.id}_abs_mean_diff')
        ])

    df = df.select(
        '*',
        *select_columns,
    )
    return df[[
        col for col in df.columns
        if 'features_' not in col
    ]]

df = _get_zscored_and_prepare_for_array_metrics(df_test, features_metrics)
df = _get_array_metrics(df, df_test).withColumn(
    # array to string of numbers
    'feature_2_stand',
    F.array_join('feature_2_stand', ',')
)

df.write.csv('output_spark.csv', sep='\t', mode='overwrite')

df.show()

+--------------------+--------------------+-------------+---------------+-----------------------+
|              id_job|     feature_2_stand|feature_2_max|feature_2_index|feature_2_abs_mean_diff|
+--------------------+--------------------+-------------+---------------+-----------------------+
|-9168029089769934451|0.768653458077092...|      10000.0|            161|     116.24780976220245|
|-9167993139315005259|-0.49647730237797...|      10000.0|            161|     116.24780976220245|
|-9167993136660569470|-0.50189929135135...|      10000.0|            161|     116.24780976220245|
|-9167993126042826314|-0.68805424610403...|      10000.0|            161|     116.24780976220245|
|-9167914043308884846|-1.00433693621779...|       9999.0|            203|       39.7571964956187|
|-9167902304941037655|0.408994856176294...|      10000.0|            161|     116.24780976220245|
|-9167765335697991344|-0.83986993735863...|      10000.0|            161|     116.24780976220245|
|-916772075534508588