<a href="https://colab.research.google.com/github/silversteinaustin/Machine-Learning-Credit-Score-Analysis/blob/main/2024_03_06_DBFS_Example_3_5_(2).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [26]:
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split

from sklearn import preprocessing

from sklearn.linear_model import LogisticRegression
from sklearn import metrics
from sklearn.metrics import roc_auc_score # AUC ROC
from sklearn.metrics import average_precision_score # AUC PRC
from sklearn.metrics import confusion_matrix

from sklearn.ensemble import RandomForestClassifier


from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

In [38]:

from pyspark.sql import SparkSession
import pyspark.pandas as ps

# Create SparkSession
spark = SparkSession.builder \
    .appName("YourAppName") \
    .getOrCreate()


In [29]:
# File location and type
file_location = "/content/super_clean_df.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
loan_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(loan_df)

DataFrame[Income: int, Age: int, Experience: int, Married/Single: string, House_Ownership: string, Car_Ownership: string, Profession: string, CITY: string, STATE: string, CURRENT_JOB_YRS: int, CURRENT_HOUSE_YRS: int, Risk_Flag: int]

In [4]:
loan_df

DataFrame[Id: int, Income: int, Age: int, Experience: int, Married/Single: string, House_Ownership: string, Car_Ownership: string, Profession: string, CITY: string, STATE: string, CURRENT_JOB_YRS: int, CURRENT_HOUSE_YRS: int, Risk_Flag: int]

In [5]:
#dropped the unnecessary data from out dataframe

clean_df = loan_df.drop('ID')

In [6]:
display(clean_df)

DataFrame[Income: int, Age: int, Experience: int, Married/Single: string, House_Ownership: string, Car_Ownership: string, Profession: string, CITY: string, STATE: string, CURRENT_JOB_YRS: int, CURRENT_HOUSE_YRS: int, Risk_Flag: int]

In [7]:
clean_df.count()

252000

In [8]:
super_clean_df = clean_df.dropna()
super_clean_df.count()

252000

In [9]:
column_list = super_clean_df.columns
print(column_list)

['Income', 'Age', 'Experience', 'Married/Single', 'House_Ownership', 'Car_Ownership', 'Profession', 'CITY', 'STATE', 'CURRENT_JOB_YRS', 'CURRENT_HOUSE_YRS', 'Risk_Flag']


In [10]:
# Check the unique values in the House_Ownership column
unique_house_ownership_values = super_clean_df.select('House_Ownership').distinct().collect()

# Print the unique values
for row in unique_house_ownership_values:
    print(row['House_Ownership'])


norent_noown
rented
owned


In [11]:
# Define custom mapping for House_Ownership column
house_ownership_mapping = {"norent_noown": 0, "rented": 1, "owned": 2}


In [30]:
import numpy as np
from pyspark.sql.functions import col

# Extract the 'Risk_Flag' column from the DataFrame and convert it to a numpy array
y = np.array(super_clean_df.select("Risk_Flag").collect())

# Drop the 'Risk_Flag' column from the DataFrame to get the feature matrix
X = super_clean_df.drop("Risk_Flag")

# Display the first 5 elements of y
print(y[:5])


[[0]
 [0]
 [0]
 [1]
 [1]]


In [13]:
column_list = super_clean_df.columns
print(column_list)

['Income', 'Age', 'Experience', 'Married/Single', 'House_Ownership', 'Car_Ownership', 'Profession', 'CITY', 'STATE', 'CURRENT_JOB_YRS', 'CURRENT_HOUSE_YRS', 'Risk_Flag']


In [35]:
type(super_clean_df)

In [14]:
X.show()


+-------+---+----------+--------------+---------------+-------------+--------------------+-------------------+--------------+---------------+-----------------+
| Income|Age|Experience|Married/Single|House_Ownership|Car_Ownership|          Profession|               CITY|         STATE|CURRENT_JOB_YRS|CURRENT_HOUSE_YRS|
+-------+---+----------+--------------+---------------+-------------+--------------------+-------------------+--------------+---------------+-----------------+
|1303834| 23|         3|        single|         rented|           no| Mechanical_engineer|               Rewa|Madhya_Pradesh|              3|               13|
|7574516| 40|        10|        single|         rented|           no|  Software_Developer|           Parbhani|   Maharashtra|              9|               13|
|3991815| 66|         4|       married|         rented|           no|    Technical_writer|          Alappuzha|        Kerala|              4|               10|
|6256451| 41|         2|        single| 

In [15]:
y[:5]

array([[0],
       [0],
       [0],
       [1],
       [1]])

In [42]:
pd_test = X.to_pandas()
X_dummies = pd.get_dummies(pd_test)
X_dummies

AttributeError: 'DataFrame' object has no attribute 'to_pandas'

In [17]:
# from pyspark.ml.feature import OneHotEncoder, StringIndexer
# from pyspark.ml import Pipeline

# # List of categorical columns
# categorical_cols = ['Married/Single', 'House_Ownership', 'Car_Ownership', 'Profession', 'CITY', 'STATE']

# # Index categorical columns
# indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in categorical_cols]

# # One-hot encode indexed categorical columns
# encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_encoded") for col in categorical_cols]

