In [1]:
# Set up environment
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
# Read the file
df = spark.read.csv("HW4 - training data.csv", header=True, inferSchema=True, sep=';')

In [4]:
df.printSchema()

root
 |-- age: double (nullable = true)
 |-- male: double (nullable = true)
 |-- friend_cnt: double (nullable = true)
 |-- avg_friend_age: double (nullable = true)
 |-- avg_friend_male: double (nullable = true)
 |-- friend_country_cnt: double (nullable = true)
 |-- subscriber_friend_cnt: double (nullable = true)
 |-- songsListened: double (nullable = true)
 |-- lovedTracks: double (nullable = true)
 |-- posts: double (nullable = true)
 |-- playlists: double (nullable = true)
 |-- shouts: double (nullable = true)
 |-- delta_friend_cnt: double (nullable = true)
 |-- delta_avg_friend_age: double (nullable = true)
 |-- delta_avg_friend_male: double (nullable = true)
 |-- delta_friend_country_cnt: double (nullable = true)
 |-- delta_subscriber_friend_cnt: double (nullable = true)
 |-- delta_songsListened: double (nullable = true)
 |-- delta_lovedTracks: double (nullable = true)
 |-- delta_posts: double (nullable = true)
 |-- delta_playlists: double (nullable = true)
 |-- delta_shouts: doubl

In [5]:
df.show(5)

+----+----+----------+--------------+---------------+------------------+---------------------+-------------+-----------+-----+---------+------+----------------+--------------------+---------------------+------------------------+---------------------------+-------------------+-----------------+-----------+---------------+------------+------+------------+------------------+-------+-------+
| age|male|friend_cnt|avg_friend_age|avg_friend_male|friend_country_cnt|subscriber_friend_cnt|songsListened|lovedTracks|posts|playlists|shouts|delta_friend_cnt|delta_avg_friend_age|delta_avg_friend_male|delta_friend_country_cnt|delta_subscriber_friend_cnt|delta_songsListened|delta_lovedTracks|delta_posts|delta_playlists|delta_shouts|tenure|good_country|delta_good_country|adopter|user_id|
+----+----+----------+--------------+---------------+------------------+---------------------+-------------+-----------+-----+---------+------+----------------+--------------------+---------------------+---------------

In [6]:
# Show the proportion of each class
class_cnt = df.groupBy('adopter').count()
total_cnt = df.count()
class_cnt.withColumn('Proportion', class_cnt['Count']/total_cnt).show()
# Super imbalanced dataset

+-------+-----+--------------------+
|adopter|count|          Proportion|
+-------+-----+--------------------+
|      1| 1540|0.017766087538358597|
|      0|85142|  0.9822339124616414|
+-------+-----+--------------------+



In [6]:
# Firstly deal with the imbalanced dataset
from pyspark.sql.functions import col
df_major = df.filter(col('adopter') == 0)
df_minor = df.filter(col('adopter') == 1)
cnt_major = df_major.count()
cnt_minor = df_minor.count()

# Oversample the minor class, but not to 1:1; oversample to 1:3
df_minor_oversampled = df_minor.sample(True, cnt_major/(cnt_minor*3), seed = 42)
df_balanced = df_major.union(df_minor_oversampled)

# Check the distribution again
class_cnt = df_balanced.groupBy('adopter').count()
total_cnt = df_balanced.count()
class_cnt.withColumn('Proportion', class_cnt['Count']/total_cnt).show()

+-------+-----+-------------------+
|adopter|count|         Proportion|
+-------+-----+-------------------+
|      0|85142| 0.7489158830824985|
|      1|28545|0.25108411691750154|
+-------+-----+-------------------+



In [7]:
# replace the original df
df = df_balanced
# Drop the user id
df = df.drop('user_id')

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [9]:
# Split the dataset
df_train, df_test = df.randomSplit([0.7, 0.3], seed = 42)

