#### 1. Installing the required python packages

In [0]:
!python -m pip install --upgrade pip

In [0]:
!pip install -r requirements38.txt

In [0]:
!pip install azure-storage-file-datalake
!pip install adlfs
!pip install fsspec

#### 2. Importing the required python packages

In [0]:
from datetime import datetime
import os
import sys
import time
import pickle

import numpy as np
import pandas as pd
import json
import traceback
import ast

from sherlock.functional import extract_features_to_csv
from sherlock.helpers import download_data
from sherlock.features.paragraph_vectors import initialise_pretrained_model, initialise_nltk
from sherlock.features.preprocessing import (extract_features,convert_string_lists_to_lists,prepare_feature_extraction,load_parquet_values)
from sherlock.features.word_embeddings import initialise_word_embeddings

#### 3. Loading the required model objects for the feature creation

In [0]:
par_vec_path = '/dbfs/'

# prepare_feature_extraction()
initialise_word_embeddings()
initialise_pretrained_model(path=par_vec_path,dim=400)
initialise_nltk()

#### 4. Connecting to the ADLS for reading all the files for feature creation

In [0]:
import os, uuid, sys
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings
from azure.storage.blob import BlobServiceClient

In [0]:
def initialize_storage_account(storage_account_name, storage_account_key):
    
    try:  
        global service_client

        service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
            "https", storage_account_name), credential=storage_account_key)
    
    except Exception as e:
        print(e)

In [0]:
def list_directory_contents(container_name,my_dir):
    file_list = []
    try:
        
        file_system_client = service_client.get_file_system_client(file_system=container_name)

        paths = file_system_client.get_paths(path=my_dir)

        for path in paths:
            file_list.append(path.name)

    except Exception as e:
     print(e)
    
    return file_list

#### 5. Credentials to connect to the Azure Data Storage

In [0]:
storage_account = "legoaistorage"
storage_account_key = "vOHAjE9vOHaxqmTRxIYETQbYlPvvFpJQ7xfky8tuWBRE9E6IbfM87ERkGcqqiHfMHs+WnEt907r6+AStjIYXlA=="
initialize_storage_account(storage_account,storage_account_key)

In [0]:
### Spark Configuration
spark.conf.set("fs.azure.account.key."+ storage_account +".dfs.core.windows.net", storage_account_key)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

#### 6. Parsing and extracting file information

In [0]:
connect_str = 'DefaultEndpointsProtocol=https;AccountName={};AccountKey={}'.format(storage_account,storage_account_key)

In [0]:
### Getting the data and other required information from each source
container_name = 'datascience-dataset'
data_content = list_directory_contents(container_name,'Source_Data')

#### 7. Iterating through the data and get all the required jsons file info

In [0]:
### Subset data which is of json file format
json_data_content = [content for content in data_content if content.endswith('.json')]

json_meta = []
for cont in json_data_content:
    json_meta.append([cont.split('/')[1],cont.split('/')[-3],cont.split('/')[-2],cont.split('/')[-1],cont])
    
json_meta_df =  pd.DataFrame(json_meta,columns= ['source','location','reponame','filename','filepath'])
json_meta_df = json_meta_df.reset_index(drop=True)

In [0]:
json_meta_df = json_meta_df[(json_meta_df['source']=='swastik')].reset_index(drop=True)
# json_meta_df = json_meta_df[json_meta_df['filepath'].str.contains('medium')].reset_index(drop=True)
# json_meta_df = json_meta_df[(json_meta_df['source']=='dipanjan')&(json_meta_df['reponame']=='Basketball_men')].reset_index(drop=True)
json_meta_df.shape

In [0]:
def data_conversion(filename,filepath):
    
    ## Identify the file extension
    file_extension = filename.rsplit('.',1)[1]
    full_filepath = "abfss://datascience-dataset@legoaistorage.dfs.core.windows.net/"+filepath
    
    ## Read the data based on extension
    if file_extension == 'json':
        data_dict = spark.read.option("multiline", "true").json(full_filepath)
        data = data_dict.toPandas()

    elif file_extension == 'parquet':
        data = spark.read.parquet(full_filepath)
            
    elif file_extension == 'csv':
        data = spark.read.format("csv").option("mode", "PERMISSIVE").load(full_filepath)
        
    elif file_extension == 'txt':
        data = spark.read.text(full_filepath)
    
    else:
        data = pd.DataFrame(columns=['id','table_name','column_name','values']) 
    
    return data

