-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathemployee_database.py
86 lines (67 loc) · 2.96 KB
/
employee_database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""
#
# employee_database.py contains the python program to define the database operations for the application
# This file is for defining access methods of database and operations on database
# Author Sajal Debnath <sdebnath@vmware.com><debnathsajal@gmail.com>
#
"""
# Importing the Modules and Libraries
#
import asyncio
import motor.motor_asyncio
from bson.objectid import ObjectId
MONGO_DETAILS = "mongodb://localhost:27017" # Asssuming mongodb is running in local server. Change for a remote DB server
#MONGO_DETAILS = "mongo-db-url"
client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_DETAILS) # Defining the database access client
database = client.employees_DB # Connecting to the employee_DB database. Change the database name for connecting to another database
employee_collection = database.get_collection("employees") # Getting the employees collection. Change the collection name to connect to another
# helpers
# Changing the data to python dictionary. Note, the changing of the id as string from bson ObjectId (mongodb)
def employee_helper(employee) -> dict:
return {
"id": str(employee["_id"]),
"emp_id": employee["emp_id"],
"first_name": employee["first_name"],
"last_name": employee["last_name"],
"email": employee["email"],
"ph_no": employee["ph_no"],
"home_addr": employee["home_addr"],
"st_addr": employee["st_addr"],
"gender": employee["gender"],
"job_type": employee["job_type"],
}
# Retrieve all employees present in the database
async def retrieve_employees():
employees = []
async for employee in employee_collection.find():
employees.append(employee_helper(employee))
return employees
# Add a new employee into to the database
async def add_employee(employee_data: dict) -> dict:
employee = await employee_collection.insert_one(employee_data)
new_employee = await employee_collection.find_one({"_id": employee.inserted_id})
return employee_helper(new_employee)
# Retrieve an employee with a matching employee ID
async def retrieve_employee(emp_id: int) -> dict:
employee = await employee_collection.find_one({"emp_id": emp_id})
if employee:
return employee_helper(employee)
# Update an employee with a matching employee ID
async def update_employee(emp_id: int, data: dict):
# Return false if an empty request body is sent.
if len(data) < 1:
return False
employee = await employee_collection.find_one({"emp_id": emp_id})
if employee:
updated_employee = await employee_collection.update_one(
{"emp_id": emp_id}, {"$set": data}
)
if updated_employee:
return True
return False
# Delete an employee from the database
async def delete_employee(emp_id: int):
employee = await employee_collection.find_one({"emp_id": emp_id }) # ObjectId(id)})
if employee:
await employee_collection.delete_one({"emp_id": emp_id }) #ObjectId(id)})
return True