In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, DoubleType
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import regexp_replace
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [2]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("xor") \
    .config("spark.executor.memory", '2g') \
    .config('spark.executor.cores', '1') \
    .config('spark.cores.max', '1') \
    .config("spark.driver.memory",'1g') \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
data_schema = StructType([
    StructField('Date',StringType(), False),
    StructField('Customer_Code',DoubleType(), False),
    StructField('Gender',IntegerType(), False),
    StructField('Foreigner_Index',IntegerType(), False),
    StructField('Channel',StringType(), False),
    StructField('Province_Name',StringType(), False),
    StructField('Active',IntegerType(), False),
    StructField('Segmentation',IntegerType(), False),
    StructField('Savings_Account',IntegerType(), False),
    StructField('Guarantees',IntegerType(), False),
    StructField('Current_Accounts',IntegerType(), False),
    StructField('Derivative',IntegerType(), False),
    StructField('Payroll_Account',IntegerType(), False),
    StructField('Junior_Account',IntegerType(), False),
    StructField('More_Particular_Account',IntegerType(), False),
    StructField('Particular_Account',IntegerType(), False),
    StructField('Particular_Plus_Account',IntegerType(), False),
    StructField('Short_Term_Deposits',IntegerType(), False),
    StructField('Medium_Term_Deposits',IntegerType(), False),
    StructField('Long_Term_Deposits',IntegerType(), False),
    StructField('e-Account',IntegerType(), False),
    StructField('Funds',IntegerType(), False),
    StructField('Mortgage',IntegerType(), False),
    StructField('Pensions',IntegerType(), False),
    StructField('Loans',IntegerType(), False),
    StructField('Taxes',IntegerType(), False),
    StructField('Credit_Card',IntegerType(), False),
    StructField('Securities',IntegerType(), False),
    StructField('Home_Account',IntegerType(), False),
    StructField('Payroll',IntegerType(), False),
    StructField('Pensions_two',IntegerType(), False),
    StructField('Direct_Debit',IntegerType(), False),
    StructField('Age_Range',IntegerType(), False),
    StructField('Months_Range',IntegerType(), False),
    StructField('Income_Range',IntegerType(), False)
])

In [4]:
df = spark.read.csv(
    'santander_df_clean.csv', header=True, schema=data_schema
).cache()

## Modeling

In [15]:
df.columns

['Date',
 'Customer_Code',
 'Gender',
 'Months_at_Bank',
 'Foreigner_Index',
 'Channel',
 'Province_Name',
 'Active',
 'Segmentation',
 'Savings_Account',
 'Guarantees',
 'Current_Accounts',
 'Derivative',
 'Payroll_Account',
 'Junior_Account',
 'More_Particular_Account',
 'Particular_Account',
 'Particular_Plus_Account',
 'Short_Term_Deposits',
 'Medium_Term_Deposits',
 'Long_Term_Deposits',
 'e-Account',
 'Funds',
 'Mortgage',
 'Pensions',
 'Loans',
 'Taxes',
 'Credit_Card',
 'Securities',
 'Home_Account',
 'Payroll',
 'Pensions_two',
 'Direct_Debit',
 'Age_Range',
 'Income_Range']

### ALS with prior observations

Construct dataframe....

In [13]:
item = df.select('Savings_Account')
item =  item.withColumn("id1", monotonically_increasing_id())
item.show(3)

+---------------+---+
|Savings_Account|id1|
+---------------+---+
|              0|  0|
|              0|  1|
|              0|  2|
+---------------+---+
only showing top 3 rows



In [14]:
currentdf = (df.select('Customer_Code','Current_Accounts'))
currentdf = currentdf.withColumn("id2", monotonically_increasing_id())
currentitem = item.replace(0,1)
currentdf = currentdf.join(currentitem,col("id1")==col("id2"),"inner").drop("id1","id2")
currentdf = currentdf.withColumnRenamed("Current_Accounts","Rating")
currentdf = currentdf.withColumnRenamed("Savings_Account","Item")

In [15]:
currentdf.count()

10292620

In [16]:
payrolldf = (df.select('Customer_Code','Payroll_Account'))
payrolldf = payrolldf.withColumn("id2", monotonically_increasing_id())
payrollitem = item.replace(0,2)
payrollitem = item.replace(1,2)
payrolldf = payrolldf.join(payrollitem,col("id1")==col("id2"),"inner").drop("id1","id2")
payrolldf = payrolldf.withColumnRenamed("Payroll_Account","Rating")
payrolldf = payrolldf.withColumnRenamed("Savings_Account","Item")
nextdf = currentdf.union(payrolldf)