In [0]:
def meta_information_check(data_df,filename,reponame):
    
    ### ID creation for each df
    if 'id' in data_df.columns:
        
        if data_df['id'].nunique() != data_df.shape[0]:
            data_df['id'] = [i for i in range(len(data_df))]        
            
    elif 'column_id' in data_df.columns:
        data_df['id'] = data_df['column_id']
    else:
        data_df = data_df.reset_index(drop=True).reset_index()
        data_df = data_df.rename(columns={'index':'id'})
    
    #### Mandatory column checks
    if 'column_name' not in data_df.columns:
        
        if 'type' in data_df.columns:
            data_df['column_name'] = data_df['type']
        else:
            data_df['column_name'] = ''   ### Need to check with             
    
    if 'table_name' not in data_df.columns:
        data_df['table_name'] = filename.rsplit('.',1)[0]
    
    data_df = data_df.rename(columns={'column_values':'values','value':'values'})
    
    data_df['values'] = data_df.apply(lambda x: list(x['values']),axis=1)
    print('Unique Type of values:', list(set([type(val) for val in data_df['values'].tolist()])))
    
    assert list(set([type(val) for val in data_df['values'].tolist()]))[0] == list
    
    if reponame == 'swastik':
        data_df['repo_name'] = data_df['dataset_name']
    else:
        data_df['repo_name'] = reponame
        
    data_df['master_id'] = data_df.apply(lambda x: x['repo_name']+'$$##$$'+x['table_name']+'$$##$$'+x['column_name'],axis=1)    
    assert data_df['master_id'].nunique()==data_df.shape[0]

    return data_df

#### 8. Storing the Information to Azure Blob Storage

In [0]:
### Blob storage based configurations
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_client = blob_service_client.get_container_client('datascience-dataset/Features')

In [0]:
def save_df_to_blob(df,output_file):

    try:
        output = df.to_csv(index=False, encoding = "utf-8")
        
        # Instantiate a new BlobClient
        blob_client = container_client.get_blob_client(output_file)
        
        # upload data
        blob_client.upload_blob(output, blob_type="BlockBlob")
        
        return 1
    
    except:
        return 0

In [0]:
for row in range(1): #len(json_meta_df)): 
    
    status = 0
    try:
        
        print('Feature Creation Started!!')
        ## Required variables
        reponame,filename,filepath = json_meta_df[['reponame','filename','filepath']].iloc[row].tolist()

        ### Extracting data from json    
        json_df = data_conversion(filename,filepath)

        if json_df.shape[0] ==0:
            continue
        
        ### Extracting meta data    
        meta_json_df = meta_information_check(json_df,filename,reponame)
        print('Meta Data Row Count: ',meta_json_df.shape)

        ### Extracting features from data
#         df = extract_features_to_csv(meta_json_df)

#         alphanumeric_feats = df[df['master_id'].isin(meta_json_df['master_id'])]

        ### Saving the output to the folder
#         full_filepath = filename.replace('.json','.csv').replace('.parquet','.csv').replace('.xlsx','.csv').replace('.txt','.csv').replace('.csv','_feats.csv')
        
#         status = save_df_to_blob(df,full_filepath) 
        
    except Exception as e:
        print(traceback.format_exc())
        print(e)

    print('Feature Creation Completed!!')
    print(row,status)

#### 9. Iterating through the data and combine features

In [0]:
### Getting the data and other required information from each source
container_name = 'datascience-dataset'
data_content = list_directory_contents(container_name,'Features')

In [0]:
features_df = pd.DataFrame()
for i,filename in enumerate(data_content[0:10]):
    full_filepath = "abfss://datascience-dataset@legoaistorage.dfs.core.windows.net/"+filename
    feats_csv = spark.read.format("csv").option("header","false").load(full_filepath)
    feats_pandas = feats_csv.toPandas()
    feats_pandas.columns = feats_pandas.iloc[0]
    feats_pandas = feats_pandas.drop(feats_pandas.index[0])
    print(filename)
    feats_pandas['file_name'] = filename.split('/')[1].replace('.csv','')
    feats_pandas['repo_name'] = json_meta_df[json_meta_df['filename']==filename.split('/')[1].replace('_feats.csv','.json')]['reponame'].tolist()[0]
    assert feats_pandas.shape[0] == feats_csv.count()-1
    features_df = pd.concat([features_df,feats_pandas])
    print(len(data_content)-i)

