In [1]:
import os,sys

# os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StructType,IntegerType,FloatType,BooleanType,StringType
from pyspark.sql.functions import rand, count, isnull, when, col
conf = SparkConf().setMaster("local[*]").setAppName("My App")
sc = SparkContext.getOrCreate(conf = conf)
sc._conf.set('spark.executor.memory','15g')\
    .set('spark.driver.memory','15g')\
        .set('spark.driver.maxResultsSize','0')
spark=SparkSession.builder\
    .appName('myApp')\
        .config("spark.driver.memory", "15g")\
            .getOrCreate()

In [3]:
def load_data(files,schema):
    df=spark.read.csv(files,header=True
                  ,schema=schema)
    return df

def load_record_linkage_data():
    schema = StructType() \
      .add("id_1",IntegerType(),True) \
      .add("id_2",IntegerType(),True) \
      .add("cmp_fname_c1",FloatType(),True) \
      .add("cmp_fname_c2",FloatType(),True) \
      .add("cmp_lname_c1",FloatType(),True) \
      .add("cmp_lname_c2",FloatType(),True) \
      .add("cmp_sex",IntegerType(),True) \
      .add("cmp_bd",IntegerType(),True) \
      .add("cmp_bm",IntegerType(),True) \
      .add("cmp_by",IntegerType(),True) \
      .add("cmp_plz",IntegerType(),True) \
      .add("is_match",BooleanType(),False)
    files=[f'./data/block_{id}.csv' for id in range(1,11)]
    return load_data(files,schema=schema)

In [4]:
df=load_record_linkage_data()