Junior_Accountdf = (df.select('Customer_Code','Junior_Account'))
Junior_Accountdf = Junior_Accountdf.withColumn("id2", monotonically_increasing_id())
Junior_Accountitem = item.replace(0,3)
Junior_Accountitem = item.replace(1,3)
Junior_Accountdf = Junior_Accountdf.join(Junior_Accountitem,col("id1")==col("id2"),"inner").drop("id1","id2")
Junior_Accountdf = Junior_Accountdf.withColumnRenamed("Junior_Account","Rating")
Junior_Accountdf = Junior_Accountdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Junior_Accountdf)

More_Particular_Accountdf = (df.select('Customer_Code','More_Particular_Account'))
More_Particular_Accountdf = More_Particular_Accountdf.withColumn("id2", monotonically_increasing_id())
More_Particular_Accountitem = item.replace(0,4)
More_Particular_Accountitem = item.replace(1,4)
More_Particular_Accountdf = More_Particular_Accountdf.join(More_Particular_Accountitem,col("id1")==col("id2"),
                                                           "inner").drop("id1","id2")
More_Particular_Accountdf = More_Particular_Accountdf.withColumnRenamed("More_Particular_Account","Rating")
More_Particular_Accountdf = More_Particular_Accountdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(More_Particular_Accountdf)

Particular_Accountdf = (df.select('Customer_Code','Particular_Account'))
Particular_Accountdf = Particular_Accountdf.withColumn("id2", monotonically_increasing_id())
Particular_Accountitem = item.replace(0,5)
Particular_Accountitem = item.replace(1,5)
Particular_Accountdf = Particular_Accountdf.join(Particular_Accountitem,col("id1")==col("id2"),"inner").drop("id1","id2")
Particular_Accountdf = Particular_Accountdf.withColumnRenamed("Particular_Account","Rating")
Particular_Accountdf = Particular_Accountdf.withColumnRenamed("Particular_Account","Item")
nextdf = nextdf.union(Particular_Accountdf)

In [17]:
nextdf.show(3)

+-------------+------+----+
|Customer_Code|Rating|Item|
+-------------+------+----+
|      16355.0|     1|   1|
|      16355.0|     1|   1|
|      24957.0|     1|   1|
+-------------+------+----+
only showing top 3 rows



In [19]:
Particular_Plus_Accountdf = (df.select('Customer_Code','Particular_Plus_Account'))
Particular_Plus_Accountdf = Particular_Plus_Accountdf.withColumn("id2", monotonically_increasing_id())
Particular_Plus_Accountitem = item.replace(0,6)
Particular_Plus_Accountitem = item.replace(1,6)
Particular_Plus_Accountdf = Particular_Plus_Accountdf.join(Particular_Plus_Accountitem,col("id1")==col("id2"),
                                                           "inner").drop("id1","id2")
Particular_Plus_Accountdf = Particular_Plus_Accountdf.withColumnRenamed("Particular_Plus_Account","Rating")
Particular_Plus_Accountdf = Particular_Plus_Accountdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Particular_Plus_Accountdf)

Medium_Term_Depositsdf = (df.select('Customer_Code','Medium_Term_Deposits'))
Medium_Term_Depositsdf = Medium_Term_Depositsdf.withColumn("id2", monotonically_increasing_id())
Medium_Term_Depositsitem = item.replace(0,7)
Medium_Term_Depositsitem = item.replace(1,7)
Medium_Term_Depositsdf = Medium_Term_Depositsdf.join(Medium_Term_Depositsitem,col("id1")==col("id2"),
                                                     "inner").drop("id1","id2")
Medium_Term_Depositsdf = Medium_Term_Depositsdf.withColumnRenamed("Medium_Term_Deposits","Rating")
Medium_Term_Depositsdf = Medium_Term_Depositsdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Medium_Term_Depositsdf)

Long_Term_Depositsdf = (df.select('Customer_Code','Long_Term_Deposits'))
Long_Term_Depositsdf = Long_Term_Depositsdf.withColumn("id2", monotonically_increasing_id())
Long_Term_Depositsitem = item.replace(0,8)
Long_Term_Depositsitem = item.replace(1,8)
Long_Term_Depositsdf = Long_Term_Depositsdf.join(Long_Term_Depositsitem,col("id1")==col("id2"),
                                                 "inner").drop("id1","id2")
Long_Term_Depositsdf = Long_Term_Depositsdf.withColumnRenamed("Long_Term_Deposits","Rating")
Long_Term_Depositsdf = Long_Term_Depositsdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Long_Term_Depositsdf)

