Skip to content

Commit

Permalink
Decouple property computation from aggregation [Resolves #125] (#140)
Browse files Browse the repository at this point in the history
A lasting problem has been the fragility of large aggregation jobs, that tend to require the coordination of CPU or memory-intensive jobs. Traditionally we have only save the aggregated results, which prevents us from reusing computation for different aggregations, or reusing a half-done list of computed properties, among many other problems.

Here we introduce The skills_ml.jon_postings.computed_properties module, which is split into two parts: computers (computing properties on collections), and aggregators (aggregating them).

The computers save data indexed by job posting to daily-partitioned storage on s3. This is cached per job posting id, so if a large collection craps out halfway through, the computation can be reused.

In addition, I'm trying to get the interface between Airflow and skills-ml right. I want to put as little code in Airflow as possible, in large part because it's very tough to unit test anything in Airflow. Just the same, we do want to be able to configure aggregations in Airflow. If we decide that now we want to create a new tabular dataset that mixes properties together in a different way, or increase the # of top skills that are present, this should be a simple change in Airflow. The result is a bit of a complex dance between the JobPostingPropertyComputers and their aggregators: the Computers define compatible aggregator functions that work with their output, and this is enforced by unit test. But aggregators can choose which ones they use for a particular aggregation (should be decided in Airflow).

To wrap up, there is now an example of a basic computation and aggregation task, runnable without any dependencies, that is basically a mini versionof the Data@Work Research Hub.

There was also a moto problem with the introduction of boto3, which I was actually able to fix by converting the geocoder and CBSA finders to the new S3BackedJsonDict class. They had implemented their own version using boto2, but I was able to remove a bunch of custom code and tests, as well as fix Travis, by switching them over.
  • Loading branch information
thcrock committed Mar 22, 2018
1 parent e22e566 commit 54fca66
Show file tree
Hide file tree
Showing 15 changed files with 1,168 additions and 203 deletions.
13 changes: 8 additions & 5 deletions docs/pydocmd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ generate:
skills_ml.evaluation.representativeness_calculators.geo_occupation+]
- skills_ml.job_postings.md: [skills_ml.job_postings.aggregate+, skills_ml.job_postings.aggregate.dataset_transform+,
skills_ml.job_postings.aggregate.field_values+, skills_ml.job_postings.aggregate.geo+,
skills_ml.job_postings.aggregate.soc_code+, skills_ml.job_postings.aggregate.title+,
skills_ml.job_postings.common_schema+, skills_ml.job_postings.corpora+, skills_ml.job_postings.corpora.basic+,
skills_ml.job_postings.geography_queriers+, skills_ml.job_postings.geography_queriers.cbsa+,
skills_ml.job_postings.geography_queriers.cbsa_from_geocode+, skills_ml.job_postings.raw+,
skills_ml.job_postings.raw.usajobs+, skills_ml.job_postings.raw.virginia+, skills_ml.job_postings.sample+]
skills_ml.job_postings.aggregate.pandas+, skills_ml.job_postings.aggregate.soc_code+,
skills_ml.job_postings.aggregate.title+, skills_ml.job_postings.common_schema+,
skills_ml.job_postings.computed_properties+, skills_ml.job_postings.computed_properties.aggregators+,
skills_ml.job_postings.computed_properties.computers+, skills_ml.job_postings.corpora+,
skills_ml.job_postings.corpora.basic+, skills_ml.job_postings.geography_queriers+,
skills_ml.job_postings.geography_queriers.cbsa+, skills_ml.job_postings.geography_queriers.cbsa_from_geocode+,
skills_ml.job_postings.raw+, skills_ml.job_postings.raw.usajobs+, skills_ml.job_postings.raw.virginia+,
skills_ml.job_postings.sample+]
gens_dir: _build/pydocmd
loader: pydocmd.loader.PythonLoader
markdown_extensions:
Expand Down
14 changes: 14 additions & 0 deletions docs/sources/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,17 @@ To showcase the interface of training a word2vec embedding model in an online ba
- A list of quarters for creating the corpus from job posting data
- A trainer object that specifies some parameters of source, s3 path, batch size, model type ...etc.
- The train method takes whatever arugments `gensim.models.word2vec.Word2Vec` or `gensim.model.doc2vec.Doc2Vec` has

## [Compute and Aggregate Properties of Job Postings as a Tabular Dataset](https://github.com/workforce-data-initiative/skills-ml/blob/master/examples/ComputeAndAggregateJobPostingProperties.py)

To show job posting property computation and aggregation,
we calculate job posting counts by cleaned title, and upload
the resulting CSV to S3.

