##IMPORTS

In [0]:
#installing py2neo driver for building connection with sandbox
!pip install py2neo

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-cc014753-1b90-4e73-af85-23b1adc57170/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
import py2neo
from py2neo import Graph
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains, map_from_entries, collect_list, struct
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql import functions as f

In [0]:
spark = SparkSession.builder.appName('PySpark Read CSV').getOrCreate()

## LOADING HOMELOAN_CSV

In [0]:
df_home_loan = spark.read.options(header='True', delimiter=',') \
              .csv("/FileStore/tables/home_loan_data_draft_latest.csv")
#display(df_home_loan)

In [0]:
df_home_loan = df_home_loan.withColumn("tenure", df_home_loan["tenure"].cast(IntegerType()))
df_home_loan = df_home_loan.withColumn("orig_amt", df_home_loan["orig_amt"].cast(FloatType()))
df_home_loan = df_home_loan.withColumn("rate", df_home_loan["rate"].cast(FloatType()))
df_home_loan = df_home_loan.withColumn("ltv", df_home_loan["ltv"].cast(IntegerType()))
df_home_loan = df_home_loan.withColumn("sch_amt", df_home_loan["sch_amt"].cast(FloatType()))
df_home_loan = df_home_loan.withColumn("amount_owed",df_home_loan["amount_owed"].cast(FloatType()))
df_home_loan

Out[40]: DataFrame[customer_id: string, home_loan_id: string, tenure: int, orig_amt: float, rate: float, dpd: string, ltv: int, soi_cohort_employee: string, soi_cohort_online_heavy: string, soi_cohort_workplace: string, soi_cohort_millitary_banking: string, soi_cohort_student: string, orig_dt: string, maturity_dt: string, sch_amt: float, amount_owed: float, loan_status: string]

## LOADING CUSTOMER_CSV

In [0]:
df_customer = spark.read.options(header='True', delimiter=',') \
              .csv("/FileStore/tables/customer_23_12_latest.csv")
#display(df_customer)

In [0]:
df_customer = df_customer.withColumn("age", df_customer["age"].cast(IntegerType()))
df_customer = df_customer.withColumn("fico_scores", df_customer["fico_scores"].cast(IntegerType()))
df_customer = df_customer.withColumn("wallet_share", df_customer["wallet_share"].cast(FloatType()))
df_customer = df_customer.withColumn("income_estimate", df_customer["income_estimate"].cast(IntegerType()))
df_customer = df_customer.withColumn("dti_ratio", df_customer["dti_ratio"].cast(FloatType()))
df_customer = df_customer.withColumn("average_time_spent(minutes)",df_customer["average_time_spent(minutes)"].cast(IntegerType()))
df_customer = df_customer.withColumn("zip",df_customer["zip"].cast(IntegerType()))
df_customer

Out[42]: DataFrame[customer_id: string, age: int, gender: string, fico_scores: int, wallet_share: float, income_estimate: int, dti_ratio: float, average_time_spent(minutes): int, last_login: string, phone: string, ssn: string, action_performed: string, fname: string, lname: string, email: string, zip: int]

## Merging Customer and HomeLoan Data

In [0]:
merged_customer_home_loan = df_customer.join(df_home_loan,df_customer["customer_id"] == df_home_loan["customer_id"],'left').drop(df_home_loan.customer_id)
#display(merged_customer_home_loan)

## Making Target Variable - Home Loan Defaulters

In [0]:
#DPD is Days past due
merged_customer_home_loan = merged_customer_home_loan.withColumn("home_loan_defaulters", when(merged_customer_home_loan.dpd == 'DPD_0', 0).otherwise(1))
#display(merged_customer_home_loan)

## Feature Engineering - Scaling Features for GDS Algorithms

