In [3]:
from utils.account_status import AccountStatus

account_status = AccountStatus('accounts.csv')

row = account_status.get_account_info_by_plate('Y0V029')

print(row)

plate='Y0V029' ch_code='01RBA2404-4' endo_date='2024-04-03' client='ROB AUTO' car_model='2022 MITSUBISHI XPANDER CROSS AT' plate_number_normalized='Y0V029'


In [3]:
from utils.is_similar_plate import is_similar_plate

plate1 = "ABC1234"
plate2 = "ABC123"

is_similar_plate(plate1, plate2)

True

In [8]:
from lark.token_manager import TokenManager
import os
from dotenv import load_dotenv

load_dotenv()

chopper_app_id = os.getenv('CHOPPER_APP_ID')
chopper_app_secret = os.getenv('CHOPPER_APP_SECRET')

token_manager = TokenManager(app_id=chopper_app_id, app_secret=chopper_app_secret)

tenant_token = token_manager.get_tenant_access_token_sync()

In [10]:
from utils.get_group_members import get_group_members

members = get_group_members(tenant_token.tenant_access_token)

In [35]:
import requests
import json
from pydantic import BaseModel

class SendMessageDataBodyField(BaseModel):
    content: str


class SendMessageSenderField(BaseModel):
    id: str
    id_type: str
    sender_type: str
    tenant_key: str


class SendMessageDataField(BaseModel):
    body: SendMessageDataBodyField
    chat_id: str
    create_time: str
    deleted: bool
    message_id: str
    msg_type: str
    sender: SendMessageSenderField
    update_time: str
    updated: bool


class SendMessageResponse(BaseModel):
    code: int
    data: SendMessageDataField
    msg: str
    

def send_message_to_gc(tenant_token: str, message: str):
    url = "https://open.larksuite.com/open-apis/im/v1/messages"

    params = {
        "receive_id_type": "chat_id"
    }

    headers = {
        "Authorization": f"Bearer {tenant_token}"
    }

    message = {
        "receive_id": "oc_b61235e79bac3d17d9107433e8267154",
        "msg_type": "text",
        "content": json.dumps({
            "text": "test notification for buzz. kindly disregard tenk u"
        })
    }

    response = requests.post(
        url=url, 
        json=message, 
        headers=headers, 
        params=params
    )

    json_response = response.json()

    pretty_json = json.dumps(json_response, indent=4)

    print(pretty_json)

    return SendMessageResponse(**json_response)

message = send_message_to_gc(
    tenant_token=tenant_token.tenant_access_token, 
    message=""
)


    

{
    "code": 0,
    "data": {
        "body": {
            "content": "{\"text\":\"test notification for buzz. kindly disregard tenk u\"}"
        },
        "chat_id": "oc_b61235e79bac3d17d9107433e8267154",
        "create_time": "1733122720395",
        "deleted": false,
        "message_id": "om_9a515da038aa4cc956f79ff2f89de8eb",
        "msg_type": "text",
        "sender": {
            "id": "cli_a6889e23ee38500a",
            "id_type": "app_id",
            "sender_type": "app",
            "tenant_key": "12f9cd33134f1759"
        },
        "update_time": "1733122720395",
        "updated": false
    },
    "msg": "success"
}


In [None]:
from typing import List

print('chat_id', message.data.message_id)

def buzz_this_message(
    tenant_token: str,
    message_id: str,
    group_members_union_id: List[str]
):
    formatted_url = "https://open.larksuite.com/open-apis/im/v1/messages/{message_id}/urgent_app".format(message_id=message_id)

    headers = {
        "Authorization": f"Bearer {tenant_token}"
    }

    params = {
        "user_id_type": "union_id"
    }

    body = {
        "user_id_list": group_members_union_id
    }
    
    response = requests.patch(
        url=formatted_url, 
        json=body, 
        headers=headers, 
        params=params
    )


members = get_group_members(
    tenant_access_token=tenant_token.tenant_access_token,
)

members = [member.member_id for member in members.data.items]

buzz_this_message(
    tenant_token=tenant_token.tenant_access_token,
    message_id=message.data.message_id,
    group_members_union_id=members
)

chat_id om_0bb2f940bc3383fde466424fc7d1dd31
https://open.larksuite.com/open-apis/im/v1/messages/om_0bb2f940bc3383fde466424fc7d1dd31/urgent_app
b'{"code":0,"data":{"invalid_user_id_list":[]},"msg":"success"}'


In [31]:
import requests
import os
from typing import Literal, Optional
from pydantic import BaseModel


class UploadImageMessageDataField(BaseModel):
    image_key: str


class UploadImageMessageResponse(BaseModel):
    code: int
    msg: str
    data: Optional[UploadImageMessageDataField] = None


MAX_SIZE_MB = 10

def upload_image_gc(
    tenant_token: str,
    image_path: str,
    image_type: Literal['avatar', 'message'] = 'message'
):
    url = "https://open.larksuite.com/open-apis/im/v1/images"

    # Check if file exists
    if not os.path.isfile(image_path):
        raise FileNotFoundError(f"The file '{image_path}' does not exist.")

    # Check file size
    file_size = os.path.getsize(image_path)
    if file_size > MAX_SIZE_MB * 1024 * 1024:
        raise ValueError(f"Image size exceeds {MAX_SIZE_MB}MB limit.")

     # Prepare headers
    headers = {
        "Authorization": f"Bearer {tenant_token}",
    }

    # Prepare multipart/form-data
    with open(image_path, 'rb') as image_file:
        files = {
            "image": (os.path.basename(image_path), image_file, "application/octet-stream"),
        }

        data = {
            "image_type": image_type
        }

        try:
            response = requests.post(
                url, 
                headers=headers, 
                files=files, 
                data=data
            )

            return UploadImageMessageResponse(**response.json())
        except requests.exceptions.RequestException as e:
            raise Exception(f"HTTP request failed: {e}")

In [33]:
print(upload_image_gc(
    tenant_token=tenant_token.tenant_access_token,
    image_path='uploads/6cf052b8-6409-4955-bb96-28ead346238e-image.jpg'
))

code=0 msg='success' data=UploadImageMessageDataField(image_key='img_v3_02h6_693a541a-8315-4991-987a-28052f989a9h')


In [None]:
from models.notification import Notification

row = account_status.get_similar_accounts_by_plate('DB4658')

notification_payload = Notification(
    plate_number=row[0].plate,
    accounts=row,
    file_path='test',
    is_similar=False
)



AttributeError: 'list' object has no attribute 'plate'

In [6]:
from utils.message_builder import message_builder


message_payload = message_builder(
    image_key='img_v3_02h6_693a541a-8315-4991-987a-28052f989a9h',
    data=notification_payload
)

In [None]:
from utils.send_message_to_gc import send_message_to_gc
from utils.get_group_members import get_group_members
from utils.buzz_this_message import buzz_this_message

message_response = send_message_to_gc(
    tenant_token=tenant_token.tenant_access_token,
    content=message_payload
)

members = get_group_members(
    tenant_access_token=tenant_token.tenant_access_token
)

members = [member.member_id for member in members.data.items]

buzz_this_message(
    tenant_token=tenant_token.tenant_access_token,
    message_id=message_response.data.message_id,
    group_members_union_id=members
)