This is essentially a mini version of the Data@Work Research Hub.

To enable this example to be run with as few dependencies as possible, we use:

- a fake local s3 instance
- a sample of the Virginia Tech open job postings dataset
- only title cleaning and job counting.
106 changes: 106 additions & 0 deletions examples/ComputeAndAggregateJobPostingProperties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Computing and aggregating job posting properties
To show job posting property computation and aggregation,
we calculate job posting counts by cleaned title, and upload
the resulting CSV to S3.
This is essentially a mini version of the Data@Work Research Hub.
To enable this example to be run with as few dependencies as possible, we use:
- a fake local s3 instance
- a sample of the Virginia Tech open job postings dataset
- only title cleaning and job counting.
To make this example a little bit more interesting, one could incorporate more
classes from the job_posting_properties.computers module, such as skill extractors or geocoders.
"""
import json
import logging
import urllib.request

from skills_ml.job_postings.computed_properties.computers import\
TitleCleanPhaseOne, PostingIdPresent
from skills_ml.job_postings.computed_properties.aggregators import\
aggregate_properties_for_quarter

from moto import mock_s3
import boto3
import s3fs
import unicodecsv as csv
import numpy

logging.basicConfig(level=logging.INFO)

VT_DATASET_URL = 'http://opendata.cs.vt.edu/dataset/ab0abac3-2293-4c9d-8d80-22d450254389/resource/9a810771-d6c9-43a8-93bd-144678cbdd4a/download/openjobs-jobpostings.mar-2016.json'


logging.info('Downloading sample Virginia Tech open jobs file')
response = urllib.request.urlopen(VT_DATASET_URL)
string = response.read().decode('utf-8')
logging.info('Download complete')
lines = string.split('\n')
logging.info('Found %s job posting lines', len(lines))

with mock_s3():
client = boto3.resource('s3')
client.create_bucket(Bucket='test-bucket')
computed_properties_path = 's3://test-bucket/computed_properties'
job_postings = []

for line in lines:
try:
job_postings.append(json.loads(line))
except ValueError:
# Some rows in the dataset are not valid, just skip them
logging.warning('Could not decode JSON')
continue

# Create properties. In this example, we are going to both compute and aggregate,
# but this is not necessary! Computation and aggregation are entirely decoupled.
# So it's entirely valid to just compute a bunch of properties and then later
# figure out how you want to aggregate them.
# We are only introducing the 'grouping' and 'aggregate' semantics this early in the
# script so as to avoid defining these properties twice in the same script.

# create properties to be grouped on. In this case, we want to group on cleaned job title
grouping_properties = [
TitleCleanPhaseOne(path=computed_properties_path),
]
# create properties to aggregate for each group
aggregate_properties = [
PostingIdPresent(path=computed_properties_path),
]

# Regardless of their role in the final dataset, we need to compute
# all properties from the dataset. Since the computed properties
# partition their S3 caches by day, for optimum performance one
# could parallelize each property's computation by a day's worth of postings
# But to keep it simple for this example, we are going to just runin a loop
for cp in grouping_properties + aggregate_properties:
logging.info('Computing property %s for %s job postings', cp, len(job_postings))
cp.compute_on_collection(job_postings)

# Now that the time consuming computation is done, we aggregate,
# choosing an aggregate function for each aggregate column.
# Here, the 'posting id present' property just emits the number 1,
# so numpy.sum gives us a count of job postings
# Many other properties, like skill counts, will commonly use
# an aggregate function like 'most common'.
# A selection is available in skills_ml.algorithms.aggregators.pandas
logging.info('Aggregating properties')
aggregate_path = aggregate_properties_for_quarter(
quarter='2016Q1',
grouping_properties=grouping_properties,
aggregate_properties=aggregate_properties,
aggregate_functions={'posting_id_present': [numpy.sum]},
aggregations_path='s3://test-bucket/aggregated_properties',
aggregation_name='title_state_counts'
)

s3 = s3fs.S3FileSystem()
logging.info('Logging all rows in aggregate file')
with s3.open(aggregate_path, 'rb') as f:
reader = csv.reader(f)
for row in reader:
logging.info(row)
2 changes: 1 addition & 1 deletion sample_job_listing.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"incentiveCompensation": "", "experienceRequirements": "Here are some experience and requirements", "baseSalary": {"maxValue": 0.0, "@type": "MonetaryAmount", "minValue": 0.0}, "description": "We are looking for a person to fill this job", "title": "Bilingual (Italian) Customer Service Rep (Work from Home)", "employmentType": "Full-Time", "industry": "Call Center / SSO / BPO, Consulting, Sales - Marketing", "occupationalCategory": "", "qualifications": "Here are some qualifications", "educationRequirements": "Not Specified", "skills": "Customer Service, Consultant, Entry Level", "validThrough": "2014-02-05T00:00:00", "jobLocation": {"@type": "Place", "address": {"addressLocality": "Salisbury", "addressRegion": "PA", "@type": "PostalAddress"}}, "@context": "http://schema.org", "alternateName": "Customer Service Representative", "datePosted": "2013-03-07", "@type": "JobPosting"}
{"incentiveCompensation": "", "experienceRequirements": "Here are some experience and requirements", "baseSalary": {"maxValue": 0.0, "@type": "MonetaryAmount", "minValue": 0.0}, "description": "We are looking for a person to fill this job", "title": "Bilingual (Italian) Customer Service Rep (Work from Home)", "employmentType": "Full-Time", "industry": "Call Center / SSO / BPO, Consulting, Sales - Marketing", "occupationalCategory": "", "qualifications": "Here are some qualifications", "educationRequirements": "Not Specified", "skills": "Customer Service, Consultant, Entry Level", "validThrough": "2014-02-05T00:00:00", "jobLocation": {"@type": "Place", "address": {"addressLocality": "Salisbury", "addressRegion": "PA", "@type": "PostalAddress"}}, "@context": "http://schema.org", "alternateName": "Customer Service Representative", "datePosted": "2013-03-07", "@type": "JobPosting", "id": "1"}
47 changes: 4 additions & 43 deletions skills_ml/algorithms/geocoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import boto
import traceback

from skills_utils.s3 import split_s3_path
from skills_utils.s3 import split_s3_path, S3BackedJsonDict



Expand All @@ -22,41 +22,14 @@ class S3CachedGeocoder(object):
"""
def __init__(
self,
s3_conn,
cache_s3_path,
geocode_func=geocoder.osm,
sleep_time=1,
):
self.s3_conn = s3_conn
self.cache_s3_path = cache_s3_path
self.geocode_func = geocode_func
self.sleep_time = sleep_time
self.cache = None

