# Venmo Customer Churn



### Setting up the Spark and Loading the Dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Run below commands
!echo "setup Colab for PySpark $PYSPARK and Spark NLP $SPARKNLP"
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install spark-nlp==4.4.0

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [None]:

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col,datediff,when, lit,
                                   regexp_replace,max,min, year,concat,month,lag,
                                   coalesce,array_contains,length,udf,size,split,
                                   explode, arrays_zip, mean, collect_list,concat_ws,avg)


import random
import pandas as pd
#Spark ML and SQL
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import col
#Spark NLP
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.annotator import *
from sparknlp.common import RegexRule
from sparknlp.base import DocumentAssembler, Finisher
from pyspark.sql.types import StringType
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, IntegerType

In [None]:
spark = sparknlp.start()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
# spark.conf.set()


In [None]:
spark

In [None]:
venmo = spark.read.parquet("drive/Shareddrives/Venmo_Project/VenmoSample.snappy.parquet")

### Creation of Churn Columns

In [None]:
venmo = venmo.withColumn("year_month", concat(year(col("datetime")).cast(StringType() ), lit("-"),
                                              when(month(col("datetime")) <10,
                                                   concat(lit("0"), month(col("datetime")).cast(StringType()))).otherwise(month(col("datetime")).cast(StringType())) ))

window_y_var = Window.partitionBy("user1")

venmo = venmo.withColumn("max_year_month", max(col("year_month")).over(window_y_var)).withColumn("min_year_month", min(col("year_month")).over(window_y_var))

venmo = venmo.filter(col("min_year_month") < "2014-06")


In [None]:
venmo = venmo.withColumn("churn", when(col("max_year_month") < "2014-06", 1).otherwise(0))

In [None]:
venmo = venmo.cache()

### Text Data Preparation



In [None]:
emoji_pattern = u'[^\U0001F300-\U0001F64F\U0001F680-\U0001F6FF\u2600-\u26FF\u2700-\u27BF]'
venmo = venmo.withColumn("emoji_only",regexp_replace(col('description'), emoji_pattern, ''))
punc_pattern = r'[^,|\.|&|\\|\||-|_|!]'
venmo = venmo.withColumn("punctuations", regexp_replace("description", punc_pattern, ""))
textonly_pattern = r'[^\w\s]|_'
venmo = venmo.withColumn("text_only",regexp_replace(col('description'), textonly_pattern, ''))

In [None]:
venmo = venmo.withColumn("text_only", when(col("text_only") == " ", None).otherwise(col("text_only")))
venmo = venmo.withColumn("punctuations", when(col("punctuations") == "", None).otherwise(col("punctuations")))
venmo = venmo.withColumn("emoji_only", when(col("emoji_only") == "", None).otherwise(col("emoji_only")))

In [None]:
venmo = venmo.withColumn("characteronly_length", length("text_only"))
venmo = venmo.withColumn("punctuations_length", length("punctuations"))
venmo = venmo.withColumn("emoji_length", length("emoji_only"))

In [None]:
### User Level Aggregation
venmo_user = venmo.groupBy("user1") \
              .agg(avg("churn").alias("churn"), \
                   concat_ws(" ",collect_list("text_only")).alias("text_only"))
venmo_user = venmo_user.filter(col('text_only').isNotNull() | (col('text_only') != ''))

In [None]:
### User Level Aggregation
venmo_user = venmo.groupBy("user1") \
              .agg(avg("churn").alias("churn"), \
                   min("datetime").alias("customer_join_Date"),\
                   avg("characteronly_length").alias("avg_characteronly_length"), \
                   avg("punctuations_length").alias("avg_punctuations_length"), \
                   avg("emoji_length").alias("avg_emoji_length"), \
                   concat_ws(" ",collect_list("punctuations")).alias("punctuations"),\
                   concat_ws(" ",collect_list("emoji_only")).alias("emoji_only"),\
                   concat_ws(" ", collect_list("text_only")).alias("text_only"))

### Descriptive Statistics

In [None]:
venmo_user.groupBy('churn').count()

In [None]:
venmo_user.withColumn('join_year', year('customer_join_Date')) \
               .groupBy('join_year', 'churn') \
               .count()

In [None]:
venmo_user.groupBy('churn').agg(avg('avg_characteronly_length').alias('character_length'),\
                                avg('avg_punctuations_length').alias('punctuation_length'),\
                                avg('avg_emoji_length').alias('emoji_length'))

