Skip to content

Commit

Permalink
Location import: Load each location as separate background job; bette…
Browse files Browse the repository at this point in the history
…r user experience if doing tons of them.
  • Loading branch information
slinkp committed Apr 20, 2012
1 parent 559da68 commit 303df0f
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 102 deletions.
107 changes: 60 additions & 47 deletions ebpub/ebpub/db/bin/import_locations.py
Expand Up @@ -49,6 +49,7 @@ def populate_ni_loc(location):
cursor = connection.cursor()
# In case the location is not new...
NewsItemLocation.objects.filter(location=location).delete()
old_niloc_count = NewsItemLocation.objects.count()
i = 0
batch_size = 400
while i < ni_count:
Expand All @@ -64,6 +65,8 @@ def populate_ni_loc(location):
""", (i, i + batch_size, location.id))
connection._commit()
i += batch_size
new_count = NewsItemLocation.objects.count()
logger.info("New: %d NewsItemLocations" % (new_count - old_niloc_count))

class LocationImporter(object):
def __init__(self, layer, location_type, source='UNKNOWN', filter_bounds=False, verbose=False):
Expand All @@ -81,61 +84,71 @@ def __init__(self, layer, location_type, source='UNKNOWN', filter_bounds=False,
from ebpub.utils.geodjango import get_default_bounds
self.bounds = get_default_bounds()

def save(self, name_field):
def create_location(self, name, location_type, geom, display_order=0):
source = self.source
locs = []
for feature in self.layer:
name = feature.get(name_field)
geom = feature.geom.transform(4326, True).geos
geom = ensure_valid(geom, name)
geom = flatten_geomcollection(geom)
fields = dict(
name = name,
slug = slugify(name),
location_type = self.get_location_type(feature),
location = geom,
city = self.metro_name,
source = source,
is_public = True,
)
if not self.should_create_location(fields):
continue
locs.append(fields)
if hasattr(geom, 'geos'):
geom = geom.geos
if geom.srid is None:
geom.srid = 4326
elif geom.srid != 4326:
geom = geom.transform(4326, True)
geom = ensure_valid(geom, name)
geom = flatten_geomcollection(geom)
if not isinstance(location_type, int):
location_type = location_type.id
kwargs = dict(
name=name,
slug=slugify(name),
location=geom,
location_type_id=location_type,
city=self.metro_name,
source=source,
is_public=True,
)
if not self.should_create_location(kwargs):
return
kwargs['defaults'] = {
'creation_date': self.now,
'last_mod_date': self.now,
'display_order': display_order,
'normalized_name': normalize(name),
'area': geom.transform(3395, True).area,
}
try:
loc, created = Location.objects.get_or_create(**kwargs)
except IntegrityError:
# Usually this means two towns with the same slug.
# Try to fix that.
slug = kwargs['slug']
existing = Location.objects.filter(slug=slug).count()
if existing:
slug = slugify('%s-%s' % (slug, existing + 1))
logger.info("Munged slug %s to %s to make it unique" % (kwargs['slug'], slug))
kwargs['slug'] = slug
loc, created = Location.objects.get_or_create(**kwargs)
else:
raise

logger.info('%s %s %s' % (created and 'Created' or 'Already had', self.location_type.name, loc))
logger.info('Populating newsitem locations ... ')
populate_ni_loc(loc)
logger.info('done.\n')

return created

def save(self, name_field):
num_created = 0
num_updated = 0
for i, loc_fields in enumerate(sorted(locs, key=lambda h: h['name'])):
kwargs = dict(
loc_fields,
defaults={
'creation_date': self.now,
'last_mod_date': self.now,
'display_order': i,
'normalized_name': normalize(loc_fields['name']),
'area': loc_fields['location'].transform(3395, True).area,
})
try:
loc, created = Location.objects.get_or_create(**kwargs)
except IntegrityError:
# Usually this means two towns with the same slug.
# Try to fix that.
slug = kwargs['slug']
existing = Location.objects.filter(slug=slug).count()
if existing:
slug = slugify('%s-%s' % (slug, existing + 1))
logger.info("Munged slug %s to %s to make it unique" % (kwargs['slug'], slug))
kwargs['slug'] = slug
loc, created = Location.objects.get_or_create(**kwargs)
else:
raise
features = sorted(self.layer, key = lambda f: f.get(name_field))
for i, feature in enumerate(features):
name = feature.get(name_field)
location_type = self.get_location_type(feature)
created = self.create_location(name, location_type, feature.geom, display_order=i)
if created:
num_created += 1
else:
num_updated += 1

logger.info('%s %s %s' % (created and 'Created' or 'Already had', self.location_type.name, loc))
logger.info('Populating newsitem locations ... ')
populate_ni_loc(loc)
logger.info('done.\n')
return (num_created, num_updated)

def should_create_location(self, fields):
Expand Down
63 changes: 22 additions & 41 deletions ebpub/ebpub/db/bin/import_zips.py
Expand Up @@ -23,9 +23,7 @@
"""