In [0]:
columns_to_scale = ["age", "fico_scores", "wallet_share","income_estimate","dti_ratio","tenure","orig_amt","rate","sch_amt"]
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(merged_customer_home_loan.select(columns_to_scale))
vectorized_scaled_data = scalerModel.transform(merged_customer_home_loan.select(columns_to_scale))
#display(vectorized_scaled_data)

In [0]:
scaled_columns = [f for f in vectorized_scaled_data.columns if '_scaled' in f]
scaled_columns

Out[46]: ['age_scaled',
 'fico_scores_scaled',
 'wallet_share_scaled',
 'income_estimate_scaled',
 'dti_ratio_scaled',
 'tenure_scaled',
 'orig_amt_scaled',
 'rate_scaled',
 'sch_amt_scaled']

## Changing vectorized columns into single value columnar format

In [0]:
firstelement=f.udf(lambda v:float(v[0]),FloatType())
scaled_data = vectorized_scaled_data.select([firstelement(c).alias(c) for c in scaled_columns])
#display(scaled_data)

## Merging Scaled Features with Customer_Home_Loan merged data

In [0]:
from pyspark.sql.functions import monotonically_increasing_id
merged_customer_home_loan = merged_customer_home_loan.withColumn("id",monotonically_increasing_id())
scaled_data = scaled_data.withColumn("id",monotonically_increasing_id())
merged_customer_home_loan_scaled_data = merged_customer_home_loan.join(scaled_data,merged_customer_home_loan["id"] == scaled_data["id"],'inner').drop(scaled_data.id).drop(merged_customer_home_loan.id)
#display(merged_customer_home_loan_scaled_data)

## Mapping Gender categorical values to 0 and 1

In [0]:
merged_customer_home_loan_scaled_data = merged_customer_home_loan_scaled_data.withColumn("gender_mapped (M:0,F:1)", when(merged_customer_home_loan_scaled_data.gender == 'M', 0).otherwise(1))
#display(merged_customer_home_loan_scaled_data)

In [0]:
defaulters_data = merged_customer_home_loan_scaled_data.where(merged_customer_home_loan_scaled_data.home_loan_defaulters == 1)
#display(defaulters_data)

In [0]:
defaulters_data = defaulters_data.withColumnRenamed("customer_id", "customer_id_defaulters")
#display(defaulters_data)

In [0]:
non_defaulters_data = merged_customer_home_loan_scaled_data.where(merged_customer_home_loan_scaled_data.home_loan_defaulters == 0)
#display(non_defaulters_data)

In [0]:
non_defaulters_data = non_defaulters_data.withColumnRenamed("customer_id", "customer_id_non_defaulters")
#display(non_defaulters_data)

## INGESTING DATA INTO NEO4J CLOUD INSTANCE

In [0]:
# use your bolt url, not this one, throughout the notebook
# use your password, not this one, throughout the notebook
defaulters_data.write\
  .format("org.neo4j.spark.DataSource")\
  .mode("Overwrite")\
  .option("authentication.type", "basic")\
  .option("url", "bolt://3.88.169.237:7687")\
  .option("authentication.basic.username", "neo4j")\
  .option("authentication.basic.password", "sentries-ratio-platter")\
  .option("labels", "Defaulters")\
  .option("node.keys","customer_id_defaulters")\
  .save()

In [0]:
non_defaulters_data.write\
  .format("org.neo4j.spark.DataSource")\
  .mode("Overwrite")\
  .option("authentication.type", "basic")\
  .option("url", "bolt://3.88.169.237:7687")\
  .option("authentication.basic.username", "neo4j")\
  .option("authentication.basic.password", "sentries-ratio-platter")\
  .option("labels", "Non Defaulters")\
  .option("node.keys","customer_id_non_defaulters")\
  .save()

### Building connection using credentials for graph projection

In [0]:
graph = Graph("bolt://3.88.169.237:7687",auth=("neo4j","sentries-ratio-platter"))

