In [0]:
zinggDir = "/models"
modelId = "databricksdemo"


MARKED_DIR = zinggDir + "/" + modelId + "/trainingData/marked/"
UNMARKED_DIR = zinggDir + "/" + modelId + "/trainingData/unmarked/"

MARKED_DIR_DBFS = "/dbfs" + MARKED_DIR
UNMARKED_DIR_DBFS = "/dbfs" + UNMARKED_DIR  


import pandas as pd
import numpy as np
 
import time
import uuid
 
from tabulate import tabulate

def cleanModel():
    dbutils.fs.rm(MARKED_DIR, recurse=True)
    # drop unmarked data
    dbutils.fs.rm(UNMARKED_DIR, recurse=True)
    return

# retrieve candidate pairs
def get_candidate_pairs():
  '''
  The purpose of this function is to retrieve candidate pairs that need labeling.
  The function compares the content of the unmarked folder within which the Zingg
  findTraining job deposits candidate paris with those of the marked folder where
  we persist labeled pairs so that no previously labeled pairs are returned.
  '''
  unmarked_pd = pd.DataFrame({'z_cluster':[]})
  marked_pd = pd.DataFrame({'z_cluster':[]})

  
  # read unmarked pairs
  try:
    tmp_pd = pd.read_parquet(
        UNMARKED_DIR_DBFS, 
        engine='pyarrow'
         )
    if tmp_pd.shape[0] != 0: unmarked_pd = tmp_pd
  except:
    pass
  
  # read marked pairs
  try:
    tmp_pd = pd.read_parquet(
        MARKED_DIR_DBFS, 
        engine='pyarrow'
         )
    if tmp_pd.shape[0] != 0: marked_pd = tmp_pd
  except:
    pass
  
  # get unmarked not in marked
  candidate_pairs_pd = unmarked_pd[~unmarked_pd['z_cluster'].isin(marked_pd['z_cluster'])]
  
  return candidate_pairs_pd

# assign label to candidate pair
def assign_label(candidate_pairs_pd, z_cluster, label):
  '''
  The purpose of this function is to assign a label to a candidate pair
  identified by its z_cluster value.  Valid labels include:
     0 - not matched
     1 - matched
     2 - uncertain
  '''
  
  # assign label
  candidate_pairs_pd.loc[ candidate_pairs_pd['z_cluster']==z_cluster, 'z_isMatch'] = label
  
  return
 
# persist labels to marked folder
def save_labels(candidate_pairs_pd):
  '''
  The purpose of this function is to save labeled pairs to the marked folder.
  '''
 
  # make dir if not exists
  dbutils.fs.mkdirs(MARKED_DIR)
  dbutils.fs.ls(MARKED_DIR)
 
  # save labeled data to file
  candidate_pairs_pd.to_parquet(
    MARKED_DIR_DBFS +'/markedRecords_'+ str(time.time_ns()/1000) + '.parquet', 
    compression='snappy',
    index=False # do not include index
    )
  
  return
 
 
def count_labeled_pairs():
  '''
  The purpose of this function is to count the labeled pairs in the marked folder.
  '''
  
  # create initial dataframes
  marked_pd = pd.DataFrame({'z_cluster':[]})
  
  # read unmarked pairs
  try:
    marked_pd = pd.read_parquet(
        MARKED_DIR_DBFS, 
        engine='pyarrow'
         )
  except:
    pass
  
  n_total = len(np.unique(marked_pd['z_cluster']))
  n_positive = len(np.unique(marked_pd[marked_pd['z_isMatch']==1]['z_cluster']))
  n_negative = len(np.unique(marked_pd[marked_pd['z_isMatch']==0]['z_cluster']))
  
  return n_positive, n_negative, n_total

# setup widget 
available_labels = {
    'No Match':0,
    'Match':1,
    'Uncertain':2
    }
dbutils.widgets.dropdown('label', 'Uncertain', available_labels.keys(), 'Is this pair a match?')




In [0]:

from zingg.client import *
from zingg.pipes import *