# # Pipeline of indexers and encoders
# pipeline = Pipeline(stages=indexers + encoders)

# # Fit and transform the pipeline to create dummy variables
# X_encoded = pipeline.fit(X).transform(X)

# # Drop the original categorical columns
# X_encoded = X_encoded.drop(*[col+"_index" for col in categorical_cols])

# # Show the resulting DataFrame with dummy variables
# X_encoded.show()


In [39]:
X=ps.get_dummies(X)

AttributeError: 'DataFrame' object has no attribute '_internal'

In [33]:
display(X)

Unnamed: 0,"DataFrame[Income: int, Age: int, Experience: int, Married/Single: string, House_Ownership: string, Car_Ownership: string, Profession: string, CITY: string, STATE: string, CURRENT_JOB_YRS: int, CURRENT_HOUSE_YRS: int]"
0,1


In [23]:
from google.colab import files

In [25]:
# Save DataFrame to CSV
super_clean_df.toPandas().to_csv('super_clean_df.csv', index=False)


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='Risk_Flag')

# Train the model on the training data
lr_model = lr.fit(train_data)

# Make predictions on the testing data
predictions = lr_model.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol='Risk_Flag')
accuracy = evaluator.evaluate(predictions)

# Print the accuracy
print("Accuracy:", accuracy)







In [None]:
# Assuming X_dummies is a pandas DataFrame containing your feature columns

# Convert X_dummies to a NumPy array
X_array = X_dummies.values









In [None]:
# Assuming y is a PySpark Column

# Import necessary libraries
import numpy as np

# Collect the values from the PySpark Column and convert them to a NumPy array
y_array = np.array(y.collect())




In [None]:
from pyspark.sql.functions import col

# Create a new DataFrame X by selecting all columns except 'Risk_Flag'
X = super_clean_df.select([col for col in super_clean_df.columns if col != 'Risk_Flag'])

# Show the first few rows of the new DataFrame X
X.show(5)


In [None]:
from pyspark.ml.feature import StringIndexer

# Define the columns to be encoded
categorical_cols = ['Married/Single', 'House_Ownership', 'Car_Ownership']

# Apply StringIndexer to each categorical column
indexed_df = super_clean_df
for col in categorical_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col+"_index").fit(indexed_df)
    indexed_df = indexer.transform(indexed_df)

# Display the encoded DataFrame
display(indexed_df)


#We'll use StringIndexer to encode the categorical columns.









In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

# Define features by combining encoded categorical columns and numerical columns
feature_cols = [col + "_index" for col in categorical_cols] + ['Income', 'Age', 'Experience', 'CURRENT_JOB_YRS', 'CURRENT_HOUSE_YRS']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Define DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Risk_Flag", featuresCol="features")

# Create a Pipeline
pipeline = Pipeline(stages=[assembler, dt])

# Fit the pipeline to the data
model = pipeline.fit(indexed_df)

# predictions = model.transform(test_data)


In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline

# Assuming you have already defined train_data and test_data

# Define features by combining encoded categorical columns and numerical columns
feature_cols = [col + "_index" for col in categorical_cols] + ['Income', 'Age', 'Experience', 'CURRENT_JOB_YRS', 'CURRENT_HOUSE_YRS']

# Assemble features with a different output column name
assembler = VectorAssembler(inputCols=feature_cols, outputCol="assembled_features")

# Define StandardScaler
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features", withStd=True, withMean=False)

# Define the pipeline
pipeline = Pipeline(stages=[assembler, scaler])

# Fit the pipeline on the training data and transform both training and testing data
pipeline_model = pipeline.fit(train_data)
train_data_scaled = pipeline_model.transform(train_data)
test_data_scaled = pipeline_model.transform(test_data)








In [None]:
train_data_scaled.printSchema()


In [None]:
# Display the DataFrame schema
train_data_scaled.printSchema()

# Show some sample rows
train_data_scaled.show(5)


In [None]:
# Splitting into Train and Test sets
X_train, X_test, y_train, y_test = train_test_split(X_dummies, y, random_state=78)


In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

# Define feature columns
feature_cols = [col + "_index" for col in categorical_cols] + ['Income', 'Age', 'Experience', 'CURRENT_JOB_YRS', 'CURRENT_HOUSE_YRS']

# Create a VectorAssembler with a different output column name
assembler = VectorAssembler(inputCols=feature_cols, outputCol="assembled_features")

# Import the Random Forest classifier
from pyspark.ml.classification import RandomForestClassifier

# Instantiate the Random Forest classifier
rf = RandomForestClassifier(labelCol="Risk_Flag", featuresCol="assembled_features", numTrees=100)

# Create a pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Fit the pipeline to the training data
rf_model = pipeline.fit(train_data_scaled)

# Optionally, make predictions on test data
predictions = rf_model.transform(test_data_scaled)

# Optionally, evaluate the model performance
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="Risk_Flag")
auc = evaluator.evaluate(predictions)
print("AUC:", auc)










In [None]:
# Create a view or table

temp_table_name = "Training_Data__1__csv"

df.createOrReplaceTempView(temp_table_name)

In [None]:
%sql

/* Query the created temp table in a SQL cell */

select * from `Training_Data__1__csv`

In [None]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "Training_Data__1__csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)