In [1]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.regression import RandomForestRegressor, RandomForestRegressionModel
from pyspark.sql.types import IntegerType

In [2]:
def vector_assembler() -> VectorAssembler:
    features = ['age','sex_index','married_index','salary',
                'successfully_credit_completed','credit_completed_amount','active_credits','active_credits_amount']
    feature = VectorAssembler(inputCols=features, outputCol="features")
    return feature

In [3]:
def model_params(rf):
    return ParamGridBuilder() \
        .addGrid(rf.maxDepth, [2, 3, 4, 5]) \
        .addGrid(rf.maxBins, [2, 3, 4]) \
        .build()

In [4]:
def prepare_data(df: DataFrame, assembler) -> DataFrame:
    df=df.withColumn('married_index', df.married.cast(IntegerType()))
    
    # df=df.withColumn('sex_index', df.sex.cast(StringIndexer()))
    sex_index = StringIndexer(inputCol='sex', outputCol="sex_index")
    df = sex_index.fit(df).transform(df)
    
    df = assembler.transform(df)
    return df

In [5]:
def build_random_forest() -> RandomForestRegressor:
    rf = RandomForestRegressor(labelCol="credit_amount", featuresCol="features")
    return rf

In [6]:
def build_evaluator() -> RegressionEvaluator:
    evaluator = RegressionEvaluator(labelCol="credit_amount", predictionCol="prediction", metricName="rmse")
    return evaluator

In [7]:
def build_tvs(rand_forest, evaluator, model_params) -> TrainValidationSplit:
    tvs = TrainValidationSplit(estimator=rand_forest,
            estimatorParamMaps=model_params,
            evaluator=evaluator,
            trainRatio=0.8)
    return tvs

In [8]:
def train_model(train_df, test_df) -> (RandomForestRegressionModel, float):
    assembler = vector_assembler()
    train_pdf = prepare_data(train_df, assembler)
    test_pdf = prepare_data(test_df, assembler)
    rf = build_random_forest()
    evaluator = build_evaluator()
    tvs = build_tvs(rf, evaluator, model_params(rf))
    models = tvs.fit(train_pdf)
    best = models.bestModel
    predictions = best.transform(test_pdf)
    rmse = evaluator.evaluate(predictions)
    print(f"RMSE: {rmse}")
    print(f'Model maxDepth: {best._java_obj.getMaxDepth()}')
    print(f'Model maxBins: {best._java_obj.getMaxBins()}')
    return best, rmse

if __name__ == "__main__":
    spark = SparkSession.builder.appName('PySparkMLJob').getOrCreate()
    train_df = spark.read.parquet("train.parquet")
    test_df = spark.read.parquet("test.parquet")
    train_model(train_df, test_df)

In [9]:
#test
spark = SparkSession.builder.appName('PySparkMLJob').getOrCreate()
train_df = spark.read.parquet("train.parquet")
test_df = spark.read.parquet("test.parquet")
# train_df=train_df.withColumn("married_index", train_df.married.cast(IntegerType()))
# train_df=train_df.withColumn("sex_index", train_df.sex.cast(IntegerType()))
# test_df=test_df.withColumn("married_index", test_df.married.cast(IntegerType()))
# test_df=test_df.withColumn("sex_index", test_df.sex.cast(IntegerType()))

22/09/18 23:19:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/18 23:19:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

In [10]:
#test
assembler = vector_assembler()
train_pdf = prepare_data(train_df, assembler)
test_pdf = prepare_data(test_df, assembler)

                                                                                

In [14]:
#test
train_pdf.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- married: boolean (nullable = true)
 |-- salary: integer (nullable = true)
 |-- successfully_credit_completed: integer (nullable = true)
 |-- credit_completed_amount: integer (nullable = true)
 |-- active_credits: integer (nullable = true)
 |-- active_credits_amount: integer (nullable = true)
 |-- credit_amount: integer (nullable = true)
 |-- married_index: integer (nullable = true)
 |-- sex_index: double (nullable = false)
 |-- features: vector (nullable = true)



In [15]:
rf = build_random_forest()
evaluator = build_evaluator()
tvs = build_tvs(rf, evaluator, model_params(rf))

# accuracy = evaluator.evaluate(predictions)
# print(f"Accuracy: {accuracy}")
# print(f'Model maxDepth: {best._java_obj.getMaxDepth()}')
# print(f'Model maxBins: {best._java_obj.getMaxBins()}')
# return best, accuracy

In [17]:
# rf.fit(train_pdf)

In [18]:
models = tvs.fit(train_pdf)

                                                                                

In [19]:
best = models.bestModel

In [20]:
predictions = best.transform(test_pdf)

In [23]:
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")
print(f'Model maxDepth: {best._java_obj.getMaxDepth()}')
print(f'Model maxBins: {best._java_obj.getMaxBins()}')

RMSE: 517555.32255035394
Model maxDepth: 5
Model maxBins: 4


                                                                                