## Model Development

In [None]:
# Perform stratified sampling
venmo_user =  venmo_user.fillna(0)
venmo_train = venmo_user.sampleBy('churn', fractions={0: 0.7, 1: 0.7}, seed=10)
venmo_test = venmo_user.subtract(venmo_train)

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols=["avg_characteronly_length","avg_punctuations_length","avg_emoji_length"], outputCol="features")
training_data = assembler.transform(venmo_train)
testing_data = assembler.transform(venmo_test)

In [None]:
training_data.groupBy('churn').count()

In [None]:
testing_data.groupBy('churn').count()

### Logistic Regression with Count Variables

In [None]:
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression(family="binomial", maxIter=100, featuresCol='features',labelCol='churn')

# Fit the model
model = glr.fit(training_data)

# Summarize the model over the training set and print out some metrics
summary = model.summary
print(summary)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction_label", labelCol="churn")
predictions = model.transform(training_data)
threshold = 0.5
predictions = predictions.withColumn('prediction_label', when(predictions['prediction'] >= threshold, 1).otherwise(0))
predictions = predictions.withColumn('prediction_label', col('prediction_label').cast('double'))
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Train AUC: {:.4f}".format(auc))

In [None]:
predictions = model.transform(testing_data)
threshold = 0.5
predictions = predictions.withColumn('prediction_label', when(predictions['prediction'] >= threshold, 1).otherwise(0))
predictions = predictions.withColumn('prediction_label', col('prediction_label').cast('double'))
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Test AUC: {:.4f}".format(auc))

In [None]:
predictions.crosstab('churn', 'prediction_label').show()

In [None]:
print("Precision: ",(988/(988+512)))
print("Recall: ",(988/(988+7298)))

### LIWC Variables Inclusion

In [None]:
venmo_liwc = spark.read.csv("drive/Shareddrives/Venmo_Project/data/liwc_results.csv", header=True, inferSchema=True)
venmo_train = venmo_train.join(venmo_liwc, venmo_train.user1 == venmo_liwc.user1, "left")
venmo_train = venmo_train.fillna(0)
venmo_test = venmo_test.join(venmo_liwc, venmo_test.user1 == venmo_liwc.user1, "left")
venmo_test = venmo_test.fillna(0)

In [None]:
features_list = ['Dic','avg_characteronly_length','avg_punctuations_length','avg_emoji_length','WC','BigWords'
 ,'pronoun','ppron','i','we','you','shehe','they','ipron','det','article','number','prep','auxverb','adverb','conj','negate', 'verb', 'adj', 'quantity', 'Drives',
 'affiliation','achieve','power','Cognition','allnone','cogproc','insight','cause','discrep','tentat','certitude','differ','memory','Affect',
 'tone_pos','tone_neg','emotion','emo_pos','emo_neg','emo_anx','emo_anger','emo_sad','swear','Social','socbehav','prosocial','polite','conflict','moral',
 'comm','socrefs','family','friend','Culture','politic','ethnicity','tech','Lifestyle','leisure','home','work','money','relig','Physical','health','illness',
 'wellness','mental','substances','sexual','food','death','need','want','acquire','lack','fulfill','fatigue','reward','risk','curiosity','allure',
 'Perception', 'attention','motion', 'space', 'visual', 'auditory', 'feeling', 'time', 'focuspast', 'focuspresent', 'focusfuture', 'Conversation', 'netspeak', 'assent', 'nonflu', 'filler']

In [None]:
assembler = VectorAssembler(inputCols=features_list, outputCol="features")
training_data = assembler.transform(venmo_train)
testing_data = assembler.transform(venmo_test)

In [None]:
# IMPORT
import numpy as np
from numpy import allclose
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier


# BUILD THE MODEL
rf = RandomForestClassifier(numTrees=1000, maxDepth=5, labelCol="churn", seed=42,subsamplingRate=0.4)
model = rf.fit(training_data)

# FEATURE IMPORTANCES

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Filter out zero importance features
non_zero_indices = list(model.featureImportances.indices)
non_zero_importances = list(model.featureImportances.values)
non_zero_features = [features_list[index] for index in non_zero_indices]

