Copyright (c) Microsoft Corporation. 
Licensed under the MIT license. 

## Opportunity Identification
The code consists in computing the similarity scores between parts. It the cosine similarity and jaccad distance to create the similarity score.

1. Define variables
2. Load dataset
3. Process dataset to generate column of features
4. Compute similarity score for each pairs of parts
5. Store resulting dataset with similarity scores



In [3]:
from pyspark.sql import functions as F
import pandas as pd
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.window import Window

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, MinHashLSH
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix, RowMatrix
from pyspark.mllib.util import MLUtils


from pyspark.sql.types import DoubleType, FloatType, ArrayType
from scipy.spatial import distance
import numpy as np


## Define column names of interest


In [4]:
# ID column of the dataset
col_id = "ID"

# Target part price column
target = 'ListPrice'

# Column to use to compute similarity score
col_to_keep = [
    col_id,target,'ProductByRegionID','ProductNumber',
    'Name','StandardCost',
    'Color','Size','SizeUnitMeasureCode',
    'Weight','WeightUnitMeasureCode', 
    'DaysToManufacture', 'ProductCategory',
    'Region', 'Country']

# Column with categorical data
col_encoding = ['Color','SizeUnitMeasureCode','WeightUnitMeasureCode','Region','Country']

# Column with integer data
col_int = ['StandardCost','Size','Weight','DaysToManufacture']

## Load dataset
Select column of interest and drop eventual duplicates


In [5]:
#spark.sql('REFRESH TABLE default.CleanComparator')
df = spark.read.table('default.CleanComparator')


In [6]:
df = df.select(*col_to_keep).drop_duplicates()
df = df.dropna()


## Process dataset
In order to apply our metrics on the features of the parts, we create two column containing the categorical value and the continuous values.


In [7]:
# Merge continuous columns into a column of lists
assembler_int = VectorAssembler(
        inputCols = col_int,
        outputCol="features_int")

# For each categorical column, convert the strings to integer
indexer = [StringIndexer(inputCol=x, outputCol=x+'_idx') for x in col_encoding]
# Merge the converted categorical columns into a column of lists
assembler_str = VectorAssembler(inputCols=[c+'_idx' for c in col_encoding],outputCol="features_cat")
# Put all the categorical transfomers into single pipeline object
assembler_cat = Pipeline(stages=[*indexer,assembler_str])

# Apply transformation on dataset
df_features = assembler_int.transform(df)
df_features = assembler_cat.fit(df).transform(df_features)

# Convert column of type Vector to numpy list
to_array = F.udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))
df_features = df_features.withColumn('features_int', to_array('features_int'))
df_features = df_features.withColumn('features_cat', to_array('features_cat'))

## Compute similarity Score

The steps to create are the following:

1. cross-join (outer-join) the dataset with itself to generate all possible pairs of parts
2. Filter out pairs that don't belong to same category
3. Filter out identical pairs (P1 - P2) == (P2 - P1)
4. Apply udf function on feature columns to compute cosine similarity and jaccard score.
5. Rescale the similarity score to be between 0-1, 1 being the highest similarity

In [8]:

# Rename columns before cross joins to avoid duplicated columns
col_table = [
    F.col(col_id).alias(col_id+'_A'),
    F.col('ProductCategory'),
    F.col('features_int'),
    F.col('features_cat'),
    F.col(target)
    ]
col_mapped_table = [
    F.col(col_id).alias(col_id+'_B'),
    F.col('ProductCategory').alias('PC'),
    F.col('features_int').alias('ft_int'),
    F.col('features_cat').alias('ft_cat'),
    F.col(target).alias(target+'_B'),
    ]

# Apply outer join
df_crossjoin = (
    df_features.select(*col_table).crossJoin(
    df_features.select(*col_mapped_table))
    )
# Filter out parts not belonging to the same category
df_filter = df_crossjoin.filter((F.col('ProductCategory') == F.col('PC')) & (F.col(target) > F.col(target+'_B')))

# Remove duplicated pairs
df_filter = df_filter.filter(F.col(col_id+'_A') < F.col(col_id+'_B'))

# Define similarity score function
def similarity_score(a,b,c,d):
    a = np.array(a, dtype=np.float)
    b = np.array(b, dtype=np.float)

    cosine_distance = 1 - np.dot(a, b)/(np.linalg.norm(a)*np.linalg.norm(b))
    jaccard_distance = 1 - distance.jaccard(c, d)

    return float(np.round(cosine_distance * 0.3 + jaccard_distance * 0.7,2))

# Define udf function
udf_similarity = F.udf(lambda x1,y1,x2,y2: similarity_score(x1,y1,x2,y2), FloatType())

# Apply similarity score function
df_similarity = df_filter.withColumn('SimilarityScore',udf_similarity(
    F.col('features_int'), F.col('ft_int'), F.col('features_cat'), F.col('ft_cat'))
    )

# Scale the similarity score to 1-0
# Compute the min and max similarity score value for each category
df_min_max = df_similarity.groupby('ProductCategory').agg(F.max('SimilarityScore').alias('max_score'),F.min('SimilarityScore').alias('min_score'))
df_similarity = df_similarity.join(df_min_max, on='ProductCategory')
# Apply scalling
df_similarity = df_similarity.withColumn('ScaledSimilarityScore', F.round((F.col('SimilarityScore')-F.col('min_score')) / (F.col('max_score')-F.col('min_score')),2))

# Cleanup results
df_final = df_similarity.select(
    'ProductCategory',
    F.col(col_id+'_A').alias(col_id), 
    F.col(col_id+'_B').alias('Opportunity_'+col_id),
    target,
    F.col(target+'_B').alias('Opportunity_'+target),
    F.col('ScaledSimilarityScore'),
    F.col('SimilarityScore')
    )

In [9]:
display(df_final)

## Store results
Store the results to a spark table and sqlpool table. For the latter, if a table already exists, you need to delete first.


In [10]:
spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")

In [11]:
df_final.write.mode('overwrite').saveAsTable('default.Opportunities')

Just in case the results should be saved in dedicated SQL Pool

In [12]:
#df.createOrReplaceTempView("df")

In [13]:
# %%spark
# val df = spark.sqlContext.sql("select * from df")
# df.write.synapsesql("comparator.dbo.PartNumbers", Constants.INTERNAL)