In [1]:
package_jar = '../target/spark-data-repair-plugin_2.12_spark3.2_0.1.0-EXPERIMENTAL-with-dependencies.jar'

In [2]:
import numpy as np
import pandas as pd
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql import functions as f

spark = SparkSession.builder \
    .config('spark.jars', package_jar) \
    .config('spark.deriver.memory', '8g') \
    .enableHiveSupport() \
    .getOrCreate()

# Suppresses user warinig messages in Python
import warnings
warnings.simplefilter("ignore", UserWarning)

# For setting up Python root logger
import logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# Suppresses logger messages in JVM
spark.sparkContext.setLogLevel("ERROR")

NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of assembly.
21/11/17 00:07:08 WARN Utils: Your hostname, maropus-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.3.4 instead (on interface en0)
21/11/17 00:07:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/11/17 00:07:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
from repair.api import Scavenger
Scavenger().version()

'0.1.0-spark3.2-EXPERIMENTAL'

In [4]:
spark.read.option("header", True).csv("../testdata/hospital.csv").createOrReplaceTempView("hospital")
spark.table('hospital').printSchema()

root
 |-- tid: string (nullable = true)
 |-- ProviderNumber: string (nullable = true)
 |-- HospitalName: string (nullable = true)
 |-- Address1: string (nullable = true)
 |-- Address2: string (nullable = true)
 |-- Address3: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCode: string (nullable = true)
 |-- CountyName: string (nullable = true)
 |-- PhoneNumber: string (nullable = true)
 |-- HospitalType: string (nullable = true)
 |-- HospitalOwner: string (nullable = true)
 |-- EmergencyService: string (nullable = true)
 |-- Condition: string (nullable = true)
 |-- MeasureCode: string (nullable = true)
 |-- MeasureName: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Sample: string (nullable = true)
 |-- Stateavg: string (nullable = true)



In [5]:
from repair.misc import RepairMisc
RepairMisc().option("table_name", "hospital").describe().selectExpr('attrName', 'distinctCnt', 'nullCnt').show()

[Stage 1:>                                                          (0 + 1) / 1]

+----------------+-----------+-------+
|        attrName|distinctCnt|nullCnt|
+----------------+-----------+-------+
|EmergencyService|          6|      0|
|    HospitalName|         68|      0|
|      CountyName|         65|      0|
|          Sample|        355|     60|
|   HospitalOwner|         28|      0|
|         ZipCode|         67|      0|
|     MeasureName|         63|      0|
|             tid|       1000|      0|
|  ProviderNumber|         71|      0|
|    HospitalType|         13|      0|
|     PhoneNumber|         72|      0|
|     MeasureCode|         56|      0|
|           Score|         71|    167|
|       Condition|         28|      0|
|        Address1|         78|      0|
|            City|         72|      0|
|        Address3|          0|   1000|
|           State|          4|      0|
|        Address2|          0|   1000|
|        Stateavg|         74|      0|
+----------------+-----------+-------+



                                                                                

In [22]:
def display_depgraph(table_name, targets, min_corr_thres=0.90, max_attr_value_num=30, max_domain_size=100):
    import uuid
    output = uuid.uuid4().hex[0:36]
    
    from repair.misc import RepairMisc
    opts = {
        "table_name": table_name,
        "path": f"/tmp/{output}",
        "target_attr_list": targets,
        "min_corr_thres": f"{min_corr_thres}",
        "max_attr_value_num": f"{max_attr_value_num}",
        "max_domain_size": f"{max_domain_size}"
    }
    RepairMisc().options({"table_name": table_name, "path": f"/tmp/{output}", "target_attr_list": targets, "min_corr_thres": f"{min_corr_thres}", "max_attr_value_num": f"{max_attr_value_num}", "": ""}).generateDepGraph()
    !dot -Tjpg /tmp/{output}/depgraph.dot -o /tmp/{output}/depgraph.jpg

    from IPython.display import SVG, display, Image
    display(Image(filename=f'/tmp/{output}/depgraph.jpg')) 