In [0]:
features_df['master_id'] = features_df.apply(lambda x: x['repo_name']+'$$##$$'+x['table_name']+'$$##$$'+x['column_name'],axis=1)
# features_df

In [0]:
def generate_id(id,name):
    if name == 'sherlock_data_feats':
        if id.startswith('ID_'):
            return id
        else:
            return 'SH_' + str(id)
    elif name == 'web_data_common_feats':
        return 'WDC_' + str(id)
    else:
        return id

In [0]:
features_df['column_id'] = features_df.apply(lambda x: generate_id(x['id'],x['file_name']),axis=1)

In [0]:
### Additional new features 
samp_pop_cols = ['uniq_values_sample','uniq_values_population','table_sample','table_population']
features_df[samp_pop_cols] = features_df[samp_pop_cols].astype("float32")

In [0]:
### Additional new features 
features_df['uniq_samp_pop_ratio'] = features_df.apply(lambda x: x['uniq_values_sample']/x['uniq_values_population'] if x['uniq_values_population']>0 else 0,axis=1)
features_df['samp_pop_ratio'] = features_df.apply(lambda x: x['table_sample']/x['table_population'] if x['table_population']>0 else 0,axis=1)

In [0]:
### Blob storage based configurations
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_client = blob_service_client.get_container_client('datascience-dataset/Model_Data')
filepath = "model_feats_data.csv"
save_df_to_blob(features_df,filepath)

#### 10. Evaluating Execution Performance

In [0]:
features_df['execution_time'] = pd.to_timedelta(features_df['execution_time'])
features_df = features_df.reset_index(drop=True)
features_df['execution_seconds'] = features_df['execution_time']/np.timedelta64(1, 's')

In [0]:
features_df['table_sample'] = features_df['table_sample'].astype(int)

In [0]:
exec_info = features_df.groupby(['dataset_name']).agg({'table_name':'nunique','master_id':'nunique','table_sample':np.mean,'execution_seconds':np.mean}).reset_index()
exec_info.columns = ['dataset_name','total_table','total_column','average_rows','execution_time_seconds']
exec_info['average_rows'] = exec_info['average_rows'].round(2)
exec_info['execution_time_seconds'] = exec_info['execution_time_seconds'].round(2)

In [0]:
exec_info = exec_info.sort_values(by='execution_time_seconds',ascending=False)
exec_info = exec_info.reset_index(drop=True)

In [0]:
### Blob storage based configurations
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_client = blob_service_client.get_container_client('datascience-dataset/Model_Data')
filepath = "feature_execution_data.csv"
save_df_to_blob(exec_info,filepath)

#### Appendix

In [0]:
import nltk
from nltk.corpus import stopwords
from gensim.models.doc2vec import Doc2Vec, TaggedDocument

STOPWORDS_ENGLISH = stopwords.words("english")

def tokenise(values):
    joined = " ".join(s for s in values if len(s) >= 1)

    # stopwords need apostrophe
    filtered = "".join(
        e for e in joined if e.isalnum() or e.isspace() or e == "'"
    ).lower()

    return [
        word
        for word in nltk.word_tokenize(filtered)
        if len(word) >= 1 and word not in STOPWORDS_ENGLISH
    ]

In [0]:
filename = f"/dbfs/par_vec_trained_model_400.pkl"        
model = Doc2Vec.load(filename)

In [0]:
model.random.seed(13)
col_values = meta_json_df[meta_json_df['id']=='nov2019_deliveries_wide_runs_11']['values'].tolist()[0]
tokens = tokenise(col_values)
inferred = model.infer_vector(tokens, steps=20, alpha=0.025)

In [0]:
def alphaAndNumericMatch(value):
    
    value = str(value)
    charCount = len(re.findall(string = value,pattern='[a-zA-Z]'))
    numCount = len(re.findall(string = value,pattern='\d'))
    specialCharCount = len(re.findall(string=value,pattern='[!#&\'()*+-/:;<=>?@[\\]^_`{|}~]'))

    if (charCount >0 or specialCharCount) and numCount>0:
        return 'alphanumeric'
    elif numCount > 0:
        return 'numeric'
    elif charCount > 0:
        return 'alpha'
    else:
        return 'others'


#### Appendix

In [0]:
import re
from collections import Counter

