Skip to content

Commit

Permalink
refactors postgres functions into new PostgreSQLSource class, cleans …
Browse files Browse the repository at this point in the history
…up setup.py and requirments (remove docx and biopython), updates to README
  • Loading branch information
kshefchek committed Feb 2, 2016
1 parent 3f0efff commit 6fe840d
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 149 deletions.
21 changes: 15 additions & 6 deletions README.md
Expand Up @@ -29,20 +29,29 @@ like [Protege](http://protege.stanford.edu/).
* One of the unit tests requires
[owltools](https://code.google.com/p/owltools/wiki/InstallOWLTools) be available on your path. You could modify
the code to skip this, if necessary
* unit tests require nosetests (if on OS X you may need to `sudo pip3 install nose`)
* Running make test requires nosetests (if on OS X you may need to `sudo pip3 install nose`)

* Required external python packages:
* [psycopg2](http://initd.org/psycopg/)
* [rdflib](https://code.google.com/p/rdflib/)
* isodate
* roman
* [python-docx](https://github.com/python-openxml/python-docx)
* pyyaml
* pysftp
* [biopython](https://github.com/biopython/biopython)
* docx


* Optional source specific python packages:
* [psycopg2](http://initd.org/psycopg/)
* [python-docx](https://github.com/python-openxml/python-docx)
* beautifulsoup4
* GitPython
* intermine
* pysftp

Note, Dipper imports source modules dynamically at runtime. As a result it is possible to build a core set
of requirements and add source specific dependencies as needed. Presently this only implemented with pip requirements
files. For example to build dependencies for MGI:

pip3 install -r requirements/core.txt
pip3 install -r requirements/mgi.txt

If you encounter any errors installing these packages using Homebrew, it could be due to [a curent known issue in upgrading to pip3](https://github.com/Homebrew/homebrew/issues/25752). In this case, first force reinstall pip2 (````pip2 install --upgrade --force-reinstall pip````) and then install the package using pip3 (eg. ````pip3 install psycopg2````.)

Expand Down
6 changes: 3 additions & 3 deletions dipper/sources/MGI.py
Expand Up @@ -4,7 +4,7 @@
import logging
import re

from dipper.sources.Source import Source
from dipper.sources.PostgreSQLSource import PostgreSQLSource
from dipper.models.assoc.Association import Assoc
from dipper.models.Dataset import Dataset
from dipper.models.assoc.G2PAssoc import G2PAssoc
Expand All @@ -19,7 +19,7 @@
logger = logging.getLogger(__name__)


class MGI(Source):
class MGI(PostgreSQLSource):
"""
This is the [Mouse Genome Informatics](http://www.informatics.jax.org/) resource,
from which we process genotype and phenotype data about laboratory mice.
Expand Down Expand Up @@ -127,7 +127,7 @@ class MGI(Source):
}

def __init__(self):
Source.__init__(self, 'mgi')
super().__init__('mgi')
self.namespaces.update(curie_map.get())

# update the dataset object with details about this resource
Expand Down
2 changes: 0 additions & 2 deletions dipper/sources/MPD.py
Expand Up @@ -2,8 +2,6 @@
import re
import logging
import io
import math
import zipfile
from zipfile import ZipFile
from dipper.models.Provenance import Provenance

Expand Down
149 changes: 149 additions & 0 deletions dipper/sources/PostgreSQLSource.py
@@ -0,0 +1,149 @@
import psycopg2
import logging
import os
from dipper.sources.Source import Source


logger = logging.getLogger(__name__)


class PostgreSQLSource(Source):
"""
Class for interfacing with remote Postgres databases
"""

def __init__(self, name=None):
super().__init__(name)
return

def fetch_from_pgdb(self, tables, cxn, limit=None, force=False):
"""
Will fetch all Postgres tables from the specified database in the cxn connection parameters.
This will save them to a local file named the same as the table, in tab-delimited format, including a header.
:param tables: Names of tables to fetch
:param cxn: database connection details
:param limit: A max row count to fetch for each table
:return: None
"""

con = None
try:
con = psycopg2.connect(host=cxn['host'], database=cxn['database'], port=cxn['port'],
user=cxn['user'], password=cxn['password'])
cur = con.cursor()
for t in tables:
logger.info("Fetching data from table %s", t)
self._getcols(cur, t)
query = ' '.join(("SELECT * FROM", t))
countquery = ' '.join(("SELECT COUNT(*) FROM", t))
if limit is not None:
query = ' '.join((query, "LIMIT", str(limit)))
countquery = ' '.join((countquery, "LIMIT", str(limit)))

outfile = '/'.join((self.rawdir,t))

filerowcount = -1
tablerowcount = -1
if not force:
# check local copy. assume that if the # rows are the same, that the table is the same
# TODO may want to fix this assumption
if os.path.exists(outfile):
# get rows in the file
filerowcount = self.file_len(outfile)
logger.info("rows in local file: %s", filerowcount)

# get rows in the table
# tablerowcount=cur.rowcount
cur.execute(countquery)
tablerowcount = cur.fetchone()[0]

if force or filerowcount < 0 or (filerowcount-1) != tablerowcount: # rowcount-1 because there's a header
if force:
logger.info("Forcing download of %s", t)
else:
logger.info("%s local (%d) different from remote (%d); fetching.", t, filerowcount, tablerowcount)
# download the file
logger.info("COMMAND:%s", query)
outputquery = "COPY ({0}) TO STDOUT WITH DELIMITER AS '\t' CSV HEADER".format(query)
with open(outfile, 'w') as f:
cur.copy_expert(outputquery, f)
else:
logger.info("local data same as remote; reusing.")

finally:
if con:
con.close()
return

def fetch_query_from_pgdb(self, qname, query, con, cxn, limit=None, force=False):
"""
Supply either an already established connection, or connection parameters.
The supplied connection will override any separate cxn parameter
:param qname: The name of the query to save the output to
:param query: The SQL query itself
:param con: The already-established connection
:param cxn: The postgres connection information
:param limit: If you only want a subset of rows from the query
:return:
"""
if con is None and cxn is None:
logger.error("ERROR: you need to supply connection information")
return
if con is None and cxn is not None:
con = psycopg2.connect(host=cxn['host'], database=cxn['database'], port=cxn['port'],
user=cxn['user'], password=cxn['password'])

outfile = '/'.join((self.rawdir, qname))
cur = con.cursor()
countquery = ' '.join(("SELECT COUNT(*) FROM (", query, ") x")) # wrap the query to get the count
if limit is not None:
countquery = ' '.join((countquery, "LIMIT", str(limit)))

# check local copy. assume that if the # rows are the same, that the table is the same
filerowcount = -1
tablerowcount = -1
if not force:
if os.path.exists(outfile):
# get rows in the file
filerowcount = self.file_len(outfile)
logger.info("INFO: rows in local file: %s", filerowcount)

# get rows in the table
# tablerowcount=cur.rowcount
cur.execute(countquery)
tablerowcount = cur.fetchone()[0]

if force or filerowcount < 0 or (filerowcount-1) != tablerowcount: # rowcount-1 because there's a header
if force:
logger.info("Forcing download of %s", qname)
else:
logger.info("%s local (%s) different from remote (%s); fetching.", qname, filerowcount, tablerowcount)
# download the file
logger.debug("COMMAND:%s", query)
outputquery = "COPY ({0}) TO STDOUT WITH DELIMITER AS '\t' CSV HEADER".format(query)
with open(outfile, 'w') as f:
cur.copy_expert(outputquery, f)
# Regenerate row count to check integrity
filerowcount = self.file_len(outfile)
if (filerowcount-1) != tablerowcount:
raise Exception("Download from MGI failed, %s != %s", (filerowcount-1), tablerowcount)
else:
logger.info("local data same as remote; reusing.")

return

# TODO generalize this to a set of utils
def _getcols(self, cur, table):
"""
Will execute a pg query to get the column names for the given table.
:param cur:
:param table:
:return:
"""
query = ' '.join(("SELECT * FROM", table, "LIMIT 0")) # for testing

cur.execute(query)
colnames = [desc[0] for desc in cur.description]
logger.info("COLS (%s): %s", table, colnames)

return
136 changes: 2 additions & 134 deletions dipper/sources/Source.py
@@ -1,4 +1,4 @@
import psycopg2
#import psycopg2

__author__ = 'nicole'

Expand Down Expand Up @@ -230,7 +230,7 @@ def checkIfRemoteIsNewer(self, remote, local, headers):
resp_header = response.getheaders()
size = resp_header.get('Content-length')
last_modified = resp_header.get('last-modified') # check me
except :
except:
resp_header = None
size = 0
last_modified = None
Expand Down Expand Up @@ -326,122 +326,6 @@ def fetch_from_url(self, remotefile, localfile, is_dl_forced, headers=None):
logger.info("file created: %s", time.asctime(time.localtime(st[ST_CTIME])))
return

def fetch_from_pgdb(self, tables, cxn, limit=None, force=False):
"""
Will fetch all Postgres tables from the specified database in the cxn connection parameters.
This will save them to a local file named the same as the table, in tab-delimited format, including a header.
:param tables: Names of tables to fetch
:param cxn: database connection details
:param limit: A max row count to fetch for each table
:return: None
"""

con = None
try:
con = psycopg2.connect(host=cxn['host'], database=cxn['database'], port=cxn['port'],
user=cxn['user'], password=cxn['password'])
cur = con.cursor()
for t in tables:
logger.info("Fetching data from table %s", t)
self._getcols(cur, t)
query = ' '.join(("SELECT * FROM", t))
countquery = ' '.join(("SELECT COUNT(*) FROM", t))
if limit is not None:
query = ' '.join((query, "LIMIT", str(limit)))
countquery = ' '.join((countquery, "LIMIT", str(limit)))

outfile = '/'.join((self.rawdir,t))

filerowcount = -1
tablerowcount = -1
if not force:
# check local copy. assume that if the # rows are the same, that the table is the same
# TODO may want to fix this assumption
if os.path.exists(outfile):
# get rows in the file
filerowcount = self.file_len(outfile)
logger.info("rows in local file: %s", filerowcount)

# get rows in the table
# tablerowcount=cur.rowcount
cur.execute(countquery)
tablerowcount = cur.fetchone()[0]

if force or filerowcount < 0 or (filerowcount-1) != tablerowcount: # rowcount-1 because there's a header
if force:
logger.info("Forcing download of %s", t)
else:
logger.info("%s local (%d) different from remote (%d); fetching.", t, filerowcount, tablerowcount)
# download the file
logger.info("COMMAND:%s", query)
outputquery = "COPY ({0}) TO STDOUT WITH DELIMITER AS '\t' CSV HEADER".format(query)
with open(outfile, 'w') as f:
cur.copy_expert(outputquery, f)
else:
logger.info("local data same as remote; reusing.")

finally:
if con:
con.close()
return

def fetch_query_from_pgdb(self, qname, query, con, cxn, limit=None, force=False):
"""
Supply either an already established connection, or connection parameters.
The supplied connection will override any separate cxn parameter
:param qname: The name of the query to save the output to
:param query: The SQL query itself
:param con: The already-established connection
:param cxn: The postgres connection information
:param limit: If you only want a subset of rows from the query
:return:
"""
if con is None and cxn is None:
logger.error("ERROR: you need to supply connection information")
return
if con is None and cxn is not None:
con = psycopg2.connect(host=cxn['host'], database=cxn['database'], port=cxn['port'],
user=cxn['user'], password=cxn['password'])

outfile = '/'.join((self.rawdir, qname))
cur = con.cursor()
countquery = ' '.join(("SELECT COUNT(*) FROM (", query, ") x")) # wrap the query to get the count
if limit is not None:
countquery = ' '.join((countquery, "LIMIT", str(limit)))

# check local copy. assume that if the # rows are the same, that the table is the same
filerowcount = -1
tablerowcount = -1
if not force:
if os.path.exists(outfile):
# get rows in the file
filerowcount = self.file_len(outfile)
logger.info("INFO: rows in local file: %s", filerowcount)

# get rows in the table
# tablerowcount=cur.rowcount
cur.execute(countquery)
tablerowcount = cur.fetchone()[0]

if force or filerowcount < 0 or (filerowcount-1) != tablerowcount: # rowcount-1 because there's a header
if force:
logger.info("Forcing download of %s", qname)
else:
logger.info("%s local (%s) different from remote (%s); fetching.", qname, filerowcount, tablerowcount)
# download the file
logger.debug("COMMAND:%s", query)
outputquery = "COPY ({0}) TO STDOUT WITH DELIMITER AS '\t' CSV HEADER".format(query)
with open(outfile, 'w') as f:
cur.copy_expert(outputquery, f)
# Regenerate row count to check integrity
filerowcount = self.file_len(outfile)
if (filerowcount-1) != tablerowcount:
raise Exception("Download from MGI failed, %s != %s", (filerowcount-1), tablerowcount)
else:
logger.info("local data same as remote; reusing.")

return

def process_xml_table(self, elem, table_name, processing_function, limit):
"""
This is a convenience function to process the elements of an xml document, when the xml is used
Expand Down Expand Up @@ -545,22 +429,6 @@ def compare_local_remote_bytes(self, remotefile, localfile):
local_size, remotefile, remote_size)
return is_equal

# TODO generalize this to a set of utils
def _getcols(self, cur, table):
"""
Will execute a pg query to get the column names for the given table.
:param cur:
:param table:
:return:
"""
query = ' '.join(("SELECT * FROM", table, "LIMIT 0")) # for testing

cur.execute(query)
colnames = [desc[0] for desc in cur.description]
logger.info("COLS (%s): %s", table, colnames)

return

def file_len(self, fname):
with open(fname) as f:
l = sum(1 for line in f)
Expand Down
2 changes: 0 additions & 2 deletions requirements.txt
Expand Up @@ -5,8 +5,6 @@ roman
python-docx
pyyaml
pysftp
biopython
docx
beautifulsoup4
GitPython
intermine

0 comments on commit 6fe840d

Please sign in to comment.