Skip to content
This repository has been archived by the owner on Mar 5, 2021. It is now read-only.

Commit

Permalink
Add functions for creating directories, dimensions, etc.
Browse files Browse the repository at this point in the history
Refactor unit tests to use above functions.
  • Loading branch information
tv42 committed Oct 11, 2007
1 parent 769c184 commit e7a693a
Show file tree
Hide file tree
Showing 4 changed files with 382 additions and 162 deletions.
110 changes: 109 additions & 1 deletion snakepit/create.py
@@ -1,6 +1,6 @@
import sqlalchemy as sq

from snakepit import hive
from snakepit import hive, directory

def create_hive(hive_uri):
"""
Expand All @@ -21,3 +21,111 @@ def create_hive(hive_uri):
table.tometadata(hive_metadata)
hive_metadata.create_all()
return hive_metadata


def create_primary_index(
directory_uri,
dimension_name,
):
"""
Create a primary index for C{dimension_name} at C{directory_uri}.
"""
directory_metadata = sq.MetaData()
directory_metadata.bind = sq.create_engine(
directory_uri,
strategy='threadlocal',
)
table = directory.dynamic_table(
table=directory.metadata.tables['hive_primary_DIMENSION'],
metadata=directory_metadata,
name='hive_primary_%s' % dimension_name,
)
directory_metadata.create_all()
return directory_metadata


class DimensionExistsError(Exception):
"""Dimension exists already"""

def __str__(self):
return ': '.join([self.__doc__]+list(self.args))

def create_dimension(hive_metadata, dimension_name, directory_uri):
"""
Create a dimension with C{dimension_name} at C{hive_metadata},
where the directory index is stored at C{directory_uri}.
Directory index must be set up before calling this function.
@raise DimensionExistsError: a dimension with that name exists
already in this hive
"""
t = hive_metadata.tables['partition_dimension_metadata']
try:
r = t.insert().execute(
name=dimension_name,
index_uri=directory_uri,
db_type=0, # TODO
)
except sq.exceptions.SQLError, e:
# sqlalchemy 0.3.x is hiding details and not providing a
# db-independent abstraction for what the error actually
# was, so we need to resort to kludges

# only catch sq.exceptions.IntegrityError when it's safe
# to depend on sqlalchemy 0.4

# http://www.sqlalchemy.org/trac/ticket/706
if 'IntegrityError' in unicode(e):
raise DimensionExistsError(repr(dimension_name))
else:
raise

(dimension_id,) = r.last_inserted_ids()
r.close()
return dimension_id

class NodeExistsError(Exception):
"""Node exists already"""

def __str__(self):
return ': '.join([self.__doc__]+list(self.args))

def create_node(hive_metadata, dimension_id, node_name, node_uri):
"""
Create a node with in dimension having C{dimension_id} with
C{node_name} at C{hive_metadata}, where the records are stored at
C{node_uri}.
Node must be set up before calling this function.
@raise NoSuchDimensionError: no such dimension found
@raise NodeExistsError: a node with that name exists
already in this hive
"""
t = hive_metadata.tables['node_metadata']
try:
r = t.insert().execute(
partition_dimension_id=dimension_id,
name=node_name,
uri=node_uri,
read_only=False, # TODO
)
except sq.exceptions.SQLError, e:
# sqlalchemy 0.3.x is hiding details and not providing a
# db-independent abstraction for what the error actually
# was, so we need to resort to kludges

# only catch sq.exceptions.IntegrityError when it's safe
# to depend on sqlalchemy 0.4

# http://www.sqlalchemy.org/trac/ticket/706
if 'IntegrityError' in unicode(e):
raise NodeExistsError(repr(node_name))
else:
raise

(node_id,) = r.last_inserted_ids()
r.close()
return node_id
8 changes: 7 additions & 1 deletion snakepit/hive.py
Expand Up @@ -22,13 +22,19 @@
sq.Column('uri', sq.String(255), nullable=False),
# this is int in HiveConfigurationSchema.java
sq.Column('read_only', sq.Boolean),

# TODO what's the scope of the name? I guess you could
# say name is hostname, and same machine can serve two
# dimensions -- then name isn't unique alone, only in
# combination with partition_dimension_id
sq.UniqueConstraint('partition_dimension_id', 'name'),
)

partition_dimension_metadata = sq.Table(
'partition_dimension_metadata',
metadata,
sq.Column('id', sq.Integer, primary_key=True),
sq.Column('name', sq.String(64), nullable=False),
sq.Column('name', sq.String(64), nullable=False, unique=True),
sq.Column('index_uri', sq.String(255), nullable=False),
# TODO wth is this used?
sq.Column('db_type', sq.String(64), nullable=False),
Expand Down

0 comments on commit e7a693a

Please sign in to comment.