-
Notifications
You must be signed in to change notification settings - Fork 1
/
mblog_api.py
121 lines (107 loc) · 4.23 KB
/
mblog_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# -*- coding: utf-8 -*-
import os
import urllib3
urllib3.disable_warnings()
import sqlite3
import requests
from uuid import uuid1
from config import TELEGRAM_TOKEN, MBlog_Backend_URL, MBlog_TOKEN, Visibility, proxies
from my_logger import logger, log
# 初始化sqlite3数据库
db = sqlite3.connect("data.db")
cursor = db.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
if not tables:
cursor.execute("create table tg_token(id int auto_increment primary key,chat_id varchar(32) UNIQUE, mblog_backend varchar(128), mblog_token varchar(512), visit varchar(12), create_time timestamp default CURRENT_TIMESTAMP)")
db.commit()
base_url = MBlog_Backend_URL + "/api"
@log
def send_message(chat_id, text):
BASE_URL = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}"
data = {"chat_id": chat_id, "text": text}
requests.post(f"{BASE_URL}/sendMessage", data=data, proxies=proxies, verify=False)
@log
def get_headers(chat_id):
cursor.execute(f"select mblog_backend,mblog_token,visit from tg_token where chat_id='{chat_id}'")
result = cursor.fetchone()
if result:
mblog_backend, mblog_token,visit = result
headers = {"token": mblog_token}
else:
mblog_backend = ""
headers = {}
visit = ""
return mblog_backend, headers, visit
@log
def download(url):
r = requests.get(url, proxies=proxies)
filename = uuid1().hex + "." + url.split(".")[-1]
with open(filename, "wb") as f:
f.write(r.content)
return filename
@log
def handle_file(file_url, mblog_backend="", headers={}):
file_list = []
if file_url:
try:
filename = download(file_url)
public_id = upload(filename, mblog_backend, headers)
file_list.append(public_id)
os.remove(filename)
except Exception as e:
print(e)
return file_list
@log
def get_memo(memo_id, mblog_backend="", headers={}):
url = f"{mblog_backend}/api/memo/{memo_id}"
r = requests.post(url, headers=headers, proxies=proxies, verify=False)
data = r.json()["data"]
publicIds = [i["publicId"] for i in data["resources"]]
return {"visibility": data["visibility"], "publicIds": publicIds, "id": memo_id,
"content": data["content"], "priority": 0, "enableComment": data["enableComment"]}
@log
def post_memo(content, file_list=[], mblog_backend="", headers={}, visit=""):
url = f"{mblog_backend}/api/memo/save"
logger.info(f"post_memo url: {url}")
r = requests.post(url, headers=headers,
json={"content":content, "visibility": visit, "publicIds":file_list},
proxies=proxies, verify=False)
logger.info(f"post_memo result: {r.text}")
return r.json()["data"]
@log
def update_memo(memo_id, file_url="", chat_id=""):
if chat_id:
mblog_backend, headers = get_headers(chat_id)
else:
mblog_backend = MBlog_Backend_URL
headers = {"token": MBlog_TOKEN}
if not headers:
send_message(chat_id, "账号未绑定,请输入/start进行绑定。")
return
memo = get_memo(memo_id, mblog_backend, headers)
file_list = handle_file(file_url, mblog_backend, headers)
if file_list:
memo["publicIds"].extend(file_list)
r = requests.post(f"{mblog_backend}/api/memo/update", headers=headers, json=memo, proxies=proxies, verify=False)
@log
def upload(file_path, mblog_backend="", headers={}):
with open(file_path, "rb") as f:
r = requests.post(f"{mblog_backend}/api/resource/upload", headers=headers, files={"files": f}, proxies=proxies, verify=False)
return r.json()["data"][0]["publicId"]
@log
def insert(text, file_url="", chat_id=""):
if chat_id:
mblog_backend, headers, visit = get_headers(chat_id)
else:
mblog_backend = MBlog_Backend_URL
headers = {"token": MBlog_TOKEN}
visit = Visibility
if not headers:
send_message(chat_id, "账号未绑定,请输入/start进行绑定。")
return '',''
file_list = handle_file(file_url, mblog_backend, headers)
memo_id = post_memo(text, file_list, mblog_backend, headers, visit)
return mblog_backend,memo_id
if __name__ == "__main__":
post_memo("测试", mblog_backend=MBlog_Backend_URL)