In [None]:
import os
import math

from glob import glob
from os import path

In [None]:
!bash ../compile_library.sh

In [None]:
egg_file = glob(os.path.join('..', 'dist', '*.egg'))[0]
egg_file

In [None]:
import findspark
findspark.init()

spark = (SparkSession
         .builder
         .appName("NameMatching_Notebook")
#              .config('spark.dynamicAllocation.enabled', False)
         .config('spark.executorEnv.PYTHON_EGG_CACHE', '/tmp')
#              .config('spark.executor.instances', 4)
#              .config('spark.executor.cores', 13)
#              .config('spark.executor.memory', '14g')
         .config('spark.driver.memory', '7g')
         .getOrCreate())
sc = spark.sparkContext
sc.setLogLevel("INFO")

sc.addPyFile(egg_file)

In [None]:
import sparse_dot_topn.sparse_dot_topn as ct # this is the cython code module

from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import RegexTokenizer
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as sf
from pyspark.sql.types import ArrayType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import LongType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
from pyspark.sql.window import Window

from scipy.sparse import csr_matrix

In [None]:
def chunk_dot_limit(A, B, ntop,
                    threshold=0, start_row=0, upper_triangular=False):
    B = B.tocsr()

    M = A.shape[0]
    N = B.shape[1]

    idx_dtype = np.int32

    if upper_triangular:
        # massive memory reduction
        # max number of possible non-zero element
        nnz_max = min(int(M * (2 * (N - start_row) - M - 1) / 2), M * ntop)
    else:
        nnz_max = M * ntop

    # arrays will be returned by reference
    rows = np.empty(nnz_max, dtype=idx_dtype)
    cols = np.empty(nnz_max, dtype=idx_dtype)
    data = np.empty(nnz_max, dtype=A.dtype)

    # C++ wrapped with Cython implementation
    # number of found non-zero entries in the upper triangular matrix
    # I'll use this value to slice the returning numpy array
    nnz = ct.sparse_dot_topn(
        M, N,
        np.asarray(A.indptr, dtype=idx_dtype),
        np.asarray(A.indices, dtype=idx_dtype),
        A.data,
        np.asarray(B.indptr, dtype=idx_dtype),
        np.asarray(B.indices, dtype=idx_dtype),
        B.data,
        ntop,
        threshold,
        rows, cols, data, start_row, int(upper_triangular))

    return ((int(i), int(j), float(v)) for i, j, v in
            zip(rows[:nnz], cols[:nnz], data[:nnz]))

In [None]:
class NameVectorizer(object):

    def __init__(self, n_gram, min_df, vocab_size):
        self.n_gram = n_gram
        self.min_df = min_df
        self.vocab_size = vocab_size
        self.__create_pipeline()

    def __create_pipeline(self):
        regexTokenizer = RegexTokenizer(inputCol="name",
                                        outputCol="tokens",
                                        pattern="")
        ngram_creator = NGram(inputCol="tokens",
                              outputCol="n_grams",
                              n=self.n_gram)
        tf_counter = CountVectorizer(inputCol='n_grams',
                                     outputCol='term_frequency',
                                     minTF=1.0,
                                     minDF=self.min_df,
                                     vocabSize=self.vocab_size,
                                     binary=False)
        idf_counter = IDF(inputCol="term_frequency",
                          outputCol="tfidf_vector")
        l2_normalizer = Normalizer(inputCol="tfidf_vector",
                                   outputCol="name_vector",
                                   p=2)

        self.pipeline = Pipeline(
            stages=[regexTokenizer,
                    ngram_creator,
                    tf_counter,
                    idf_counter,
                    l2_normalizer]
        )

    def fit_transform(self, df):
        """Fit transformers and apply all estimators.
        """
        return self.pipeline.fit(df).transform(df)


def unpack_vector(sparse):
    """Combine indices and values into a tuples.

    For each value below 0.1 in the sparse vector we create a tuple and
    then add these tuples into a single list. The tuple contains the
    index and the value.
    """
    return ((int(index), float(value)) for index, value in
            zip(sparse.indices, sparse.values) if value > 0.05)


schema = StructType([StructField("dummy_id", LongType(), False),
                     StructField("id", StringType(), False),
                     StructField("name", StringType(), False)])

ngram_schema = ArrayType(StructType([
    StructField("ngram_index", IntegerType(), False),
    StructField("value", FloatType(), False)
]))

similarity_schema = StructType([
    StructField("i", IntegerType(), False),
    StructField("j", IntegerType(), False),
    StructField("SIMILARITY", FloatType(), False)
])

udf_unpack_vector = sf.udf(unpack_vector, ngram_schema)

In [None]:
input_file = 'adl://ulohubdldevne.azuredatalakestore.net/data/parquet/OPERATORS.parquet'
output_file = 'adl://ulohubdldevne.azuredatalakestore.net/data/parquet/OPERATORS_MATCHED.parquet'

input_file = '../../data/operators.parquet'
output_file = '../../data/operators_matched.parquet'

save_output = False
mode = 'overwrite'

In [None]:
ntop = 10
threshold = 0.8
fraction = .001

In [None]:
drop_chars = "\\\\!#%&()*+-/:;<=>?@\\^|~\u00A8\u00A9\u00AA\u00AC\u00AD\u00AF\u00B0\u00B1\u00B2\u00B3\u00B6\u00B8\u00B9\u00BA\u00BB\u00BC\u00BD\u00BE\u2013\u2014\u2022\u2026\u20AC\u2121\u2122\u2196\u2197\u247F\u250A\u2543\u2605\u2606\u3001\u3002\u300C\u300D\u300E\u300F\u3010\u3011\uFE36\uFF01\uFF06\uFF08\uFF09\uFF1A\uFF1B\uFF1F{}\u00AE\u00F7\u02F1\u02F3\u02F5\u02F6\u02F9\u02FB\u02FC\u02FD\u1BFC\u1BFD\u2260\u2264\u2DE2\u2DF2\uEC66\uEC7C\uEC7E\uED2B\uED34\uED3A\uEDAB\uEDFC\uEE3B\uEEA3\uEF61\uEFA2\uEFB0\uEFB5\uEFEA\uEFED\uFDAB\uFFB7\u007F\u24D2\u2560\u2623\u263A\u2661\u2665\u266A\u2764\uE2B1\uFF0D"
regex = "[{}]".format(drop_chars)

In [None]:
print("Driver Memory: ", sc._conf.get('spark.driver.memory'))

w = Window.partitionBy('COUNTRY_CODE').orderBy(sf.asc('id'))

all_operators = (
    spark
    .read.parquet(input_file)
    .sample(False, fraction)
    .na.drop(subset=['NAME_CLEANSED'])
    # create unique ID
    .withColumn('id', sf.concat_ws('~',
                                   sf.col('COUNTRY_CODE'),
                                   sf.col('SOURCE'),
                                   sf.col('REF_OPERATOR_ID')))
    .fillna('')
    # create string columns to matched
    .withColumn('name',
                sf.concat_ws(' ',
                             sf.col('NAME_CLEANSED'),
                             sf.col('CITY_CLEANSED'),
                             sf.col('STREET_CLEANSED'),
                             sf.col('ZIP_CODE_CLEANSED')))
    .withColumn('name', sf.regexp_replace('name', regex, ''))
    .withColumn('name', sf.trim(sf.regexp_replace('name', '\s+', ' ')))
    .withColumn('name_index', sf.row_number().over(w) - 1)
    .select('name_index', 'id', 'name', 'COUNTRY_CODE')
)
all_operators.persist()

In [None]:
all_opr_count = all_operators.groupby('COUNTRY_CODE','name').count().sort('count', ascending=False)
all_opr_count.show(200, truncate=False)

In [None]:
opr_count = all_operators.groupby('COUNTRY_CODE').count()
opr_count.sort('count', ascending=True).show(10)

In [None]:
country_codes = (opr_count[opr_count['count'] > 100]
                 .select('COUNTRY_CODE')
                 .distinct()
                 .rdd.map(lambda r: r[0]).collect())

print(country_codes)

In [None]:
for country_code in country_codes:
    operators = (
        all_operators[sf.col('COUNTRY_CODE') == country_code]
        .filter(sf.col('COUNTRY_CODE') == country_code)
        .drop('COUNTRY_CODE')
        .repartition('id')
        .sort('id', ascending=True)
    )

    sparse_names = (
        NameVectorizer(n_gram=2, min_df=2, vocab_size=1500)
        .fit_transform(operators)
        .select(['name_index', 'name_vector'])
    )

    match_pairs = (
        sparse_names
        .withColumn('explode', sf.explode(udf_unpack_vector(sf.col('name_vector'))))
        .withColumn('ngram_index', sf.col('explode').getItem('ngram_index'))
        .withColumn('value', sf.col('explode').getItem('value'))
        .select('name_index', 'ngram_index', 'value')
    )

    df = match_pairs.toPandas()
    df.name_index = df.name_index.astype(np.int32)
    df.ngram_index = df.ngram_index.astype(np.int32)
    df.value = df.value.astype(np.float64)

    csr_names_vs_ngrams = csr_matrix(
        (df.value.values, (df.name_index.values, df.ngram_index.values)),
        shape=(df.name_index.max() + 1, df.ngram_index.max() + 1),
        dtype=np.float64)
    del df

    csr_rdd_transpose = spark.sparkContext.broadcast(csr_names_vs_ngrams.transpose())

    n_chunks = max(1, math.floor(csr_names_vs_ngrams.shape[0] / 1000))
    chunk_size = math.ceil(csr_names_vs_ngrams.shape[0] / n_chunks)
    n_chunks = math.ceil(csr_names_vs_ngrams.shape[0] / chunk_size)
    chunks = [(csr_names_vs_ngrams[
               (i * chunk_size): min((i + 1) * chunk_size, csr_names_vs_ngrams.shape[0])], i * chunk_size)
              for i in range(n_chunks)]

    chunks_rdd = spark.sparkContext.parallelize(chunks, numSlices=len(chunks))

    del csr_names_vs_ngrams

    similarity = chunks_rdd.flatMap(
        lambda x: chunk_dot_limit(x[0], csr_rdd_transpose.value,
                                  ntop=ntop,
                                  threshold=threshold,
                                  start_row=x[1],
                                  upper_triangular=True)
    )

    similarity = similarity.toDF(similarity_schema)

    grouping_window = (
        Window
        .partitionBy('j')
        .orderBy(sf.asc('i')))

    # keep only the first entry sorted alphabetically
    grp_sim = (
        similarity
        .withColumn("rn", sf.row_number().over(grouping_window))
        .filter(sf.col("rn") == 1)
        .drop('rn')
    )

    # remove group ID from column j
    grouped_similarity =  grp_sim.join(
        grp_sim.select('j').subtract(grp_sim.select('i')),
        on='j', how='inner'
    )

    matches = (
        grouped_similarity
        .join(operators, grouped_similarity['i'] == operators['name_index'],
              how='left').drop('name_index')
        .selectExpr('i', 'j', 'id as SOURCE_ID',
                    'SIMILARITY', 'name as SOURCE_NAME')
        .join(operators, grouped_similarity['j'] == operators['name_index'],
              how='left').drop('name_index')
        .withColumn('COUNTRY_CODE', sf.lit(country_code))
        .selectExpr('COUNTRY_CODE', 'SOURCE_ID', 'id as TARGET_ID',
                    'SIMILARITY', 'SOURCE_NAME', 'name as TARGET_NAME')
    )

    if save_output:
        (matches
         .coalesce(20)
         .write
         .partitionBy('country_code')
         .parquet(fn, mode=mode))
    else:
        matches.persist()
        n_matches = matches.count()

        print('\n\nNr. Similarities:\t', n_matches)
        print('Threshold:\t', threshold)
        print('NTop:\t', ntop)
        (matches
         .select('SOURCE_ID', 'TARGET_ID',
                 'SIMILARITY', 'SOURCE_NAME', 'TARGET_NAME')
         .sort('SIMILARITY', ascending=True)
         .show(50, truncate=False))

        (matches
         .groupBy(['SOURCE_ID', 'SOURCE_NAME'])
         .count()
         .sort('count', ascending=False).show(50, truncate=False))

        matches.describe('SIMILARITY').show()
    print("Done, country code:", country_code)