-
Notifications
You must be signed in to change notification settings - Fork 41
/
sqliteUtils.py
196 lines (173 loc) · 6.78 KB
/
sqliteUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# -*- coding: iso-8859-1 -*-
#Copyright (C) Fiz Vazquez vud1@sindominio.net
# vud1@grupoikusnet.com
# Jakinbidea & Grupo Ikusnet Developer
#This program is free software; you can redistribute it and/or
#modify it under the terms of the GNU General Public License
#as published by the Free Software Foundation; either version 2
#of the License, or (at your option) any later version.
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU General Public License for more details.
#You should have received a copy of the GNU General Public License
#along with this program; if not, write to the Free Software
#Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import logging
import sys, traceback, commands
import datetime
try:
from sqlite3 import dbapi2 as sqlite
except ImportError:
logging.error('Not able to find sqlite2 module (new in python 2.5)')
from pysqlite2 import dbapi2 as sqlite
logging.info('Using pysqlite2 module to access DB. Think about upgrading to python 2.5!')
class Sql:
def __init__(self,host=None, ddbb = None, user = None, password = None, configuration = None):
self.db = None
if ddbb == 'memory':
self.ddbb = ':memory:'
self.url = 'sqlite://'
else:
confdir = configuration.confdir
self.ddbb = "%s/pytrainer.ddbb" %confdir
self.url = "sqlite:///" + self.ddbb
def get_connection_url(self):
return self.url
def connect(self):
#si devolvemos 1 ha ido todo con exito
self.db = sqlite.connect(self.ddbb)
return (True, "OK")
#probamos si estan las tablas creadas, y sino.. las creamos
'''try:
self.select("records","id_record","1=1 limit 0,1")
except:
self.createTables()
return 1'''
def disconnect(self):
self.db.close()
def createDDBB(self):
pass
def getTableList(self):
tmpList = self.select("sqlite_master","name", "type IN ('table','view') AND name NOT LIKE 'sqlite_%' ORDER BY name")
# The instruction above returns a list of tuples, going for a simple list
newList = []
for entry in tmpList:
newList.append(entry[0])
return newList
def createTableDefault(self,tableName,columns):
'''22.11.2009 - dgranda
Creates a new table in database given name and column name and data types. New in version 1.7.0
args:
tableName - string with name of the table
columns - dictionary containing column names and data types coming from definition
returns: none'''
logging.debug('>>')
logging.info('Creating '+str(tableName)+' table with default values')
logging.debug('Columns definition: '+str(columns))
cur = self.db.cursor()
sql = 'CREATE TABLE %s (' %(tableName)
for entry in columns:
sql += '%s %s,' %(entry,columns[entry])
# Removing trailing comma
sql = sql.rstrip(',')
sql = sql+");"
logging.debug('SQL sentence: '+str(sql))
cur.execute(sql)
logging.debug('<<')
def insert(self,table, cells, values):
logging.debug('>>')
cur = self.db.cursor()
val = values
count = 0
string = ""
for i in val:
if count>0:
string+=","
string+= self._to_sql_value(i)
count = count+1
sql = "insert into %s (%s) values (%s)" %(table,cells,string)
logging.debug('SQL sentence: '+str(sql))
cur.execute(sql)
self.db.commit()
logging.debug('<<')
def _to_sql_value(self, value):
logging.debug('>>')
logging.debug('Value: %s | type: %s ' %(value,type(value)))
if value == None:
return "null"
elif type(value) in [str, unicode]:
return "\"" + value + "\""
elif type(value) == datetime.datetime:
return value.strftime("\"%Y-%m-%d %H:%M:%S%z\"")
elif type(value) == datetime.date:
return value.strftime("\"%Y-%m-%d\"")
else:
return str(value)
logging.debug('<<')
def freeExec(self,sql):
cur = self.db.cursor()
cur.execute(sql)
retorno = []
for row in cur:
retorno.append(row)
self.db.commit()
return retorno
def delete(self,table,condition):
cur = self.db.cursor()
sql = "delete from %s where %s" %(table,condition)
cur.execute(sql)
self.db.commit()
def update(self,table,cells,values, condition):
cur = self.db.cursor()
cells = cells.split(",")
count = 0
string = ""
for val in values:
if count>0:
string+=","
string += """%s=%s """ %(cells[count], self._to_sql_value(values[count]))
count = count+1
string +=" where %s" %condition
sql = "update %s set %s" %(table,string)
cur.execute(sql)
self.db.commit()
def select(self,table,cells,condition, mod=None):
cur = self.db.cursor()
sql = "select %s from %s" %(cells,table)
if condition is not None:
sql = "%s where %s" % (sql, condition)
if mod is not None:
sql = "%s %s" % (sql, mod)
'''if condition != None:
sql = "select %s from %s where %s" %(cells,table,condition)
else:
sql = "select %s from %s " %(cells,table)'''
cur.execute(sql)
retorno = []
for row in cur:
retorno.append(row)
return retorno
def retrieveTableInfo(self,tableName):
cur = self.db.cursor()
sql = "PRAGMA table_info(%s);" %tableName
cur.execute(sql)
tableInfo = []
for row in cur:
tableInfo.append(row)
return tableInfo
def addColumn(self,tableName,columnName,dataType):
sql = "alter table %s add %s %s" %(tableName,columnName,dataType)
logging.debug("Trying SQL: %s" % sql)
try:
self.freeExec(sql)
except:
logging.error('Not able to add/change column '+columnName+' to table '+tableName)
traceback.print_exc()
def createDatabaseBackup(self):
logging.info("Creating compressed copy of current DB")
logging.debug('Database path: '+str(self.ddbb))
result = commands.getstatusoutput('gzip -c '+self.ddbb+' > '+self.ddbb+'_`date +%Y%m%d_%H%M`.gz')
if result[0] != 0:
raise Exception, "Copying current database does not work, error #"+str(result[0])
logging.info('Database backup successfully created')