#build the arguments for zingg
args = Arguments()
#set field definitions
fname = FieldDefinition("fname", "string", MatchType.FUZZY)
lname = FieldDefinition("lname", "string", MatchType.FUZZY)
stNo = FieldDefinition("stNo", "string", MatchType.FUZZY)
add1 = FieldDefinition("add1","string", MatchType.FUZZY)
add2 = FieldDefinition("add2", "string", MatchType.FUZZY)
city = FieldDefinition("city", "string", MatchType.FUZZY)
areacode = FieldDefinition("areacode", "string", MatchType.FUZZY)
state = FieldDefinition("state", "string", MatchType.FUZZY)
dob = FieldDefinition("dob", "string", MatchType.FUZZY)
ssn = FieldDefinition("ssn", "string", MatchType.FUZZY)

fieldDefs = [fname, lname, stNo, add1, add2, city, areacode, state, dob, ssn]

args.setFieldDefinition(fieldDefs)
#set the modelid and the zingg dir
args.setModelId(modelId)
args.setZinggDir(zinggDir)
args.setNumPartitions(4)
args.setLabelDataSampleSize(0.5)

#reading dataset into inputPipe and settint it up in 'args'
#below line should not be required if you are reading from in memory dataset
#in that case, replace df with input df
schema = "id string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn  string"
inputPipe = CsvPipe("testFebrl", "/FileStore/test.csv", schema)

args.setData(inputPipe)

#setting outputpipe in 'args'
outputPipe = CsvPipe("resultFebrl", "/tmp/febrlOutput")

args.setOutput(outputPipe)



In [0]:
options = ClientOptions([ClientOptions.PHASE,"findTrainingData"])

#Zingg execution for the given phase
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

In [0]:
# get candidate pairs
candidate_pairs_pd = get_candidate_pairs()
 
# if no candidate pairs, run job and wait
if candidate_pairs_pd.shape[0] == 0:
  print('No unlabeled candidate pairs found.  Run findTraining job ...')

else:
    # get list of pairs (as identified by z_cluster) to label 
    z_clusters = list(np.unique(candidate_pairs_pd['z_cluster'])) 

    # identify last reviewed cluster
    last_z_cluster = '' # none yet

    # print candidate pair stats
    print('{0} candidate pairs found for labeling'.format(len(z_clusters)))

In [0]:

# get current label setting (which is from last cluster)
last_label = available_labels[dbutils.widgets.get('label')]
 
# assign label to last cluster
if last_z_cluster != '':
  assign_label(candidate_pairs_pd, last_z_cluster, last_label)
 
# get next cluster to label
try:
  z_cluster = candidate_pairs_pd[(candidate_pairs_pd['z_isMatch']==-1) & (candidate_pairs_pd['z_cluster'] != last_z_cluster)].head(1)['z_cluster'].values[0]
except:
  pass
  z_cluster = ''
 
# present the next pair
if z_cluster != '':
  print('IS THIS PAIR A MATCH?')
  print(f"Current widget setting will label this as '{dbutils.widgets.get('label')}'.")
  print('Change widget value if different label required.\n')
  print(
    tabulate(
      candidate_pairs_pd[candidate_pairs_pd['z_cluster']==z_cluster], 
      headers = 'keys', 
      tablefmt = 'psql'
      )
    )
else:
  print('All candidate pairs have been labeled.\n')
 
# hold last items for assignnment in next run
last_z_cluster = z_cluster
 
# if no more to label
if last_z_cluster == '':
  
  # save labels
  save_labels(candidate_pairs_pd)
  
  # count labels accumulated
  n_pos, n_neg, n_tot = count_labeled_pairs()
  print(f'You have accumulated {n_pos} pairs labeled as positive matches.')
  print("If you need more pairs to label, re-run the cell titled 'Get Data (Run Once Per Cycle).'")

In [0]:
# save labels
save_labels(candidate_pairs_pd)

# count labels accumulated
n_pos, n_neg, n_tot = count_labeled_pairs()
print(f'You have accumulated {n_pos} pairs labeled as positive matches.')
print("If you need more pairs to label, re-run the cell titled 'Get Data (Run Once Per Cycle).'")

In [0]:
options = ClientOptions([ClientOptions.PHASE,"trainMatch"])

#Zingg execution for the given phase
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()