In [193]:
import os
import gzip, pickle
import pandas as pd
import numpy as np
import re
import networkx as nx
import itertools as it
import sys
import random
import pprint
import string

from collections import defaultdict

import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams

from logistic_sgd import LogisticRegression
from dA import dA
from SdA import SdA

from sklearn.ensemble import RandomForestClassifier

In [194]:
LOCATION = '../feature_engineering/repo/council_spending_data/data/'   # real estate data
# LOCATION = '../feature_engineering/repo/raw_data/'   # real estate data
#LOCATION = '../data.gov.food+environment/'
#LOCATION = '../data.gov.au/'
#LOCATION = '../data.gov.apple/'

CONNECTION_COUNT = 10   # ?

In [195]:
def char_freq(text, freq=True, lowercase=True, entropy=True):  # extract character counts/frequencies in text (text is a list of strings)
    text = str(text)   # concatenate the elements of text into a single string
    all_chars = string.printable
    
    if lowercase: 
        text = text.lower()
    char_dic = {}
    if text is not '':
        for x in all_chars:
            char_dic[x] = float(text.count(x))/len(text) if freq else text.count(x)
    else:
        for x in all_chars:
            char_dic[x] = 0
        
    # add the information measure (negative entropy) of text, Sum(p(i) log(p(i)), i=1,...,n), where p(i) is the i-th character frequency, 
    # n is the number of possible characters in the "alphabet", i.e., number of possible states
    
    entr = 0
    max_entr = -np.log2(1./len(all_chars))   # maximum entropy, in bits per character, for a text made of all_chars
    if entropy:
        for x in string.printable:
            if freq:
                p_x = char_dic[x]
            else:
                p_x = float(char_dic[x])/len(text)
            if p_x > 0:
                entr += - p_x * np.log2(p_x)   # entropy of text, in bits per character

    char_dic['_entropy_'] = entr/max_entr   # return the normalized entropy of text
    
    return char_dic

class LinkGraph(object):
    
    def __init__(self, data_path, link_list, chunksize=100):   # link_list is a list of linkers (linker nodes) for the graph
        self.graph = nx.Graph()
        self.index = {}
        self.path = data_path

        # now build up the nodes
        self._build(link_list, chunksize)

    def drop_edge(self, src, dst):
        """
            remove a particular edge...
        """
        try:
            self.graph.remove_edge(src, dst)
        except Exception:
            print("Edge", src, "-", dst, " is not in the graph")
            
    def _build(self, link_list, chunksize=100):
        """
            This will build the graph by walking through the
            csv files, and reading the header columns. Using
            the Linker objects in the link_list, connections
            can be made between nodes...
        """
        i = 0
        
        for (dirpath, _, filenames) in os.walk(self.path):
            print("\n\ndirpath = ",dirpath)
            for f in filenames:
                print("\nfile ",f)
                try:
                    path = os.path.join(dirpath, f)
                    df = pd.read_csv(path, chunksize=chunksize).read(chunksize)
                    # df = linker.read(path)
                    
                    for col in df:
                        key = (f, col)
                        # fill in the index...
                        self.index[i] = key

                        for linker in link_list:
                            linker.fn(i, f, df, col)

                        i += 1

                except Exception as e:
                    print("Error: {}".format(e))

        # now add those nodes to the graph self.graph, and connect them accordingly...
        for linker in link_list:
            linker.process_nodes(self.graph)
            

class Linker(object):
    m = defaultdict(list)   # inverted index of the linker, with keys corresponding to the linkers, values corresponding to the lists of col nodes that the linkers connect
    weight = 1

    def fn(self, i, f, df, col):
        """
            fn is a function that will define the link node.
            When the data sets/columns are iterated over, 
            this function will be called to produce a node,
            with
            
            fn(f: Filename, df: DataFrame, col: Column Name)
        """
        self.m[f].append(i)   # append, to the inverse index m[f], the index id uniquely pointing to (f, col) tuple
        return

    def process_nodes(self, g):
        """
            process_map by creating weighted links from the 'key nodes' to the 'value nodes', or links between different 'value nodes'
            'value nodes' contain the column ids
            'key nodes' contain the ids of the linkers that connect the value nodes (data columns)
        """
        w = self.weight
        for k, v in self.m.items():   # self.m.items() are the items of the linker's inverse index m
            # add the links from the linker node k to all 'value' nodes it is connected to...
            g.add_weighted_edges_from((k, y, w) for y in v)

        self.m.clear()

# class PrintLinker(Linker):
#     """
#         This Linker prints
#     """
#     weight = 0.1
#     def fn(self, i, f, df, col):
#         super().fn(i, f, df, col)   # perform the parent's fn method first
#         print(i, f, df, col)
#         return
        
class TableLinker(Linker):
    """
        This Linker matches the filenames e.g. if the column
        is in the same file, they are linked.
        Its inverse index m is of the form {key=file_name: value=[list of nodes (col ids)]}
    """
    weight = 0.1
    def fn(self, i, f, df, col):
        self.m[f].append(i)   # append, to the inverted index m[f], the index i uniquely pointing to (f, col) tuple
        return f

class NameLinker(Linker):
    """
        This Linker matches the column names.
        Its inverse index m is of the form {key=column_name: value=[list of nodes (col ids)]}
    """
    weight = 0.2
    def fn(self, i, f, df, col):
        short_name = re.sub('[^a-z ]', '', col.lower().strip())  # col names are lowercased, and stripped of all non-alpha characters
        self.m[short_name].append(i)   # append, to the inverted index m[short_name], the index i uniquely pointing to (f, col) tuple
        return

    
class DataLinker(Linker):
    """
        This Linker links the columns with intersecting data (with exact matches between some rows in those columns).
    """
    weight = 1.0
    cutoff = 10   # maximum number of nodes sharing the same data key, to be considered as meaningfully connected
    scale = 0.75
    m = defaultdict(set)
    
    def fn(self, i, f, df, col):
        """
            comes in for every column...
        """
        for value in df[col]:  # loop over all values in the column col
            self.m[str(value)].add(i)  # add i to the inverted index's list corresponding to key=str(value)
        return
    
    def process_nodes(self, g):  
        """
            process_map
        """
        print("Unique data points:::", len(self.m))   # length of the list of unique linker node keys (unique data rows in all imported chunks of all columns)

        w = self.weight
        for k, v in self.m.items():
            # add the link...
            if len(v) < self.cutoff:
                connections = it.combinations(v, r=2)   # all unordered pairs of nodes in v
                w_connections = []
                for x, y in connections:
                    if x in g.edge and y in g.edge[x]:
                        g[x][y]['weight'] *= self.scale  # if x-y edge already exists in g (i.e., x-y are already connected by some other data key(s)), update (reduce by 25%) its weight
                    else:
                        g.add_edge(x, y, weight=w)  # ((x, y, w) for x, y in connections)
                #g.add_weighted_edges_from(w_connections)

        self.m.clear()
        
        
class SemanticLinker(Linker):
    """
    This linker links the columns of the same semantic data type
    """
        
    weight = 0.4
    m = defaultdict(list)
    
    def __init__(self, model_path):   # model_path is the path of the pre-trained semantic classifier
        def load_model(file_name):  #load the hyperparameters and the model 
            save_file = open(file_name, 'rb')
            hp = pickle.load(save_file, encoding='bytes')      #load the hyperparameters
            model = pickle.load(save_file, encoding='bytes')   #load the model
            targets_dict = pickle.load(save_file, encoding='bytes') #load the targets_dict - the dict translating integer labels to attributes
            save_file.close()
            return (hp, model, targets_dict)
        
        self.model_path = model_path
        print("Loading semantic classifier from",self.model_path)
        self.hp, self.classifier, self.targets_dict = load_model(self.model_path)  #load the best mlp model
        print('Loaded semantic classifier:\n', self.classifier,'\n')
        self.attributes = list(self.targets_dict.values())  # list of all known semantic types (attributes)
        
    def generate_examples(self,df,col,verbose=False):  #generate a matrix of examples from records, with number of records per example and number of examples per bucket defined in hp 
        if verbose: 
            print ('records_per_example = %i' %self.hp['records_per_example'])
            print ('examples_per_bucket = %i' %self.hp['examples_per_bucket'])
        if self.hp['entropy']:
            all_chars = list(string.printable) + ['_entropy_']
        else:
            all_chars = list(string.printable)
        # define the number of examples to generate:
        n_examples = self.hp['examples_per_bucket']

        examples = pd.DataFrame(index=range(n_examples), columns=['bucket_id']+['attribute']+['label']+all_chars)
        examples['bucket_id'][:] = col    # bucket_id is the column name
        examples['attribute'][:] = np.nan    # the attribute is not known
        examples['label'][:] = np.nan    # the label is not known
        for row in range(self.hp['examples_per_bucket']):
            if len(df[col])>0:
                sample = np.random.choice(df[col], size=self.hp['records_per_example'], replace=True)
            else:
                sample = ''   # empty string
            freq_vec = char_freq(sample, freq=self.hp['freq'], lowercase=self.hp['lowercase'], entropy=self.hp['entropy'])
            for char in all_chars:
                examples.set_value(row, char, freq_vec[char])

        return examples
    
    def predict_semantic_type(self, df, col):
        """
        Given a df and col, predict the col's semantic type, using self.classifier
        """
        
        # Generate examples from df[col]:
        examples = self.generate_examples(df,col)
        if len(examples.index)==0:
            print ('predict_semantic_type: no examples to label, stopping.')
            return
        
        # predict soft labels (labels' posteriors) of examples:
        x = examples.iloc[:,3:].as_matrix()  # feature vectors of the examples
        if str(self.classifier.__class__) == "<class 'sklearn.ensemble.forest.RandomForestClassifier'>":
            y_score = self.classifier.predict_proba(x)
        else:
            print('predict_semantic_type: this classifier type',self.classifier.__class__ ,'is not yet implemented, stopping.')
            return 
        
        y_score_mean = np.mean(y_score, axis=0)  # take a mean of each column in y_score - i.e., mean soft label of all examples
        sem_type = self.targets_dict[np.argmax(y_score_mean)]  # semantic type with the highest posterior wins
        print("column",col,": semantic type is",sem_type)
        return sem_type
    
    def fn(self, i, f, df, col):
        sem_type = self.predict_semantic_type(df, col)  # predict semantic type of the column col of the dataframe df, using the loaded classifier
        self.m['sem_type: ' + sem_type].append(i)   # append, to the inverse index m[sem_type], the index id uniquely pointing to (f, col) tuple
        return

