In [None]:
!pip install pyspark graphframes

In [None]:
!export PYSPARK_SUBMIT_ARGS='--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12'

In [None]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import tensorflow_hub as hub
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql import Window as w
from graphframes import GraphFrame
from pyspark.ml.linalg import DenseVector, SparseVector
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer, CountVectorizer, StopWordsRemover, NGram, Normalizer, VectorAssembler, Word2Vec, Word2VecModel, PCA
from pyspark.ml.clustering import LDA
from pyspark.ml.linalg import VectorUDT, Vectors

In [None]:
spark = (
    SparkSession.builder
    .config('spark.executor.memory', '4g')
    .config('spark.app.name', 'Spark Updated Conf')
    .config('spark.executor.cores', '2')
    .config('spark.cores.max', '2')
    .config('spark.driver.memory','8g')
    .getOrCreate()
)

# 0. Load data

In [None]:
df = spark.read.csv("X2.csv", header=True)
df.toPandas()

# 0. Data cleaning

In [None]:
# Set everything to lowercase
for c in df.columns:
    df = df.withColumn(c, f.lower(f.col(c)))

# Extract brand or infer from title
df = df.drop('ssd_capacity')
df = df.withColumn('brand', f.regexp_extract('brand', "^(\w+)", 0))
computer_brands = ['lenovo', 'acer', 'hp', 'dell', 'asus', 'samsung', 'huawei', 'surface', 'apple']
computer_brands_pattern = '({})'.format('|'.join(computer_brands))
df = df.withColumn('brand', f.when( f.regexp_extract('title', computer_brands_pattern, 0)!='', f.regexp_extract('title', computer_brands_pattern, 0))\
                   .otherwise(df.brand))
#exctract cpu_brand and infer type if intel
cpu_brands = ['intel', 'apple', 'amd', 'nvidia', 'arm']
cpu_pattern = '({})'.format('|'.join(cpu_brands))
df = df.withColumn('cpu_model',f.regexp_extract('cpu_model', '(i\d|pentium|celeron|a\d)', 0))
df = df.withColumn('cpu_model', f.when( (f.regexp_extract('cpu_brand','(intel|amd)', 0 )!='') & f.isnull(df.cpu_model) ,\
                                        f.regexp_extract('cpu_brand', '(i\d|pentium|celeron|a\d)', 0))\
                   .otherwise(df.cpu_model))
df = df.withColumn('cpu_brand', f.when(f.regexp_extract('cpu_brand', cpu_pattern, 0) != '', f.regexp_extract('cpu_brand', cpu_pattern, 1))\
                                       .otherwise(f.regexp_extract('title', cpu_pattern, 0)))
df = df.withColumn('weight', f.when(df.weight.contains('pounds') | df.weight.contains('lbs'),
                                    (f.regexp_extract('weight', '(\d+.?\d)', 0).cast(t.DoubleType()))).otherwise(
                                    f.round(f.regexp_extract('weight', '(\d+.?\d)', 0).cast(t.DoubleType())*2.20462,1)
                        )
                    )


In [None]:
from more_itertools import intersperse

def merge_columns(df, column_names, output):
    df = df.withColumn(output, f.concat_ws(" ", *column_names))
    return df.drop(*column_names)

ddf = df.drop("ram_frequency")
ddf = merge_columns(ddf, ["cpu_brand", "cpu_model", "cpu_frequency", "cpu_type"], "cpu")
ddf = merge_columns(ddf, ["ram_capacity", "ram_type"], "ram")
ddf.toPandas()

# 1. Blocking
Blocking will be done feeding a TF-IDF matrix to an LDA model and extracting
keywords from the title matching them to topics.

In [None]:
@f.udf(returnType=t.ArrayType(t.StringType()))
def filter_alnum(arr):
    return [t for t in arr if t.isalnum() and len(t) > 2]

