In [None]:
from pyspark.sql.functions import col, trim, when
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import SparkSession


In [None]:
spark = SparkSession.builder.appName("Regression").config("spark.driver.memory", "8g").config("spark.executor.memory", "16g").getOrCreate()

24/12/05 12:43:10 WARN Utils: Your hostname, chun-B650-AORUS-ELITE-AX resolves to a loopback address: 127.0.0.1; using 192.168.1.227 instead (on interface enp12s0)
24/12/05 12:43:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/05 12:43:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
join_df = spark.read.csv(
    "/home/chun/Downloads/cs-267/project/join_df.csv",
    header=True,       # Set to True if the CSV has a header
    inferSchema=True,  # Prevent automatic type inference if fields are irregular
    multiLine=True,     # Support multi-line fields
    quote='"',          # Handle quoted fields properly
    escape='"',         # Escape quotes inside quoted fields
    sep=","             # Specify comma as the delimiter
)

In [None]:
join_df.printSchema()

root
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- avg_cur_bal: integer (nullable = true)
 |-- Tot_cur_bal: integer (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- funded_amount: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- pymnt_plan: boolean (nullable = true)
 |-- type: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- description: string (nullable = true)



In [None]:
categorical_variables  = [col for col, dtype in join_df.dtypes if dtype == 'string']
categorical_variables

['emp_title',
 'emp_length',
 'home_ownership',
 'verification_status',
 'addr_state',
 'loan_status',
 'state',
 'term',
 'grade',
 'type',
 'purpose',
 'description']

In [None]:

from pyspark.sql import functions as F

# Find columns with only one distinct value
single_value_columns = []
for column in join_df.columns:
    distinct_count = join_df.select(column).distinct().count()
    if distinct_count == 1:
        single_value_columns.append(column)
        i = categorical_variables.index(column)
        del categorical_variables[i]

print(single_value_columns)
join_df = join_df.drop(*single_value_columns)


[]


In [None]:
for column in categorical_variables:
    join_df = join_df.withColumn(
        column,
        when(col(column).isNull(), "N/A").otherwise(col(column))
    )

In [None]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"-index", handleInvalid="skip") for column in categorical_variables]
encoder = OneHotEncoder(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]
)
assembler = VectorAssembler(
    inputCols=encoder.getOutputCols(),
    outputCol="categorical-features",
    handleInvalid="skip"
)
pipeline = Pipeline(stages=indexers + [encoder, assembler])
train_df, test_df = join_df.randomSplit([0.8, 0.2], seed=42)

In [None]:
pip = pipeline.fit(train_df)
train_df = pip.transform(train_df)
test_df = pip.transform(test_df)

[Stage 122:>                                                        (0 + 1) / 1]                                                                                

In [None]:
continuous_variables = [col for col, dtype in join_df.dtypes if dtype == 'double']
assembler = VectorAssembler(
    inputCols=['categorical-features', *continuous_variables],
    outputCol='features',
    handleInvalid="skip"
)
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

In [None]:
indexer = StringIndexer(inputCol='funded_amount', outputCol='label', handleInvalid="skip")
train_df = indexer.fit(train_df).transform(train_df)
test_df = indexer.fit(test_df).transform(test_df)



In [None]:
len(train_df.columns)

47

In [None]:
lr = LogisticRegression(featuresCol='features', labelCol='label')
model = lr.fit(train_df)
# model.load("dbfs:/lrmodel-1732953611.7421615/")

24/12/05 12:43:25 WARN DAGScheduler: Broadcasting large task binary with size 1548.3 KiB
24/12/05 12:43:26 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/05 12:43:26 WARN DAGScheduler: Broadcasting large task binary with size 1549.1 KiB
24/12/05 12:43:27 WARN DAGScheduler: Broadcasting large task binary with size 1549.1 KiB
24/12/05 12:43:28 WARN DAGScheduler: Broadcasting large task binary with size 1549.1 KiB
24/12/05 12:43:29 WARN DAGScheduler: Broadcasting large task binary with size 1549.1 KiB
24/12/05 12:43:29 WARN DAGScheduler: Broadcasting large task binary with size 1549.1 KiB
24/12/05 12:43:30 WARN DAGScheduler: Broadcasting large task binary with size 1549.1 KiB
24/12/05 12:43:30 WARN DAGScheduler: Broadcasting large task binary with size 1549.1 KiB
24/12/05 12:43:31 WARN DAGScheduler: Broadcasting large task binary with size 1549.1 KiB
24/12/05 12:43:32 WARN DAGScheduler: Broadcasting large task binary with size 1549.1 KiB
24/

In [None]:
# train_df.write.json("/data/train_df.json", mode="overwrite")
# test_df.write.json("/data/test_df.json",  mode="overwrite")
# import time
# model.save(f"/lrmodel-{time.time()}")



In [None]:
pred = model.transform(test_df)
pred.show()

24/12/05 12:45:20 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/12/05 12:45:20 WARN DAGScheduler: Broadcasting large task binary with size 34.0 MiB


+--------------------+----------+--------------+----------+-------------------+----------+-----------+-----------+-----------+-----------+-----+-------------+----------+--------+-----------+-----+----------+----------+------------------+--------------------+---------------+----------------+--------------------+-------------------------+----------------+-----------------+-----------+----------+-----------+----------+-------------+-----------------+-----------------------+------------------------+----------------------------+---------------------------------+------------------------+-------------------------+-------------------+------------------+-------------------+------------------+---------------------+-------------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|           emp_title|emp_length|home_ownership|annual_inc|verification_status|addr_state|avg_cur_bal|Tot_cur_bal|loan_status|loan_amount|state|funded_amount|

In [None]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(pred)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Evaluate Mean Absolute Error (MAE)
mae_evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(pred)
print(f"Mean Absolute Error (MAE): {mae}")

24/12/05 12:45:45 WARN DAGScheduler: Broadcasting large task binary with size 33.5 MiB


Root Mean Squared Error (RMSE): 124.54504181965842
Mean Absolute Error (MAE): 59.45116279069767


24/12/05 12:45:45 WARN DAGScheduler: Broadcasting large task binary with size 33.5 MiB
