In [268]:
spark.stop()

In [269]:
import pyspark.sql.functions as fn
import pyspark.sql.types as tp
from pyspark.sql import SparkSession, DataFrame

spark = (SparkSession.builder
            .master("local[2]")
            .config("spark.driver.bindAddress", "localhost")
            .config("spark.driver.port", "8080")
            .config("spark.driver.memory", "2g")
            .config("spark.driver.host", "localhost")
            .config("spark.executor.memory", "3g" )
            .config("spark.executor.cores", "5" )
            .config("spark.dynamicAllocation.enabled", "true" )
            .config("spark.default.parallelism", "2" )
            .config("spark.shuffle.io.retryWait", "2000ms" )
            .config("spark.shuffle.io.maxRetries", "2" )
            .getOrCreate())

In [279]:
def getDfWithSuffixedColumns(df, suffix):
    return df.select([fn.col(c).alias(f"{c}{suffix}") for c in df.columns])

def mergeDatasetOnKey(suffix, df1, df2): 
    df_new = getDfWithSuffixedColumns(df2, suffix=suffix)
    return df1.join(df_new, df1[f"key{suffix}"]==df_new[f"pkey{suffix}"], "inner")

def dropColumns(training, db):
    '''
    Returns the training_dataset and cleaned_data_dataset with dropped (aka unnecessary columns)
    '''
    dbColsToDrop = [
    "_c0",
    "paddress",
    "ppublisher",
    "pseries",
    "pbooktitlefull_id",
    "pjournalfull_id",
    "peditor",
    "pbooktitle_id",
    "partition"
    ]
    trainColsToDrop = ["id", "partition"]
    return (training.drop(*trainColsToDrop), db.drop(*dbColsToDrop))


def jaccard_similarity(a, b):
    # convert to set
    a = set(a)
    b = set(b)
    # calucate jaccard similarity
    j = float(len(a.intersection(b))) / len(a.union(b))
    return j



In [288]:
df = spark.read.csv("data/db/db.csv", sep="!", header=True)
train_df = spark.read.csv("data/train.csv", header=True)

(train_df, df) = dropColumns(train_df, df)
train_df =  mergeDatasetOnKey("1", mergeDatasetOnKey("2", train_df, df), df)