# Sort the features by importance in decreasing order
sorted_indices = np.argsort(non_zero_importances)
non_zero_indices = [non_zero_indices[i] for i in sorted_indices]
non_zero_importances = [non_zero_importances[i] for i in sorted_indices]
non_zero_features = [non_zero_features[i] for i in sorted_indices]

# Increase the size of the plot
plt.figure(figsize=(10, 24))

# Creating the bar chart
plt.barh(np.arange(len(non_zero_features)), non_zero_importances, align='center')
plt.yticks(np.arange(len(non_zero_features)), non_zero_features)

# Adding labels and title
plt.xlabel('Importance')
plt.ylabel('Features')
plt.title('Feature Importances')

# Displaying the chart
plt.show()

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction_label", labelCol="churn")
predictions = model.transform(training_data)
threshold = 0.5
predictions = predictions.withColumn('prediction_label', when(predictions['prediction'] >= threshold, 1).otherwise(0))
predictions = predictions.withColumn('prediction_label', col('prediction_label').cast('double'))
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Train AUC: {:.4f}".format(auc))

In [None]:
predictions = model.transform(testing_data)
threshold = 0.5
predictions = predictions.withColumn('prediction_label', when(predictions['prediction'] >= threshold, 1).otherwise(0))
predictions = predictions.withColumn('prediction_label', col('prediction_label').cast('double'))
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Test AUC: {:.4f}".format(auc))

In [None]:
predictions.crosstab('churn', 'prediction_label').show()

In [None]:
print("Precision: ",(4981/(4981+592)))
print("Recall: ",(4981/(4981+3305)))

### Model Development in Pandas, as sample being smaller size

In [None]:
!pip install shap
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, roc_auc_score
import shap
import matplotlib.pyplot as plt

In [None]:
df_train = venmo_train.toPandas()
df_test = venmo_test.toPandas()

In [None]:
rf = RandomForestClassifier(n_estimators=1000, criterion='log_loss', max_depth=10, min_samples_split=50, min_samples_leaf=25,
                            max_features='sqrt', max_leaf_nodes=None, min_impurity_decrease=0.0, bootstrap=True, oob_score=True, n_jobs=-1, random_state=1000,
                            verbose=3, warm_start=False)
rf.fit(df_train[features_list], df_train['churn'])

In [None]:
rf.oob_score_

In [None]:
y_pred = rf.predict(df_test[features_list])
precision = precision_score(df_test['churn'], y_pred)
recall = recall_score(df_test['churn'], y_pred)
print(precision)
print(recall)

In [None]:
from sklearn.metrics import accuracy_score
accuracy_score(df_test['churn'], y_pred)

In [None]:
import matplotlib.pyplot as plt
feature_importance = rf.feature_importances_
sorted_indices = np.argsort(feature_importance)[::-1]
sorted_importance = feature_importance[sorted_indices]
sorted_features = features_list

plt.figure(figsize=(14, 6))
plt.bar(range(len(sorted_importance)), sorted_importance, tick_label=sorted_features)
plt.xticks(rotation=90)
plt.xlabel('Features')
plt.ylabel('Importance')
plt.title('Feature Importances')
plt.tight_layout()
plt.show()

In [None]:
# explainer = shap.TreeExplainer(rf)
# shap_values = explainer.shap_values(df_test[features_list])

In [None]:
# shap.summary_plot(shap_values, df_test[features_list])

In [None]:
# shap.summary_plot(shap_values[1], df_test[features_list])

In [None]:
# shap.summary_plot(shap_values[0], df_test[features_list])

### Word Embdeddings and Model Development

In [None]:
# Spark NLP Pipeline
### Text Tokens
document_assembler = DocumentAssembler() \
    .setInputCol("text_only")

documentNormalizer = DocumentNormalizer() \
    .setInputCols("document") \
    .setOutputCol("normalizedDocument") \
    .setLowercase(True)