e_Accountdf = (df.select('Customer_Code','e-Account'))
e_Accountdf = e_Accountdf.withColumn("id2", monotonically_increasing_id())
e_Accountitem = item.replace(0,9)
e_Accountitem = item.replace(1,9)
e_Accountdf = e_Accountdf.join(e_Accountitem,col("id1")==col("id2"),"inner").drop("id1","id2")
e_Accountdf = e_Accountdf.withColumnRenamed("e-Account","Rating")
e_Accountdf = e_Accountdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(e_Accountdf)

In [21]:
Fundsdf = (df.select('Customer_Code','Funds'))
Fundsdf = Fundsdf.withColumn("id2", monotonically_increasing_id())
Fundsitem = item.replace(0,10)
Fundsitem = item.replace(1,10)
Fundsdf = Fundsdf.join(Fundsitem,col("id1")==col("id2"),"inner").drop("id1","id2")
Fundsdf = Fundsdf.withColumnRenamed("Funds","Rating")
Fundsdf = Fundsdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Fundsdf)

Mortgagedf = (df.select('Customer_Code','Mortgage'))
Mortgagedf = Mortgagedf.withColumn("id2", monotonically_increasing_id())
Mortgageitem = item.replace(0,11)
Mortgageitem = item.replace(1,11)
Mortgagedf = Mortgagedf.join(Mortgageitem,col("id1")==col("id2"),"inner").drop("id1","id2")
Mortgagedf = Mortgagedf.withColumnRenamed("Mortgage","Rating")
Mortgagedf = Mortgagedf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Mortgagedf)

Pensionsdf = (df.select('Customer_Code','Pensions'))
Pensionsdf = Pensionsdf.withColumn("id2", monotonically_increasing_id())
Pensionsitem = item.replace(0,12)
Pensionsitem = item.replace(1,12)
Pensionsdf = Pensionsdf.join(Pensionsitem,col("id1")==col("id2"),"inner").drop("id1","id2")
Pensionsdf = Pensionsdf.withColumnRenamed("Pensions","Rating")
Pensionsdf = Pensionsdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Pensionsdf)

Loansdf = (df.select('Customer_Code','Loans'))
Loansdf = Loansdf.withColumn("id2", monotonically_increasing_id())
Loansitem = item.replace(0,13)
Loansitem = item.replace(1,13)
Loansdf = Loansdf.join(Loansitem,col("id1")==col("id2"),"inner").drop("id1","id2")
Loansdf = Loansdf.withColumnRenamed("Loans","Rating")
Loansdf = Loansdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Loansdf)

In [23]:
Taxesdf = (df.select('Customer_Code','Taxes'))
Taxesdf = Taxesdf.withColumn("id2", monotonically_increasing_id())
Taxesitem = item.replace(0,14)
Taxesitem = item.replace(1,14)
Taxesdf = Taxesdf.join(Taxesitem,col("id1")==col("id2"),"inner").drop("id1","id2")
Taxesdf = Taxesdf.withColumnRenamed("Taxes","Rating")
Taxesdf = Taxesdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Taxesdf)

Credit_Carddf = (df.select('Customer_Code','Credit_Card'))
Credit_Carddf = Credit_Carddf.withColumn("id2", monotonically_increasing_id())
Credit_Carditem = item.replace(0,15)
Credit_Carditem = item.replace(1,15)
Credit_Carddf = Credit_Carddf.join(Credit_Carditem,col("id1")==col("id2"),"inner").drop("id1","id2")
Credit_Carddf = Credit_Carddf.withColumnRenamed("Credit_Card","Rating")
Credit_Carddf = Credit_Carddf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Credit_Carddf)

Securitiesdf = (df.select('Customer_Code','Securities'))
Securitiesdf = Securitiesdf.withColumn("id2", monotonically_increasing_id())
Securitiesitem = item.replace(0,16)
Securitiesitem = item.replace(1,16)
Securitiesdf = Securitiesdf.join(Securitiesitem,col("id1")==col("id2"),"inner").drop("id1","id2")
Securitiesdf = Securitiesdf.withColumnRenamed("Securities","Rating")
Securitiesdf = Securitiesdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Securitiesdf)

Home_Accountdf = (df.select('Customer_Code','Home_Account'))
Home_Accountdf = Home_Accountdf.withColumn("id2", monotonically_increasing_id())
Home_Accountitem = item.replace(0,17)
Home_Accountitem = item.replace(1,17)
Home_Accountdf = Home_Accountdf.join(Home_Accountitem,col("id1")==col("id2"),"inner").drop("id1","id2")
Home_Accountdf = Home_Accountdf.withColumnRenamed("Home_Account","Rating")
Home_Accountdf = Home_Accountdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Home_Accountdf)

