# Prepare, merge and ingest data

This interactive notebook (see Task#2 in the workflow) performs: 
- import metadata and semantics (in CSV), 
- merge metadata and semantics into one dataframe based on filenames 
- subset the enriched metadata to simulate good quality of metadata
- ingest them to ElasticSearch index database using BulkAPI

Pre-requisite:
1. ElasticSearch installed. In this experiment, ElasticSearch (version 7.12.0) was deployed through the Docker (version 4.4.2).
2. Metadata and semantics to be merged are ready to read in CSV. In our experiment:
- metadata in JSON were formatted to CSV using Kibana, an ElasticStack tool.
- column semantics were extracted (see Task#1) and exported as a CSV file
3. The experiment were conducted with Python 3.9.7

![Workflow for the experimental setup in leveraging column semantics for data discovery.](image/workflowv2.png "Workflow")

## 1. Prepare

In [1]:
import pandas as pd
import os

### Read files containing metadata and semantics information

In [2]:
root_folder = "./data/"
metadata_df = pd.read_csv(os.path.join(root_folder, "metadata_datagov_simple.csv"))
enrich_df = pd.read_csv(os.path.join(root_folder, "enriched_all.csv")) 
display(metadata_df.head(1))
display(enrich_df.head(1))

Unnamed: 0,id,title,description,attribution,data_fields.Publisher,data_fields.tags,data.data_organization,data.data_filename
0,000201bb-1991-4165-aef5-b4f3510dc744,IDEA Part C Section 618 Table 2 Program Settin...,IDEA Part C Section 618 Table 2 Program Settin...,IDEA Part C Section 618 Table 2 Program Settin...,Office of Special Education Programs,"[""early-intervention-services"",""education-of-i...",Department of Education,31a14619b98cf1e7897bea04bf356bc2fe47d28afadeb6...


Unnamed: 0,data_filename,colSemantics,colSemantics_s10,colSemantics_s20,colSemantics_s30,colSemantics_s40,colSemantics_s50,colSemantics_s60,colSemantics_s70,colSemantics_s80,colSemantics_s90,colSemantics_s95,colSemantics_s98,colSemantics_s99,colNames,colScores,colComplete,colTypes,colLen
0,7cdef5be079976d589563498ba1801d9317588a120c481...,"['code', 'year']","['code', 'year']","['code', 'year']","['code', 'year']","['code', 'year']","['code', 'year']",['year'],['year'],[],[],[],[],[],"['IncomeCategory', ' IncomeCategoryDesc']","[0.5207, 0.7909]","[1.0, 1.0]","[dtype('O'), dtype('O')]","[3.5, 23.25]"


#### Checkpoint

In [3]:
# CHECK
metadata_df["data.data_filename"][2]

'["16c959754401f6c9db9839cf2943c4d151224114ec4f8da9271340af0b2b7a07.text.csv","a979e9bf61914df8d36afe25d40d1cd7bbcc42918776242242d144fd892bed0e.application.rdf+xml","c5deeeea9486a9fcda8c28df00850238ee0d862c71d29e105ca5e420b8350abe.application.json","95337fc401c26b3e2749c0d96ec30b62eaccf1501089e5c411b3a99a69d3a05b.text.xml"]'

## 2. Merge
merge metadata and semantics through cross-join by filename 

### Cross-join all metadata and semantics

In [4]:
metadata_df['join'] = 1
enrich_df['join'] = 1

metadata_df = metadata_df.merge(enrich_df, on='join').drop('join', axis=1)
enrich_df.drop('join', axis=1, inplace=True)

display(metadata_df.head(1))

Unnamed: 0,id,title,description,attribution,data_fields.Publisher,data_fields.tags,data.data_organization,data.data_filename,data_filename,colSemantics,...,colSemantics_s80,colSemantics_s90,colSemantics_s95,colSemantics_s98,colSemantics_s99,colNames,colScores,colComplete,colTypes,colLen
0,000201bb-1991-4165-aef5-b4f3510dc744,IDEA Part C Section 618 Table 2 Program Settin...,IDEA Part C Section 618 Table 2 Program Settin...,IDEA Part C Section 618 Table 2 Program Settin...,Office of Special Education Programs,"[""early-intervention-services"",""education-of-i...",Department of Education,31a14619b98cf1e7897bea04bf356bc2fe47d28afadeb6...,7cdef5be079976d589563498ba1801d9317588a120c481...,"['code', 'year']",...,[],[],[],[],[],"['IncomeCategory', ' IncomeCategoryDesc']","[0.5207, 0.7909]","[1.0, 1.0]","[dtype('O'), dtype('O')]","[3.5, 23.25]"


### On the cross-join table, find matching dataset filenames

In [5]:
metadata_df['match'] = metadata_df.apply(lambda x: x['data.data_filename'].find(x.data_filename), axis=1).ge(0)

#### Checkpoint: export the cross-join table with match indicator for eyeball check (find "check_cross_join.csv")

In [6]:
# CHECK for matching csv
metadata_df[metadata_df.match == True][['data.data_filename', 'data_filename']].to_csv(os.path.join(root_folder, "check_cross_join.csv"))


### Drop the rows with non-matching dataset filenames

In [7]:
metadata_df = metadata_df[metadata_df.match == True]
metadata_df.drop('match', axis=1, inplace=True)
metadata_df.head(1)

Unnamed: 0,id,title,description,attribution,data_fields.Publisher,data_fields.tags,data.data_organization,data.data_filename,data_filename,colSemantics,...,colSemantics_s80,colSemantics_s90,colSemantics_s95,colSemantics_s98,colSemantics_s99,colNames,colScores,colComplete,colTypes,colLen
957,000201bb-1991-4165-aef5-b4f3510dc744,IDEA Part C Section 618 Table 2 Program Settin...,IDEA Part C Section 618 Table 2 Program Settin...,IDEA Part C Section 618 Table 2 Program Settin...,Office of Special Education Programs,"[""early-intervention-services"",""education-of-i...",Department of Education,31a14619b98cf1e7897bea04bf356bc2fe47d28afadeb6...,31a14619b98cf1e7897bea04bf356bc2fe47d28afadeb6...,"['year', 'state', 'area', 'rank', 'area', 'ran...",...,"['area', 'rank', 'area', 'area', 'area']","['area', 'area', 'area', 'area']",[],[],[],"['Year', 'State', 'Setting', 'Birth to 1', ' 1...","[0.4587, 0.6141, 0.434, 0.4079, 0.9014, 0.8384...","[1.0, 0.9828, 0.9794, 0.9794, 0.9794, 0.9794, ...","[dtype('O'), dtype('O'), dtype('O'), dtype('O'...","[4.3162, 10.0344, 11.2268, 2.0893, 2.4502, 2.6..."


#### Checkpoint: presence of rows with same metadata_id for different dataset, and join them
Some metadata contains multiple data files. We want to have one record for one metadata ID, hence joining them.

In [8]:
# CHECK (before joining)
metadata_df.groupby("id").size().reset_index(name='counts').groupby("counts").size() #check unique rows

counts
1     2121
2       38
3        8
4       12
5        6
6        4
7        6
8        2
9        1
12       1
13       1
23       1
28       1
29       3
30       3
31       1
41       1
51       1
66       1
dtype: int64

### Group rows with same metadata ID by joining the textual information

In [9]:
metadata_df = metadata_df.astype(str)
metadata_by_id_df = metadata_df.groupby('id').agg({'title' : 'first', 
                               'description' : 'first', 
                               'attribution' : 'first',
                               'data_fields.Publisher' : 'first',
                               'data_fields.tags' : 'first',
                               'data.data_organization' : 'first',
                               'data.data_filename' : 'first',
                               'colSemantics' : ' '.join,
                               'colSemantics_s10' : ' '.join, 'colSemantics_s20' : ' '.join, 'colSemantics_s30' : ' '.join,
                               'colSemantics_s40' : ' '.join, 'colSemantics_s50' : ' '.join, 'colSemantics_s60' : ' '.join,
                               'colSemantics_s70' : ' '.join, 'colSemantics_s80' : ' '.join, 'colSemantics_s90' : ' '.join,
                               'colSemantics_s95' : ' '.join, 'colSemantics_s98' : ' '.join, 'colSemantics_s99' : ' '.join,
                               'colNames' : ' '.join,
                               'colScores' : ' '.join,
                               'colTypes' : ' '.join,
                               'colComplete' : ' '.join,
                               'colLen' : ' '.join}) 


#### Checkpoint

In [10]:
# CHECK (after joining)
metadata_by_id_df.groupby("id").size().reset_index(name='counts').groupby("counts").size() #check unique rows


counts
1    2212
dtype: int64

In [11]:
# Standardize field names
metadata_by_id_df.rename({'data_fields.Publisher': 'publisher', 
                          'data_fields.tags': 'tags',
                         'data.data_organization': 'data_organization', 
                         'data.data_filename': 'filename',
                         'colSemantics': 'col_semantics',
                         'colSemantics_s10' : 'col_semantics_s10',
                         'colSemantics_s20' : 'col_semantics_s20', 
                         'colSemantics_s30' : 'col_semantics_s30', 
                         'colSemantics_s40' : 'col_semantics_s40',
                         'colSemantics_s50' : 'col_semantics_s50', 
                         'colSemantics_s60' : 'col_semantics_s60', 
                         'colSemantics_s70' : 'col_semantics_s70',
                         'colSemantics_s80' : 'col_semantics_s80', 
                         'colSemantics_s90' : 'col_semantics_s90', 
                         'colSemantics_s95' : 'col_semantics_s95',
                         'colSemantics_s98' : 'col_semantics_s98', 
                         'colSemantics_s99' : 'col_semantics_s99',
                         'colNames': 'col_names',
                         'colTypes': 'col_types',
                         'colLen': 'col_len',
                         'colScores': 'col_scores',
                         'colComplete': 'col_completeness'}, axis=1, inplace=True)

metadata_by_id_df.columns

Index(['title', 'description', 'attribution', 'publisher', 'tags',
       'data_organization', 'filename', 'col_semantics', 'col_semantics_s10',
       'col_semantics_s20', 'col_semantics_s30', 'col_semantics_s40',
       'col_semantics_s50', 'col_semantics_s60', 'col_semantics_s70',
       'col_semantics_s80', 'col_semantics_s90', 'col_semantics_s95',
       'col_semantics_s98', 'col_semantics_s99', 'col_names', 'col_scores',
       'col_types', 'col_completeness', 'col_len'],
      dtype='object')

### clean up by removing rows with empty semantics

In [12]:
# CHECK
len(metadata_by_id_df)

2212

In [13]:
metadata_by_id_df = metadata_by_id_df[metadata_by_id_df['col_semantics'] != '[]']
len(metadata_by_id_df)

2006

## 3. Subset Metadata  (skip if using original metadata)
to simulate good quality metadata, we took a subset of metadata containing either description length or attribute length that are at least at the second quartile or higher in the population.

In [40]:
metadata_by_id_df_good = metadata_by_id_df

In [41]:
metadata_by_id_df_good['len_desc'] = metadata_by_id_df_good['description'].str.len()
metadata_by_id_df_good['len_attr'] = metadata_by_id_df_good['attribution'].str.len()

### calculate population quantiles for length of description and length of attributes

In [42]:
print(metadata_by_id_df_good.quantile([0.25, 0.5, 0.75]))

      len_desc  len_attr
0.25    126.25     175.0
0.50    299.50     189.0
0.75    691.00     212.0


In [43]:
metadata_by_id_df_good = metadata_by_id_df_good[(metadata_by_id_df_good['len_desc'] > 126) | 
                                                (metadata_by_id_df_good['len_attr'] > 175)]


#### checkpoint: number of rows before and after taking subset

In [44]:
len(metadata_by_id_df)

2006

In [45]:
len(metadata_by_id_df_good)

1864

## 4. Ingest Data

### specify index name, this will be used in ElasticSearch's Index Database

In [14]:
INDEX_NAME = 'datagov-all-csv-scores' # option for good quality subset: 'datagov-good-csv-scores'

### specify fields to ingest

In [15]:
col_list = ['title', 'description', 'attribution', 'publisher', 'tags'
            ,'data_organization', 'filename', 'col_semantics', 'col_names'
            , 'col_semantics_s10','col_semantics_s20', 'col_semantics_s30','col_semantics_s40'
            ,'col_semantics_s50', 'col_semantics_s60', 'col_semantics_s70','col_semantics_s80'
            , 'col_semantics_s90', 'col_semantics_s95','col_semantics_s98', 'col_semantics_s99'
            ,'col_names','col_types', 'col_len','col_scores', 'col_completeness']


### specify dataframe
- for original metadata, use: metadata_by_id_df
- for subset of metadata, use: metadata_by_id_df_good

In [16]:
metadata_df = metadata_by_id_df[col_list] # df with subset is "metadata_by_id_df_good [col_list]"

### setup ElasticSearch

In [18]:
from elasticsearch import Elasticsearch
es_client = Elasticsearch(
    "localhost:9200",
    http_auth=["elastic", "changeme"],
)

In [19]:
from elasticsearch.client import IndicesClient
es_index_client = IndicesClient(es_client)

In [20]:
es_client.indices.create(index = INDEX_NAME, ignore=400)

{'acknowledged': True,
 'shards_acknowledged': True,
 'index': 'datagov-all-csv-scores-test'}

### convert metadata to dictionary for ingestion

In [21]:
metadata_df['id'] = metadata_df.index
metadata_dict_df = metadata_df.to_dict('records')
metadata_dict_df[0]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  metadata_df['id'] = metadata_df.index
  metadata_dict_df = metadata_df.to_dict('records')


{'title': 'IDEA Part C Section 618 Table 2 Program Settings, 2012',
 'description': 'IDEA Part C Section 618 Table 2 Program Settings, 2012 (IDEA Part C Table 2 2012), is a study that is part of the IDEA Part C program; program data is available since 2006 at . IDEA Part C Table 2 2012 is a cross-sectional universe report that collected counts of infants and toddlers with disabilities in each state who (a) are served under IDEA Part C, (b) are served in different program settings, and (c) exit Part C because of program completion or for other reasons. States submitted the report in spreadsheet form by mail or electronically by e-mail. Key statistics produced from IDEA Part C Table 2 2012 provide insights on the demographics of children served under IDEA Part C, service settings, referral to preschool or other services, and personnel providing early intervention services.',
 'attribution': 'IDEA Part C Section 618 Table 2 Program Settings, 2012 (https://catalog.data.gov/dataset/000201bb

### define methods to convert dataframe to a generator

In [22]:
def generator(df):
    """
        Create a generator based on the metadata for ingestion to Elastic Search
        Input: 
            df: dataset to be ingested to ElasticSearch
        Output: a generator for the dataset
    """
    for c, line in enumerate(df):
        yield {
            '_index' : INDEX_NAME,
            '_type' : '_doc',
            '_id': line.get('id', None),
            '_source': {
                'title': line.get('title',''),
                'description': line.get('description',''),
                'attribution': line.get('attribution',''),
                'publisher': line.get('publisher',''),
                'tags': line.get('tags',''),
                'data_organization': line.get('data_organization',''),
                'filename': line.get('filename',''),
                'col_semantics': line.get('col_semantics',''),
                'col_semantics_s10': line.get('col_semantics_s10',''),
                'col_semantics_s20': line.get('col_semantics_s20',''),
                'col_semantics_s30': line.get('col_semantics_s30',''),
                'col_semantics_s40': line.get('col_semantics_s40',''),
                'col_semantics_s50': line.get('col_semantics_s50',''),
                'col_semantics_s60': line.get('col_semantics_s60',''),
                'col_semantics_s70': line.get('col_semantics_s70',''),
                'col_semantics_s80': line.get('col_semantics_s80',''),
                'col_semantics_s90': line.get('col_semantics_s90',''),
                'col_semantics_s95': line.get('col_semantics_s95',''),
                'col_semantics_s98': line.get('col_semantics_s98',''),
                'col_semantics_s99': line.get('col_semantics_s99',''),
                'col_names': line.get('col_names',''),
                'col_types': line.get('col_types',''),
                'col_len': line.get('col_len',''),
                'col_scores': line.get('col_scores',''),
                'col_completeness': line.get('col_completeness','')
            }
        }    
        
metadata_gen = generator(metadata_dict_df)

#### Checkpoint

In [23]:
# CHECK
next(metadata_gen)

{'_index': 'datagov-all-csv-scores-test',
 '_type': '_doc',
 '_id': '000201bb-1991-4165-aef5-b4f3510dc744',
 '_source': {'title': 'IDEA Part C Section 618 Table 2 Program Settings, 2012',
  'description': 'IDEA Part C Section 618 Table 2 Program Settings, 2012 (IDEA Part C Table 2 2012), is a study that is part of the IDEA Part C program; program data is available since 2006 at . IDEA Part C Table 2 2012 is a cross-sectional universe report that collected counts of infants and toddlers with disabilities in each state who (a) are served under IDEA Part C, (b) are served in different program settings, and (c) exit Part C because of program completion or for other reasons. States submitted the report in spreadsheet form by mail or electronically by e-mail. Key statistics produced from IDEA Part C Table 2 2012 provide insights on the demographics of children served under IDEA Part C, service settings, referral to preschool or other services, and personnel providing early intervention servi

### start ingestion with the use of BulkAPI and generator

In [24]:
from elasticsearch import helpers

try:
    res = helpers.bulk(es_client, generator(metadata_dict_df))
    print ('working')
    
except Exception as e:
    print(e)




working