In [196]:
%%time
# Create the graph
g = LinkGraph(LOCATION,[
        TableLinker(),
        NameLinker(),
#         DataLinker(),
        SemanticLinker(model_path="rf_model.pkl")
    ],
    100000 # chunksize
)

print(
    "all nodes (columns + linkers):", len(g.graph.nodes()),
    ", column nodes:", len(g.index),
    ", edges:", len(g.graph.edges())
)
print("done")

Loading semantic classifier from rf_model.pkl
Loaded semantic classifier:
 RandomForestClassifier(bootstrap=True, compute_importances=None,
            criterion='gini', max_depth=None, max_features='auto',
            max_leaf_nodes=None, min_density=None, min_samples_leaf=1,
            min_samples_split=2, n_estimators=500, n_jobs=4,
            oob_score=False, random_state=1234567, verbose=0) 



dirpath =  ../feature_engineering/repo/council_spending_data/data/

file  .DS_Store
Error: No columns to parse from file

file  schema_10__council__royal-borough-of-greenwich__april_2012_payments_over_500_csv.csv
column Supplier : semantic type is expense_supplier_name
column Payment Date : semantic type is expense_payment_date
column Amount : semantic type is expense_amount_paid

file  schema_10__council__royal-borough-of-greenwich__april_2012_payments_over_and_pound500_csv.csv
column Supplier : semantic type is expense_supplier_name
column Payment Date : semantic type is expense_payment

In [192]:
print("CLUSTERS: ")
cc = nx.connected_components(g.graph)   # generator of lists of nodes for all connected components in the graph
i = 0
for x in cc:
    i += 1
    if len(x) < 10:
        print("cluster size (nodes):", len(x), x)
    else:
        print("cluster size (nodes):", len(x), '...')
print("Total clusters: ", i)   # clusters are defined as connected components of the graph

lengths = []
path_hops = []
nodes = list(g.index.keys())   # list of all nodes (their indices) in the graph
for _ in range(CONNECTION_COUNT):
    print()
    src = random.choice(nodes)   # randomly choose a source node
    dst = random.choice(nodes)   # randomly choose a destination node

    print("")
    print("Attempting to connect '{} ({})' to '{} ({})'".format(
        g.index[src][1],
        src,
        g.index[dst][1],
        dst)
    )
    print("==========")
    try:
        path = nx.dijkstra_path(g.graph, src, dst)
        path_length = nx.dijkstra_path_length(g.graph, src, dst)
        lengths.append(path_length)
        path_hops.append(len(path)-1)

        print("path:", path)
        print("Path Length:", path_length)
        print("")

        for node in path:
            if node in g.index:
                print("NODE:", node, ":", g.index[node])
                if node != path[-1]:
                    print(" | ")
            else:
                print("BY: ", node)
                print(" | ")

    except Exception as e:
        print(e)

print("===========")
print("")
print("MIN path length:", min(lengths))
print("MAX path length:", max(lengths))
print("AVG path length:", sum(lengths) / len(lengths))
print("MIN path hops:", min(path_hops))
print("MAX path hops:", max(path_hops))
print("AVG path hops:", sum(path_hops) / len(path_hops))


CLUSTERS: 
cluster size (nodes): 4972 ...
Total clusters:  1


Attempting to connect 'Authority (2719)' to 'BVACOP (2810)'
path: [2719, 'schema_32__council__hampshire-county-council__payments-may2010.csv', 2722, 'expense type', 2808, 'schema_34__council__buckinghamshire-county-council__aug10.csv', 2810]
Path Length: 0.6

NODE: 2719 : ('schema_32__council__hampshire-county-council__payments-may2010.csv', 'Authority')
 | 
BY:  schema_32__council__hampshire-county-council__payments-may2010.csv
 | 
NODE: 2722 : ('schema_32__council__hampshire-county-council__payments-may2010.csv', 'Expense Type')
 | 
BY:  expense type
 | 
NODE: 2808 : ('schema_34__council__buckinghamshire-county-council__aug10.csv', 'Expense type')
 | 
BY:  schema_34__council__buckinghamshire-county-council__aug10.csv
 | 
NODE: 2810 : ('schema_34__council__buckinghamshire-county-council__aug10.csv', 'BVACOP')


Attempting to connect 'Body Name (398)' to 'SPEND (2796)'
path: [398, 'schema_15__council__cheltenham-borough-cou