In [0]:
ignoreList = ['#na','#n/a','na','n/a','none','nan','blank','blanks','nil','n.a.','n.a',
             '"#na"','"#n/a"','"na"','"n/a"','"none"','"nan"','"blank"','"blanks"','"nil"','"n.a."','"n.a"',
             "'#na'","'#n/a'","'na'","'n/a'","'none'","'nan'","'blank'","'blanks'","'nil'","'n.a.'","'n.a'"]

In [0]:
### Reading the features data for the modelling purpose
prefix_path = "abfss://datascience-dataset@legoaistorage.dfs.core.windows.net/"
filepath = 'Model_Data/model_feats_data.csv'
filepath_upd = prefix_path + filepath
feats_data_spark = spark.read.format("csv").option("header","false").load(filepath_upd)
feats_data = feats_data_spark.toPandas()
feats_data.columns = feats_data.iloc[0]
feats_data = feats_data.drop(feats_data.index[0])

In [0]:
# import nltk
# nltk.download('words')

from nltk.corpus import words
words_list = []

for value in words.words():
    if len(value)>2:
        words_list.append(value.lower())

In [0]:
def normalise_whitespace(data):
    if isinstance(data, str):
        return re.sub(r"\s{2,}", " ", data.strip())
    else:
        return data

In [0]:
def normalise_string_whitespace(col_values, dataset_name, table_name, column_name):

    normalized_values = list(map(normalise_whitespace, col_values))
    
    ### Removing the table and column name from values ## Added to remove features list
    normalized_values = [val for val in normalized_values if str(val).lower() not in [dataset_name.lower() ,table_name.lower(),column_name.lower()]]
    
    return normalized_values

In [0]:
#### Remove ASCII Characters from the data
def removeASCII(strs):
    return ''.join([char for word in str(strs) for char in word if ord(char)<128])

In [0]:
def remove_table_column_name(values,dataset_name,table_name,column_name):
    return [val for val in values if str(val).lower() not in [dataset_name.lower(), table_name.lower(), column_name.lower()]]    

In [0]:
def additional_processing(value):
    
    #print('Additional Processing:',value)
    if value is None or pd.isnull(value) or str(value).lower() in ignoreList:
      return_val = ''
    else:
      value = str(value).replace('\xa0',' ').strip()
      return_val = removeASCII(value)

    return return_val

In [0]:
### Create alphanumeric features based on the dictionary
sub_feats_data = feats_data[['master_id','alphaRatio','numericRatio']]
sub_feats_data[['alphaRatio','numericRatio']] = sub_feats_data[['alphaRatio','numericRatio']].astype(float)
merged_feats_data = pd.merge(meta_json_df,sub_feats_data,on='master_id')

In [0]:
def data_cleaning(col_values,dataset_name,table_name,column_name):
    removed_cols = remove_table_column_name(col_values,dataset_name,table_name,column_name)
    normalized_list = normalise_string_whitespace(removed_cols,dataset_name, table_name, column_name)
    cleaned_values = list(map(additional_processing,normalized_list))
    cleaned_values_notnull = [val for val in cleaned_values if len(val)>1]
    return cleaned_values_notnull

In [0]:
def lexical_matching(col_values,dataset_name,table_name,column_name):
    
    final_score = []
    
    cleaned_data = data_cleaning(col_values,dataset_name,table_name,column_name)
    cleaned_data = [data for data in cleaned_data if len(data)>2]
    
    if len(cleaned_data) == 0:
        return 0
    else:
        words_freq = Counter(cleaned_data)
        
        for search_word in words_freq.keys():
            score = sum([1 if search_word.lower() in word.lower() else 0 for word in set(words_list)])
            bool_score = 1 if score>0 else 0
            final_score.append(bool_score * words_freq[search_word])
        return sum(final_score)/len(cleaned_data)

In [0]:
def alphanum_flag_creation(values, alpha, numeric, dataset_name, table, column):
    
    if alpha == 0:
        return 0
    elif numeric == 0:
        return 1
    else:
        return lexical_matching(values,dataset_name,table,column)

In [0]:
merged_feats_data['alphanumeric_dict_flag'] = merged_feats_data.apply(lambda x: alphanum_flag_creation(x ['values'],x['alphaRatio'],x['numericRatio'],x['dataset_name'],x['table_name'],x['column_name']),axis=1)

In [0]:
# merged_feats_data['alphanumeric_dict_flag'].value_counts()

In [0]:
#337, 338, 342,352

In [0]:
#