In [2]:
# import logging
# import logging.config

# logging.config.fileConfig(fname='logging.conf', disable_existing_loggers=False)
# logger = logging.getLogger('pyspark')

In [1]:
import os
import sys

# starting spark session
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import pyspark
sc = pyspark.SparkContext.getOrCreate(pyspark.SparkConf().setMaster("local[*]"))
spark = pyspark.sql.SparkSession(sc)

In [2]:
# reading the file
staticSchema = spark.read\
        .option("inferSchema", "true")\
        .option("header","true")\
        .option("escape", "\"")\
        .option("multiLine","true")\
        .format("csv")\
        .load("input/train.csv")\
        .limit(1000)\
        .schema

In [34]:
# reading the file
df = spark.read\
        .schema(staticSchema)\
        .option("header","true")\
        .option("escape", "\"")\
        .option("multiLine","true")\
        .format("csv")\
        .load("input/train.csv")\
        .persist()

In [22]:
# repartition the data in case of the file becomes too large to handle
df = df.repartition(5 * 4 * 2) # (executors * cores * replicationFactor)

In [35]:
# extacting feature columns'
cols_numbering = [(col.split('_')[-2], col.split('_')[-1]) for col in df.columns if 'feature' in col]

In [36]:
from pyspark.sql.functions import stddev, mean, col
from pyspark.sql import DataFrame
def standardization(df: DataFrame, input_column: str, output_column: str) -> DataFrame:
    df1 = df\
        .select(mean(f"{input_column}").alias(f"mean_{input_column}"), stddev(f"{input_column}").alias(f"stddev_{input_column}"))\
        .crossJoin(df)\
        .withColumn(f"{output_column}" , (col("feature_type_1_0") - col(f"mean_{input_column}")) / col(f"stddev_{input_column}"))\
        .drop(*[f"mean_{input_column}",f"stddev_{input_column}"])
    return df1

In [32]:
from pyspark.sql.functions import udf, array, greatest, when, lit, expr
from pyspark.sql.types import IntegerType
def max_feature_value(df: DataFrame, columns: list) -> DataFrame:  
    df2 = df\
      .withColumn("max",greatest(*[col(x) for x in columns]))\
      .withColumn('max_feature_type_1_index', array(*[when(col(c) ==col('max'), lit(c)) for c in columns]))\
      .withColumn('max_feature_type_1_index', expr("filter(max_feature_type_1_index, x -> x is not null)"))\
      .withColumn('max_feature_type_1_index', col("max_feature_type_1_index")[0])
    return df2

In [37]:
for i,j in cols_numbering:
    input_column = "_".join(["feature_type",i,j])
    output_column = "_".join(["features_type",i,"stand",j])
    df = standardization(df, input_column, output_column)

In [33]:
input_columns = ["_".join(["feature_type",i,j]) for i,j in cols_numbering]
df = max_feature_value(df, columns = input_columns)

In [29]:
df.persist()\
    .write\
    .option("header",True)\
    .mode('overwrite')\
    .format("csv")\
    .save("output/something.csv")