[Row(key1='conf/semweb/PaolucciKPS02', key2='conf/woa/SacileMPB00', label='False', pyear2='2000.0', pid2='778642', pkey2='conf/woa/SacileMPB00', ptype_id2='4', pjournal_id2='0', clean_author2='Roberto Sacile|Ernesto Montaldo|Massimo Paolucci 0002|Antonio Boccalatte', clean_title2='Intelligent agents applied to manufacturing: the MAKE-IT approach.', pyear1='2002.0', pid1='683591', pkey1='conf/semweb/PaolucciKPS02', ptype_id1='1', pjournal_id1='0', clean_author1='Massimo Paolucci|Takahiro Kawamura|Terry R. Payne|Katia P. Sycara', clean_title1='Semantic Matching of Web Services Capabilities.'),
 Row(key1='conf/cases/AkgulLM01', key2='conf/date/AkgulM01', label='True', pyear2='2001.0', pid2='166440', pkey2='conf/date/AkgulM01', ptype_id2='4', pjournal_id2='0', clean_author2='Bilge Saglam Akgul|Vincent John Mooney III', clean_title2='System-on-a-chip processor synchronization support in hardware.', pyear1='2001.0', pid1='91031', pkey1='conf/cases/AkgulLM01', ptype_id1='4', pjournal_id1='0',

In [297]:
jaccard_udf = fn.udf(jaccard_similarity, fn.StringType())

def addColumns(training_df):
    temp_df = training_df

    temp_df = temp_df.withColumn("jaccard_author", jaccard_udf(*["clean_author1", "clean_author2"]).cast("Double"))
    temp_df = temp_df.withColumn("jaccard_title", jaccard_udf(*["clean_title1", "clean_title2"]).cast("Double"))
    temp_df = temp_df.withColumn("jaccard_key", jaccard_udf(*["key1", "key2"]).cast("Double"))
    temp_df = temp_df.withColumn("diff_year", temp_df.pyear2 - temp_df.pyear1)
    return temp_df

train_df = addColumns(train_df)

[Row(key1='conf/semweb/PaolucciKPS02', key2='conf/woa/SacileMPB00', label='False', pyear2='2000.0', pid2='778642', pkey2='conf/woa/SacileMPB00', ptype_id2='4', pjournal_id2='0', clean_author2='Roberto Sacile|Ernesto Montaldo|Massimo Paolucci 0002|Antonio Boccalatte', clean_title2='Intelligent agents applied to manufacturing: the MAKE-IT approach.', pyear1='2002.0', pid1='683591', pkey1='conf/semweb/PaolucciKPS02', ptype_id1='1', pjournal_id1='0', clean_author1='Massimo Paolucci|Takahiro Kawamura|Terry R. Payne|Katia P. Sycara', clean_title1='Semantic Matching of Web Services Capabilities.', jaccard_author=0.5625, jaccard_title=0.5625, jaccard_key=0.6190476190476191, diff_year=-2.0),
 Row(key1='conf/cases/AkgulLM01', key2='conf/date/AkgulM01', label='True', pyear2='2001.0', pid2='166440', pkey2='conf/date/AkgulM01', ptype_id2='4', pjournal_id2='0', clean_author2='Bilge Saglam Akgul|Vincent John Mooney III', clean_title2='System-on-a-chip processor synchronization support in hardware.', 

## Feature Prep

In [305]:
from pyspark.ml.feature import VectorAssembler

required_features = [
                    'jaccard_author',
                    'jaccard_title',
                    'jaccard_key',
                    'diff_year'
                   ]

assembler = VectorAssembler(inputCols=required_features, outputCol='features')

transformed_data = assembler.transform(train_df)
# transformed_data.show()
transformed_data = transformed_data.withColumn("label", transformed_data.label.cast('boolean').cast('integer'))

transformed_data.take(2)

[Row(key1='conf/semweb/PaolucciKPS02', key2='conf/woa/SacileMPB00', label=0, pyear2='2000.0', pid2='778642', pkey2='conf/woa/SacileMPB00', ptype_id2='4', pjournal_id2='0', clean_author2='Roberto Sacile|Ernesto Montaldo|Massimo Paolucci 0002|Antonio Boccalatte', clean_title2='Intelligent agents applied to manufacturing: the MAKE-IT approach.', pyear1='2002.0', pid1='683591', pkey1='conf/semweb/PaolucciKPS02', ptype_id1='1', pjournal_id1='0', clean_author1='Massimo Paolucci|Takahiro Kawamura|Terry R. Payne|Katia P. Sycara', clean_title1='Semantic Matching of Web Services Capabilities.', jaccard_author=0.5625, jaccard_title=0.5625, jaccard_key=0.6190476190476191, diff_year=-2.0, features=DenseVector([0.5625, 0.5625, 0.619, -2.0])),
 Row(key1='conf/cases/AkgulLM01', key2='conf/date/AkgulM01', label=1, pyear2='2001.0', pid2='166440', pkey2='conf/date/AkgulM01', ptype_id2='4', pjournal_id2='0', clean_author2='Bilge Saglam Akgul|Vincent John Mooney III', clean_title2='System-on-a-chip process

## Training

In [317]:
[train, test] = transformed_data.randomSplit([0.9, 0.1], seed=1000)

test_with_label = test
test_without_label = test.drop("label")

In [331]:
from pyspark.ml.classification import RandomForestClassifier


rf = RandomForestClassifier(labelCol='label', 
                            featuresCol='features',
                            maxDepth=8)
model = rf.fit(train)
rf_predictions = model.transform(test_with_label)

In [335]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import GBTClassifier

reg = LogisticRegression(labelCol='label')
reg_model = reg.fit(train)
reg_predictions = reg_model.transform(test_with_label)

## Evaluation

In [336]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'label', metricName = 'accuracy')
print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions))
print('Logistic Regression Accuracy:', multi_evaluator.evaluate(reg_predictions))

Random Forest classifier Accuracy: 0.7843137254901961
Logistic Regression Accuracy: 0.7855392156862745