sentence_detector = SentenceDetector() \
    .setInputCols(["normalizedDocument"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(["token"]) \
    .setOutputCol("token_lemma")

finisher = Finisher() \
    .setInputCols(["token_lemma"]) \
    .setOutputCols(["text_tokens"]) \
    .setIncludeMetadata(False) \
    .setOutputAsArray(True)

nlpPipeline = Pipeline(stages=[
    document_assembler,
    documentNormalizer,
    sentence_detector,
    tokenizer,
    lemmatizer,
    finisher
])

### Emoji Tokens
document_assembler_emoji = DocumentAssembler() \
    .setInputCol("emoji_only")

sentence_detector_emoji = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer_emoji = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

finisher_emoji = Finisher() \
    .setInputCols(["token"]) \
    .setOutputCols(["emoji_tokens"]) \
    .setIncludeMetadata(False) \
    .setOutputAsArray(True)

nlpPipeline_emoji = Pipeline(stages=[
    document_assembler_emoji,
    sentence_detector_emoji,
    tokenizer_emoji,
    finisher_emoji
])

### punctuations
document_assembler_punc = DocumentAssembler() \
    .setInputCol("punctuations")

sentence_detector_punc = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer_punc= Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

finisher_punc = Finisher() \
    .setInputCols(["token"]) \
    .setOutputCols(["punctuation_tokens"]) \
    .setIncludeMetadata(False) \
    .setOutputAsArray(True)

nlpPipeline_punc = Pipeline(stages=[
    document_assembler_punc,
    sentence_detector_punc,
    tokenizer_punc,
    finisher_punc
])



In [None]:
venmo_user = nlpPipeline.fit(venmo_user).transform(venmo_user)
venmo_user = nlpPipeline_emoji.fit(venmo_user).transform(venmo_user)
venmo_user = nlpPipeline_punc.fit(venmo_user).transform(venmo_user)

In [None]:
# Perform stratified sampling
venmo_user =  venmo_user.fillna(0)
venmo_train = venmo_user.sampleBy('churn', fractions={0: 0.7, 1: 0.7}, seed=10)
venmo_test = venmo_user.subtract(venmo_train)

In [None]:
from pyspark.ml.feature import HashingTF, IDF

In [None]:
hashingTF = HashingTF(inputCol="text_tokens", outputCol="countfeatures")

idf = IDF(inputCol="countfeatures", outputCol="tf_idf_features", minDocFreq=5) #minDocFreq: remove sparse terms

nlp_pipeline_tf = Pipeline(
    stages=[
            hashingTF,
            idf
            ])

nlp_model_tf = nlp_pipeline_tf.fit(venmo_train)

In [None]:
venmo_train = nlp_model_tf.transform(venmo_train)
venmo_test = nlp_model_tf.transform(venmo_test)

### Count Features and Modelling

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator


assembler = VectorAssembler(inputCols=["countfeatures"], outputCol="features")
training_data = assembler.transform(venmo_train)

# rf = RandomForestClassifier(numTrees=100, maxDepth=5, labelCol="churn", seed=42,subsamplingRate=0.4)
# model = rf.fit(training_data)

lr = LogisticRegression(featuresCol="features", labelCol="churn")
model = lr.fit(training_data)

# make predictions on the testing data
predictions = model.transform(training_data)
predictions.show(1,0)


evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="churn")
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Train AUC: {:.4f}".format(auc))

# create a feature vector from the "name" column in the testing data
testing_data = assembler.transform(venmo_test)

# make predictions on the testing data
predictions = model.transform(testing_data)
predictions.show(1,0)


evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="churn")
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Test AUC: {:.4f}".format(auc))

### TF-IDF Features and Model Development

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols=["tf_idf_features"], outputCol="features")
training_data = assembler.transform(venmo_train)


lr = LogisticRegression(featuresCol="features", labelCol="churn")
model = lr.fit(training_data)

In [None]:
# make predictions on the testing data
predictions = model.transform(training_data)
predictions.show(1,0)


evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="churn")
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Train AUC: {:.4f}".format(auc))

# create a feature vector from the "name" column in the testing data
testing_data = assembler.transform(venmo_test)

# make predictions on the testing data
predictions = model.transform(testing_data)
predictions.show(1,0)


evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="churn")
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

print("Test AUC: {:.4f}".format(auc))

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="churn")
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
print("AUC: {:.4f}".format(auc))

In [None]:
from pyspark.ml.feature import CountVectorizer, IDF

# Define the input data
data = [("example",),
        ("yet",),
        ("example another yet example",)]

df = spark.createDataFrame(data, ["text_only"])
from pyspark.sql.functions import split

df = nlpPipeline.fit(df).transform(df)

df = nlp_pipeline_tf.fit(df).transform(df)
df.show(3,0)