In [1]:
import findspark
findspark.init()

In [27]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.types import IntegerType, DoubleType

In [4]:
spark = SparkSession.builder.appName("train-test").getOrCreate()

In [10]:
df = spark.read.load("parquet/*.parquet", sep = ",", inferSchma = "True")

In [13]:
df.show(5)


+------------+-------+--------------------+-------------+-------+---------+----------------+------------------+---+----+--------------+---+---------------------+--------------------+------------+-------------+----------+----------------+--------------+---+--------------------+------------+------------------------+-------------------+------------------+-----------------+----------------+------------------+
|          id|channel|              seller|interest_rate|balance|loan_term|origination_date|first_payment_date|ltv|cltv|borrower_count|dti|borrower_credit_score|first_time_homebuyer|loan_purpose|property_type|unit_count|occupancy_status|property_state|zip|insurance_percentage|product_type|co_borrower_credit_score|first_payment_month|first_payment_year|origination_month|origination_year|foreclosure_status|
+------------+-------+--------------------+-------------+-------+---------+----------------+------------------+---+----+--------------+---+---------------------+--------------------+

In [14]:
df.describe()

DataFrame[summary: string, id: string, channel: string, seller: string, interest_rate: string, balance: string, loan_term: string, origination_date: string, first_payment_date: string, ltv: string, cltv: string, borrower_count: string, dti: string, borrower_credit_score: string, first_time_homebuyer: string, loan_purpose: string, property_type: string, unit_count: string, occupancy_status: string, property_state: string, zip: string, insurance_percentage: string, product_type: string, co_borrower_credit_score: string, first_payment_month: string, first_payment_year: string, origination_month: string, origination_year: string, foreclosure_status: string]

In [15]:
df.columns

['id',
 'channel',
 'seller',
 'interest_rate',
 'balance',
 'loan_term',
 'origination_date',
 'first_payment_date',
 'ltv',
 'cltv',
 'borrower_count',
 'dti',
 'borrower_credit_score',
 'first_time_homebuyer',
 'loan_purpose',
 'property_type',
 'unit_count',
 'occupancy_status',
 'property_state',
 'zip',
 'insurance_percentage',
 'product_type',
 'co_borrower_credit_score',
 'first_payment_month',
 'first_payment_year',
 'origination_month',
 'origination_year',
 'foreclosure_status']

In [16]:
my_cols = df.select(
"channel",
"seller",
"first_time_homebuyer",
"loan_purpose",
"property_type",
"occupancy_status",
"property_state",
"foreclosure_status"
)

In [22]:
my_cols_final = my_cols.withColumn("foreclosure_Status_int",df["foreclosure_status"].cast(IntegerType())).drop("foreclosure_status")

In [23]:
my_cols_final.show(5)

+-------+--------------------+--------------------+------------+-------------+----------------+--------------+----------------------+
|channel|              seller|first_time_homebuyer|loan_purpose|property_type|occupancy_status|property_state|foreclosure_Status_int|
+-------+--------------------+--------------------+------------+-------------+----------------+--------------+----------------------+
|      R|               OTHER|                   N|           C|           SF|               I|            CA|                     1|
|      R|SUNTRUST MORTGAGE...|                   N|           P|           SF|               P|            MD|                     1|
|      C|PHH MORTGAGE CORP...|                   N|           R|           SF|               P|            CA|                     0|
|      C|               OTHER|                   N|           R|           CO|               P|            MA|                     0|
|      C|FLAGSTAR CAPITAL ...|                   N|           

In [28]:
channel_indexer = StringIndexer(inputCol = "channel", outputCol= "ChannelIndex")
channel_encoder = OneHotEncoder(inputCol = "ChannelIndex", outputCol = "ChannelVec")

In [29]:
seller_indexer = StringIndexer(inputCol = "seller", outputCol = "SellerIndex")
seller_encoder = OneHotEncoder(inputCol = "SellerIndex", outputCol = "SellerVec")

In [30]:
first_time_homebuyer_indexer = StringIndexer(inputCol = "first_time_homebuyer", outputCol = "FirstTimeHomebuyerIndex")
first_time_homebuyer_encoder = OneHotEncoder(inputCol = "FirstTimeHomebuyerIndex", outputCol = "FirstTimeHomebuyerVec")

In [31]:
loan_purpose_indexer = StringIndexer(inputCol = "loan_purpose", outputCol = "LoanPurposeIndex")
loan_purpose_encoder = OneHotEncoder(inputCol = "LoanPurposeIndex", outputCol = "LoanPurposeVec")

In [32]:
property_type_indexer = StringIndexer(inputCol = "property_type", outputCol = "PropertyTypeIndex")
property_type_encoder = OneHotEncoder(inputCol = "PropertyTypeIndex", outputCol = "PropertyTypeVec")

In [33]:
occupancy_status_indexer = StringIndexer(inputCol = "occupancy_status", outputCol = "OccupancyStatusIndex")
occupancy_status_encoder = OneHotEncoder(inputCol = "OccupancyStatusIndex", outputCol = "OccupancyStatusVec")

In [35]:
property_state_indexer = StringIndexer(inputCol = "property_state", outputCol = "PropertyStateIndex")
property_state_encoder = OneHotEncoder(inputCol = "PropertyStateIndex", outputCol = "PropertyStateVec")

In [36]:
assembler = VectorAssembler(inputCols = ["ChannelVec","SellerVec","FirstTimeHomebuyerVec","LoanPurposeVec","PropertyTypeVec",
                                        "OccupancyStatusVec","PropertyStateVec"], outputCol = "features")

In [38]:
log_reg = LogisticRegression(featuresCol="features",labelCol="foreclosure_status")

In [45]:
pipeline = Pipeline(stages=[channel_indexer,seller_indexer,first_time_homebuyer_indexer,loan_purpose_indexer,property_type_indexer,
                          occupancy_status_indexer, property_state_indexer, channel_encoder, seller_encoder,
                          first_time_homebuyer_encoder, loan_purpose_encoder, occupancy_status_encoder, property_type_encoder,
                          assembler, log_reg])

In [47]:
train_data, test_data = my_cols_final.randomSplit([0.7,0.3])

In [48]:
fit_model = pipeline.fit(train_data)

IllegalArgumentException: 'Field "ChannelVec" does not exist.\nAvailable fields: channel, seller, first_time_homebuyer, loan_purpose, property_type, occupancy_status, property_state, foreclosure_Status_int, channel_index, seller_index, first_time_homebuyer_index, loan_purpose_index, property_type_index, occupancy_status_index, PropertyStateIndex, channelVec, sellerVec, firsttimehomebuyerVec, loanpurposeVec, occupancystatusVec, propertytypeVec'