In [None]:
display_depgraph('hospital', 'City,PhoneNumber,ProviderNumber', max_attr_value_num=30)

In [6]:
pdf = spark.table('hospital').toPandas()

In [7]:
def to_histogram(X):
    import altair as alt
    charts = []
    for c in X:
        c = alt.Chart(pdf).mark_bar().encode(x=alt.X(c), y=alt.Y('count()', axis=alt.Axis(title='freq')))
        charts.append(c.properties(width=300, height=300))

    return alt.hconcat(*charts)

In [None]:
to_histogram(pdf[pdf.columns[pdf.columns != 'tid']])

In [None]:
import math
import numpy as np
from sklearn.cluster import AgglomerativeClustering, AffinityPropagation
from repair.costs import Levenshtein

target = 'City'
cost_func = Levenshtein().compute

value_counts = dict([(r[0], r.cnt) for r in spark.table('hospital').groupBy(target).agg(f.expr('count(1) cnt')).collect()])
values = np.asarray(list(value_counts.keys()))
X = -1 * np.array([[cost_func(v1, v2) for v1 in values] for v2 in values])

# clg = AffinityPropagation(affinity='precomputed', damping=0.5, max_iter=200, convergence_iter=15, random_state=42)
clg = AgglomerativeClustering(n_clusters=35)
clg.fit(X)

domain_values = []
for cid in np.unique(clg.labels_):
    g = np.unique(values[np.nonzero(clg.labels_ == cid)])
    grouped_values = sorted([(v, value_counts[v]) for v in g], key=lambda v: v[1], reverse=True)
    # print('**{}** => {}'.format(grouped_values[0], grouped_values[1:]))
    domain_values.append(grouped_values[0][0])
    
print(domain_values)    

In [None]:
import math
import numpy as np
from sklearn.cluster import AgglomerativeClustering, AffinityPropagation
from repair.costs import Levenshtein

target = ['City', 'ZipCode']

def cost_func(x, y):
    cf = Levenshtein().compute
    return sum([cf(v1, v2) for v1, v2 in zip(x, y)])

value_counts = dict([((r[0], r[1]), r.cnt) for r in spark.table('hospital').groupBy(target).agg(f.expr('count(1) cnt')).collect()])
values = np.asarray(list(value_counts.keys()))
X = -1 * np.array([[cost_func(v1, v2) for v1 in values] for v2 in values])

# clg = AffinityPropagation(affinity='precomputed', damping=0.5, max_iter=200, convergence_iter=15, random_state=42)
# clg = AgglomerativeClustering(n_clusters=70)
clg = AgglomerativeClustering(n_clusters=None, distance_threshold=20)
clg.fit(X)

for cid in np.unique(clg.labels_):
    g = values[np.nonzero(clg.labels_ == cid)]
    grouped_values = sorted([((v[0], v[1]), value_counts[(v[0], v[1])]) for v in g], key=lambda x: x[1], reverse=True)
    # grouped_values = list(map(lambda x: (x[0][1], x[1]), grouped_values))
    print('**{}** => {}'.format(grouped_values[0], grouped_values[1:]))

In [7]:
target_columns = ['ProviderNumber', 'HospitalName', 'Address1', 'City', 'State', 'ZipCode', 'CountyName', 'PhoneNumber', 'HospitalType', 'HospitalOwner', 'EmergencyService', 'Condition', 'MeasureCode', 'MeasureName', 'Score', 'Stateavg']

In [8]:
target = 'HospitalName'

In [9]:
from repair.detectors import *
error_detectors = [
    DomainValues(target, autofill=True, min_count_thres=12)
]

from repair.model import RepairModel
m = RepairModel().setTableName('hospital').setRowId('tid').setErrorDetectors(error_detectors).setTargets(target_columns)
detected_error_cells_df, target_columns, pairwise_stats, distinct_stats = m._do_error_detection('hospital', [])