@property
def _key(self):
bucket_name, path = split_s3_path(self.cache_s3_path)
return boto.s3.key.Key(
bucket=self.s3_conn.get_bucket(bucket_name),
name=path
)

def _load(self):
try:
cache_json = json.loads(self._key.get_contents_as_string().decode('utf-8'))
self.cache = cache_json
# results for a new file can be None instead of empty dict,
# so explicitly handle that case
if not self.cache:
self.cache = {}

except boto.exception.S3ResponseError as e:
logging.warning(
'Geocoder cachefile load failed with exception %s,' +
'will overwrite', e
)
self.cache = {}
assert isinstance(self.cache, dict)
self.cache = S3BackedJsonDict(path=self.cache_s3_path)

def retrieve_from_cache(self, search_strings):
"""Retrieve a saved geocode result from the cache if it exists
Expand All @@ -69,8 +42,6 @@ def retrieve_from_cache(self, search_strings):
job_posting (string) A job posting in schema.org/JobPosting json form
Returns: (string) The geocoding result, or None if none is available
"""
if not self.cache:
self._load()
cache_results = [
self.cache.get(search_string, None)
for search_string in search_strings
Expand All @@ -93,8 +64,6 @@ def geocode(self, search_string):
search_string (string) A search query to send to the geocoder
Returns: (string) The geocoding result
"""
if not self.cache:
self._load()
if search_string not in self.cache:
logging.info('%s not found in cache, geocoding', search_string)
self.cache[search_string] = self.geocode_func(search_string).json
Expand All @@ -103,7 +72,7 @@ def geocode(self, search_string):

def save(self):
"""Save the geocoding cache to S3"""
self._key.set_contents_from_string(json.dumps(self.cache))
self.cache.save()
logging.info(
'Successfully saved geocoding cache to %s',
self.cache_s3_path
Expand All @@ -115,8 +84,6 @@ def all_cached_geocodes(self):
Returns: (dict) search strings mapping to their (dict) geocoded results
"""
if not self.cache:
self._load()
return self.cache

def geocode_search_strings_and_save(self, search_strings, save_every=100000):
Expand All @@ -128,17 +95,11 @@ def geocode_search_strings_and_save(self, search_strings, save_every=100000):
Defaults to every 100000 job postings
"""
processed = 0
skipped = 0
try:
for i, search_string in enumerate(search_strings):
self.geocode(search_string)
processed += 1
if i % save_every == 0:
logging.info(
'Geocoding update: %s total, %s cache size',
i,
len(self.cache.keys())
)
self.save()

except Exception:
logging.error('Quitting geocoding due to %s', traceback.format_exc())
Expand Down
72 changes: 10 additions & 62 deletions skills_ml/algorithms/geocoders/cbsa.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import logging

import boto

import s3fs
import shapely.geometry
import fiona

from skills_ml.datasets.cbsa_shapefile import download_shapefile
from skills_utils.s3 import split_s3_path
from skills_utils.s3 import split_s3_path, S3BackedJsonDict

Match = namedtuple('Match', ['index', 'area'])

Expand All @@ -24,7 +24,7 @@ class S3CachedCBSAFinder(object):
provides S3 caching. A minimal call looks like
```python
cbsa_finder = S3CachedCBSAFinder(s3_conn=..., cache_s3_path='some-bucket/cbsas.json')
cbsa_finder = S3CachedCBSAFinder(cache_s3_path='some-bucket/cbsas.json')
cbsa_finder.find_all_cbsas_and_save({
"Flushing, NY": { 'bbox': ['southwest': [..., ...], 'northeast': [...,...] }
"Houston, TX": { 'bbox': ['southwest': [..., ...], 'northeast': [...,...] }
Expand All @@ -40,8 +40,7 @@ class S3CachedCBSAFinder(object):
only one copy of `find_all_cbsas_and_save` at a time to avoid overwriting
the S3 cache file.
Args:
s3_conn (boto.s3.connection) an s3 connection
Args:
cache_s3_path (string) path (including bucket) to the json cache on s3
shapefile_name (string) local path to a CBSA shapefile to use
optional, will download TIGER 2015 shapefile if absent
Expand All @@ -51,62 +50,26 @@ class S3CachedCBSAFinder(object):
"""
def __init__(
self,
s3_conn,
cache_s3_path,
shapefile_name=None,
cache_dir=None
):
self.s3_conn = s3_conn
self.cache_s3_path = cache_s3_path
self.shapes = []
self.properties = []
self.cache = None
self.cache_original_size = 0
self.shapefile_name = shapefile_name or download_shapefile(cache_dir or 'tmp')

@property
def _key(self):
bucket_name, path = split_s3_path(self.cache_s3_path)
return boto.s3.key.Key(
bucket=self.s3_conn.get_bucket(bucket_name),
name=path
)

def _load_cache(self):
"""Load the result cache into memory"""
try:
cache_json = self._key.get_contents_as_string().decode('utf-8')
self.cache = json.loads(cache_json)
if not self.cache:
self.cache = {}
self.cache_original_size = len(cache_json)
except boto.exception.S3ResponseError as e:
logging.warning(
'CBSA finder cachefile load failed with exception %s,' +
'will overwrite', e
)
self.cache = {}
self.cache = S3BackedJsonDict(path=self.cache_s3_path)
self.cache_dir = cache_dir
self.shapefile_name = shapefile_name

def _load_shapefile(self):
"""Load the CBSA Shapefile into memory"""
if not self.shapefile_name:
download_shapefile(self.cache_dir or 'tmp')
with fiona.collection(self.shapefile_name) as input:
for row in input:
self.shapes.append(shapely.geometry.shape(row['geometry']))
self.properties.append(row['properties'])

try:
cache_json = self._key.get_contents_as_string().decode('utf-8')
self.cache = json.loads(cache_json)
if not self.cache:
self.cache = {}
self.cache_original_size = len(cache_json)
except boto.exception.S3ResponseError as e:
logging.warning(
'CBSA finder cachefile load failed with exception %s,' +
'will overwrite', e
)
self.cache = {}

def query(self, geocode_result):
"""Find the geographically closest CBSA to the given geocode result
Expand Down Expand Up @@ -172,27 +135,12 @@ def find_all_cbsas_and_save(self, geocode_results):

def save(self):
"""Save the cbsa finding cache to S3"""
cache_json = json.dumps(self.cache)
if len(cache_json) >= self.cache_original_size:
new_cache_json = json.dumps(self.cache)
self._key.set_contents_from_string(new_cache_json)
logging.info(
'Successfully saved cbsa finding cache to %s',
self.cache_s3_path
)
else:
logging.error(
'New cache size: %s smaller than existing cache size: %s, aborting',
len(cache_json),
self.cache_original_size
)
self.cache.save()

@property
def all_cached_cbsa_results(self):
"""Return the contents of the cache
Returns: (dict) search strings mapping to their (tuple) results
"""
if not self.cache:
self._load_cache()
return self.cache
Loading

0 comments on commit 54fca66

Please sign in to comment.