Skip to content
This repository
Browse code

MySQL storage, that works, but is a it messy yet.

Note: it is pure MySQL storage not a django backend!
  • Loading branch information...
commit 076c036ce782b93867e8b047abeb4654406e167a 1 parent 386242b
Sergey Vasilyev authored

Showing 2 changed files with 213 additions and 52 deletions. Show diff stats Hide diff stats

  1. +135 21 lib/daal/storages/mysql.py
  2. +78 31 lib/daal/storages/mysql.sql
156 lib/daal/storages/mysql.py
... ... @@ -1,4 +1,6 @@
1 1 # coding: utf-8
  2 +#TODO: explicit micro-transatctions (we have commits but have no begins now). preferable using "with".
  3 +
2 4 from ..item import Item
3 5 from ._base import Storage, StorageID
4 6 from ._base import StorageExpectationError, StorageItemAbsentError, StorageUniquenessError
@@ -16,10 +18,14 @@ class MysqlStorage(Storage):
16 18 * Others to come.
17 19 """
18 20
19   - def __init__(self, name):
  21 + def __init__(self, hostname, username, password, database, name):
20 22 super(MysqlStorage, self).__init__()
21   - self.connected = False
22 23 self.connection = None
  24 + self.connected = False
  25 + self.hostname = hostname
  26 + self.username = username
  27 + self.password = password
  28 + self.database = database
23 29 self.name = name
24 30
25 31 def store(self, id, value, expect=None, unique=None):
@@ -62,11 +68,9 @@ def fetch(self, id):
62 68 otherwise id is treated as a sequence of ids and all of them are fetched.
63 69 Actual fetch goes in batches of 20 items per requests (SimpleDB limitation).
64 70 """
65   -
66 71 self._connect()
67 72
68   - values = dict(StorageID(id))
69   - where = '(%s)' % self._id_to_where(id)
  73 + where, values = self._ids_to_sql([id])
70 74 query = "SELECT * FROM `%s` WHERE %s" % (self.name, where) #!!! escape table name
71 75
72 76 cursor = self.connection.cursor(MySQLdb.cursors.DictCursor)
@@ -91,11 +95,11 @@ def mfetch(self, ids):
91 95
92 96 self._connect()
93 97
94   - where = 'OR'.join(['(%s)' % self._id_to_where(id) for id in ids])
  98 + where, values = self._ids_to_sql(ids)
95 99 query = "SELECT * FROM `%s` WHERE %s" % (self.name, where) #!!! escape table name
96 100
97 101 cursor = self.connection.cursor(MySQLdb.cursors.DictCursor)
98   - cursor.execute(query)
  102 + cursor.execute(query, values)
99 103 rows = cursor.fetchall()
100 104 if len(rows) < 1:
101 105 raise StorageItemAbsentError("The item '%s' is not found." % id)
@@ -151,7 +155,7 @@ def try_create(self, factory):
151 155 item.update(pk)
152 156
153 157 # Ensure the item is absent using an attribute that always exists.
154   -# pk_where = '(%s)' % self._id_to_where(item['id'])
  158 +# pk_where, pk_values = self._ids_to_sql([item['id']])