2021-11-17 00:07:31.805 INFO model: Elapsed time (name: error detection) is 6.109451770782471(s)
2021-11-17 00:07:33.241 INFO model: Repairable target columns are HospitalName in noisy columns (HospitalName)
2021-11-17 00:07:33.243 INFO model: [Error Detection Phase] Analyzing cell domains to fix error cells...
2021-11-17 00:07:57.873 INFO model: Elapsed time (name: cell domain analysis) is 24.629590034484863(s)
2021-11-17 00:08:01.613 INFO model: [Error Detection Phase] 21 noisy cells fixed and 24 error cells remaining...


In [10]:
spark.read.option("header", True).csv("../bin/testdata/hospital_error_cells.csv").createOrReplaceTempView("hospital_error_cells")
spark.table('hospital_error_cells').groupBy('attribute').count().show()

+----------------+-----+
|       attribute|count|
+----------------+-----+
|     MeasureCode|   29|
|  ProviderNumber|   28|
|    HospitalName|   24|
|      CountyName|   39|
|           State|   26|
|   HospitalOwner|   27|
|          Sample|   31|
|     PhoneNumber|   34|
|     MeasureName|   36|
|         ZipCode|   30|
|           Score|   23|
|    HospitalType|   32|
|EmergencyService|   27|
|       Condition|   32|
|        Stateavg|   27|
|        Address1|   31|
|            City|   33|
+----------------+-----+



In [11]:
from repair.model import RepairModel
m = RepairModel().setTableName('hospital').setRowId('tid').setDiscreteThreshold(100) 
# error_cells_df = spark.table('hospital_error_cells')
error_cells_df = detected_error_cells_df
repair_base_df = m._prepare_repair_base_cells('hospital', error_cells_df, target_columns)
pdf = repair_base_df.toPandas()

                                                                                

In [None]:
to_histogram(pdf[pdf.columns[pdf.columns != 'tid']])

In [12]:
X_test = pdf[pdf[target].isna()].drop([target], axis=1).reset_index(drop=True)
pdf = pdf[pdf[target].notna()]
X = pdf.drop(['tid', target], axis=1).reset_index(drop=True)
y = pdf[target].reset_index(drop=True)

In [13]:
import category_encoders as ce
se = ce.OrdinalEncoder(handle_unknown='impute')
X = se.fit_transform(X)
_X_test = se.transform(X_test[X.columns]).copy(deep=True)
X_test = pd.concat([X_test[['tid']], _X_test], axis=1)

In [None]:
def to_splom(pdf, target, cols=None):
    import altair as alt
    _cols = cols if cols is not None else [c for c in pdf.columns if c != target]
    chart = alt.Chart(pdf).mark_circle().encode(
        alt.X(alt.repeat("column"), type='quantitative'),
        alt.Y(alt.repeat("row"), type='quantitative'),
        color=f'{target}:N'
    )
    chart = chart.properties(width=150, height=150)
    chart = chart.repeat(
        row=_cols,
        column=_cols
    )
    return chart

In [None]:
cols = ['ProviderNumber', 'HospitalName', 'Address1', 'City', 'State', 'ZipCode', 'CountyName', 'PhoneNumber', 'HospitalType', 'HospitalOwner', 'EmergencyService', 'Condition', 'MeasureCode', 'MeasureName', 'Score', 'Sample', 'Stateavg']
_y = y.replace(dict(map(lambda v: (v[1], v[0]), enumerate(y.unique()))))
to_splom(pd.concat([X, _y], axis=1), target, cols=cols)

In [None]:
def compute_mi_between_features(pdf):
    from minepy import MINE
    results = []
    mine = MINE(alpha=0.6, c=15, est="mic_approx")

    import itertools
    for c1, c2 in itertools.combinations(pdf.columns, 2):
        mine.compute_score(pdf[c1], pdf[c2])
        results.append(((c1, c2), mine.mic()))

    return sorted(results, key=lambda x: x[1], reverse=True)

In [None]:
cols = ['ProviderNumber', 'HospitalName', 'Address1', 'City', 'State', 'ZipCode', 'CountyName', 'PhoneNumber', 'HospitalType', 'HospitalOwner', 'EmergencyService', 'Condition', 'MeasureCode', 'MeasureName', 'Score', 'Sample', 'Stateavg']
cols = [c for c in cols if c != target]
mi_between_features = compute_mi_between_features(X[cols])
print(mi_between_features[0:3])

In [17]:
def compute_mi(X, y):
    from minepy import MINE
    results = []
    mine = MINE(alpha=0.6, c=15, est="mic_approx")

    for c in X.columns:
        mine.compute_score(y, X[c])
        results.append(((target, c), mine.mic()))

    return sorted(results, key=lambda x: x[1], reverse=True)

In [None]:
cols = ['ProviderNumber', 'HospitalName', 'Address1', 'City', 'State', 'ZipCode', 'CountyName', 'PhoneNumber', 'HospitalType', 'HospitalOwner', 'EmergencyService', 'Condition', 'MeasureCode', 'MeasureName', 'Score', 'Sample', 'Stateavg']
cols = [c for c in cols if c != target]
mi_against_target = compute_mi(X[cols], y)
selected_features_with_mi = list(map(lambda x: x[0][1], mi_against_target))[0:6]

In [None]:
def select_features_with_bruta(X, y, is_discrete=True):
    from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
    from boruta import BorutaPy
    model_class = RandomForestClassifier if is_discrete else RandomForestRegressor
    rf = model_class(n_jobs=-1, max_depth=5)
    rf.fit(X, y)
    print('SCORE with ALL Features: %1.2f' % rf.score(X, y))

    rf = model_class(n_jobs=-1, max_depth=5)
    fs = BorutaPy(rf, n_estimators='auto', random_state=42, perc=80, two_step=False, max_iter=500)
    fs.fit(X.values, y.values)

    selected = fs.support_
    X_selected = X[X.columns[selected]]
    rf = model_class(n_jobs=-1, max_depth=5)
    rf.fit(X_selected, y)
    print('SCORE with selected Features: %1.2f' % rf.score(X_selected, y))
    
    return X.columns[selected]

In [None]:
selected_features_with_bruta = select_features_with_bruta(X.fillna(-255.0), y, is_discrete=True)

In [14]:
# X_selected = X[selected_features_with_mi]
X_selected = X

In [None]:
def to_tsne(X, y, target, nsample=100, perplexity=50, n_iter=10000):
    # One of non-linear embedding in sklearn
    from sklearn.manifold import TSNE
    tsne = TSNE(n_components=2, random_state=42, perplexity=perplexity, n_iter=n_iter)
    _pdf = X.copy(deep=True)
    _pdf[target] = y
    _pdf_sampled = _pdf.dropna().sample(n=nsample, random_state=42)
    _X = _pdf_sampled[_pdf_sampled.columns[_pdf_sampled.columns != target]]
    _y = _pdf_sampled[target]
    _tf = tsne.fit_transform(_X)
    _X = pd.DataFrame({'tSNE-X': _tf[:, 0], 'tSNE-Y': _tf[:, 1], target: _y})
    print('KL divergence: {}'.format(tsne.kl_divergence_))

    import altair as alt
    chart = alt.Chart(_X).mark_point().encode(x='tSNE-X', y='tSNE-Y', color=f'{target}:N').properties(width=600, height=400).interactive()
    return chart

In [None]:
to_tsne(X_selected, y, target=target, nsample=100, perplexity=100, n_iter=250)

In [15]:
from repair import train
params = {'hp.timeout': '3600', 'hp.no_progress_loss': '30'}
(clf, score), _ = train.build_model(X_selected, y, is_discrete=True, num_class=len(y.unique()), n_jobs=-1, opts=params)
print(f'Score: {score}')

# import lightgbm as lgb
# obj = 'multiclass' if len(y.unique()) > 2 else 'binary'
# clf = lgb.LGBMClassifier(objective=obj, num_leaves=64, min_child_samples=20, max_depth=7)
# clf.fit(X[X.columns[selected]], y)