"""Returns the df with tokenized columns with stopwords removed"""
def tokenize(df, string_cols):
    output = df
    stopW = ['softwarecity', 'amazon', 'com', 'pc', 'windows', 'computers', 'computer', 'accessories', 'laptop', 'notebook', 'kg', 'inch', 'processor', 'memory','gb', 'ram', 'hdd', 'ssd', 'cpu', 'display', 'hz', 'ghz', 'tb','rpm', 'slot', 'slots', 'mhz', 'cache', 'ram', 'ddram', 'dram', 'hd']
    for c in string_cols:
        output = output.withColumn('temp', f.coalesce(f.col(c), f.lower(c), f.lit('')))
        tokenizer = RegexTokenizer(inputCol='temp', outputCol=c+"_rawtokens", pattern = "\\W")
        remover = StopWordsRemover(inputCol=c+"_rawtokens", outputCol=c+"_tokens", stopWords=stopW)

        output = tokenizer.transform(output)
        output = remover.transform(output).drop(c+"_rawtokens")
        output = output.withColumn(c+'_tokens', f.array_distinct(filter_alnum(f.col(c+"_tokens"))))
    # output has c+tokens columns
    return output.drop("temp")

def generate_blocking_keys(df, token_cols, min_freq=1):
    """Pipeline:
            1 - CountVectorizer -> TF
            2 - IDF
            3 - LDA
    """
    df = df.withColumn('tokens', f.array_distinct(f.concat(*token_cols)))
    cv = CountVectorizer(inputCol='tokens', outputCol="raw_features")
    cvmodel = cv.fit(df)
    df_vect = cvmodel.transform(df)

    idf = IDF(inputCol="raw_features", outputCol="features", minDocFreq=min_freq)
    idfModel = idf.fit(df_vect)
    df_idf= idfModel.transform(df_vect)

    normalizer = Normalizer(p=2.0, inputCol='features', outputCol='tfidf')
    output = normalizer.transform(df_idf)

    lda = LDA(k=5, maxIter=1000, featuresCol='tfidf')
    lda_model = lda.fit(output)
    vocab = cvmodel.vocabulary
    #returns words for each topic term
    @f.udf(returnType=t.ArrayType(t.StringType()))
    def get_words(token_list):
        return [vocab[token_id] for token_id in token_list]

    #create list of topic keywords
    # i.e topic 1 -> acer, anspire, intel
    topics = lda_model.describeTopics(3).withColumn('topicWords', get_words(f.col('termIndices'))).collect()
    list_of_topics = []
    for r in topics:
        topicW = r.__getitem__('topicWords')
        for w in topicW:
            list_of_topics.append(w)

    #returns list of 3 'hashtags' i.e keywords for topic
    #from tokens: title, brand, cpu_brand
    @f.udf(returnType=t.ArrayType(t.StringType()))
    def get_key(words):
        l = [w for w in words if w in list_of_topics]
        l = list(set(l))
        l.sort()
        return l[:3]
    output = output.withColumn("blocking_key", get_key(f.col("tokens")))
    return output

"""Use universal sentence encoder from tensorflow_hub"""
MODEL = None
def get_model_magic():
  global MODEL
  if MODEL is None:
      MODEL = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
  return MODEL

@f.udf(returnType=VectorUDT())
def encode_sentence(x):
  model = get_model_magic()
  emb = model([x]).numpy()[0]
  return Vectors.dense(emb)

In [None]:
columns = ['title', 'cpu', 'ram']
#Generate Blocking Keys

blocking_df = tokenize(ddf, columns)
#for c in columns:
    #blocking_df = blocking_df.withColumn(c+'_encoding', encode_sentence(f.coalesce(f.col(c), f.lit(''))))
blocking_df = generate_blocking_keys(blocking_df, [c+'_tokens' for c in columns])

#Add encoding on title
blocking_df = blocking_df.withColumn('title_encoding', encode_sentence(f.coalesce(f.col('title'), f.lit(''))))