In [10]:
# Create an assembler
features = [col for col in df.columns if col != "adopter"]
assembler = VectorAssembler(inputCols = features, outputCol = "features")

In [11]:
# Create a scaler
scaler = StandardScaler(inputCol = "features", outputCol = "scaled_features")

In [12]:
# Evaluator: f1
f1 = MulticlassClassificationEvaluator(labelCol="adopter", predictionCol="prediction", metricName="f1")

In [13]:
# Model 1. Decision Tree
dt = DecisionTreeClassifier(featuresCol = "scaled_features", labelCol = "adopter", seed = 42)
pl_dt = Pipeline(stages = [assembler, scaler, dt])
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [3, 5, 10]) \
    .addGrid(dt.minInstancesPerNode, [5, 10, 20]) \
    .addGrid(dt.impurity, ['gini', 'entropy']) \
    .build()
cv = CrossValidator(estimator = pl_dt, estimatorParamMaps = paramGrid, evaluator = f1, numFolds = 3, seed = 42)
cv_model = cv.fit(df_train)

In [14]:
dt_best = cv_model.bestModel
dt_best.stages[-1]

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_89a2322d6839, depth=10, numNodes=787, numClasses=2, numFeatures=25

In [15]:
pred = dt_best.transform(df_test)
f1_dt = f1.evaluate(pred)
f1_dt

0.8415140584946538

In [18]:
print(f"Best Max Depth: {dt_best.stages[-1].getMaxDepth()}")
print(f"Best Min Instances Per Node: {dt_best.stages[-1].getMinInstancesPerNode()}")
print(f"Best Impurity: {dt_best.stages[-1].getImpurity()}")

Best Max Depth: 10
Best Min Instances Per Node: 5
Best Impurity: gini


In [None]:
# Model 2. Random Forest
rf = RandomForestClassifier(featuresCol = "scaled_features", labelCol = "adopter", seed = 42)
pl_rf = Pipeline(stages = [assembler, scaler, rf])
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 200]) \
    .addGrid(rf.maxDepth, [5, 10, 20]) \
    .addGrid(rf.minInstancesPerNode, [5, 10]) \
    .build()
cv = CrossValidator(estimator = pl_rf, estimatorParamMaps = paramGrid, evaluator = f1, numFolds = 3, seed = 42)
cv_model = cv.fit(df_train)

In [None]:
rf_best = cv_model.bestModel
rf_best.stages[-1]

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_eff2b40387b9, depth=5, numNodes=41, numClasses=2, numFeatures=25

In [None]:
pred = rf_best.transform(df_test)
f1_rf = f1.evaluate(pred)
f1_rf

0.7573694324732099

In [None]:
# Model 3. GBT
gbt = GBTClassifier(featuresCol = "scaled_features", labelCol = "adopter", seed = 42)
pl_gbt = Pipeline(stages = [assembler, scaler, gbt])
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10, 20]) \
    .addGrid(gbt.minInstancesPerNode, [1, 5]) \
    .addGrid(gbt.stepSize, [0.1, 0.3, 0.5]) \
    .build()
cv = CrossValidator(estimator = pl_gbt, estimatorParamMaps = paramGrid, evaluator = f1, numFolds = 3, seed = 42)
cv_model = cv.fit(df_train)

In [None]:
gbt_best = cv_model.bestModel
gbt_best.stages[-1]

In [None]:
pred = gbt_best.transform(df_test)
precision_gbt = f1.evaluate(pred)
precision_gbt

In [30]:
# Model 4. Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Transformer
from pyspark.ml.param import Param, Params
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

lr = LogisticRegression(featuresCol="scaled_features", labelCol="adopter")