In [25]:
Payrolldf = (df.select('Customer_Code','Payroll'))
Payrolldf = Payrolldf.withColumn("id2", monotonically_increasing_id())
Payrollitem = item.replace(0,18)
Payrollitem = item.replace(1,18)
Payrolldf = Payrolldf.join(Payrollitem,col("id1")==col("id2"),"inner").drop("id1","id2")
Payrolldf = Payrolldf.withColumnRenamed("Payroll","Rating")
Payrolldf = Payrolldf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Payrolldf)

Pensions_twodf = (df.select('Customer_Code','Pensions_two'))
Pensions_twodf = Pensions_twodf.withColumn("id2", monotonically_increasing_id())
Pensions_twoitem = item.replace(0,19)
Pensions_twoitem = item.replace(1,19)
Pensions_twodf = Pensions_twodf.join(Pensions_twoitem,col("id1")==col("id2"),"inner").drop("id1","id2")
Pensions_twodf = Pensions_twodf.withColumnRenamed("Pensions_two","Rating")
Pensions_twodf = Pensions_twodf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Pensions_twodf)

Direct_Debitdf = (df.select('Customer_Code','Direct_Debit'))
Direct_Debitdf = Direct_Debitdf.withColumn("id2", monotonically_increasing_id())
Direct_Debititem = item.replace(0,20)
Direct_Debititem = item.replace(1,20)
Direct_Debitdf = Direct_Debitdf.join(Direct_Debititem,col("id1")==col("id2"),"inner").drop("id1","id2")
Direct_Debitdf = Direct_Debitdf.withColumnRenamed("Direct_Debit","Rating")
Direct_Debitdf = Direct_Debitdf.withColumnRenamed("Savings_Account","Item")
nextdf = nextdf.union(Direct_Debitdf)

In [26]:
nextdf.count()

205852400

In [27]:
nextdf.show(3)

+-------------+------+----+
|Customer_Code|Rating|Item|
+-------------+------+----+
|      16355.0|     1|   1|
|      16355.0|     1|   1|
|      24957.0|     1|   1|
+-------------+------+----+
only showing top 3 rows



In [28]:
sampledf = nextdf.sample(False, .05, 42)

In [29]:
(training, test) = sampledf.randomSplit([0.8, 0.2])

In [30]:
als = ALS(rank=5, maxIter=5, regParam=0.01, userCol="Customer_Code", itemCol="Item", ratingCol="Rating", 
          coldStartStrategy="drop")

In [31]:
model = als.fit(training)

In [32]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating",
                                predictionCol="prediction")

In [33]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.2529278867368078


In [34]:
# Generate top 10 product recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each product
productRecs = model.recommendForAllItems(10)

In [35]:
userRecs.show(3)

+-------------+--------------------+
|Customer_Code|     recommendations|
+-------------+--------------------+
|        15957|[{1, 0.9110956}, ...|
|        16339|[{1, 0.99861526},...|
|        16574|[{0, 0.0}, {10, 0...|
+-------------+--------------------+
only showing top 3 rows



In [36]:
productRecs.show(3)

+----+--------------------+
|Item|     recommendations|
+----+--------------------+
|  12|[{110563, 1.32121...|
|   1|[{118302, 4.48251...|
|  13|[{15890, 0.0}, {1...|
+----+--------------------+
only showing top 3 rows



In [3]:
logistic_data_schema = StructType([
    StructField('Customer_Code',DoubleType(), False),
    StructField('Item',IntegerType(), False),
    StructField('Rating',DoubleType(), False)
])

In [4]:
logisticdf = spark.read.csv(
    'santander_df_logisticpred.csv', header=True, schema=logistic_data_schema
).cache()

In [5]:
sampledf = logisticdf.sample(False, .05, 42)
(training, test) = sampledf.randomSplit([0.8, 0.2])
als = ALS(rank=5, maxIter=5, regParam=0.01, userCol="Customer_Code", itemCol="Item", ratingCol="Rating", 
          coldStartStrategy="drop")

In [6]:
model = als.fit(training)

In [7]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating",
                                predictionCol="prediction")

In [8]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.19726707719539566


In [9]:
# Generate top 10 product recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each product
productRecs = model.recommendForAllItems(10)

In [10]:
userRecs.show(3)

+-------------+--------------------+
|Customer_Code|     recommendations|
+-------------+--------------------+
|        15957|[{1, 0.49662533},...|
|        16339|[{1, 0.9918904}, ...|
|        16574|[{1, 0.99593943},...|
+-------------+--------------------+
only showing top 3 rows



In [11]:
productRecs.show(3)

+----+--------------------+
|Item|     recommendations|
+----+--------------------+
|  12|[{15890, 0.0}, {1...|
|   1|[{159680, 0.99864...|
|  13|[{15890, 0.0}, {1...|
+----+--------------------+
only showing top 3 rows

