-
Notifications
You must be signed in to change notification settings - Fork 1
/
my_mssql_utility.py
161 lines (144 loc) · 6.62 KB
/
my_mssql_utility.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# -*- coding: utf-8 -*-
# author: ylkjick532428@gmail.com
import os
import pypyodbc
import re
import sys
import platform
plat_str = platform.system()
#=========================================================================
# MsSqlUtil : mssql数据库的工具类
#=========================================================================
class MsSqlUtil(object):
def __init__(self, db):
self.db = db
self.db_conn = None
self.db_cursor = None
self.get_db_conn()
self.get_db_cursor()
#=========================================================================
# set_db : 修改数据库配置
#=========================================================================
def set_db(self, db):
self.db = db
#=========================================================================
# get_db_conn : 获得数据库连接
#=========================================================================
def get_db_conn(self):
db = self.db
plat_str = platform.system()
if plat_str == "Windows":
connection_string ='Driver={SQL Server Native Client 10.0};Server=%s;Database=%s;Uid=%s;Pwd=%s;PORT=%s;' % (self.db["HOST"], self.db["NAME"], self.db["USER"], self.db["PASSWORD"], self.db["PORT"])
self.db_conn = pypyodbc.connect(connection_string)
else:
connection_string ='DSN=MySQLServerDatabase;Server=%s;Database=%s;Uid=%s;Pwd=%s;PORT=%s;' % (self.db["HOST"], self.db["NAME"], self.db["USER"], self.db["PASSWORD"], self.db["PORT"])
self.db_conn = pypyodbc.connect(connection_string)
return self.db_conn
#=========================================================================
# get_db_cursor : 获得数据库光标
#=========================================================================
def get_db_cursor(self):
try:
if self.db_conn:
self.db_cursor = self.db_conn.cursor()
except:
self.db_cursor = self.get_db_conn().cursor()
return self.db_cursor
#=========================================================================
# query : 执行查询语句, fields为查询自动的数组
#=========================================================================
def query(self, comm, parameters=[], fileds=[]):
if plat_str != "Windows":
return self.query_fileds(comm, parameters, fileds)
results = []
self.db_cursor.execute(comm, parameters)
query_results = self.db_cursor.fetchmany(10)
columns = [d[0].lower() for d in self.db_cursor.description]
while query_results:
result = [dict(zip(columns, record)) for record in query_results]
results.extend(result)
query_results = self.db_cursor.fetchmany(10)
return results
#===========================================================================
# match_fileds
#===========================================================================
def match_fileds(self, text):
values = []
for tmp in text.split(","):
tmp_col = tmp.split(" as ")
if len(tmp_col) == 2:
values.append(tmp_col[1].strip())
else:
values.append(tmp_col[0].strip())
return values
#=========================================================================
# query : 执行查询语句, fields为查询自动的数组
#=========================================================================
def query_fileds(self, comm, parameters=[], fileds=[]):
comm = comm.lower()
select_pattern = re.compile("select (.*) from")
match = select_pattern.search(comm)
if match:
text = str(match.group(1))
fileds = self.match_fileds(text)
results = []
self.db_cursor.execute(comm, parameters)
query_results = self.db_cursor.fetchmany(10)
while query_results:
for record in query_results:
tmp = {}
for i in range(0, len(fileds)):
tmp_key = fileds[i]
tmp[tmp_key] = record[i]
results.append(tmp)
query_results = self.db_cursor.fetchmany(10)
return results
#=========================================================================
# update : 执行update语句
#=========================================================================
def update(self, comm, parameters=[]):
self.db_cursor.execute(comm, parameters)
self.db_conn.commit()
#=========================================================================
# insert_many:
# sql_str:"insert into python_modules(module_name, file_path) values(:1, :2)"
# records: [(1, 1), (2,2)]
#=========================================================================
def insert_many(self, sql_str, records):
# sql_str:"insert into python_modules(module_name, file_path) values(:1, :2)"
# records: [(1, 1), (2,2)]
self.db_cursor.prepare(sql_str)
self.db_cursor.executemany(None, records)
self.db_conn.commit()
#=========================================================================
# escape_string :
#=========================================================================
def escape_string(self, str):
str = str.replace("'", "")
str = str.replace('"', "")
#=========================================================================
# close_db_connection : 关闭数据库连接
#=========================================================================
def close_db_conn(self):
try:
self.db_conn.close()
print('close db connection success')
except:
print('close db connection failure')
#=========================================================================
# close : 关闭数据库连接
#=========================================================================
def close(self):
self.close_db_conn()
#=========================================================================
# test : 数据库测试
#=========================================================================
def test(self):
sql = 'select id, name, type, poi_x,poi_y from pps_poi_data'
self.db_cursor.execute(sql)
result = self.db_cursor.fetchall()
for i in result:
print(i[1])
if __name__ == "__main__":
db_config_local={'USER': 'USER', 'PASSWORD': 'PASSWORD', 'NAME': 'NAME', 'HOST': 'HOST', 'PORT': 50000}
sqlserver = MsSqlUtil(self.db_config_local)