In [5]:
df.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: float (nullable = true)
 |-- cmp_fname_c2: float (nullable = true)
 |-- cmp_lname_c1: float (nullable = true)
 |-- cmp_lname_c2: float (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm: integer (nullable = true)
 |-- cmp_by: integer (nullable = true)
 |-- cmp_plz: integer (nullable = true)
 |-- is_match: boolean (nullable = true)



In [6]:
df.show(10)

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|14055|94934|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|33948|34740|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|  946|71870|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|64880|71676|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|25739|45991|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|  

## تبدیل به داده‌های عددی

In [7]:
from pyspark.sql.functions import when, lit


def convert_label_binary(input_df):
    temp = input_df.withColumn('label',
                             when(input_df['is_match']==True,
                                  lit(1)).otherwise(0)
                                  ) 
    return temp

In [8]:
numerical_df = convert_label_binary(df).drop('is_match')

In [9]:
numerical_df.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: float (nullable = true)
 |-- cmp_fname_c2: float (nullable = true)
 |-- cmp_lname_c1: float (nullable = true)
 |-- cmp_lname_c2: float (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm: integer (nullable = true)
 |-- cmp_by: integer (nullable = true)
 |-- cmp_plz: integer (nullable = true)
 |-- label: integer (nullable = false)



In [10]:
numerical_df.show(10)

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+-----+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|label|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+-----+
| 3148| 8326|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|14055|94934|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|33948|34740|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|  946|71870|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|64880|71676|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|25739|45991|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|62415|93584|      

# Missing Values

In [11]:
numerical_df.select([count(when(isnull(column), column)).alias(column) for column in numerical_df.columns]).show()

+----+----+------------+------------+------------+------------+-------+------+------+------+-------+-----+
|id_1|id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|label|
+----+----+------------+------------+------------+------------+-------+------+------+------+-------+-----+
|   0|   0|        1007|     5645434|           0|     5746668|      0|   795|   795|   795|  12843|    0|
+----+----+------------+------------+------------+------------+-------+------+------+------+-------+-----+



### Drop Missing Values


In [15]:
no_id_numerical_df = numerical_df.drop('id_1','id_2')

In [16]:
no_id_numerical_df.show(10)

+------------+------------+------------+------------+-------+------+------+------+-------+-----+
|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|label|
+------------+------------+------------+------------+-------+------+------+------+-------+-----+
|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    1|
|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      0|    1|
|         1.0|        null|   

In [20]:
no_id_numerical_df.count()

5749132

In [21]:
no_id_numerical_df.na.drop(how='all').count()

5749132

In [33]:
no_id_numerical_df.na.drop(how='any',thresh=2).count()

5749132

### Fill the Missing Values

In [22]:
from pyspark.ml.feature import Imputer


def fill_missing_values(input_df):
    # for float variables
    miss_df=input_df.drop('id_1','id_2')
    miss_df=miss_df.replace('?',None)
    float_cols=[
    'cmp_fname_c1', 
    'cmp_fname_c2', 
    'cmp_lname_c1', 
    'cmp_lname_c2', 
    ]
    float_imputer = Imputer(
        inputCols=float_cols,
        outputCols=[f"{col}_imp" for col in float_cols]
    ).setStrategy('mean')

    # for binary variables
    binary_cols=[
        'cmp_sex', 
        'cmp_bd', 
        'cmp_bm', 
        'cmp_by',
        'cmp_plz',
    ]
    binary_imputer = Imputer(
        inputCols=binary_cols,
        outputCols=[f"{col}_imp" for col in binary_cols]
    ).setStrategy('mode')
    imputed_df=float_imputer.fit(miss_df).transform(miss_df)
    output_df=binary_imputer.fit(imputed_df).transform(imputed_df)
    output_df=output_df.select([x for x in output_df.columns if '_imp' in x or x=='is_match'])
    return output_df


def preprocessing_df(input_df):
    output_df = convert_label_binary(fill_missing_values(input_df))
    return output_df.drop('is_match')

In [23]:
prep_df=preprocessing_df(df)


In [28]:
prep_df.show(10)

+----------------+----------------+----------------+----------------+-----------+----------+----------+----------+-----------+-----+
|cmp_fname_c1_imp|cmp_fname_c2_imp|cmp_lname_c1_imp|cmp_lname_c2_imp|cmp_sex_imp|cmp_bd_imp|cmp_bm_imp|cmp_by_imp|cmp_plz_imp|label|
+----------------+----------------+----------------+----------------+-----------+----------+----------+----------+-----------+-----+
|             1.0|       0.9000177|             1.0|      0.31841284|          1|         1|         1|         1|          1|    1|
|             1.0|       0.9000177|             1.0|      0.31841284|          1|         1|         1|         1|          1|    1|
|             1.0|       0.9000177|             1.0|      0.31841284|          1|         1|         1|         1|          1|    1|
|             1.0|       0.9000177|             1.0|      0.31841284|          1|         1|         1|         1|          1|    1|
|             1.0|       0.9000177|             1.0|      0.31841284|

In [29]:
prep_df.select([count(when(isnull(column), column)).alias(column) for column in prep_df.columns]).show()

+----------------+----------------+----------------+----------------+-----------+----------+----------+----------+-----------+-----+
|cmp_fname_c1_imp|cmp_fname_c2_imp|cmp_lname_c1_imp|cmp_lname_c2_imp|cmp_sex_imp|cmp_bd_imp|cmp_bm_imp|cmp_by_imp|cmp_plz_imp|label|
+----------------+----------------+----------------+----------------+-----------+----------+----------+----------+-----------+-----+
|               0|               0|               0|               0|          0|         0|         0|         0|          0|    0|
+----------------+----------------+----------------+----------------+-----------+----------+----------+----------+-----------+-----+



In [30]:
from pyspark.ml.feature import VectorAssembler
def feature_engineering(input_df,feature_list,label_name):
    assembler = VectorAssembler(inputCols=feature_list,
                             outputCol='features')
    assembled_df = assembler.transform(input_df)
    output_df=assembled_df.select('features', label_name)
    return output_df

In [31]:
input_features=list(set(prep_df.columns) - set(['label','is_match']))
input_features

['cmp_bm_imp',
 'cmp_by_imp',
 'cmp_bd_imp',
 'cmp_fname_c1_imp',
 'cmp_fname_c2_imp',
 'cmp_lname_c2_imp',
 'cmp_sex_imp',
 'cmp_lname_c1_imp',
 'cmp_plz_imp']

In [32]:

assembled_df = feature_engineering(prep_df,input_features,'label')
assembled_df.show(10, truncate=False)

+-------------------------------------------------------------------+-----+
|features                                                           |label|
+-------------------------------------------------------------------+-----+
|[1.0,1.0,1.0,1.0,0.9000176787376404,0.3184128403663635,1.0,1.0,1.0]|1    |
|[1.0,1.0,1.0,1.0,0.9000176787376404,0.3184128403663635,1.0,1.0,1.0]|1    |
|[1.0,1.0,1.0,1.0,0.9000176787376404,0.3184128403663635,1.0,1.0,1.0]|1    |
|[1.0,1.0,1.0,1.0,0.9000176787376404,0.3184128403663635,1.0,1.0,1.0]|1    |
|[1.0,1.0,1.0,1.0,0.9000176787376404,0.3184128403663635,1.0,1.0,1.0]|1    |
|[1.0,1.0,1.0,1.0,0.9000176787376404,0.3184128403663635,1.0,1.0,1.0]|1    |
|[1.0,1.0,1.0,1.0,0.9000176787376404,0.3184128403663635,1.0,1.0,0.0]|1    |
|[1.0,1.0,1.0,1.0,0.9000176787376404,0.3184128403663635,1.0,1.0,1.0]|1    |
|[1.0,1.0,1.0,1.0,0.9000176787376404,0.3184128403663635,1.0,1.0,1.0]|1    |
|[1.0,1.0,1.0,1.0,0.9000176787376404,0.3184128403663635,1.0,1.0,1.0]|1    |
+-----------

In [None]:
def test_train_split(input_df,train_size=0.7):
    train, test = assembled_df.randomSplit([train_size,1 - train_size], seed=42)
    return train,test

In [None]:
train, test = test_train_split(assembled_df,0.7)
train.count()

### ML Models

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier,LogisticRegression
from pyspark.ml import Pipeline

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
import numpy as np
def evaluate_from_scratch(pred,model_name='Logistic Regression'):
    pred.groupBy('label', 'prediction').count().show()

    # Calculate the elements of the confusion matrix
    TN = pred.filter('prediction = 0 AND label = prediction').count()
    TP = pred.filter('prediction = 1 AND label = prediction').count()
    FN = pred.filter('prediction = 0 AND label = 1').count()
    FP = pred.filter('prediction = 1 AND label = 0').count()

    # Accuracy measures the proportion of correct predictions
    accuracy = (TN + TP) / (TN + TP + FN + FP)
    recall = (TP) / (TP+FN)
    precision= (TP) / (TP+FP)
    f1=2*(precision*recall)/(precision+recall)
    print(f'EVALUATION SUMMARY for {model_name}:')
    print(f" accuracy:{accuracy}")
    print(f" precision:{precision}")
    print(f" recall:{recall}")
    print(f" f1-score:{f1}")

def evaluate_from_spark(predictions,model_name='Logistic Regression'):
    eval = BinaryClassificationEvaluator(rawPredictionCol="probability", labelCol="label")
    eval2= MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")
    AUC  = eval.evaluate(predictions)
    ACC  = eval2.evaluate(predictions, {eval2.metricName:"accuracy"})
    PREC  = eval2.evaluate(predictions, {eval2.metricName:"weightedPrecision"})
    REC  = eval2.evaluate(predictions, {eval2.metricName:"weightedRecall"})
    F1  = eval2.evaluate(predictions, {eval2.metricName:"f1"})
    WeightedFMeasure=eval2.evaluate(predictions, {eval2.metricName:"weightedFMeasure"})
    print(f"{model_name} Performance Measure")
    print(" Accuracy = %0.8f" % ACC)
    print(" Weighted Precision = %0.8f" % PREC)
    print(" Weighted Recall = %0.8f" % REC)
    print(" F1 = %0.8f" % F1)
    print(" Weighted F Measure = %0.8f" % WeightedFMeasure)

    print(" AUC = %.8f" % AUC)
    print(" ROC curve:")
    PredAndLabels           = predictions.select("probability", "label")
    PredAndLabels_collect   = PredAndLabels.collect()
    PredAndLabels_list      = [(float(i[0][0]), 1.0-float(i[1])) for i in PredAndLabels_collect]
    PredAndLabels           = sc.parallelize(PredAndLabels_list)
    fpr = dict()                                                        # FPR: False Positive Rate
    tpr = dict()                                                        # TPR: True Positive Rate
    roc_auc = dict()
    
    y_test = [i[1] for i in PredAndLabels_list]
    y_score = [i[0] for i in PredAndLabels_list]
    
    fpr, tpr, _ = roc_curve(y_test, y_score)
    roc_auc = auc(fpr, tpr)
    
    plt.figure(figsize=(5,4))
    plt.plot(fpr, tpr, label='ROC curve (area = %0.8f)' % roc_auc)
    plt.plot([0, 1], [0, 1], 'r--')
    # plt.xlim([0.0, 1.0])
    # plt.ylim([0.0, 1.05])
    plt.yticks(np.arange(0,1.03,0.1))
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'ROC Curve - {model_name}')
    plt.legend(loc="lower right")
    plt.show()

    

def evaluate(predictions,model_name=None):
    print('Evaluate From Scratch:')
    evaluate_from_scratch(predictions,model_name)
    print('\nEvaluate From Spark Library:')
    evaluate_from_spark(predictions,model_name)

In [None]:
# lr=LogisticRegression(featuresCol='features', labelCol='label')
# lr_model = lr.fit(train)
# lr_result = lr_model.transform(test)

In [None]:
lr=LogisticRegression(featuresCol='features', labelCol='label')
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(train)
lr_result = model.transform(test)

In [None]:
lr_result.select('label', 'prediction', 'probability').show(3)

In [None]:
evaluate(lr_result)

### Decision Tree

In [None]:
tree = DecisionTreeClassifier()
tree_pipeline = Pipeline(stages=[tree])
tree_model = tree_pipeline.fit(train)
tree_result = tree_model.transform(test)

In [None]:
evaluate(tree_result,model_name='Decision Tree')

### RandomForest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rf_pipeline = Pipeline(stages=[ rf])
rf_model=rf_pipeline.fit(train)
rf_result=rf_model.transform(test)

In [None]:
evaluate(rf_result,model_name='Random Forest')

# Not Refactored yet...


### Cross Validation 

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [None]:
lr_cs = LogisticRegression()

In [None]:
from pyspark.ml.classification import LogisticRegression
lr_cs = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr_cs.regParam, [0.1, 0.01]) \
    .addGrid(lr_cs.fitIntercept, [False, True])\
    .addGrid(lr_cs.elasticNetParam, [0.0, 1.0])\
    .build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr_cs, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(train)
lrprediction=cvModel.transform(test)



print('Accuracy:', evaluator.evaluate(lrprediction))
print('AUC:', BinaryClassificationMetrics(lrprediction['label','prediction'].rdd).areaUnderROC)

In [None]:
evaluate(lrprediction)

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier()
grid = ParamGridBuilder() \
        .addGrid(dt.maxDepth,  [2, 5, 10, 20, 30]) \
        .addGrid(dt.maxBins,  [10, 20, 40, 80, 100]) \
        .build()
dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
cv = CrossValidator(estimator=dt, 
                    estimatorParamMaps=grid, 
                    evaluator=dtevaluator,
                    numFolds = 3)
dtModel = cv.fit(train)
dtpredictions = dtModel.transform(test)

print('Accuracy:', dtevaluator.evaluate(dtpredictions))
print('AUC:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderROC)

In [None]:
evaluate(dtpredictions)