import json
top_k = 3
probs = clf.predict_proba(X_test[X_selected.columns])
pmf = map(lambda p: {"classes": clf.classes_.tolist(), "probs": p.tolist()}, probs)
pmf = map(lambda p: json.dumps(p), pmf)
df = spark.createDataFrame(pd.DataFrame({'tid': X_test['tid'], 'pmf': pd.Series(list(pmf))}))
df = df.selectExpr('tid', 'from_json(pmf, "classes array<string>, probs array<double>") pmf')
df = df.selectExpr('tid', 'arrays_zip(pmf.classes, pmf.probs) pmf')
df = df.selectExpr('tid', f'slice(array_sort(pmf, (left, right) -> if(left.`1` < right.`1`, 1, -1)), 1, {top_k}) top_k_pmf')
df = df.selectExpr('tid', f'top_k_pmf[0].`0` `{target}`', 'top_k_pmf')
predicted = df.toPandas()

2021-11-17 00:09:06.357 INFO train: hyperopt: #eval=34/100000000


Score: 0.9851851851851853


In [16]:
spark.read.option("header", True).csv("../testdata/hospital_clean.csv").createOrReplaceTempView("hospital_clean")
pdf_clean = spark.table('hospital_clean').where(f'attribute = "{target}"').selectExpr('tid', 'correct_val').toPandas()
result = pd.merge(predicted, pdf_clean, on='tid')
result['is_correct'] = result[target] == result['correct_val']
pd.set_option("display.max_colwidth", 300)
result

Unnamed: 0,tid,HospitalName,top_k_pmf,correct_val,is_correct
0,448,alaska regional hospital,"[(alaska regional hospital, 0.965180010867711), (yukon kuskokwim delta reg hospital, 0.00188181880242369), (russellville hospital, 0.001581647942678674)]",alaska regional hospital,True
1,987,st vincents blount,"[(st vincents blount, 0.969608307938321), (cherokee medical center, 0.0016293588339227668), (mizell memorial hospital, 0.0013591107793142734)]",st vincents blount,True
2,938,georgiana hospital,"[(georgiana hospital, 0.9694830632908333), (community hospital inc, 0.0016168767492821924), (andalusia regional hospital, 0.001216973480009243)]",georgiana hospital,True
3,710,community hospital inc,"[(community hospital inc, 0.9402108705919754), (georgiana hospital, 0.005719194439756884), (chilton medical center, 0.003832305896522176)]",community hospital inc,True
4,903,riverview regional medical center,"[(riverview regional medical center, 0.979489111620467), (gadsden regional medical center, 0.0017159366568619103), (alaska regional hospital, 0.00106431275327909)]",riverview regional medical center,True
5,730,cullman regional medical center,"[(cullman regional medical center, 0.9496663019772545), (russellville hospital, 0.0036578050644101292), (east alabama medical center and snf, 0.0027762741813686003)]",cullman regional medical center,True
6,171,marshall medical center north,"[(marshall medical center north, 0.9728405884352309), (marshall medical center south, 0.001388365573704062), (dale medical center, 0.0008807263805157)]",marshall medical center north,True
7,697,community hospital inc,"[(community hospital inc, 0.9546905503014285), (georgiana hospital, 0.004913753531693198), (yukon kuskokwim delta reg hospital, 0.004209044963471981)]",community hospital inc,True
8,560,jackson hospital & clinic inc,"[(jackson hospital & clinic inc, 0.966840336703911), (callahan eye foundation hospital, 0.0016637125774095693), (baptist medical center south, 0.0016492286554056453)]",jackson hospital & clinic inc,True
9,921,riverview regional medical center,"[(riverview regional medical center, 0.9689162907437554), (gadsden regional medical center, 0.002374200567450267), (andalusia regional hospital, 0.0023201718190947835)]",riverview regional medical center,True


In [17]:
print('Accuracy: {}'.format(len(result[result['is_correct']]) / len(result)))

Accuracy: 1.0