155 159
156 160 # Store the values to the physical storage if necessary.
157 161 assignments = ','.join(['%s=%%(%s)s' % (field, field) for field in item.keys()])
@@ -171,6 +175,7 @@ def try_update(self, id, fn, field=None):
171 175 """
172 176
173 177 self._connect()
  178 + pk = dict(StorageID(id))
174 179
175 180 # Try to fetch the item's values from the physical storage.
176 181 # Fallback to empty list of values if the item does not exist.
@@ -178,19 +183,24 @@ def try_update(self, id, fn, field=None):
178 183 item = self.fetch(id)
179 184 except StorageItemAbsentError, e:
180 185 item = Item() # what if it is of another type???
  186 + item.update(pk)
181 187
182 188 # Ensure the item is absent using an attribute that always exists.
183   - pk_where = '(%s)' % self._id_to_where(id)
  189 +# pk_where, pk_values = self._ids_to_sql([id]) # is it still used???
184 190
185 191 # Get the values to be updated. Store them to the physical storage if necessary.
186 192 changes = fn(item)
187 193 changes.update(dict(StorageID(id)))
  194 + values = dict(item, **changes)
  195 + #!!! separate and make it obvious which fields are for pk, and which are for values. merge them only for the query.
188 196
189   - # Store the values to the physical storage if necessary.
  197 + # Build SQL query to INSERT or UPDATE depending on absence of existence of the item.
190 198 assignments = ','.join(['%s=%%(%s)s' % (field, field) for field in changes.keys()])
191   - query = "REPLACE INTO %s SET %s" % (self.name, assignments)
  199 + query = "INSERT INTO %s SET %s ON DUPLICATE KEY UPDATE %s" % (self.name, assignments, assignments)
  200 +
  201 + # Store the values to the physical storage if necessary.
192 202 cursor = self.connection.cursor(MySQLdb.cursors.DictCursor)
193   - cursor.execute(query, changes)
  203 + cursor.execute(query, values)
194 204 self.connection.commit()
195 205
196 206 # Return
@@ -213,25 +223,129 @@ def try_replace(self, id, fn, field=None):
213 223 raise # just to make it very obvious that we pass it through.
214 224
215 225 # Ensure the item is absent using an attribute that always exists.
216   - pk_where = '(%s)' % self._id_to_where(id)
  226 + pk_where, pk_values = self._ids_to_sql([id])
217 227
218 228 # Get the values to be updated. Store them to the physical storage if necessary.
219 229 changes = fn(item)
  230 + #!!!TODO: I guess here is an error, since pk_values are not merged into changes. But seems we do not use replace() at all.
220 231
221   - # Store the values to the physical storage if necessary.
  232 + # Build SQL query to update an item if it exists.
222 233 assignments = ','.join(['%s=%%(%s)s' % (field, field) for field in changes.keys()])
223 234 query = "UPDATE %s SET %s WHERE %s" % (self.name, assignments, pk_where)
  235 +
  236 + # Store the values to the physical storage if necessary.
224 237 cursor = self.connection.cursor(MySQLdb.cursors.DictCursor)
225 238 cursor.execute(query, changes)
  239 + #!!!todo: raise StorageItemAbsentError() if affected_rows == 0
226 240 self.connection.commit()
227 241
228 242 # Return
229 243 return changes # re-fetch?
230 244
231   - def _id_to_where(self, id):
  245 + def append(self, id, value, retries=1):
  246 + #NB: since we use SQL row locks, there is no need to retries.
  247 + #NB: value field must be declared as NOT NULL DEFAULT ''.
  248 + #TODO: we can remove that requirement for DEFAULT value, but have to rewrite all this dict manipulations.
  249 +
  250 + value_field = 'value'
  251 + self._connect()
  252 +
  253 + # Execute the query and aquire a row lock on the counter.
232 254 pk = dict(StorageID(id))
233   - where = ' AND '.join(["%s = %%(%s)s" % (field, field) for field in pk.keys()])
234   - return where
  255 + values = dict(pk, value=value)
  256 + assignments = ','.join(['%s=%%(%s)s' % (field, field) for field in pk.keys()]
  257 + + ['%s=concat(%s, %%(%s)s)' % (value_field, value_field, value_field)])
  258 + query = "INSERT INTO %s SET %s ON DUPLICATE KEY UPDATE %s" % (self.name, assignments, assignments)
  259 + cursor = self.connection.cursor(MySQLdb.cursors.DictCursor)
  260 + cursor.execute(query, values)
  261 +
  262 + # Fetch the value while it is locked - no one will change it since we updates and till we committed.
  263 + value = self.fetch(id)[value_field]
  264 +
  265 + # Commit and release the row lock.
  266 + self.connection.commit()
  267 +
  268 + # Return
  269 + return value
  270 +
  271 + def prepend(self, id, value, retries=1):
  272 + raise NotImplementedError()#!!!todo later
  273 +
  274 + def increment(self, id, step, retries=1):
  275 + #NB: since we use SQL row locks, there is no need to retries.
  276 + #NB: value field must be declared as NOT NULL DEFAULT 0.
  277 + #TODO: we can remove that requirement for DEFAULT value, but have to rewrite all this dict manipulations.
  278 +
  279 + value_field = 'value'
  280 + self._connect()
  281 +
  282 + pk = dict(StorageID(id))
  283 + assignments = ','.join(['%s=%%(%s)s' % (field, field) for field in pk.keys()] + ['%s=%s+(%d)' % (value_field, value_field, int(step))])
  284 + query = "INSERT INTO %s SET %s ON DUPLICATE KEY UPDATE %s" % (self.name, assignments, assignments)
  285 +
  286 + # Execute the query and aquire a row lock on the counter.
  287 + cursor = self.connection.cursor(MySQLdb.cursors.DictCursor)
  288 + cursor.execute(query, pk)
  289 +
  290 + # Fetch the value while it is locked - no one will change it since we updates and till we committed.
  291 + value = self.fetch(id)[value_field]
  292 +
  293 + # Commit and release the row lock.
  294 + self.connection.commit()
  295 +
  296 + # Return
  297 + return value
  298 +
  299 + def decrement(self, id, step, retries=1):
  300 + # No special support or optimizations for decrement operation.
  301 + return self.increment(id, -step, retries=retries)
  302 +
  303 + def _metaescape(self, name):
  304 + """
  305 + Escapes field and table names (and any other SQL entities) for use in queries.
  306 + Does not escape data values with this! Use parameter binding instead!
  307 + """
  308 + return '`%s`' % unicode(meta).replace('`', '``')
  309 +
  310 + def _ids_to_sql(self, ids):
  311 + """
  312 + Converts few storage IDs to SQL WHERE clause and dict with values for binding.
  313 + List of ids must be non-empty (it's better to catch this situation at higher level).
  314 +
  315 + The clause returned is build as short as it is possible in this implementation.
  316 + But it resolves all conflicts with the same-named fields with differently values
  317 + by use of automatically generated reference names instead of just field names.
  318 +
  319 + It also optimizes the number of values by re-using same references for same values.
  320 + This is especially useful when you have dozens of ids (100+), and in most of them
  321 + have fields with equal or repeating values.
  322 +
  323 + E.g., if we have ids {a=10,b=20} and {a=10,b=30}, it will return these WHERE clause and values:
  324 + clause = (a=%(a1)s and b=%(b1)s) or (a=%(a1)s and b=%(b2)s)
  325 + values = {a1=10, b1=20, b2=30}
  326 + Note that %(a1)s parameter is used for both expressions, since its values is the same.
  327 + """
  328 + wheres = []
  329 + values = []
  330 + references = {} # [field][value] -> key in values
  331 + for index, id in enumerate(ids):
  332 + # Build per-id expressions.
  333 + id_wheres = []
  334 + id_values = []
  335 + for field, value in dict(StorageID(id)).items():
  336 + # First, check if this field&value pair exists already, and create it if it does not yet.
  337 + reference = references.setdefault(field, {}).setdefault(value, '%s%s' % (field, index))
  338 +
  339 + # Then, add the field&value expression to lists for query building.
  340 + id_wheres.append((field, reference))
  341 + id_values.append((reference, value))
  342 +
  343 + # Merge per-id expressions into global list of expressions.
  344 + wheres.append(id_wheres)
  345 + values.extend(id_values)
  346 + clause = '(%s)' % 'OR'.join(['(%s)' % ' AND '.join(['%s=%%(%s)s' % (field, reference) for field, reference in id_wheres]) for id_wheres in wheres])
  347 + values = dict(values)
  348 + return clause, values
235 349
236 350 def _connect(self):
237 351 """
@@ -239,9 +353,9 @@ def _connect(self):
239 353 If already connected, does nothing.
240 354 """
241 355 if not self.connected:
242   - self.connection = MySQLdb.connect(host = "localhost",
243   - user = "root",
244   - passwd = "",
245   - db = "shortener")
  356 + self.connection = MySQLdb.connect(host = self.hostname,
  357 + user = self.username,
  358 + passwd = self.password,
  359 + db = self.database)
246 360 self.connected = True
247 361 return self
109 lib/daal/storages/mysql.sql
... ... @@ -1,35 +1,82 @@
1 1 SET NAMES utf8;
  2 +USE `shortener`;
2 3
3   -DROP TABLE IF EXISTS urls;
4   -CREATE TABLE urls (
5   - `id` varchar(100) not null,
6   - `host` varchar(100) not null,
7   - `code` varchar(100) not null,
8   - `url` longtext not null default '',
9   - `created_ts` float default null,
10   - `remote_addr` varchar(100) default null,
11   - `remote_port` varchar(10) default null,
12   - primary key(`host`, `id`)
13   -);
  4 +/*
  5 + * Main storage for forward resolution of shortened urls to originals targets, plus meta info.
  6 + * Code field is stored just for convenience and compatibility of algorithms with other storages,
  7 + * who change id field automatically.
  8 + */
  9 +DROP TABLE IF EXISTS `urls`;
  10 +CREATE TABLE `urls` (
  11 + `host` varchar(100) not null,
  12 + `id` varchar(100) not null,
  13 + `code` varchar(100) not null,
  14 + `url` longtext not null,
  15 + `created_ts` integer unsigned default null,
  16 + `remote_addr` varchar(45) default null, -- beware of IPv6, see http://stackoverflow.com/questions/166132/maximum-length-of-the-textual-representation-of-an-ipv6-address
  17 + `remote_port` smallint default null,
  18 + primary key (`host`, `id`)
  19 +) engine=innodb;
14 20
15   -DROP TABLE IF EXISTS last_urls;
16   -CREATE TABLE last_urls (
17   - `id` varchar(100) not null,
18   - `host` varchar(100) not null,
19   - `code` varchar(100) not null,
20   - `url` longtext not null default '',
21   - `created_ts` float default null,
22   - `remote_addr` varchar(100) default null,
23   - `remote_port` varchar(10) default null,
24   - `timestamp` integer unsigned not null,
25   - index(`timestamp`),
26   - primary key(`host`, `id`)
27   -);
  21 +/*
  22 + * Same structure as with urls table, and the same item is stored.
  23 + * But this table is cleaned regularly to keep the list short and sort it fast.
  24 + * If LatestTargetsDimension will be refactored to key-value storages, this table will be removed.
  25 + */
  26 +DROP TABLE IF EXISTS `last_urls`;
  27 +CREATE TABLE `last_urls` like `urls`;
  28 +ALTER TABLE `last_urls`
  29 + ADD COLUMN `timestamp` integer unsigned not null,
  30 + ADD INDEX (`timestamp`);
28 31
29   -DROP TABLE IF EXISTS sequences;
30   -CREATE TABLE sequences (
31   - `id` varchar(100) not null,
32   - `host` varchar(100) not null,
33   - `value` varchar(100) not null,
34   - primary key(`host`, `id`)
35   -);
  32 +/*
  33 + * Used by generators to store their state (current value).
  34 + * Value has no default field as defined for counters, since it is not
  35 + * a storage-level numeric counter, but is a generator-level textual one.
  36 + */
  37 +DROP TABLE IF EXISTS `sequences`;
  38 +CREATE TABLE `sequences` (
  39 + `host` varchar(100) not null,
  40 + `id` varchar(100) not null,
  41 + `value` varchar(100) not null,
  42 + primary key (`host`, `id`)
  43 +) engine=innodb;
  44 +
  45 +/*
  46 + * Used by PopularDomainsDimension to store per-domain counters.
  47 + * Value field must have a default of 0 as defined for counters (see mysql.py).
  48 + */
  49 +DROP TABLE IF EXISTS `popular_domain_counters`;
  50 +CREATE TABLE `popular_domain_counters` (
  51 + `host` varchar(100) not null,
  52 + `time_shard` integer unsigned not null,
  53 + `domain` varchar(100) not null, -- why 100 chars? much more!
  54 + `value` integer unsigned not null default 0,
  55 + primary key (`host`, `time_shard`, `domain`)
  56 +) engine=innodb;
  57 +
  58 +/*
  59 + * Used by PopularDomainsDimension to store number of domains in each of the grid levels.
  60 + * Value field must have a default of 0 as defined for counters (see mysql.py).
  61 + */
  62 +DROP TABLE IF EXISTS `popular_grid_level_counters`;
  63 +CREATE TABLE `popular_grid_level_counters` (
  64 + `host` varchar(100) not null,
  65 + `time_shard` integer unsigned not null,
  66 + `grid_level` tinyint unsigned not null, -- having more than 255 grid levels is really a bad idea, even 25 is too many.
  67 + `value` integer unsigned not null default 0,
  68 + primary key (`host`, `time_shard`, `grid_level`)
  69 +) engine=innodb;
  70 +
  71 +/*
  72 + * Used by PopularDomainsDimension to store list of domains for each of the grid levels.
  73 + * Value field must have a default of '' as defined for accumulators (see mysql.py).
  74 + */
  75 +DROP TABLE IF EXISTS `popular_grid_level_domains`;
  76 +CREATE TABLE `popular_grid_level_domains` (
  77 + `host` varchar(100) not null,
  78 + `time_shard` integer unsigned not null,
  79 + `grid_level` tinyint unsigned not null, -- having more than 255 grid levels is really a bad idea, even 25 is too many.
  80 + `value` longtext not null default '',
  81 + primary key (`host`, `time_shard`, `grid_level`)
  82 +) engine=innodb;

0 comments on commit 076c036

Please sign in to comment.
Something went wrong with that request. Please try again.