In [1]:
%load_ext autoreload
%autoreload 2

import os
from pathlib import Path

# Override/set credentials in env var
os.environ['CWD'] = str(Path(os.getcwd()))
# os.environ['CWD'] = os.environ['CWD'].parent

# Base paths
cwd = Path(os.environ['CWD'])
dir_data = cwd / 'data'

# Set mlflow artifacts location
dir_exp_root = Path(cwd / 'experiments')
dir_exp_this = Path(dir_exp_root / 'record_linkage')
dir_exp_mlflow = dir_exp_this / 'mlflow'
dir_exp_mlflow.mkdir(exist_ok=True, parents=True)

import mlflow
mlflow_tracking_uri = (dir_exp_mlflow / 'mlruns').as_uri()
print(f'mlflow URI: {mlflow_tracking_uri}')
mlflow.set_tracking_uri(mlflow_tracking_uri)

import numpy as np
from PIL import Image
import pandas as pd

mlflow URI: file:///root/work/research/record_linkage/experiments/record_linkage/mlflow/mlruns


In [2]:
# !pip3 install pyspark[pandas_on_spark]==3.2.1

# Load fake data

In [3]:
db1 = pd.read_csv('db1.csv')
db2 = pd.read_csv('db2.csv')

Unnamed: 0.1,Unnamed: 0,job,company,ssn,residence,current_location,blood_group,website,username,name,sex,address,mail,birthdate
0,0,Herbalist,Norton-Castillo,049-66-7039,"9727 Todd Unions\nStaceyville, MN 41947","(Decimal('-55.9124455'), Decimal('32.664240'))",AB-,"['https://butler.info/', 'http://johnson.net/']",shannon98,Amanda Arroyo,F,"0751 Samantha Walk Apt. 650\nChenfort, SC 04033",schroedermelissa@yahoo.com,1970-03-09
1,1,Outdoor activities/education manager,Bowman-Jensen,067-35-7522,Unit 0723 Box 5155\nDPO AP 51042,"(Decimal('19.764603'), Decimal('67.662516'))",A-,"['http://www.parker.com/', 'https://www.lowe.i...",nathaniel53,Victoria Brown,F,"787 Alexander Road\nPort Leslieborough, VA 53325",tylermonica@hotmail.com,1926-06-24
2,2,"Chemist, analytical","Wilkerson, Guerrero and Mason",219-24-2883,"878 Charles Mountain\nNorth Loriton, LA 14794","(Decimal('-57.0475775'), Decimal('4.464624'))",AB+,"['https://www.salinas.com/', 'https://kennedy....",uperry,Amy Henry,F,"706 Sarah Lakes Apt. 421\nSouth Jeremy, AR 49313",pdelgado@hotmail.com,2016-03-18
3,3,"Geneticist, molecular",Simmons-Martinez,383-07-2464,"993 Boyd Throughway\nNew Paul, WI 12980","(Decimal('-6.687283'), Decimal('110.936355'))",B+,"['http://wilson-cohen.com/', 'https://riggs-al...",qrobertson,Christopher Curtis,M,"7274 Bird Canyon Suite 720\nValentinechester, ...",patrick79@gmail.com,1966-11-20
4,4,Structural engineer,"Castillo, Clark and Kemp",296-65-4080,"35200 Miller Mountain Apt. 251\nRileyville, MO...","(Decimal('-56.0627945'), Decimal('64.691982'))",B+,['https://baker.com/'],kfitzgerald,Kevin Vargas,M,"1310 Anderson Fork Apt. 598\nBrandonbury, WV 3...",rickytaylor@hotmail.com,1968-07-01
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
999995,999995,"Radiographer, therapeutic",Baker Inc,259-97-2454,"7915 Dalton Lodge\nWest Melissa, ME 06907","(Decimal('45.6559475'), Decimal('74.918465'))",B+,"['http://www.jones-snyder.info/', 'http://www....",daniel20,Frank Brown,M,"935 Carter Shoals Apt. 167\nMartinezport, NC 7...",kenneth41@gmail.com,1972-06-22
999996,999996,"Therapist, nutritional",Wilson Inc,741-53-2960,"192 Smith Village\nBiancabury, SD 44689","(Decimal('66.9223325'), Decimal('-50.131176'))",B+,"['http://www.alexander.net/', 'https://smith.c...",davisphillip,Stephen Cannon,M,"0748 Christina Flats\nEast Robertburgh, ID 15848",amandachan@yahoo.com,1936-07-14
999997,999997,Make,Young and Sons,627-73-2836,"0630 Morgan Plain Suite 156\nCoxborough, ID 14780","(Decimal('-88.3733525'), Decimal('-27.510056'))",AB-,['https://www.gutierrez.com/'],timothybryan,Sharon Rice,F,"42873 Moore Fords Apt. 735\nAshleymouth, CT 46802",weissdeborah@gmail.com,1997-03-04
999998,999998,Restaurant manager,Pearson-Fisher,749-52-4350,"4546 Banks Haven\nLake Christinaview, MT 72487","(Decimal('28.7827685'), Decimal('-32.938448'))",AB-,"['https://www.jones.com/', 'https://www.rodrig...",brookssharon,Amanda Meyers,F,56524 Joseph Stream Apt. 499\nPort Christopher...,pjenkins@hotmail.com,1947-08-16


# OR: Generate fake data

In [2]:
from tqdm import tqdm
from faker import Faker
Faker.seed(288)
fake = Faker(
#     [
#     'it_IT',
#     'en_US',
#     'es_ES',
#     'en_CA'
#     ]
)
import pandas as pd

db1 = pd.DataFrame([fake.profile() for _ in tqdm(list(range(1000000)))])

100%|████████████████████████████████| 1000000/1000000 [18:29<00:00, 901.46it/s]


In [3]:
# db1.to_csv('db1.csv')

In [3]:
# Add current_location
# Remove fields randomly 
# db1.head(5)

In [None]:
# Best config found:
# Min df: 50 
# Max df: 

In [3]:
P_NOISE_CHAR = 0.1
NUM_TOKENS_KEPT_ADDR = 2
ROW_COL_MISSING_OR_SWAPPED = None #'SWAP' # None, 'MISSING'
P_ROW_COL_MISSING_OR_SWAPPED = 1
FRAC_KEPT_ROWS_DB2 = 0.5


TFIDF_ANALYZER = 'char_wb'
TFIDF_NGRAM_LO = 3
TFIDF_NGRAM_HI = 3
TFIDF_MAX_DF = 0.8
TFIDF_MIN_DF = 10
TFIDF_MAX_FEATS = 100000

In [5]:
pd.options.mode.chained_assignment = None
np.random.seed(288)

db2 = db1[['job', 'address', 'name']]

import re
def _split(txt):
    return [x.strip() for x in re.split('-|\s| and |,', txt) if x.strip() != '']

# _split('Wilson, Sanchez and Pearson')

db2['job'] = db2['job'].apply(lambda txt: 
    np.random.choice(_split(txt))
)

db2['name'] = db2['name'].apply(lambda txt: 
    np.random.choice(_split(txt))
)



db2['address'] = db2['address'].apply(lambda txt: 
    ' '.join(np.random.choice(_split(txt), size=min(NUM_TOKENS_KEPT_ADDR, len(_split(txt))), replace=False)) # Keep 2 tokens when possible, otherwise 1
)

import string

def _add_char_noise(txt, p, noise_set_chars=[c for c in string.ascii_lowercase + string.digits]):
    txt_noise = ''
    for c in txt:
        if np.random.rand() < p:
            txt_noise += np.random.choice(noise_set_chars)
        else:
            txt_noise += c
    return txt_noise


from functools import partial

f_add_char_noise = partial(_add_char_noise, p=P_NOISE_CHAR)



db2['job'] = db2['job'].apply(f_add_char_noise)
db2['address'] = db2['address'].apply(f_add_char_noise)
db2['name'] = db2['name'].apply(f_add_char_noise)


_prime = '′'
d_name_map_c1c2 = {k:k+_prime for k in ['name', 'address', 'job']}
db2 = db2.rename(columns=d_name_map_c1c2)

In [7]:
def _random_swap_columns(row, subset, p):
    if np.random.rand() < p:
        c1, c2 = np.random.choice(subset, 2, replace=False)
        aux = row[c1]
        row[c1] = row[c2]
        row[c2] = aux
    return row

In [8]:
def _random_set_empty_column(row, subset, p):
    if np.random.rand() < p:
        c = np.random.choice(subset)
        row[c] = ''
    return row

In [8]:
rows_db2 = []

for _,row in db2.iterrows():
    if ROW_COL_MISSING_OR_SWAPPED == 'SWAP':
        row = _random_swap_columns(row,
                                   subset=list(d_name_map_c1c2.values()), # ['name′', 'address′', 'job′']
                                   p=P_ROW_COL_MISSING_OR_SWAPPED)
    elif ROW_COL_MISSING_OR_SWAPPED == 'MISSING':
        row = _random_set_empty_column(row,
                                       subset=list(d_name_map_c1c2.values()), #  ['name′', 'address′', 'job′']
                                       p=P_ROW_COL_MISSING_OR_SWAPPED)
    elif ROW_COL_MISSING_OR_SWAPPED is None:
        pass
    else:
        raise ValueError()
    rows_db2.append(row)
    
db2 = pd.DataFrame(rows_db2)

In [11]:
with pd.option_context('max.colwidth', None):
    display(db1[['name', 'address', 'job']].head())

Unnamed: 0,name,address,job
0,Amanda Arroyo,"0751 Samantha Walk Apt. 650\nChenfort, SC 04033",Herbalist
1,Victoria Brown,"787 Alexander Road\nPort Leslieborough, VA 53325",Outdoor activities/education manager
2,Amy Henry,"706 Sarah Lakes Apt. 421\nSouth Jeremy, AR 49313","Chemist, analytical"
3,Christopher Curtis,"7274 Bird Canyon Suite 720\nValentinechester, SC 19114","Geneticist, molecular"
4,Kevin Vargas,"1310 Anderson Fork Apt. 598\nBrandonbury, WV 31931",Structural engineer


In [12]:
db2[d_name_map_c1c2.values()].head()

Unnamed: 0,name′,address′,job′
0,Arroyo,0s51 Chenfg4t,Herbulist
1,Victoria,5r325 i87,manager
2,Amy,Joremy 707,analytical
3,Chrictopher,SCcCanyon,Ge9eti9est
4,Kevin,kV 1310,engineer


In [13]:
# missing rows + shuffle

db2 = db2.sample(frac=FRAC_KEPT_ROWS_DB2, random_state=288).reset_index()

In [3]:
# db2.to_csv('db2.csv')


# Spark

In [4]:
from pyspark.sql import SparkSession
# Create PySpark SparkSession

# .master('local[*]'): Set local computations
# .config("spark.driver.memory", "15g"): Set to avoid Java heap OOM
# .config("spark.sql.shuffle.partitions", "2000"): Set 2k instead of default 200 to avoid big partitions

spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "1g") \
    .appName('record-linkage') \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/02 14:12:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
sdf1 = spark.createDataFrame(db1).repartition(200)
print(sdf1.rdd.getNumPartitions())
sdf1.printSchema()

22/05/02 14:13:15 WARN TaskSetManager: Stage 0 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 4) / 4]

200
root
 |-- Unnamed: 0: long (nullable = true)
 |-- job: string (nullable = true)
 |-- company: string (nullable = true)
 |-- ssn: string (nullable = true)
 |-- residence: string (nullable = true)
 |-- current_location: string (nullable = true)
 |-- blood_group: string (nullable = true)
 |-- website: string (nullable = true)
 |-- username: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- address: string (nullable = true)
 |-- mail: string (nullable = true)
 |-- birthdate: string (nullable = true)



In [6]:
db2 = db2.rename(columns={
    'job′': 'job',
    'address′': 'address',
    'name′': 'name'
})
db2['index'] = db2['index'].astype(int)

In [7]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
# Create User defined Custom Schema using StructType
db2_schema = StructType([
    StructField("index", IntegerType(), False),
    StructField("job", StringType(), True),
    StructField("address", StringType(), True),
    StructField("name", StringType(), True)
])

sdf2 = spark.createDataFrame(db2[['index','job','address','name']], schema=db2_schema).repartition(200)
print(sdf2.rdd.getNumPartitions())
sdf2.printSchema()

22/05/02 14:13:24 WARN TaskSetManager: Stage 1 contains a task of very large size (4951 KiB). The maximum recommended task size is 1000 KiB.
[Stage 1:>                                                          (0 + 4) / 4]

200
root
 |-- index: integer (nullable = false)
 |-- job: string (nullable = true)
 |-- address: string (nullable = true)
 |-- name: string (nullable = true)



In [8]:
from pyspark.sql.functions import udf
from pyspark.ml.feature import CountVectorizer, IDF, NGram

def char_tokenize(txt):
    return [c for c in txt]

In [9]:
def fit_char_level_tfidf_pyspark(spark_df, col, ngram_n=3):
    udf_tokenize = udf(char_tokenize, ArrayType(StringType(), False))
    
    ngram = NGram(n=ngram_n)
    spark_df = spark_df.withColumn(f"{col}_tok", udf_tokenize(f"{col}"))

    ngram.setInputCol(f"{col}_tok")
    ngram.setOutputCol(f"{col}_tok_ngrams")

    spark_df = ngram.transform(spark_df)

    count_vec = CountVectorizer(inputCol=f"{col}_tok_ngrams", outputCol=f"{col}_tf_tok_ngrams")
    count_vec = count_vec.fit(spark_df)
    spark_df = count_vec.transform(spark_df)
    
    idf = IDF(inputCol=f"{col}_tf_tok_ngrams", outputCol=f"{col}_tfidf_tok_ngrams")
    idf = idf.fit(spark_df)
    spark_df = idf.transform(spark_df)
    
    return {
        'count_vec': count_vec,
        'idf': idf,
        'spark_df': spark_df,
    }

In [16]:
def transform_w_char_level_tfidf_pyspark(spark_df, col, count_vec, idf, ngram_n=3):
    udf_tokenize = udf(char_tokenize, ArrayType(StringType(), False))
    
    ngram = NGram(n=ngram_n)
    spark_df = spark_df.withColumn(f"{col}_tok", udf_tokenize(f"{col}"))

    ngram.setInputCol(f"{col}_tok")
    ngram.setOutputCol(f"{col}_tok_ngrams")
    spark_df = ngram.transform(spark_df)
    
    count_vec.setInputCol(f"{col}_tok_ngrams")
    count_vec.setOutputCol(f"{col}_tf_tok_ngrams")
    spark_df = count_vec.transform(spark_df)
    
    idf.setInputCol(f"{col}_tf_tok_ngrams")
    idf.setOutputCol(f"{col}_tfidf_tok_ngrams")
    spark_df = idf.transform(spark_df)
    
    return spark_df


# def vstack_pyspark_cols(spark_df1, col1, spark_df2, col2):
#     return spark_df1.withColumnRenamed(col1, 'col').select(['col']).union(spark_df2.withColumnRenamed(col2, 'col').select(['col']))


def vstack_pyspark_list_cols(spark_df1, list_cols1, spark_df2, list_cols2):
    list_cols = []
    for col in list_cols1:
        list_cols.append(spark_df1.select(col).withColumnRenamed(col, 'col'))
    for col in list_cols2:
        list_cols.append(spark_df2.select(col).withColumnRenamed(col, 'col'))
        
    spark_df_aux = list_cols[0]
    for spark_df_col in list_cols[1:]:
        spark_df_aux = spark_df_aux.union(spark_df_col)
        
    return spark_df_aux

In [12]:
sdf_out = vstack_pyspark_list_cols(spark_df1=sdf1, list_cols1=['job', 'address', 'name'], spark_df2=sdf2, list_cols2=['job', 'address', 'name'])

In [13]:
d_ret_fit_tfidf = fit_char_level_tfidf_pyspark(sdf_out, 'col')

22/05/02 14:13:26 WARN TaskSetManager: Stage 2 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
22/05/02 14:13:29 WARN TaskSetManager: Stage 3 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
22/05/02 14:13:32 WARN TaskSetManager: Stage 4 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
22/05/02 14:13:34 WARN TaskSetManager: Stage 5 contains a task of very large size (4951 KiB). The maximum recommended task size is 1000 KiB.
22/05/02 14:13:35 WARN TaskSetManager: Stage 6 contains a task of very large size (4951 KiB). The maximum recommended task size is 1000 KiB.
22/05/02 14:13:35 WARN TaskSetManager: Stage 7 contains a task of very large size (4951 KiB). The maximum recommended task size is 1000 KiB.
22/05/02 14:14:58 WARN TaskSetManager: Stage 24 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
22/05/02

In [17]:
sdf2

DataFrame[index: int, job: string, address: string, name: string]

In [35]:
sdf2 = transform_w_char_level_tfidf_pyspark(
    spark_df=sdf2,
    col='job',
    count_vec=d_ret_fit_tfidf['count_vec'],
    idf=d_ret_fit_tfidf['idf'],
    ngram_n=3
)
sdf2.count()

22/05/02 15:04:54 WARN TaskSetManager: Stage 50 contains a task of very large size (4951 KiB). The maximum recommended task size is 1000 KiB.


500000

In [40]:
sdf1

DataFrame[Unnamed: 0: bigint, job: string, company: string, ssn: string, residence: string, current_location: string, blood_group: string, website: string, username: string, name: string, sex: string, address: string, mail: string, birthdate: string, job_tok: array<string>, job_tok_ngrams: array<string>, job_tf_tok_ngrams: vector, job_tfidf_tok_ngrams: vector]

In [38]:
sdf1 = transform_w_char_level_tfidf_pyspark(
    spark_df=sdf1,
    col='job',
    count_vec=d_ret_fit_tfidf['count_vec'],
    idf=d_ret_fit_tfidf['idf'],
    ngram_n=3
)
sdf1.count()

In [39]:
sdf1

DataFrame[Unnamed: 0: bigint, job: string, company: string, ssn: string, residence: string, current_location: string, blood_group: string, website: string, username: string, name: string, sex: string, address: string, mail: string, birthdate: string, job_tok: array<string>, job_tok_ngrams: array<string>, job_tf_tok_ngrams: vector, job_tfidf_tok_ngrams: vector]

In [41]:
# cosine similarity of two TF-IDF vectors can also be calculated as the dot product of two L2 normalized TF-IDF vectors
# https://stackoverflow.com/a/46764347/7928119
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="job_tfidf_tok_ngrams", outputCol="job_tfidf_tok_ngrams_l2norm")
sdf1 = normalizer.transform(sdf1)
sdf1.count()

22/05/02 15:15:43 WARN TaskSetManager: Stage 62 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

1000000

In [44]:
sdf2 = normalizer.transform(sdf2)
sdf2.count()

22/05/02 15:19:57 WARN TaskSetManager: Stage 71 contains a task of very large size (4951 KiB). The maximum recommended task size is 1000 KiB.


500000

In [59]:
from pyspark.mllib.linalg.distributed import Matrix

In [72]:
from pyspark.mllib.linalg.distributed import RowMatrix
# Column of sparse vectors to sparse mattrix
# https://stackoverflow.com/a/47675305/7928119
v1 = sdf1.select("job_tfidf_tok_ngrams_l2norm").rdd.map(lambda row: row.job_tfidf_tok_ngrams_l2norm)
m1 = RowMatrix(v1)

22/05/02 15:54:45 WARN TaskSetManager: Stage 92 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
[Stage 92:>                                                         (0 + 4) / 4]

In [81]:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
ac = sdf1.select("job_tfidf_tok_ngrams_l2norm")
mata = IndexedRowMatrix(ac.rdd.map(lambda row: IndexedRow(row)))
ma = mata.toBlockMatrix(100,100)

22/05/02 16:35:07 WARN TaskSetManager: Stage 106 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
22/05/02 16:35:09 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
22/05/02 16:35:09 ERROR Executor: Exception in task 0.0 in stage 108.0 (TID 5992)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/lib/python3.7/dist-packages/pyspark/rdd.py", line 1562, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/loca

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 108.0 failed 1 times, most recent failure: Lost task 0.0 in stage 108.0 (TID 5992) (7a13ec49fbba executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/lib/python3.7/dist-packages/pyspark/rdd.py", line 1562, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_38517/3345332766.py", line 3, in <lambda>
TypeError: __init__() missing 1 required positional argument: 'vector'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/lib/python3.7/dist-packages/pyspark/rdd.py", line 1562, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_38517/3345332766.py", line 3, in <lambda>
TypeError: __init__() missing 1 required positional argument: 'vector'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [None]:


ac = offer_row.select('a')
bc = offer_row.select('a')
mata = IndexedRowMatrix(ac.rdd.map(lambda row: IndexedRow(*row)))
matb = IndexedRowMatrix(bc.rdd.map(lambda row: IndexedRow(*row)))

ma = mata.toBlockMatrix(100,100)
mb = matb.toBlockMatrix(100,100)

ans = ma.multiply(mb.transpose())

In [77]:
m1.numRows()

22/05/02 15:59:09 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
22/05/02 15:59:09 ERROR Executor: Exception in task 2.0 in stage 96.0 (TID 5970)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 67, in _convert_t

Py4JJavaError: An error occurred while calling o827.numRows.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 96.0 failed 1 times, most recent failure: Lost task 2.0 in stage 96.0 (TID 5970) (7a13ec49fbba executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 67, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.ml.linalg.SparseVector'> into Vector

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1889)
	at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1253)
	at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1253)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.numRows(RowMatrix.scala:76)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 67, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.ml.linalg.SparseVector'> into Vector

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1889)
	at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1253)
	at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1253)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [71]:
# sdf1.select('job_tfidf_tok_ngrams_l2norm').show(5,False)

In [51]:
v2 = sdf2.select("job_tfidf_tok_ngrams_l2norm").rdd.map(lambda row: row.features)
m2 = RowMatrix(v2)

22/05/02 15:35:34 WARN TaskSetManager: Stage 78 contains a task of very large size (4951 KiB). The maximum recommended task size is 1000 KiB.


In [58]:
m1.numRows()

22/05/02 15:38:19 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
22/05/02 15:38:20 ERROR Executor: Exception in task 2.0 in stage 82.0 (TID 5941)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1573, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'features' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertool

Py4JJavaError: An error occurred while calling o619.numRows.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 82.0 failed 1 times, most recent failure: Lost task 2.0 in stage 82.0 (TID 5941) (7a13ec49fbba executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1573, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'features' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_38517/1488069231.py", line 4, in <lambda>
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1578, in __getattr__
    raise AttributeError(item)
AttributeError: features

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1889)
	at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1253)
	at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1253)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.numRows(RowMatrix.scala:76)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1573, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'features' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_38517/1488069231.py", line 4, in <lambda>
  File "/usr/local/lib/python3.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1578, in __getattr__
    raise AttributeError(item)
AttributeError: features

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1889)
	at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1253)
	at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1253)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [52]:
M = m1.multiply(m2.transpose())

AttributeError: 'RowMatrix' object has no attribute 'transpose'

In [19]:
sdf2.count()

22/05/02 15:01:34 WARN TaskSetManager: Stage 38 contains a task of very large size (4951 KiB). The maximum recommended task size is 1000 KiB.


500000

In [15]:
d_ret_fit_tfidf

{'count_vec': CountVectorizerModel: uid=CountVectorizer_9fa2f37b5f18, vocabularySize=110028,
 'idf': IDFModel: uid=IDF_7354e917a504, numDocs=4500000, numFeatures=110028,
 'spark_df': DataFrame[col: string, col_tok: array<string>, col_tok_ngrams: array<string>, col_tf_tok_ngrams: vector, col_tfidf_tok_ngrams: vector]}

In [18]:
sdf_aux.rdd.getNumPartitions()

22/05/02 13:26:34 WARN TaskSetManager: Stage 2 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
[Stage 2:>                                                          (0 + 4) / 4]

200

In [9]:
cols_db1
cols_db2

In [10]:
sdf_all = vstack_pyspark_cols(sdf1, 'job', sdf2, 'job')

In [23]:
d_ret_tfidf = fit_char_level_tfidf_pyspark(spark_df=sdf_all, col='col')

22/04/28 15:51:28 WARN TaskSetManager: Stage 18 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
22/04/28 15:51:31 WARN TaskSetManager: Stage 19 contains a task of very large size (4951 KiB). The maximum recommended task size is 1000 KiB.
22/04/28 15:51:49 WARN TaskSetManager: Stage 28 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
22/04/28 15:51:51 WARN TaskSetManager: Stage 29 contains a task of very large size (4951 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [25]:
d_ret_tfidf

{'count_vec': CountVectorizerModel: uid=CountVectorizer_21f95c44fd33, vocabularySize=48436,
 'idf': IDFModel: uid=IDF_ec338830e747, numDocs=1500000, numFeatures=48436,
 'spark_df': DataFrame[col: string, col_tok: array<string>, col_tok_ngrams: array<string>, col_tf_tok_ngrams: vector, col_tfidf_tok_ngrams: vector]}

In [24]:
d_ret_tfidf['idf'].transform

<bound method Transformer.transform of IDFModel: uid=IDF_ec338830e747, numDocs=1500000, numFeatures=48436>

In [22]:
sdf1.take(2)

22/04/28 13:48:43 WARN TaskSetManager: Stage 20 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[Row(Unnamed: 0=125372, job='Pharmacologist', company='Herrera PLC', ssn='693-58-1880', residence='89503 Kathy Turnpike Suite 249\nNorth Ronald, WI 83639', current_location="(Decimal('72.3116095'), Decimal('-42.207814'))", blood_group='O-', website="['https://www.arnold-trujillo.com/', 'http://www.castillo-lloyd.com/', 'https://hampton.net/', 'https://marshall.com/']", username='aaronramirez', name='David Thompson', sex='M', address='949 Rice Rapid Apt. 119\nWalkerfurt, WI 34835', mail='xbrandt@gmail.com', birthdate='2004-08-16', job_tok=['P', 'h', 'a', 'r', 'm', 'a', 'c', 'o', 'l', 'o', 'g', 'i', 's', 't'], job_tok_ngrams=['P h a', 'h a r', 'a r m', 'r m a', 'm a c', 'a c o', 'c o l', 'o l o', 'l o g', 'o g i', 'g i s', 'i s t'], tf_job_tok_ngrams=SparseVector(1893, {0: 1.0, 29: 1.0, 32: 1.0, 33: 1.0, 35: 1.0, 131: 1.0, 323: 1.0, 414: 1.0, 617: 1.0, 753: 1.0, 904: 1.0, 1700: 1.0}), tfidf_job_tok_ngrams=SparseVector(1893, {0: 1.3194, 29: 2.5272, 32: 2.5465, 33: 2.5667, 35: 2.6073, 131:

IDF_80b77a8fa561

In [None]:
(inputCol="words", outputCol="features")
      .fit(original_df)
      .transform(original_df)

In [11]:
HashingTF(inputCol="job_tok_ngrams", outputCol="tf_job_tok_ngrams").transform(sdf1)

TypeError: __init__() got an unexpected keyword argument 'inputCol'

In [None]:

# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)

In [83]:
sdf2.take(10)

22/04/28 11:42:52 WARN TaskSetManager: Stage 10 contains a task of very large size (4951 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[Row(index=222252, job='and', address='88787 M4rtibez', name='Lisb'),
 Row(index=94171, job='surveyob', address='Logarland 15323', name='Williams'),
 Row(index=147772, job='Surveyor', address='Ville Mloneybe9g', name='Eriza'),
 Row(index=465693, job='Libracian', address='Vilyages Jogn', name='Perkins'),
 Row(index=318978, job='Airlinv', address='5c287 Shoal', name='Veronica'),
 Row(index=246490, job='Fisheries', address='aa9senberp UT', name='Campbell'),
 Row(index=217914, job='assiss7nt', address='Park Ronals', name='Bass'),
 Row(index=544156, job='safety', address='Loc0 38gm8', name='Charles'),
 Row(index=76224, job='aherapist', address='ewive Chrishopher', name='Edwin'),
 Row(index=377407, job='of', address='Dustiaborough West', name='Mille8')]

In [87]:
db1[['name', 'job', 'company']].iloc[:3]

Unnamed: 0,name,job,company
0,Amanda Arroyo,Herbalist,Norton-Castillo
1,Victoria Brown,Outdoor activities/education manager,Bowman-Jensen
2,Amy Henry,"Chemist, analytical","Wilkerson, Guerrero and Mason"


In [109]:
sdf1.rdd.getNumPartitions()

22/04/28 13:17:25 WARN TaskSetManager: Stage 21 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.

200

NGram_ca03f062ea9b

NGram_57965cb425e8

22/04/28 13:20:20 WARN TaskSetManager: Stage 22 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[Row(Unnamed: 0=125372, job='Pharmacologist', company='Herrera PLC', ssn='693-58-1880', residence='89503 Kathy Turnpike Suite 249\nNorth Ronald, WI 83639', current_location="(Decimal('72.3116095'), Decimal('-42.207814'))", blood_group='O-', website="['https://www.arnold-trujillo.com/', 'http://www.castillo-lloyd.com/', 'https://hampton.net/', 'https://marshall.com/']", username='aaronramirez', name='David Thompson', sex='M', address='949 Rice Rapid Apt. 119\nWalkerfurt, WI 34835', mail='xbrandt@gmail.com', birthdate='2004-08-16', job_tok=['P', 'h', 'a', 'r', 'm', 'a', 'c', 'o', 'l', 'o', 'g', 'i', 's', 't'], job_tok_ngrams=['P h a', 'h a r', 'a r m', 'r m a', 'm a c', 'a c o', 'c o l', 'o l o', 'l o g', 'o g i', 'g i s', 'i s t']),
 Row(Unnamed: 0=179221, job='Associate Professor', company='Lee-Olson', ssn='643-37-8682', residence='08094 Suarez Center Apt. 410\nLisaburgh, UT 76403', current_location="(Decimal('-63.6904745'), Decimal('17.944387'))", blood_group='AB-', website="['http://

In [None]:
rdd.map(lambda x: (x,1))

In [89]:
from pyspark.ml.feature import Tokenizer

In [90]:
df = spark.createDataFrame([("aaa bbbb cc",)], ["text"])
tokenizer = Tokenizer(outputCol="words")
tokenizer.setInputCol("text")
tokenizer.transform(df).head()

Row(text='aaa bbbb cc', words=['aaa', 'bbbb', 'cc'])

In [None]:
hashingTF = HashingTF()

In [43]:
sdf2 = spark.createDataFrame(db2) 

TypeError: field name′: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

In [36]:
sdf1.rdd.getNumPartitions()

4

In [41]:
sdf1.rdd.take(10)

                                                                                

[Row(Unnamed: 0=125372, job='Pharmacologist', company='Herrera PLC', ssn='693-58-1880', residence='89503 Kathy Turnpike Suite 249\nNorth Ronald, WI 83639', current_location="(Decimal('72.3116095'), Decimal('-42.207814'))", blood_group='O-', website="['https://www.arnold-trujillo.com/', 'http://www.castillo-lloyd.com/', 'https://hampton.net/', 'https://marshall.com/']", username='aaronramirez', name='David Thompson', sex='M', address='949 Rice Rapid Apt. 119\nWalkerfurt, WI 34835', mail='xbrandt@gmail.com', birthdate='2004-08-16'),
 Row(Unnamed: 0=179221, job='Associate Professor', company='Lee-Olson', ssn='643-37-8682', residence='08094 Suarez Center Apt. 410\nLisaburgh, UT 76403', current_location="(Decimal('-63.6904745'), Decimal('17.944387'))", blood_group='AB-', website="['http://davis.com/', 'http://www.wood.org/']", username='adamssally', name='Joseph Murphy', sex='M', address='5256 Eric Roads Suite 484\nStaffordton, CT 29815', mail='carla00@hotmail.com', birthdate='1961-04-16'),

In [17]:
sdf1.show(10)

22/04/28 10:54:26 WARN TaskSetManager: Stage 1 contains a task of very large size (82296 KiB). The maximum recommended task size is 1000 KiB.
ERROR:root:KeyboardInterrupt while sending command.                 (0 + 1) / 1]
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [85]:
import pandas as pd

db1 = pd.read_excel(dir_data / 'POS Data Melilla.xlsx').dropna(how='all', axis=1)
db2 = pd.read_excel(dir_data / 'Nielsen data Melilla.xlsx', header=1).dropna(how='all', axis=1)

db1_name = 'POS'
db2_name = 'Nielsen'

In [93]:
len(db2.columns)

37

In [17]:
# Toni: reduce dim

from sklearn.feature_extraction.text import TfidfVectorizer
tfidf = TfidfVectorizer(analyzer=TFIDF_ANALYZER,
                        ngram_range=(TFIDF_NGRAM_LO, TFIDF_NGRAM_HI),
                        max_df=TFIDF_MAX_DF,
                        min_df=TFIDF_MIN_DF,
                        max_features=TFIDF_MAX_FEATS)

In [None]:
X = np.concatenate([
    [str(x) for x in db1.to_numpy().flatten()],
    [str(x) for x in db2.to_numpy().flatten()]
])
tfidf.fit(X)
TFIDF_VOCAB = len(tfidf.vocabulary_)
TFIDF_VOCAB

In [134]:
from sklearn.metrics.pairwise import cosine_similarity
l_d_sim = []
for c1 in db1.columns:
    for c2 in db2.columns:
        # Do not take into account NaN values in these columns
        X1 = tfidf.transform([str(x) for x in db1[c1].values if pd.notna(x)]).astype(np.float32) # cast to np.float32 to avoid mem issues in cosine_sim mat
        X2 = tfidf.transform([str(x) for x in db2[c2].values if pd.notna(x)]).astype(np.float32)
        M = cosine_similarity(X1, X2)
        max_sim_c1c2 = M.max(axis=1).mean()
        max_sim_c2c1 = M.max(axis=0).mean()
        max_sim_mean = np.mean([max_sim_c1c2, max_sim_c2c1])
        l_d_sim.append({
            'c1': c1,
            'c2': c2,
            'sim': max_sim_mean
        })
#         print(f'Similarity index between DB1({c1}) and DB2({c2}): {max_sim_c1c2}')
df_sim = pd.DataFrame(l_d_sim).sort_values('sim', ascending=False)


TO_MATCH = set(db1.columns).union(set(db2.columns))#["name'", "company'", "address'"]
matched = set()
rows_kept = []

for _, row in df_sim.iterrows():
    c1 = row['c1']
    c2 = row['c2']
    if (c1 in matched or c2 in matched)\
    or (c1 not in TO_MATCH and c2 not in TO_MATCH):
        continue
    else:
        matched = matched.union(set([c1, c2]))
        rows_kept.append(row)


df_c1c2 = pd.DataFrame(rows_kept)

# d_c1c2 = df_c1c2.set_index('c1')['c2'].to_dict()
# d_c1c2

In [130]:
db1['RANK NACIONAL']

0        NaN
1      374.0
2        NaN
3     1419.0
4        NaN
5       38.0
6     6419.0
7        NaN
8     6889.0
9        NaN
10    7153.0
11    7287.0
12     744.0
Name: RANK NACIONAL, dtype: float64

In [131]:
db2['CENTROCIAL']

0                    NaN
1                    NaN
2                    NaN
3                    NaN
4                    NaN
5                    NaN
6                    NaN
7                    NaN
8                    NaN
9                    NaN
10    C.C.PARQUE MELILLA
11                   NaN
12                   NaN
Name: CENTROCIAL, dtype: object

In [135]:
df_c1c2

Unnamed: 0,c1,c2,sim
0,IR Store_code,CODIGO,1.0
379,ZIP,COD_POSTAL,1.0
535,SUPERFN,SUPERFN,1.0
494,MUNICIPIO,MUNICIPIO,1.0
150,CHAIN SF,CADENA,1.0
456,PROVINCIA,PROVINCIA,1.0
573,TIPO_NIELSEN_NEW,TIPO_NIELSEN_NEW,1.0
418,CCAA,COMUNIDAD_AUTONOMA,1.0
188,ROTULO,ROTULO,1.0
663,AGR.2019 (P_15),POT_15,1.0


In [136]:
from IPython.core.display import Markdown, HTML
THR_SIM_COL = 0.5
display(Markdown(f'''
### Detected column matches with high matching score between:
 - DB1 (`{db1_name}`) and,
 - DB2 (`{db2_name}`)
### Showing column matches with score `> {THR_SIM_COL}`
'''))

df_c1c2_filtered = df_c1c2.query(f'sim > {THR_SIM_COL}').rename(columns={
    'c1': f'Column in {db1_name}',
    'c2': f'Column in {db2_name}',
    'sim': 'match_score (column)',
})

display(HTML(df_c1c2_filtered.to_html(index=False)))


### Detected column matches with high matching score between:
 - DB1 (`POS`) and,
 - DB2 (`Nielsen`)
### Showing column matches with score `> 0.5`


Column in POS,Column in Nielsen,match_score (column)
IR Store_code,CODIGO,1.0
ZIP,COD_POSTAL,1.0
SUPERFN,SUPERFN,1.0
MUNICIPIO,MUNICIPIO,1.0
CHAIN SF,CADENA,1.0
PROVINCIA,PROVINCIA,1.0
TIPO_NIELSEN_NEW,TIPO_NIELSEN_NEW,1.0
CCAA,COMUNIDAD_AUTONOMA,1.0
ROTULO,ROTULO,1.0
AGR.2019 (P_15),POT_15,1.0


In [137]:
df_c1c2

Unnamed: 0,c1,c2,sim
0,IR Store_code,CODIGO,1.0
379,ZIP,COD_POSTAL,1.0
535,SUPERFN,SUPERFN,1.0
494,MUNICIPIO,MUNICIPIO,1.0
150,CHAIN SF,CADENA,1.0
456,PROVINCIA,PROVINCIA,1.0
573,TIPO_NIELSEN_NEW,TIPO_NIELSEN_NEW,1.0
418,CCAA,COMUNIDAD_AUTONOMA,1.0
188,ROTULO,ROTULO,1.0
663,AGR.2019 (P_15),POT_15,1.0


In [138]:
d_name_map_c1c2 = df_c1c2_filtered[['Column in POS', 'Column in Nielsen']].set_index('Column in POS').to_dict()['Column in Nielsen']

In [151]:
db1[['IR Store_code','CANAL SF','RANK NACIONAL','CODE','CHAIN SF','ADDRESS']]

Unnamed: 0,IR Store_code,CANAL SF,RANK NACIONAL,CODE,CHAIN SF,ADDRESS
0,5200016,CANAL NO SF,,,"MERCADONA, S.A.","CL DE LOS VELEZ, MARQUES SN CLCARLOS V"
1,5200014,SUPER NACIONAL,374.0,SUPER 1-5,GRUPO EROSKI,CL CUARTEL VALENZUELA SN C.C.PARQUE MELILLA
2,5200007,CANAL NO SF,,,"LIDL SUPERMERCADOS, S.A.U.",CL MAANAN BENAISA MIMUM SN
3,5200001,SUPER REGIONAL,1419.0,RESTO SUPER 51-184,"SUPERSOL SPAIN, S.L.",CL GENERAL POLAVIEJA 30
4,5200008,CANAL NO SF,,,GM FOOD IBERICA,CL FERNANDEZ CUEVAS 1
5,5200003,SUPER REGIONAL,38.0,RESTO SUPER,"SUPERSOL SPAIN, S.L.",CL ALCALDE RAFAEL GINEL 12 EDIF. SAN LORENZO
6,5200009,SUPER NACIONAL,6419.0,RESTO SUPER,"DIA, S.A.",CL MARQUES DE MONTEMAR 5
7,5200006,CANAL NO SF,,,"COVIRAN, S.C.A.",CL ALFONSO XIII SN
8,5200015,SUPER NACIONAL,6889.0,RESTO SUPER,"DIA, S.A.",CL GENERAL VILLALBA 48
9,5200004,CANAL NO SF,,,"COVIRAN, S.C.A.",CL LAS MARGARITAS SN NAVE 1


In [139]:
# Prepare same width dataframes with correspondences to extract row vectors
db1_to_match = pd.concat([db1[c] for c in d_name_map_c1c2.keys()], axis=1, ignore_index=True)
db2_to_match = pd.concat([db2[c] for c in d_name_map_c1c2.values()], axis=1, ignore_index=True)
assert db1_to_match.shape[1] == db2_to_match.shape[1]

In [140]:
# Cast all to str
for db in [db1, db2]:
    for c in db.columns:
        db[c] = db[c].astype(str)

# Rows matching

In [141]:
from scipy.sparse import vstack, hstack
def vectorize_df_ordered_cols(df):
    vectorized_tfidf = np.vectorize(lambda x: tfidf.transform([x]))
    db_mat = df.to_numpy()
    X_db = vectorized_tfidf(db_mat)
    X_db = [hstack(row) for row in X_db]
    X_db = vstack(X_db)
    return X_db

In [142]:
# TODO: Change tick() to be the one to have the name, and not tack(): more descriptive
class Clock():
    def __init__(self):
        self.d = {}
        
    def tick(self):
        self.t = time()
        
    def tack(self, name):
        self.d[name] = round(time() - self.t, 2)

In [143]:
from time import time
clock = Clock()

clock.tick()
X_db1 = vectorize_df_ordered_cols(db1[d_name_map_c1c2.keys()]).astype(np.float32)
clock.tack('time_vectorize_db1')

clock.tick()
X_db2 = vectorize_df_ordered_cols(db2[d_name_map_c1c2.values()]).astype(np.float32)
clock.tack('time_vectorize_db2')

In [144]:
from sklearn.metrics.pairwise import cosine_similarity

clock.tick()
M_rowsim = cosine_similarity(X_db1, X_db2)

matching_row_in_db2 = M_rowsim.argmax(axis=1)
matching_row_in_db2_sim = M_rowsim.max(axis=1)
clock.tack('time_matching')

db1['matching_row_in_db2'] = matching_row_in_db2
db1['sim'] = matching_row_in_db2_sim

In [145]:
clock.tick()
list_top5 = []
for row in M_rowsim:
    list_top5.append(row.argsort()[::-1][:5])
    
matching_row_in_db2_top5 = np.array(list_top5)
clock.tack('time_argsort_for_topk')

In [99]:
db1['matching_row_in_db2_top5'] = [row for row in matching_row_in_db2_top5]

In [156]:
db1[['IR Store_code','CANAL SF','RANK NACIONAL','CODE','CHAIN SF','ADDRESS', 'matching_row_in_db2', 'sim']].rename(columns={'sim': 'match_score'})

Unnamed: 0,IR Store_code,CANAL SF,RANK NACIONAL,CODE,CHAIN SF,ADDRESS,matching_row_in_db2,match_score
0,5200016,CANAL NO SF,,,"MERCADONA, S.A.","CL DE LOS VELEZ, MARQUES SN CLCARLOS V",12,0.982882
1,5200014,SUPER NACIONAL,374.0,SUPER 1-5,GRUPO EROSKI,CL CUARTEL VALENZUELA SN C.C.PARQUE MELILLA,10,0.979956
2,5200007,CANAL NO SF,,,"LIDL SUPERMERCADOS, S.A.U.",CL MAANAN BENAISA MIMUM SN,5,0.987224
3,5200001,SUPER REGIONAL,1419.0,RESTO SUPER 51-184,"SUPERSOL SPAIN, S.L.",CL GENERAL POLAVIEJA 30,0,0.979883
4,5200008,CANAL NO SF,,,GM FOOD IBERICA,CL FERNANDEZ CUEVAS 1,6,0.981585
5,5200003,SUPER REGIONAL,38.0,RESTO SUPER,"SUPERSOL SPAIN, S.L.",CL ALCALDE RAFAEL GINEL 12 EDIF. SAN LORENZO,2,0.972946
6,5200009,SUPER NACIONAL,6419.0,RESTO SUPER,"DIA, S.A.",CL MARQUES DE MONTEMAR 5,7,0.991414
7,5200006,CANAL NO SF,,,"COVIRAN, S.C.A.",CL ALFONSO XIII SN,4,0.968788
8,5200015,SUPER NACIONAL,6889.0,RESTO SUPER,"DIA, S.A.",CL GENERAL VILLALBA 48,11,0.985194
9,5200004,CANAL NO SF,,,"COVIRAN, S.C.A.",CL LAS MARGARITAS SN NAVE 1,3,0.982281


In [154]:
db2

Unnamed: 0,CODIGO,MACROCADENAS_NOMBRE,CADENA,ROTULO,ABRV,DIRECCION,NUMERO,DIRECCION1,DIRECCION_COD,COD_POSTAL,...,POT_8,POT_9,POT_10,POT_11,POT_12,POT_13,POT_14,POT_15,POINT_X,POINT_Y
0,5200001,"EUROMADI IBERICA,S.A.","SUPERSOL SPAIN, S.L.",SUPERSOL,CL,GENERAL POLAVIEJA,30,,GENERAL POLAVIEJA 30_005200001,52006,...,7.65,7.62,6.3,6.74,5.06,1.19,9.56,6.28,-2.94225087774823,35.2847260582838
1,5200002,"EUROMADI IBERICA,S.A.","SUPERSOL SPAIN, S.L.",SUPERSOL,CL,MADRID,S/N,EDIF. ZURBARAN,MADRID S/N_005200002,52001,...,3.16,1.82,1.19,1.17,1.01,0.15,2.7,1.52,-2.95355215298406,35.2884665409159
2,5200003,"EUROMADI IBERICA,S.A.","SUPERSOL SPAIN, S.L.",SUPERSOL,CL,ALCALDE RAFAEL GINEL,12,EDIF. SAN LORENZO,ALCALDE RAFAEL GINEL 12_005200003,52001,...,4.4,4.14,3.46,3.6,2.86,0.51,4.79,3.37,-2.93800895948979,35.2868764082751
3,5200004,"EUROMADI IBERICA,S.A.","COVIRAN, S.C.A.",COVIRAN,CL,LAS MARGARITAS,S/N,NAVE 1,LAS MARGARITAS S/N_005200004,52006,...,3.06,0.87,1.88,0.57,1.06,0.04,3.72,1.57,-2.94146830821349,35.2717502054199
4,5200006,"EUROMADI IBERICA,S.A.","COVIRAN, S.C.A.",COVIRAN,CL,ALFONSO XIII,S/N,,ALFONSO XIII S/N_005200006,52006,...,3.3,1.1,2.15,0.64,1.24,0.03,3.99,1.73,-2.93561156317003,35.2933079020288
5,5200007,INDEPENDIENTE,"LIDL SUPERMERCADOS, S.A.U.",LIDL,CL,MAANAN BENAISA MIMUM,S/N,,MAANAN BENAISA MIMUM S/N_005200007,52005,...,6.21,6.85,4.38,5.74,4.05,4.18,6.89,6.46,-2.95687136207697,35.2890075345388
6,5200008,"EUROMADI IBERICA,S.A.",GM FOOD IBERICA,SUMA,CL,FERNANDEZ CUEVAS,1,,FERNANDEZ CUEVAS 1_005200008,52005,...,12.65,3.22,7.53,3.51,2.96,0.02,10.92,4.92,-2.94764269703977,35.2899884593456
7,5200009,INDEPENDIENTE,"DIA, S.A.",DIA MARKET,CL,MARQUES DE MONTEMAR,5,,MARQUES DE MONTEMAR 5_005200009,52006,...,3.34,1.7,1.71,1.35,1.43,0.35,2.8,1.76,-2.94020053413355,35.2852018637255
8,5200011,INDEPENDIENTE,"DIA, S.A.",DIA MAXI,CL,GARCIA CABRELLES,20,,GARCIA CABRELLES 20_005200011,52002,...,2.29,1.68,1.77,0.76,1.48,0.23,2.09,1.57,-2.94400726672052,35.2955264665589
9,5200012,INDEPENDIENTE,"DIA, S.A.",DIA MAXI,CL,CABRERIZAS,17,PLANTA 7-PUERTA B,CABRERIZAS 17_005200012,52203,...,2.23,1.65,1.74,0.75,1.47,0.23,2.05,1.54,-2.94846725093748,35.2966408020948


In [149]:
with pd.option_context('display.max_columns', None):
    display(db1)

Unnamed: 0,IR Store_code,CANAL SF,RANK NACIONAL,CODE,CHAIN SF,ROTULO,BANNER CONTROL,BANNER SF (COMPILADO),NAME STORE (ID),ADDRESS,ZIP,CCAA,PROVINCIA,MUNICIPIO,SUPERFN,TIPO_NIELSEN_NEW,CHANNEL P&G,AGR.2019 (P_15),COBERTURA FY2122,matching_row_in_db2,sim
0,5200016,CANAL NO SF,,,"MERCADONA, S.A.",MERCADONA,MERCADONA,BANNER NO SF,STORE NO SF,"CL DE LOS VELEZ, MARQUES SN CLCARLOS V",52001,MELILLA,MELILLA,MELILLA,1925,SUPER1 800-2499,DISCOUNTER,21.04,NO,12,0.982882
1,5200014,SUPER NACIONAL,374.0,SUPER 1-5,GRUPO EROSKI,EROSKI,EROSKI SUPER,EROSKI SUPER,STORE NO SF,CL CUARTEL VALENZUELA SN C.C.PARQUE MELILLA,52006,MELILLA,MELILLA,MELILLA,4200,HIPER2 2500-6499,SUPER NACIONAL,20.26,NO,10,0.979956
2,5200007,CANAL NO SF,,,"LIDL SUPERMERCADOS, S.A.U.",LIDL,LIDL,BANNER NO SF,STORE NO SF,CL MAANAN BENAISA MIMUM SN,52005,MELILLA,MELILLA,MELILLA,1400,SUPER1 800-2499,DISCOUNTER,6.46,NO,5,0.987224
3,5200001,SUPER REGIONAL,1419.0,RESTO SUPER 51-184,"SUPERSOL SPAIN, S.L.",SUPERSOL,SUPERSOL,SUPERSOL,STORE NO SF,CL GENERAL POLAVIEJA 30,52006,MELILLA,MELILLA,MELILLA,2483,SUPER1 800-2499,SUPER REGIONAL,6.28,NO,0,0.979883
4,5200008,CANAL NO SF,,,GM FOOD IBERICA,SUMA,BANNER NO SF,BANNER NO SF,STORE NO SF,CL FERNANDEZ CUEVAS 1,52005,MELILLA,MELILLA,MELILLA,1480,SUPER1 800-2499,HFS,4.92,NO,6,0.981585
5,5200003,SUPER REGIONAL,38.0,RESTO SUPER,"SUPERSOL SPAIN, S.L.",SUPERSOL,SUPERSOL,SUPERSOL,STORE NO SF,CL ALCALDE RAFAEL GINEL 12 EDIF. SAN LORENZO,52001,MELILLA,MELILLA,MELILLA,1084,SUPER1 800-2499,SUPER REGIONAL,3.37,NO,2,0.972946
6,5200009,SUPER NACIONAL,6419.0,RESTO SUPER,"DIA, S.A.",DIA MARKET,DIA,BANNER NO SF,STORE NO SF,CL MARQUES DE MONTEMAR 5,52006,MELILLA,MELILLA,MELILLA,533,SUPER2 300-799,SUPER NACIONAL,1.76,NO,7,0.991414
7,5200006,CANAL NO SF,,,"COVIRAN, S.C.A.",COVIRAN,BANNER NO SF,BANNER NO SF,STORE NO SF,CL ALFONSO XIII SN,52006,MELILLA,MELILLA,MELILLA,566,SUPER2 300-799,HFS,1.73,NO,4,0.968788
8,5200015,SUPER NACIONAL,6889.0,RESTO SUPER,"DIA, S.A.",DIA MARKET,DIA,BANNER NO SF,STORE NO SF,CL GENERAL VILLALBA 48,52002,MELILLA,MELILLA,MELILLA,481,SUPER2 300-799,SUPER NACIONAL,1.64,NO,11,0.985194
9,5200004,CANAL NO SF,,,"COVIRAN, S.C.A.",COVIRAN,BANNER NO SF,BANNER NO SF,STORE NO SF,CL LAS MARGARITAS SN NAVE 1,52006,MELILLA,MELILLA,MELILLA,466,SUPER2 300-799,HFS,1.57,NO,3,0.982281


In [21]:
# db1[list(d_c1c2.keys())+['matching_row_in_db2']]

In [22]:
# Slide: Not noisy vs correspondent noisy

In [23]:
# Ruido2: Falta algun campo
# Ruido3: Swap campo
# Rendimiento SCANN

In [100]:
df_row_matches = pd.merge(
    left=db1.reset_index()[['index', 'matching_row_in_db2', 'company', 'address', 'name', 'sim', 'matching_row_in_db2_top5']],
    right=db2.reset_index()[['level_0', 'index'] + list(d_name_map_c1c2.values())],
    left_on='index',
    right_on='index'
)

KeyError: "['company', 'address', 'name'] not in index"

# Metrics

In [25]:
def df_to_html_file_for_mlflow(df, path_artifact):
    # Also, write HTML and log 
    from pretty_html_table import build_table
    with pd.option_context("display.precision", 4):
        html_df = build_table(df, index=True, color='grey_light', font_family='Arial', font_size=12)
        with open(path_artifact, 'w') as fb:
            fb.write(html_df)
            
            
dir_artifacts = Path('output') # Can it be temp?
dir_artifacts.mkdir(parents=True, exist_ok=True)

In [26]:
d_metrics = {}

In [27]:
# Top1 acc - no thr
df_row_matches['high_conf'] = df_row_matches['sim'] > 0

df_row_matches['match_correct'] = df_row_matches['matching_row_in_db2'] == df_row_matches['level_0']
df_ct = pd.crosstab(df_row_matches['high_conf'], df_row_matches['match_correct'], normalize='all')
d_metrics['top1 acc'] = df_ct[True][True]

In [28]:
# Top1 acc - thr
THR_CONFIDENCE_QUANTILE = 0.3 # Example of threshold threshold tests → We will not process bottom 30% confident rows
# THR LEAVING OUT % OF SAMPLES
THR = df_row_matches['sim'].quantile(THR_CONFIDENCE_QUANTILE)
df_row_matches['high_conf'] = df_row_matches['sim'] > THR
df_ct = pd.crosstab(df_row_matches['high_conf'], df_row_matches['match_correct'], normalize='index')
d_metrics[f'top1 acc of processed'] = df_ct[True][True]

In [29]:
# norm this by 'all' instead of 'index'
df_ct_for_barplot = pd.crosstab(df_row_matches['high_conf'], df_row_matches['match_correct'], normalize='all', dropna=False)

In [30]:
df_to_html_file_for_mlflow(df_ct.reset_index(), dir_artifacts / 'error_rate_thr_norm_row.html')

In [31]:
df_to_html_file_for_mlflow(df_ct_for_barplot.reset_index(), dir_artifacts / 'error_rate_thr_norm_all.html')

In [32]:
df_ct_for_barplot
d_metrics_thr = {
    'perc_processed_ok': df_ct_for_barplot[True][True],
    
    'perc_processed_ko': df_ct_for_barplot[False][True],
    
    'perc_not_processed': THR_CONFIDENCE_QUANTILE,
}
df_metrics_thr = pd.DataFrame(d_metrics_thr.items())

df_metrics_thr[' '] = ''
df_metrics_thr[1] = (df_metrics_thr[1]*100).apply(lambda x: round(x,2))

import plotly.express as px
fig = px.bar(
    df_metrics_thr,
    x=' ',
    y=1,
    color=0,
    color_discrete_map={
        'perc_not_processed': 'rgba(70,70,70,0.5)',
        'perc_processed_ok': 'rgba(0,200,120,1)',
        'perc_processed_ko': 'red'
    },
    text=1,
    width=400,
    
)

fig.update_layout(
    uniformtext_minsize=12,
    uniformtext_mode='hide',
    legend_title_text='Case',
    xaxis_title="",
    yaxis_title="%",
    title='Percentages of not processed, processed correctly and<br>processed incorrectly when using threshold',
)
fig.update_layout(title_font_size=12)

fig.write_html(dir_artifacts / 'perc_not_proc_proc_ok_ko.html')

In [33]:
# Top3 acc
df_row_matches['match_correct_top3'] = df_row_matches.apply(lambda row: row['level_0'] in row['matching_row_in_db2_top5'][:3], axis=1)
df_ct = pd.crosstab(df_row_matches['high_conf'], df_row_matches['match_correct_top3'], normalize='index')
d_metrics['top3 acc'] = df_ct[True][True]

In [34]:
# Top5 acc
df_row_matches['match_correct_top5'] = df_row_matches.apply(lambda row: row['level_0'] in row['matching_row_in_db2_top5'], axis=1)
df_ct = pd.crosstab(df_row_matches['high_conf'], df_row_matches['match_correct_top5'], normalize='index')
d_metrics['top5 acc'] = df_ct[True][True]

In [35]:
# !pip install scann

In [36]:
# !pip install numpy --upgrade

In [37]:
# # https://github.com/google-research/google-research/blob/master/scann/docs/example.ipynb
# import scann


In [38]:
# searcher = scann.scann_ops_pybind.builder(X_db1, 10, "dot_product").tree(
#     num_leaves=2000, num_leaves_to_search=100, training_sample_size=250000).score_ah(
#     2, anisotropic_quantization_threshold=0.2).reorder(100).build()

In [39]:
sample_rows = db1.sample(7)
with pd.option_context('max.colwidth', None):
    display(sample_rows[['residence']])

Unnamed: 0,residence
3897,"5405 Jessica Grove\nNorth Matthew, MD 73186"
15719,"3205 Bowman Isle Suite 454\nLake David, IN 66600"
17915,"354 Allen Fort Apt. 673\nHayestown, CA 53217"
12783,"92597 Annette Branch\nYoungchester, MO 29107"
6880,"4177 Sarah Pass\nJacksonview, ID 21177"
15785,"PSC 3693, Box 9492\nAPO AE 88261"
2426,"57520 Costa Hill Suite 261\nLake Julieborough, HI 65335"


In [40]:
# # Example libpostal
# from postal.parser import parse_address
# from postal.expand import expand_address
# db1_residences_parsed = sample_rows[['residence']]
# db1_residences_parsed['residence'] = db1_residences_parsed['residence'].apply(
#     lambda txt: dict([(v,k) for k,v in parse_address(txt)]))
# pd.json_normalize(db1_residences_parsed['residence'])

In [41]:
import plotly.express as px
fig = px.histogram(
    df_row_matches['sim'],
    color=df_row_matches['match_correct'],
    barmode='stack'
)
fig.add_vline(x=THR, line_dash='dash')
fig.update_layout(
    legend_title_text='Match correct',
    xaxis_title="Confidence",
    yaxis_title="Num matched elements",
    title='Error rate by confidence interval'
)
fig.write_html(dir_artifacts / 'error_rate_by_conf.html')

In [42]:
with mlflow.start_run():
    for k in [
        'NUM_TOKENS_KEPT_ADDR',
        'P_NOISE_CHAR',
        'ROW_COL_MISSING_OR_SWAPPED',
        'P_ROW_COL_MISSING_OR_SWAPPED',
        'FRAC_KEPT_ROWS_DB2',
        'TFIDF_ANALYZER',
        'TFIDF_NGRAM_LO',
        'TFIDF_NGRAM_HI',
        'TFIDF_MAX_DF',
        'TFIDF_MIN_DF',
        'TFIDF_MAX_FEATS',
        'TFIDF_VOCAB',
        'THR_CONFIDENCE_QUANTILE',
        'THR',
    ]:
        mlflow.log_param(k, locals()[k])
    
    for k, v in d_metrics.items():
        mlflow.log_metric(k, v)
        
    for k, v in clock.d.items():
        mlflow.log_metric(k, v)
    
    mlflow.log_artifacts(dir_artifacts)