Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querying CommonCrawl parquet archive #29

Closed
mhlr opened this issue Mar 20, 2018 · 1 comment
Closed

querying CommonCrawl parquet archive #29

mhlr opened this issue Mar 20, 2018 · 1 comment

Comments

@mhlr
Copy link

mhlr commented Mar 20, 2018

I have set up Athena to point at the CommonCrawl parquet archive as described in
http://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/
I am able to run queries in the created table using the Athena web UI.

When I try to access the table from python using

from blaze import data
from urllib.parse import quote_plus 
from sqlalchemy.engine import create_engine
from sqlalchemy.sql.expression import select
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.schema import Table, MetaData

conn_str = 'awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/'\
           '{schema_name}?s3_staging_dir={s3_staging_dir}'

engine = create_engine(conn_str.format(
    aws_access_key_id=quote_plus(PUBLIC),
    aws_secret_access_key=quote_plus(PRIVATE),
    region_name='us-east-1',
    schema_name='ccindex',
    s3_staging_dir=quote_plus('s3://commoncrawl/cc-index/table/cc-main/warc/')))

db = data(engine)

db.ccindex.ccindex.peek()

I get


---------------------------------------------------------------------------
OperationalError                          Traceback (most recent call last)
~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1192                         parameters,
-> 1193                         context)
   1194         except BaseException as e:

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/default.py in do_execute(self, cursor, statement, parameters, context)
    506     def do_execute(self, cursor, statement, parameters, context=None):
--> 507         cursor.execute(statement, parameters)
    508 

~/anaconda3/lib/python3.6/site-packages/pyathena/util.py in _wrapper(*args, **kwargs)
     28         with _lock:
---> 29             return wrapped(*args, **kwargs)
     30     return _wrapper

~/anaconda3/lib/python3.6/site-packages/pyathena/cursor.py in execute(self, operation, parameters)
    116         else:
--> 117             raise OperationalError(query_execution.state_change_reason)
    118 

OperationalError: Access denied when writing output to url: s3://commoncrawl/cc-index/table/cc-main/warc/3766a047-e63c-4531-b0f7-85a1d7035f9a.csv . Please ensure you are allowed to access the S3 bucket. If you are encrypting query results with KMS key, please ensure you are allowed to access your KMS key

The above exception was the direct cause of the following exception:

OperationalError                          Traceback (most recent call last)
<ipython-input-10-6ce5eec8364d> in <module>()
----> 1 import codecs, os, ast;__pyfile = codecs.open('''/tmp/py3278WRZ''', encoding='''utf-8''');__code = __pyfile.read().encode('''utf-8''');__pyfile.close();os.remove('''/tmp/py3278WRZ''');__block = ast.parse(__code, '''/home/dm/athna.py''', mode='exec');__last = __block.body[-1];__isexpr = isinstance(__last,ast.Expr);__block.body.pop() if __isexpr else None;exec(compile(__block, '''/home/dm/athna.py''', mode='exec'));eval(compile(ast.Expression(__last.value), '''/home/dm/athna.py''', mode='eval')) if __isexpr else None

~/athna.py in <module>()
     24     s3_staging_dir=quote_plus('s3://commoncrawl/cc-index/table/cc-main/warc/')))
     25 
---> 26 db = data(engine)
     27 
     28 db.peek()

~/anaconda3/lib/python3.6/site-packages/blaze/interactive.py in data(data_source, dshape, name, fields, schema, **kwargs)
    161         data_source = tuple(data_source)
    162     if not dshape:
--> 163         dshape = discover(data_source)
    164         types = None
    165         if isinstance(dshape.measure, Tuple) and fields:

~/anaconda3/lib/python3.6/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
    162             self._cache[types] = func
    163         try:
--> 164             return func(*args, **kwargs)
    165 
    166         except MDNotImplementedError:

~/anaconda3/lib/python3.6/site-packages/odo/backends/sql.py in discover(engine)
    291 @dispatch(sa.engine.base.Engine)
    292 def discover(engine):
--> 293     return discover(metadata_of_engine(engine))
    294 
    295 

~/anaconda3/lib/python3.6/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
    162             self._cache[types] = func
    163         try:
--> 164             return func(*args, **kwargs)
    165 
    166         except MDNotImplementedError:

~/anaconda3/lib/python3.6/site-packages/odo/backends/sql.py in discover(metadata)
    297 def discover(metadata):
    298     try:
--> 299         metadata.reflect(views=metadata.bind.dialect.supports_views)
    300     except NotImplementedError:
    301         metadata.reflect()

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/sql/schema.py in reflect(self, bind, schema, views, only, extend_existing, autoload_replace, **dialect_kwargs)
   3924 
   3925             available = util.OrderedSet(
-> 3926                 bind.engine.table_names(schema, connection=conn))
   3927             if views:
   3928                 available.update(

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in table_names(self, schema, connection)
   2137             if not schema:
   2138                 schema = self.dialect.default_schema_name
-> 2139             return self.dialect.get_table_names(conn, schema)
   2140 
   2141     def has_table(self, table_name, schema=None):

<string> in get_table_names(self, connection, schema, **kw)

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/reflection.py in cache(fn, self, con, *args, **kw)
     40     info_cache = kw.get('info_cache', None)
     41     if info_cache is None:
---> 42         return fn(self, con, *args, **kw)
     43     key = (
     44         fn.__name__,

~/anaconda3/lib/python3.6/site-packages/pyathena/sqlalchemy_athena.py in get_table_names(self, connection, schema, **kw)
    122                 WHERE table_schema = '{schema}'
    123                 """.format(schema=schema)
--> 124         return [row.table_name for row in connection.execute(query).fetchall()]
    125 
    126     def has_table(self, connection, table_name, schema=None):

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in execute(self, object, *multiparams, **params)
    940         """
    941         if isinstance(object, util.string_types[0]):
--> 942             return self._execute_text(object, multiparams, params)
    943         try:
    944             meth = object._execute_on_connection

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _execute_text(self, statement, multiparams, params)
   1102             statement,
   1103             parameters,
-> 1104             statement, parameters
   1105         )
   1106         if self._has_events or self.engine._has_events:

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1198                 parameters,
   1199                 cursor,
-> 1200                 context)
   1201 
   1202         if self._has_events or self.engine._has_events:

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1411                 util.raise_from_cause(
   1412                     sqlalchemy_exception,
-> 1413                     exc_info
   1414                 )
   1415             else:

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/util/compat.py in raise_from_cause(exception, exc_info)
    201     exc_type, exc_value, exc_tb = exc_info
    202     cause = exc_value if exc_value is not exception else None
--> 203     reraise(type(exception), exception, tb=exc_tb, cause=cause)
    204 
    205 if py3k:

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause)
    184             value.__cause__ = cause
    185         if value.__traceback__ is not tb:
--> 186             raise value.with_traceback(tb)
    187         raise value
    188 

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1191                         statement,
   1192                         parameters,
-> 1193                         context)
   1194         except BaseException as e:
   1195             self._handle_dbapi_exception(

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/default.py in do_execute(self, cursor, statement, parameters, context)
    505 
    506     def do_execute(self, cursor, statement, parameters, context=None):
--> 507         cursor.execute(statement, parameters)
    508 
    509     def do_execute_no_params(self, cursor, statement, context=None):

~/anaconda3/lib/python3.6/site-packages/pyathena/util.py in _wrapper(*args, **kwargs)
     27     def _wrapper(*args, **kwargs):
     28         with _lock:
---> 29             return wrapped(*args, **kwargs)
     30     return _wrapper
     31 

~/anaconda3/lib/python3.6/site-packages/pyathena/cursor.py in execute(self, operation, parameters)
    115                 self.retry_max_delay, self.retry_exponential_base)
    116         else:
--> 117             raise OperationalError(query_execution.state_change_reason)
    118 
    119     def executemany(self, operation, seq_of_parameters):

OperationalError: (pyathena.error.OperationalError) Access denied when writing output to url: s3://commoncrawl/cc-index/table/cc-main/warc/3766a047-e63c-4531-b0f7-85a1d7035f9a.csv . Please ensure you are allowed to access the S3 bucket. If you are encrypting query results with KMS key, please ensure you are allowed to access your KMS key [SQL: "\n                SELECT table_name\n                FROM information_schema.tables\n                WHERE table_schema = 'ccindex'\n                "] (Background on this error at: http://sqlalche.me/e/e3q8)
@mhlr mhlr closed this as completed Mar 20, 2018
@mhlr
Copy link
Author

mhlr commented Mar 20, 2018

realized that s3_staging_dir should be my tmp bucket

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant