-
Notifications
You must be signed in to change notification settings - Fork 2
/
dbManager.py
100 lines (88 loc) · 3.46 KB
/
dbManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -*- coding: utf-8 -*-
import json
import pymysql
class dbManager(object):
def __init__(self, config):
self.host = config['host']
self.user = config['user']
self.passwd = config['password']
self.db = config['database']
self.port= config['port']
self.charset = "utf8mb4"
self.cursorclass = pymysql.cursors.DictCursor
self.connection=None
self.cursor=None
def connect(self):
if self.connection is None:
self.connection= pymysql.connect(host=self.host,
user=self.user,
passwd=self.passwd,
port=self.port,
db=self.db,
charset=self.charset,
cursorclass=self.cursorclass,
autocommit=True)
self.cursor=self.connection.cursor()
def close(self):
if self.cursor:
self.cursor.close()
self.cursor = None
if self.connection:
self.connection.close()
self.connection = None
def insert_company(self, item):
self.connect()
try:
field_list=list()
param_list=list()
for key in item:
#if type(book_attr[key])==list and len(book_attr[key])==0:
# continue
if key=='group':
field_list.append('`{}`'.format(key))
else:
field_list.append(key)
param_list.append('%('+key+')s')
insert_sql="insert into factory_group ("+",".join(field_list)+") VALUES ("+ ",".join(param_list)+")"
sql = self.cursor.mogrify(insert_sql, item)
self.cursor.execute(insert_sql, item)
except pymysql.InternalError as e:
print(item)
raise
except pymysql.IntegrityError as e:
if e.args[0]=='1062':
print("duplicate key")
finally:
self.close()
def query_parent(self, company_name):
self.connect()
try:
query_sql="select * from company_info where company_name like '%%{company_name}%%'".format(company_name=company_name)
self.cursor.execute(query_sql)
parent_info=self.cursor.fetchone()
return parent_info
except Exception as e:
raise
finally:
self.close()
def query_fine_record_by_taxcode(self, taxcode):
self.connect()
try:
query_sql="select factory_fine.facility_name, factory_fine.penalty_money, factory_corp.registration_no, factory_fine.fine_or_note from factory_fine, factory_corp where factory_corp.corp_id=%(taxcode)s and factory_corp.registration_no=factory_fine.registration_no"
#print("{}, {}".format(query_sql, taxcode))
self.cursor.execute(query_sql, {"taxcode":taxcode})
fine_record_list=self.cursor.fetchall()
penalty_money=0
has_fine=False
if len(fine_record_list):
has_fine=True
for record in fine_record_list:
if record['fine_or_note']:
penalty_money+=int(record['penalty_money'])
#else:
# print(record)
return has_fine, penalty_money, len(fine_record_list)
except Exception as e:
raise
finally:
self.close()