class ProbabilityThreshold(Transformer, Params):
    threshold = Param(Params._dummy(), "threshold", "threshold for binary classification")
    outputCol = Param(Params._dummy(), "outputCol", "output column name")

    def __init__(self, threshold=0.3, outputCol="custom_prediction"):
        super(ProbabilityThreshold, self).__init__()
        self._set(threshold=threshold, outputCol=outputCol)

    def _transform(self, dataset):
        threshold = self.getThreshold()
        
        def apply_threshold(probability):
            return float(probability[1] > threshold)

        apply_threshold_udf = udf(apply_threshold, DoubleType())
        return dataset.withColumn(self.getOutputCol(), apply_threshold_udf(dataset.probability))

    def getThreshold(self):
        return self.getOrDefault(self.threshold)

    def setThreshold(self, value):
        return self._set(threshold=value)

    def getOutputCol(self):
        return self.getOrDefault(self.outputCol)

    def setOutputCol(self, value):
        return self._set(outputCol=value)

In [31]:
# Add the custom transformer to the pipeline
th = ProbabilityThreshold(threshold=0.3, outputCol="custom_prediction")

pl_lr = Pipeline(stages=[assembler, scaler, lr, th])

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

In [34]:
cv = CrossValidator(estimator=pl_lr, estimatorParamMaps=paramGrid, evaluator=f1, numFolds=3, seed=42)

cv_model = cv.fit(df_train)
lr_best = cv_model.bestModel
lr_best.stages[-1]

ProbabilityThreshold_cebf95b4701d

In [36]:
pred = lr_best.transform(df_test)
f1_lr = f1.evaluate(pred)
f1_lr

0.7099224185317523

### Output

In [None]:
df2 = spark.read.csv("HW4 - test data.csv", header=True, inferSchema=True, sep=';')
user_id = df2.select("user_id")
df2 = df2.drop('user_id')

In [None]:
best = DecisionTreeClassifier(featuresCol = "scaled_features", labelCol = "adopter", seed = 42, maxDepth = 10, minInstancesPerNode = 5, impurity = "gini")
pl = Pipeline(stages = [assembler, scaler, best])
model = lr.fit(df_train)
pred = model.transform(df2)

In [None]:
import pandas as pd
prediction = pred.select('prediction').toPandas()

In [3]:
import pandas as pd
result = pd.read_csv('result2.csv')
result.head()

Unnamed: 0,user_id,prediction(adopter)
0,5,False
1,41,False
2,77,False
3,99,False
4,106,True


In [None]:
user_id = user_id.toPandas()

In [None]:
result = pd.concat([user_id, prediction], axis=1)

In [None]:
result.rename(columns={'prediction': 'prediction(adopter)'}, inplace=True)

In [4]:
result['user_id'] = result['user_id'].astype(str)
result['prediction(adopter)'] = result['prediction(adopter)'].astype(bool)

In [5]:
result

Unnamed: 0,user_id,prediction(adopter)
0,5,False
1,41,False
2,77,False
3,99,False
4,106,True
...,...,...
86676,1708912,False
86677,1708924,False
86678,1708946,False
86679,1708972,False


In [None]:
result.dtypes

user_id                object
prediction(adopter)      bool
dtype: object

In [None]:
result.to_csv('result.csv', index=False)

In [None]:
result = pd.read_csv('result2.csv')

In [6]:
result['user_id'] = result['user_id'].astype(float)
result['prediction(adopter)'] = result['prediction(adopter)'].astype(bool)

In [None]:
result.dtypes

user_id                float64
prediction(adopter)       bool
dtype: object

In [None]:
result

Unnamed: 0,user_id,prediction(adopter)
0,5.0,False
1,41.0,True
2,77.0,True
3,99.0,True
4,106.0,True
...,...,...
86676,1708912.0,True
86677,1708924.0,False
86678,1708946.0,True
86679,1708972.0,True


In [7]:
result.to_csv('result4.csv', index=False)

In [None]:
df2 = pd.read_csv('result4.csv')

In [None]:
df2.dtypes

user_id                float64
prediction(adopter)       bool
dtype: object