-
Notifications
You must be signed in to change notification settings - Fork 0
/
FirebaseService.py
53 lines (43 loc) 路 1.39 KB
/
FirebaseService.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from typing import TypedDict, Union, Optional
from firebase import firebase
from dotenv import load_dotenv
from datetime import datetime
import os
import uuid
import pytz
# Load environment variables
load_dotenv()
FIREBASE_DATABASE_URL = os.environ.get("FIREBASE_DATABASE_URL")
if not FIREBASE_DATABASE_URL:
raise Exception("Firebase database URL not found in .env file")
# Initialize Firebase
firebase_app = firebase.FirebaseApplication(FIREBASE_DATABASE_URL, None)
class FirebaseMessage(TypedDict):
isSentByBot: bool
text: str
sentAt: str # ISO 8601
id: Union[str, int]
def email_to_key(email: str):
return email.replace('.', '_')
def save_token(user_id, token):
result = firebase_app.put(f'/userTokens/{user_id}', 'token', token)
return result
def get_token(user_id):
result = firebase_app.get(f'/userTokens/{user_id}', 'token')
return result or {}
def write_chat_message(user_email: str, message: str, callback_str: Optional[str]):
email = email_to_key(user_email)
print(email)
id = str(uuid.uuid4())
print(id)
message: FirebaseMessage = {
"id": id,
"text": message,
"isSentByBot": True,
"sentAt": datetime.now(tz=pytz.timezone("America/New_York")).isoformat(),
"callback": callback_str
}
print(message)
result = firebase_app.post(f"/chats/{email}", message)
print(result)
return result is not None