blocking_df.toPandas()

In [None]:
blocking_df.groupby('blocking_key').count().show()

# 2. Candidate pairs generation and match likelihood

In [None]:
"""
This cell output a candidates dataframe that has
instance_ids pairs that makes sense to compare, i.e each
entity will be paired with another entity from the same block
"""

cols_to_keep = ["instance_id", "title_encoding"]
# Filter blocks to only keep ones bigger than one
pairs = (
    blocking_df
    .select(*cols_to_keep, 'blocking_key')
    .groupby('blocking_key').agg(f.count('instance_id').alias('size'), f.collect_set('instance_id').alias('id'))\
    .filter(f.col('size') > 1).select('blocking_key',f.explode('id').alias('id'))
)

left = pairs.withColumnRenamed('id', 'src')
right = pairs.withColumnRenamed('id', 'dst')
#candidates based on matching of blocking_key (i.e inside the block)
candidates = left.join(right, ['blocking_key'], 'inner')\
    .filter(f.col('src') < f.col('dst'))\
    .select('src', 'dst').distinct()
node = blocking_df.select(f.col('instance_id').alias('id'), 'title_encoding').drop('instance_id')

In [None]:
"""
@f.udf(returnType=t.DoubleType())
def dot(x, y):
  if x is not None and y is not None:
    return float(x.dot(y))
  else:
    return 0

def null_safe_levenshtein_sim(c1, c2):
  output = f.when(f.col(c1).isNull() | f.col(c2).isNull(), 0)\
            .otherwise(1 - f.levenshtein(c1, c2) / f.greatest(f.length(c1), f.length(c2)))
  return output

def null_safe_num_sim(c1, c2):
  output = f.when(f.col(c1).isNull() | f.col(c2).isNull(), 0)\
            .when((f.col(c1) == 0) & (f.col(c2) == 0), 1)\
            .when((f.col(c1) == 0) | (f.col(c2) == 0), 0)\
            .otherwise(1 - f.abs(f.col(c1) - f.col(c2)) / f.greatest(c1, c2))
  return output

def null_safe_token_overlap(c1, c2):
  # is the overlap a significant part of the shorter string
  output = f.when(f.col(c1).isNull() | f.col(c2).isNull(), 0)\
            .when((f.size(f.array_distinct(c1)) == 0) | (f.size(f.array_distinct(c2)) == 0), 0)\
            .otherwise(f.size(f.array_intersect(c1, c2)) / f.least(f.size(f.array_distinct(c1)), f.size(f.array_distinct(c1))))
  return output

def calc_sim(df, candidates):
    metrics = []
    for c in columns[:2]:
        if '_encoding' not in c:
            candidates = candidates.withColumn(c+'_lev', null_safe_levenshtein_sim(df.filter(df.id == candidates.src).select(c),df.filter(df.id == candidates.dst).select(c)))
            metrics.append(c+'_lev')
        else:
            metrics.append(c+'_sim')
            candidates = candidates.withColumn(c+'_sim', dot(df.filter(df.id == candidates.src).select(c), df.filter(df.id == candidates.dst).select(c)))
    candidates = candidates.withColumn('tfidf_sim', dot(df.filter(df.id == candidates.src).select('tfidf'),df.filter(df.id == candidates.dst).select('tfidf')))
    candidates = candidates.withColumn('token_sim', dot(df.filter(df.id == candidates.src).select('tokens_swRemoved'), df.filter(df.id == candidates.dst).select('tokens_swRemoved')))
    candidates = candidates.withColumn('weight_sim', dot(df.filter(df.id == candidates.src).select('weight'),df.filter(df.id == candidates.dst).select('weight')))
    metrics.append('tfidf_sim')
    metrics.append('token_sim')
    metrics.append('weigth_sim')
    def sum_distance(distances):
        return sum(d for d in distances)
    udf_sum = f.udf(sum_distance, t.DoubleType())
    candidates = candidates.withColumn('sum_sim', udf_sum([f.col(c) for c in metrics]))
    udf_norm = f.udf(lambda d : d / len(metrics))
    candidates = candidates.withColumn('overall_sim', udf_norm(f.col('sum_sim'))).drop(f.col('sum_sim'))
    return candidates

distance_df = calc_sim(node, candidates)
"""