#### Graph projection is already created. This cell needs to run only when creating a new graph projection. To debug or to create a graph with different name - remove %md from line 1
tx = graph.begin()
tx.evaluate( """
    CALL gds.graph.project(
    'myGraphVyome141_KNN_graph',
    {
        Defaulters: {
            properties: ['age_scaled',
 'fico_scores_scaled',
 'wallet_share_scaled',
 'income_estimate_scaled',
 'dti_ratio_scaled',
 'tenure_scaled',
 'orig_amt_scaled',
 'rate_scaled',
 'sch_amt_scaled']
        },
        
        `Non Defaulters`: {
            properties: ['age_scaled',
 'fico_scores_scaled',
 'wallet_share_scaled',
 'income_estimate_scaled',
 'dti_ratio_scaled',
 'tenure_scaled',
 'orig_amt_scaled',
 'rate_scaled',
 'sch_amt_scaled']
        }
    },
    '*'
)YIELD
  graphName,
  nodeProjection,
  nodeCount,
  relationshipProjection,
  relationshipCount;

""" )
graph.commit(tx)

#### Using Graph K Nearest Neighbours Algorithm to determine Potential Home Loan Defaulters by finding similarity scores between Defaulters and Non Defaulters

In [0]:
query = """
      CALL gds.alpha.knn.filtered.stream('myGraphVyome141_KNN_graph', {
    topK: 3,
    nodeProperties: ['age_scaled',
 'fico_scores_scaled',
 'wallet_share_scaled',
 'income_estimate_scaled',
 'dti_ratio_scaled',
 'tenure_scaled',
 'orig_amt_scaled',
 'rate_scaled',
 'sch_amt_scaled'],
   sourceNodeFilter: 'Defaulters',
   targetNodeFilter: 'Non Defaulters',
    // The following parameters are set to produce a deterministic result
    randomSeed: 1337,
    concurrency: 1,
    sampleRate: 0.75,
    deltaThreshold: 0.0
})
YIELD node1, node2, similarity

RETURN gds.util.asNode(node1).customer_id_defaulters AS customer_id_defaulters, gds.util.asNode(node2).customer_id_non_defaulters AS customer_id_non_defaulters, similarity
      
"""


df_similarity = spark.read.format("org.neo4j.spark.DataSource")\
    .option("url", "bolt://3.88.169.237:7687")\
    .option("authentication.type", "basic")\
    .option("authentication.basic.username", "neo4j")\
    .option("authentication.basic.password", "sentries-ratio-platter")\
    .option("query", query) \
    .option("partitions", "1") \
    .load()

display(df_similarity)

customer_id_defaulters,customer_id_non_defaulters,similarity
CUST499509,CUST301488,0.8737530120382685
CUST499509,CUST164089,0.8557881718836708
CUST499509,CUST290194,0.8467684320120017
CUST594764,CUST807344,0.8748010506101452
CUST594764,CUST854589,0.8743478520087958
CUST594764,CUST469240,0.867096278435157
CUST305070,CUST864555,0.8988718973704601
CUST305070,CUST238356,0.8631531063939064
CUST305070,CUST807344,0.8567458447249418
CUST768887,CUST973003,0.9149361284716828


#### Writing Similarity scores to neo4j sandbox

In [0]:
df_similarity.write\
  .format("org.neo4j.spark.DataSource")\
  .mode("Overwrite")\
  .option("authentication.type", "basic")\
  .option("url", "bolt://3.88.169.237:7687")\
  .option("authentication.basic.username", "neo4j")\
  .option("authentication.basic.password", "sentries-ratio-platter")\
  .option("relationship", "SIMILAR")\
  .option("relationship.save.strategy", "keys")\
  .option("relationship.source.labels", "Defaulters")\
  .option("relationship.source.node.keys", "customer_id_defaulters")\
  .option("relationship.source.save.mode", "overwrite")\
  .option("relationship.target.labels", "Non Defaulters")\
  .option("relationship.target.node.keys", "customer_id_non_defaulters")\
  .option("relationship.properties","similarity")\
  .option("relationship.target.save.mode", "overwrite")\
  .save()