from django.contrib.gis.geos import MultiPolygon
from ebpub.db.models import Location, LocationType
from ebpub.utils.geodjango import ensure_valid
from ebpub.utils.geodjango import flatten_geomcollection
from ebpub.db.models import LocationType
from ebpub.db.bin import import_locations
import sys

Expand All @@ -42,22 +40,22 @@ def __init__(self, layer, name_field, source='UNKNOWN', filter_bounds=False, ver
)
self.name_field = name_field
super(ZipImporter, self).__init__(layer, location_type, source, filter_bounds, verbose)
self.zipcodes = {}
self.zipcode_geoms = {}
self.collapse_zip_codes()

def collapse_zip_codes(self):
# The ESRI ZIP Code layer breaks ZIP Codes up along county
# boundaries, so we need to collapse them first before
# proceeding.

if len(self.zipcodes) > 0:
if len(self.zipcode_geoms) > 0:
return

for feature in self.layer:
zipcode = feature.get(self.name_field)
geom = feature.geom.transform(4326, True).geos
if zipcode not in self.zipcodes:
self.zipcodes[zipcode] = geom
if zipcode not in self.zipcode_geoms:
self.zipcode_geoms[zipcode] = geom
else:
# If it's a MultiPolygon geom we're adding to our
# existing geom, we need to "unroll" it into its
Expand All @@ -66,51 +64,30 @@ def collapse_zip_codes(self):
subgeoms = list(geom)
else:
subgeoms = [geom]
existing_geom = self.zipcodes[zipcode]
existing_geom = self.zipcode_geoms[zipcode]
if not isinstance(existing_geom, MultiPolygon):
new_geom = MultiPolygon([existing_geom])
new_geom.extend(subgeoms)
self.zipcodes[zipcode] = new_geom
self.zipcode_geoms[zipcode] = new_geom
else:
existing_geom.extend(subgeoms)

def create_location(self, zipcode, geom, display_order=0):
verbose = self.verbose
source = self.source
geom = ensure_valid(geom, self.location_type.slug)
geom = flatten_geomcollection(geom)
geom.srid = 4326
kwargs = dict(
name = zipcode,
normalized_name = zipcode,
slug = zipcode,
location_type = self.location_type,
location = geom,
city = self.metro_name,
source = source,
is_public = True,
)
if not self.should_create_location(kwargs):
return
kwargs['defaults'] = {'display_order': display_order,
'last_mod_date': self.now,
'creation_date': self.now,
'area': kwargs['location'].transform(3395, True).area,
}
zipcode_obj, created = Location.objects.get_or_create(**kwargs)
if verbose:
print >> sys.stderr, '%s ZIP Code %s ' % (created and 'Created' or 'Already had', zipcode_obj.name)
return created

def import_zip(self, zipcode):
self.create_location(zipcode, self.zipcodes[zipcode])

def import_zip(self, zipcode, display_order=0):
if zipcode not in self.zipcode_geoms:
import_locations.logger.info("Zipcode %s not found in shapefile" % zipcode)
return False
return self.create_location(zipcode, self.location_type,
geom=self.zipcode_geoms[zipcode],
display_order=display_order)

def save(self):
num_created = 0
num_updated = 0
sorted_zipcodes = sorted(self.zipcodes.iteritems(), key=lambda x: int(x[0]))
sorted_zipcodes = sorted(self.zipcode_geoms.iteritems(), key=lambda x: int(x[0]))
for i, (zipcode, geom) in enumerate(sorted_zipcodes):
created = self.create_location(zipcode, geom, display_order=i)
created = self.create_location(zipcode, self.location_type, geom=geom,
display_order=i)
if created:
num_created += 1
else:
Expand All @@ -124,6 +101,10 @@ def parse_args(optparser, argv):
optparser.add_option('-n', '--name-field', dest='name_field', default='ZCTA5CE',
help='field that contains the zipcode\'s name')
opts, args = optparser.parse_args(argv)
if not opts.verbose:
import logging
logging.basicConfig()
import_locations.logger.setLevel(logging.WARN)

if len(args) != 1:
optparser.error('must give path to shapefile')
Expand Down
33 changes: 25 additions & 8 deletions obadmin/obadmin/admin/tasks.py
Expand Up @@ -19,6 +19,7 @@

from background_task import background
from django.conf import settings
from django.contrib.gis.geos import GEOSGeometry
from ebdata.retrieval.retrievers import Retriever
from tempfile import mkdtemp
from zipfile import ZipFile
Expand Down Expand Up @@ -135,20 +136,36 @@ def import_zip_from_shapefile(filename, zipcode):
logger.exception('Zipcode import failed')
return

def import_locations_from_shapefile(shapefile, layer_number, location_type,
name_field, filter_bounds):
# Not in background since this should just quickly create a bunch of
# import_location() jobs.
try:
layer = layer_from_shapefile(shapefile, layer_number)
features = sorted(layer, key = lambda f: f.get(name_field))
for i, feature in enumerate(features):
name = feature.get(name_field)
import_location(shapefile, layer_number, location_type, name, feature.geom.wkt, filter_bounds, display_order=i)
except:
logger.exception("Location import failed")
# Unfortunately, since everythign runs asynchronously,
# we don't know when the shapefile is safe to delete.
# At least it's in $TMPDIR :(


@background
def import_locations_from_shapefile(shapefile, layer_number, location_type, name_field, filter_bounds):
def import_location(shapefile, layer_number, location_type, name, wkt, filter_bounds, display_order):
# Passing WKT because background functions need all their args to
# be json-serializable.
try:
layer = layer_from_shapefile(shapefile, layer_number)
importer = LocationImporter(layer, location_type,
filter_bounds=filter_bounds)
created, updated = importer.save(name_field)
geom = GEOSGeometry(wkt)
importer.create_location(name, location_type, geom,
display_order=display_order)
except:
logger.exception("Location import failed")
return
if created + updated > 0:
# TODO: validate this directory!
shutil.rmtree(os.path.dirname(shapefile))
return True
logger.exception("Location import of %s failed" % name)


@background
Expand Down
26 changes: 23 additions & 3 deletions obadmin/obadmin/admin/templates/obadmin/jobs_status.html
@@ -1,6 +1,26 @@
Current background jobs:
<ol style="list-style-type: none">
<style type="text/css">
.background-jobs .running {
color: #00BB00;
}
.background-jobs .pending {
color: #92B720;
}
.background-jobs {
list-style-type: none;
}
</style>

<h3>Current background jobs:</h3>
<ol class="background-jobs">
{% for job_info in counts %}
<li><b>{{ job_info.running_count }} running, {{ job_info.pending_count }} queued</b> {{ job_info.label }}</li>
<li>
<b>{{ job_info.label }}</b>:
<span {% if job_info.running_count > 0 %} class="running"{% endif %}>
{{ job_info.running_count }} running
</span>,
<span {% if job_info.pending_count > 0 %} class="pending"{% endif %}>
{{ job_info.pending_count }} pending
</span>
</li>
{% endfor %}
</ol>
6 changes: 3 additions & 3 deletions obadmin/obadmin/admin/views.py
Expand Up @@ -196,10 +196,10 @@ def jobs_status(request, appname, modelname):
counts = [
{'label': u'Download state shapefile',
'task': u'obadmin.admin.tasks.download_state_shapefile' },
{'label': u'Import ZIP codes',
{'label': u'Import ZIP code',
'task': u'obadmin.admin.tasks.import_zip_from_shapefile'},
{'label': 'Import Locations',
'task': u'obadmin.admin.tasks.import_locations_from_shapefile'},
{'label': 'Import Location',
'task': u'obadmin.admin.tasks.import_location'},
]
elif appname == 'streets':
# Don't bother discriminating further based on modelname;
Expand Down

0 comments on commit 303df0f

Please sign in to comment.