In [None]:
"""
Read label.csv and expand it trough transitivity
"""
labels = (
    spark.read.csv("Y2.csv", header=True)
    .withColumnRenamed('left_instance_id', 'lid')
    .withColumnRenamed('right_instance_id', 'rid')
)

In [None]:
"""
Reuse this cell to join a <left_id,right_id> with node to extract features
"""
#label_df = labels.join(candidates.withColumnRenamed('src','lid').withColumnRenamed('dst','rid'), ['lid','rid'], 'inner')
label_df = labels.join(node.alias("node_1"), labels.lid == node.id, 'inner').drop('id')
for c in cols_to_keep[1:]:
    label_df = label_df.withColumnRenamed(c, 'l_'+c)

label_df = label_df.alias('one').join(node.alias("node_2"), label_df.rid == node.id, 'inner').drop('id')
for c in cols_to_keep[1:]:
    label_df = label_df.withColumnRenamed(c, 'r_'+c)
print(label_df.columns)

matching_pairs = candidates.join(node.alias("node_1"), candidates.src == node.id, 'inner').drop('id')
for c in cols_to_keep[1:]:
    matching_pairs = matching_pairs.withColumnRenamed(c, 'l_'+c)

matching_pairs = matching_pairs.alias('one').join(node.alias("node_2"), matching_pairs.dst == node.id, 'inner').drop('id')
for c in cols_to_keep[1:]:
    matching_pairs = matching_pairs.withColumnRenamed(c, 'r_'+c)

matching_pairs.columns

In [None]:
@f.udf(returnType=VectorUDT())
def toList(row):
    l = []
    for v in row:
        for n in v:
            l.append(float(n))
    return Vectors.dense(l)

label_df = label_df.withColumn('features', toList(f.array('l_title_encoding', 'r_title_encoding')))\
    .drop('l_title_encoding', 'r_title_encoding')
label_df = label_df.withColumn('label', f.col('label').cast(t.IntegerType()))
label_df = label_df.withColumn('weights', f.when(f.col('label')==0, 0.66).otherwise(1.0))

matching_pairs = matching_pairs.withColumn('features', toList(f.array('l_title_encoding', 'r_title_encoding')))\
    .drop('l_title_encoding', 'r_title_encoding')

matching_pairs = matching_pairs.withColumnRenamed('src', 'lid').withColumnRenamed('dst','rid')

# 3. Machine Learning Magic Bitch

In [None]:
from pyspark.ml.classification import LinearSVC, LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
model = LinearSVC(featuresCol='features', labelCol='label', weightCol='weights',maxIter=100)
param_grid = ParamGridBuilder().addGrid(model.regParam, [0.5, 0.4, 0.3, 0.2, 0.1]).build()
cvs = CrossValidator(estimator=model,
                           estimatorParamMaps=param_grid,
                           evaluator=BinaryClassificationEvaluator(),#(rawPredictionCol='prediction', labelCol='label'),\
                           numFolds=4)

In [None]:
training_set, test_set = label_df.randomSplit([0.8, 0.2])

In [None]:
#grid_search, hyperpar tuning...
estimator = cvs.fit(training_set)

In [None]:
prediction = estimator.transform(test_set).select('lid','rid','label','prediction')

In [None]:
prediction.groupby("label").count().toPandas()

In [None]:
estimator.save("model.model")

In [None]:
!zip -r model.zip model.model

In [None]:
accuracy = prediction.filter(f.col('label')==f.col('prediction').cast(t.IntegerType())).count() / prediction.count()
print("Accuracy: ", accuracy)