#### Graph projection for degree centrality algorithm

##### Graph projection is already created. This cell needs to run only when creating a new graph projection. To debug or to create a graph with different name - remove %md from line 1

degree = graph.begin()
degree.evaluate("""CALL gds.graph.project(
  'myGraph_Vyome_Degree_Projection',
  ['Defaulters','Non Defaulters'],
  {
    SIMILAR: {
      orientation: 'REVERSE',
      properties: ['similarity']
    }
  }
)
""")
graph.commit(degree)

#### How many defaulters a non-defaulter shares similarities with is determined by Degree Centrality Score.

In [0]:
query = """CALL gds.degree.stream('myGraph_Vyome_Degree_Projection')
YIELD nodeId, score
where gds.util.asNode(nodeId).customer_id_non_defaulters is not null
RETURN gds.util.asNode(nodeId).customer_id_non_defaulters AS customer_id_non_defaulters, score AS similar_defaulter_score
"""
degree_results = spark.read.format("org.neo4j.spark.DataSource")\
    .option("url", "bolt://52.90.57.219:7687")\
    .option("authentication.type", "basic")\
    .option("authentication.basic.username", "neo4j")\
    .option("authentication.basic.password", "exhaust-highlines-comma")\
    .option("query", query) \
    .option("partitions", "1") \
    .load()

degree_results = degree_results.orderBy('similar_defaulter_score',ascending = False)
display(degree_results)


customer_id_non_defaulters,similar_defaulter_score
CUST807344,4.0
CUST469240,3.0
CUST854589,2.0
CUST238356,1.0
CUST254353,1.0
CUST301488,1.0
CUST164089,1.0
CUST675159,1.0
CUST599158,1.0
CUST660250,1.0


#### Graph Projection for Weighted Eigen Vector Centrality Algorithm

##### Graph projection is already created. This cell needs to run only when creating a new graph projection. To debug or to create a graph with different name - remove %md from line 1
eigen = graph.begin()
eigen.evaluate( """CALL gds.graph.project(
  'myGraph_Vyome_Eigen_Projection',
  ['Defaulters','Non Defaulters'],
  'SIMILAR',
  {
    relationshipProperties: 'similarity'
  }
)
""" )
graph.commit(eigen)

#### We are computing Eigenvector similarity to take the similarity percentage between a defaulter and non defaulter in consideration as well

In [0]:
query = """CALL gds.eigenvector.stream('myGraph_Vyome_Eigen_Projection',{relationshipWeightProperty : 'similarity',scaler :'L2Norm'})
YIELD nodeId, score
where gds.util.asNode(nodeId).customer_id_non_defaulters is not null
RETURN gds.util.asNode(nodeId).customer_id_non_defaulters AS customer_id_non_defaulters, score * 100 as eigen_similar_defaulter_score
"""
eigen_results = spark.read.format("org.neo4j.spark.DataSource")\
    .option("url", "bolt://52.90.57.219:7687")\
    .option("authentication.type", "basic")\
    .option("authentication.basic.username", "neo4j")\
    .option("authentication.basic.password", "exhaust-highlines-comma")\
    .option("query", query) \
    .option("partitions", "1") \
    .load()

eigen_results = eigen_results.orderBy('eigen_similar_defaulter_score',ascending = False)
display(eigen_results)


customer_id_non_defaulters,eigen_similar_defaulter_score
CUST807344,53.46651630700174
CUST469240,40.60327476480497
CUST854589,27.68744043606206
CUST973003,15.272817302773229
CUST864555,15.21061373159109
CUST747228,15.185455659620455
CUST648137,15.074831667723751
CUST301488,15.052090120438816
CUST649539,15.006923846743543
CUST235925,14.991808377150436
