In [1]:
import os, sys, pandas as pd, numpy as np, string, json, math
from pandas.io.json import json_normalize
from random import randrange, shuffle
from elasticsearch import Elasticsearch

In [2]:
DATA_FOLDER = "../data/"

REPORTS_FOLDER = "../data/outputs/synthetic_reports/"

In [3]:
metrics_catalog_df = pd.read_csv("{}metrics_catalog_synthetic_v1_19feb20.csv".format(DATA_FOLDER + "outputs/"))

col_met_mapping = pd.read_csv("{}col_met_mapping_v1_19feb20.csv".format(DATA_FOLDER + "outputs/"))

owners_df = pd.read_csv("{}rep_owner_mapping_v1_19feb20.csv".format(DATA_FOLDER + "outputs/"))

users_df = pd.read_csv("{}rep_users_mapping_v1_19feb20.csv".format(DATA_FOLDER + "outputs/"))

In [4]:
class ElasticSearchIndexer():
    
    
    """
    1. Search and fetch all/new reports from sharepoint
    2. Fetch/create updated metric-report mapping
    3. Prepare indexing data
    4. Connect/Fetch connecter object to ElasticSearch
    5. Run index
    """
    
    
    REPORTS_FOLDER = "../data/outputs/synthetic_reports/"
    
    OUTPUT_FOLDER = "../data/outputs/"    
    
    COL_MET_MAP_FNAME = "{}col_met_mapping_v1_19feb20.csv".format(OUTPUT_FOLDER)
    
    def __init__(self, 
                 es_connector_obj,
                es_index_name = "test",
                 es_doctype = "metric_report_mapping"
                ):
        
        self.ES_CONNECTOR = es_connector_obj
        
        self.ES_INDEX_NAME = es_index_name
        
        self.ES_DOCTYPE = es_doctype
        
        
        return
    
    
    def run_index(self):
        
        """
        Main function
        Runs all individual components
        """
        
        self.report_names = self.fetch_report_names()
        
        self.met_rep_map = self.fetch_metric_report_map()
        
        self.index_data = self.prepare_indexing_data()
        
        self.ES_CONNECTOR = self.connect_es()
        
        self.index()
        
        print("Indexing complete")
        
        return
    
    def fetch_report_names(self):
        """
        To be updated after connecting to sharepoint
        
        Just reads filenames from directory for now.        
        """
        print("fetching report names.....")
        
        report_names = os.listdir(self.REPORTS_FOLDER)
        
        
        
        return report_names
    
    def _get_rep_colnames(self):
        
        """
        Fetches report and its column names from reports folder.
        
        To be updated/replaced with relevant functionality from SharePoint API
        """
        
        report_names = os.listdir(self.REPORTS_FOLDER)
        
        colname_df = pd.DataFrame(columns=["repname", "colname"])
    
        for rep in report_names:
            
            rep_path = os.path.join(self.REPORTS_FOLDER, rep)
            
            colnames = pd.read_excel(rep_path, nrows=0).columns.tolist()
            
            temp_colname_df = pd.DataFrame({"colname":colnames})
            
            temp_colname_df["repname"] = rep
            
            colname_df = pd.concat([temp_colname_df, colname_df])
        
        return colname_df
    
    def fetch_metric_report_map(self):
        """
        To be updated after connecting to sharepoint
        
        For now creates the metric report map data from col-metric mapping file
        """
        
        print("fetching/creating metric-report mapping file.....")
        
        col_met_map = pd.read_csv(self.COL_MET_MAP_FNAME)
        
        rep_colnames_df = self._get_rep_colnames()
        
        met_rep_map = rep_colnames_df.merge(col_met_map, left_on="colname", right_on="column_names", how="left")
        
        met_rep_map.drop_duplicates(subset=["repname", "metric_names", "colname"], inplace=True)
        
        
        
        return met_rep_map[["metric_names", "repname", "colname"]]
    
    def prepare_indexing_data(self):
        
        """
        Preprocessing data to be indexed
        """
        
        print("pre-processing data to be indexed....")
        
        met_rep_map = self.met_rep_map.copy()
        
        # There must not be any missing values while indexing        
        met_rep_map.fillna("<missing>", inplace=True)
        
        return met_rep_map
    
    def connect_es(self):
        
        print("establishing connection to ES.....")
        
        return self.ES_CONNECTOR
    
    
    def _mapping_df_generator(self, df):
        
        """
        Yields a generator that iterates through metric-report name mapping file
        """
        
        df_iter = df.iterrows()
        
        for index, document in df_iter:
            
            
            yield {
                    "_index": self.ES_INDEX_NAME,
                
                    "_doc": self.ES_DOCTYPE,
                
                    "_id" : f"{index}",
                
                    "_source": document.to_dict(),
                
                }
            
        raise StopIteration
    
    def create_index_with_mappings_settings(self):
        
        """
        Creates an empty index with settings and mappings
        
        Custom underscore tokenizers has been used
        """
        
        index_map_setts = \
                    { 
                       'mappings':{ 
                          'properties':{ 
                             'colname':{ 
                                'type':'text',
                                "analyzer":"metric_names_analyzer",# this is the analyzer with underscore tokenizer
                                "search_analyzer":"standard"#while search time, just use normal tokenizer
                             },
                             'metric_names':{ 
                                'type':'text',
                                "analyzer":"metric_names_analyzer",
                                "search_analyzer":"standard"
                             },
                             'repname':{ 
                                'type':'text',
                                "analyzer":"metric_names_analyzer",
                                "search_analyzer":"standard"
                             },

                          }
                       },
                       "settings":{ 
                          "analysis":{ 
                             "analyzer":{ 
                                "metric_names_analyzer":{ #Analyzer to hold underscore tokenizer
                                   "tokenizer":"underscore_tokenizer"
                                }
                             },
                             "tokenizer":{ 
                                "underscore_tokenizer":{ #Underscore tokenizer is defined here
                                   "type":"pattern",
                                   "pattern":"_"
                                }
                             }
                          }
                       }
                    }


        if not self.ES_CONNECTOR.indices.exists(index=self.ES_INDEX_NAME):

            #Create index if it doesnt exist
            #Should you delete if an index already exists?? Skipping for now
            
            print("creating empty index with settings and mappings.....")
            
            self.ES_CONNECTOR.indices.create(index=self.ES_INDEX_NAME, body=index_map_setts)

        return
    
    def index(self):
        
        """
        Indexes data into Elastic Search
        """
        
        
        from elasticsearch import helpers
        
        mapping_generator = self._mapping_df_generator(self.index_data)
        
        self.create_index_with_mappings_settings()# Empty index is created now
        
        helpers.bulk(self.ES_CONNECTOR, mapping_generator)
        
        print("indexing all data now.....")
        
        self.ES_CONNECTOR.indices.refresh(self.ES_INDEX_NAME)
        
        print(self.ES_CONNECTOR.cat.count(self.ES_INDEX_NAME, params={"format": "json"}))
        
        return
        
        

In [5]:
esi = ElasticSearchIndexer(Elasticsearch(), es_index_name="test2")

In [6]:
esi.run_index()

fetching report names.....
fetching/creating metric-report mapping file.....
pre-processing data to be indexed....
establishing connection to ES.....
indexing all data now.....
[{'epoch': '1582210261', 'timestamp': '14:51:01', 'count': '2333'}]
Indexing complete
