In [55]:
pip install pandas





[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [56]:
pip install elasticsearch




[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip





In [57]:
from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200")
es.info().body

{'name': '6b25b00d1398',
 'cluster_name': 'docker-cluster',
 'cluster_uuid': 'OQA0xXHlSkGzK1ZGptRGEQ',
 'version': {'number': '8.7.0',
  'build_flavor': 'default',
  'build_type': 'docker',
  'build_hash': '09520b59b6bc1057340b55750186466ea715e30e',
  'build_date': '2023-03-27T16:31:09.816451435Z',
  'build_snapshot': False,
  'lucene_version': '9.5.0',
  'minimum_wire_compatibility_version': '7.17.0',
  'minimum_index_compatibility_version': '7.0.0'},
 'tagline': 'You Know, for Search'}

In [58]:
import pandas as pd

df = (
    pd.read_csv("Employee Sample Data 1.csv",encoding="latin")
    .dropna()
    .reset_index()
)

In [59]:
mappings = {
    "properties": {
        "employee_id": {"type": "keyword"},
        "full_name": {"type": "text", "analyzer": "standard"},
        "job_title": {"type": "text", "analyzer": "standard"},
        "department": {"type": "text", "analyzer": "standard"},
        "business_unit": {"type": "text", "analyzer": "standard"},
        "gender": {"type": "keyword"},
        "ethnicity": {"type": "keyword"},
        "age": {"type": "integer"},
        "hire_date": {"type": "date", "format": "M/d/yyyy||MM/dd/yyyy"},
        "annual_salary": {"type": "text"},
        "bonus_percent": {"type": "text"},
        "country": {"type": "keyword"},
        "city": {"type": "keyword"},
        "exit_date": {"type": "date", "format": "M/d/yyyy", "null_value": None}
    }
}

In [60]:
if not es.indices.exists(index="employee_data"):
    es.indices.create(index="employee_data", mappings=mappings)
for i, row in df.iterrows():
    doc = {
            "id": row["Employee ID"],
            "Full Name": row["Full Name"],
            "Job Title": row["Job Title"],
            "Department": row["Department"],
            "Business Unit": row["Business Unit"],
            "Gender": row["Gender"],
            "Ethnicity": row["Ethnicity"],
            "Age": row["Age"],
            "Hire Date": row["Hire Date"],
            "Annual Salary": row["Annual Salary"],
            "Bonus": row["Bonus %"],
            "Country": row["Country"],
            "City": row["City"]
        }
    if pd.notnull(row["Exit Date"]):
        try:
            exit_date = pd.to_datetime(row["Exit Date"], format="%m/%d/%Y")
            doc["Exit Date"] = exit_date.strftime('%Y-%m-%d')
        except ValueError:
            pass
    es.index(index="employee_data", id=i, document=doc)
es.indices.refresh(index="employee_data")
es.cat.count(index="employee_data", format="json")



ListApiResponse([{'epoch': '1727973219', 'timestamp': '16:33:39', 'count': '102'}])

In [61]:
resp = es.search(
    index="employee_data",
    query={
        "bool": {
            "must": {
                "match_phrase": {
                    "Job Title": "Network Administrator"
                }
            },
            "filter": {
                "bool": {
                    "must": {
                        "match": {"City": "Phoenix"}
                    },
                    "must_not": {
                        "match_phrase": {"Department": "Human Resources"}
                    }
                }
            }
        }
    }
)

In [62]:
for hit in resp['hits']['hits']:
    print(hit["_score"])

5.643914


In [63]:
def createCollection(p_collection_name):
    mappings = {
        "properties": {
            "employee_id": {"type": "keyword"},
            "full_name": {"type": "text", "analyzer": "standard"},
            "job_title": {"type": "text", "analyzer": "standard"},
            "department": {"type": "text", "analyzer": "standard"},
            "business_unit": {"type": "text", "analyzer": "standard"},
            "gender": {"type": "keyword"},
            "ethnicity": {"type": "keyword"},
            "age": {"type": "integer"},
            "hire_date": {"type": "date", "format": "M/d/yyyy||MM/dd/yyyy"},
            "annual_salary": {"type": "text"},
            "bonus_percent": {"type": "text"},
            "country": {"type": "keyword"},
            "city": {"type": "keyword"},
            "exit_date": {"type": "date", "format": "M/d/yyyy", "null_value": None}
        }
    }
    if not es.indices.exists(index=p_collection_name):
        es.indices.create(index=p_collection_name, mappings=mappings)
        print(f"Collection '{p_collection_name}' created successfully.")
    else:
        print(f"Collection '{p_collection_name}' already exists.")

In [64]:
def searchByColumn(p_collection_name, p_column_name, p_column_value):
    query = {
        "match": {
            p_column_name: p_column_value
        }
    }
    resp = es.search(index=p_collection_name, query=query)
    print(f"Search results for {p_column_name}='{p_column_value}':")

#Print the raw response to debug if needed
    print(resp)

    if resp['hits']['total']['value'] > 0:
        for hit in resp['hits']['hits']:
            print(hit["_source"])
    else:
        print(f"No records found for {p_column_name}='{p_column_value}' in collection '{p_collection_name}'")

In [65]:
def indexData(p_collection_name, p_exclude_column):
    for i, row in df.iterrows():
        doc = {
            "employee_id": row["Employee ID"],
            "full_name": row["Full Name"],
            "job_title": row["Job Title"],
            "department": row["Department"],
            "business_unit": row["Business Unit"],
            "gender": row["Gender"],
            "ethnicity": row["Ethnicity"],
            "age": row["Age"],
            "hire_date": row["Hire Date"],
            "annual_salary": row["Annual Salary"],
            "bonus_percent": row["Bonus %"],
            "country": row["Country"],
            "city": row["City"]
        }

        # Handle Exit Date if not null
        if pd.notnull(row["Exit Date"]):
            try:
                exit_date = pd.to_datetime(row["Exit Date"], format="%m/%d/%Y")
                doc["exit_date"] = exit_date.strftime('%Y-%m-%d')
            except ValueError:
                pass

        # Ensure age is numeric and handle missing values
        try:
            doc["age"] = int(row["Age"])
        except (ValueError, TypeError):
            doc["age"] = None  # Provide a default value if necessary

        # Handle hire date parsing errors
        try:
            doc["hire_date"] = pd.to_datetime(row["Hire Date"], format="%m/%d/%Y").strftime('%Y-%m-%d')
        except ValueError:
            doc["hire_date"] = None  # Handle invalid date format

        # Exclude the specified column
        if p_exclude_column in doc:
            del doc[p_exclude_column]

        # Print the document being indexed
        print(f"Indexing document {i}: {doc}")

        # Index the document into Elasticsearch
        try:
            es.index(index=p_collection_name, id=i, document=doc)
        except Exception as e:
            pass

    # Refresh the Elasticsearch index after indexing the data
    es.indices.refresh(index=p_collection_name)


In [66]:
def searchByColumn(p_collection_name, p_column_name, p_column_value):
    query = {
        "match": {
            p_column_name: p_column_value
        }
    }

    # Execute the search with the constructed query
    resp = es.search(index=p_collection_name, size=10, query=query)

    # Loop through and print each hit
    for hit in resp['hits']['hits']:
        print(hit["_source"])


In [67]:
def getEmpCount(p_collection_name):
    count = es.cat.count(index=p_collection_name, format="json")
    print(f"Total employees in collection '{p_collection_name}':", count[0]["count"])

In [68]:
def delEmpById(p_collection_name, p_employee_id):
    es.delete_by_query(
        index=p_collection_name,
        body={
            "query": {
                "match": {"employee_id": p_employee_id}
            }
        }
    )
    print(f"Employee with ID '{p_employee_id}' deleted from collection '{p_collection_name}'.")

In [69]:
def getDepFacet(p_collection_name):
    # Query to aggregate by department using the .keyword field
    query = {
        "size": 0,  # No documents returned, just the aggregation
        "aggs": {
            "departments": {
                "terms": {
                    "field": "department.keyword",  # Use keyword field for exact aggregation
                    "size": 10  # Top 10 departments
                }
            }
        }
    }

    # Perform the search with aggregation
    try:
        resp = es.search(index=p_collection_name, body=query)

        # Check if aggregations are present
        if 'aggregations' in resp:
            print("Department Aggregation Results:")
            for bucket in resp['aggregations']['departments']['buckets']:
                print(f"Department: {bucket['key']}, Count: {bucket['doc_count']}")
        else:
            print(f"No aggregation results found for collection: {p_collection_name}")
    except Exception as e:
        print(f"Error retrieving department aggregation: {e}")

    # Print the index mapping for the collection
    print(es.indices.get_mapping(index=p_collection_name))


In [70]:
v_nameCollection = 'hash_kowshi'
v_phoneCollection = 'hash_9483'  # Replace '1234' with the last four digits of your phone


In [71]:
# Create collections
createCollection(v_nameCollection)
createCollection(v_phoneCollection)

# Get employee count
getEmpCount(v_nameCollection)


Collection 'hash_kowshi' already exists.
Collection 'hash_9483' already exists.
Total employees in collection 'hash_kowshi': 0


In [72]:
indexData(v_nameCollection, 'Department')
indexData(v_phoneCollection, 'Gender')

Indexing document 0: {'employee_id': 'E02007', 'full_name': 'Ezra Vu', 'job_title': 'Network Administrator', 'department': 'IT', 'business_unit': 'Manufacturing', 'gender': 'Male', 'ethnicity': 'Asian', 'age': 62, 'hire_date': '2004-04-22', 'annual_salary': '$66,227 ', 'bonus_percent': '0%', 'country': 'United States', 'city': 'Phoenix', 'exit_date': '2014-02-14'}
Indexing document 1: {'employee_id': 'E02023', 'full_name': 'Lillian Lewis', 'job_title': 'Technical Architect', 'department': 'IT', 'business_unit': 'Research & Development', 'gender': 'Female', 'ethnicity': 'Black', 'age': 43, 'hire_date': '2013-08-14', 'annual_salary': '$83,323 ', 'bonus_percent': '0%', 'country': 'United States', 'city': 'Phoenix', 'exit_date': '2019-03-31'}
Indexing document 2: {'employee_id': 'E02038', 'full_name': 'Amelia Dominguez', 'job_title': 'Sr. Manager', 'department': 'Accounting', 'business_unit': 'Corporate', 'gender': 'Female', 'ethnicity': 'Latino', 'age': 31, 'hire_date': '2015-09-23', 'ann

In [73]:
delEmpById(v_nameCollection, 'E02003')

# Get employee count after deletion
getEmpCount(v_nameCollection)

Employee with ID 'E02003' deleted from collection 'hash_kowshi'.
Total employees in collection 'hash_kowshi': 0


In [74]:
searchByColumn(v_nameCollection, 'Department', 'IT')
searchByColumn(v_nameCollection, 'Gender', 'Male')
searchByColumn(v_phoneCollection, 'Department', 'IT')

In [75]:
getDepFacet(v_nameCollection)
getDepFacet(v_phoneCollection)

Department Aggregation Results:
{'hash_kowshi': {'mappings': {'properties': {'age': {'type': 'integer'}, 'annual_salary': {'type': 'text'}, 'bonus_percent': {'type': 'text'}, 'business_unit': {'type': 'text', 'analyzer': 'standard'}, 'city': {'type': 'keyword'}, 'country': {'type': 'keyword'}, 'department': {'type': 'text', 'analyzer': 'standard'}, 'employee_id': {'type': 'keyword'}, 'ethnicity': {'type': 'keyword'}, 'exit_date': {'type': 'date', 'format': 'M/d/yyyy'}, 'full_name': {'type': 'text', 'analyzer': 'standard'}, 'gender': {'type': 'keyword'}, 'hire_date': {'type': 'date', 'format': 'M/d/yyyy||MM/dd/yyyy'}, 'job_title': {'type': 'text', 'analyzer': 'standard'}}}}}
Department Aggregation Results:
{'hash_9483': {'mappings': {'properties': {'age': {'type': 'integer'}, 'annual_salary': {'type': 'text'}, 'bonus_percent': {'type': 'text'}, 'business_unit': {'type': 'text', 'analyzer': 'standard'}, 'city': {'type': 'keyword'}, 'country': {'type': 'keyword'}, 'department': {'type': '