# Clustering and Scoring Job Relocation Opportunities - ETL Scripts

Austin Rainwater

---

For this notebook, I have created some libraries to assist in building an Extract/Transform/Load Data Pipeline. This pipeline uses asyncio, so that multiple requests to different endpoints can be made at one time. The pipeline allows the full Extract/Transform/Load process to be run concurrently, instead of synchronously running each step and waiting for their results.

The pipeline consists of an abstract `PipelineStep` class. Each step must define a `process_batch(self, batch)` coroutine. 

In [1]:
!pip install --quiet --upgrade sqlalchemy==1.3.22 pymysql==0.9.3 PyYAML aiohttp aiomysql==0.0.21

from etl import PipelineStep, DataPipeline
from warnings import warn
from abc import ABC
import xml.etree.ElementTree as xml
import re
import yaml

import aiohttp
import asyncio
import sqlalchemy as sa

from aiomysql.sa import create_engine

from sqlalchemy import (
    Table,
    Column,
    MetaData,
    String,
    Numeric,
    Integer
)

with open('secrets.yaml', 'r') as secrets_file:
    secrets = yaml.safe_load(secrets_file)

## PipelineStep Subclasses

These are some PipelineStep subclasses I plan to use multiple times because I will be accessing specific APIs.

In [2]:
class WikipediaPipelineStep(PipelineStep, ABC):
    request_counter = 0
    wikipedia_url = 'https://en.wikipedia.org/w/api.php'
    wikipedia_header = {"User-Agent": 
          'datascience jupyter notebook/0.2 '
          '(https://github.com/pacorain/datascience-certification-final-project; '
          'Austin Rainwater, paco@heckin.io)'}
    max_batch_size = 1
    async_batches = True
    
    def start(self):
        self.session = aiohttp.ClientSession()
        super().start()
        
    def stop(self):
        super().stop()
        loop = asyncio.get_running_loop()
        loop.create_task(self.session.close())
    
    async def make_request(self, params):
        async with self.session.get(self.wikipedia_url, params=params, headers=self.wikipedia_header) as request:
            self.__class__.request_counter += 1
            assert 200 <= request.status <= 299
            response = await request.json()
            if params['action'] == 'query' and 'continue' in response:
                warn("A Continue was issued but not handled.")
                # I'm not sure how to integrate this, or if I will even need to.
            return response

In [3]:
class FoursquarePipelineStep(PipelineStep, ABC):
    pass

In [4]:
class DatabasePipelineStep(PipelineStep, ABC):
    """Pipeline step for handle data for the database for this project.
    
    async_batches are disabled here because I just have a simple MariaDB instance, on a single
    disk, so running multiple queries will not likely run any faster--and may even run slower.
    """
    async_batches = False
    max_batch_size = 500
    
    engine = None
    
    def __init__(self):
        super().__init__()
        

loop = asyncio.get_running_loop()
DatabasePipelineStep.engine = await create_engine(**secrets['db_prod'], loop=loop)




# Defining the actual pipeline steps

Now, I can use the code I've written to create a data pipeline. To test, I will start with Fort Wayne, IN like I did in the previous document.

In [5]:
class NormalizeCityNames(WikipediaPipelineStep):
    """First step: take incoming city names and normalize them according to the titles of
    their pages on Wikipedia.

    Changes batch size to 50 since Wikipedia supports _querying_ 50 pages at a time
    """

    batch_size = 49

    async def process_batch(self, city_names):
        params = {
            "action": "query",
            "format": "json",
            "redirects": 1,
            "titles": "|".join(city_names)
        }
        try:
            result = await self.make_request(params)
            response = result['query']
        except KeyError as e:
            raise RuntimeError(f"Response not valid: {result}", city_names)
        if 'redirects' in response:
            for redirect in response['redirects']:
                #TODO: Cache redirects
                yield redirect['to']
        for page in response['pages'].values():
            if "missing" in page.keys():
                warn(f"The city {page['title']} was provided but is not available on Wikipedia, and has been skipped")
            if page['title'] in city_names:
                # Original name was not redirected; yield the original name
                yield page['title']

In [6]:
pipeline = DataPipeline(NormalizeCityNames(), data=['Fort Wayne'])
await pipeline.run()

['Fort Wayne, Indiana']

In [7]:
class ParseTree(WikipediaPipelineStep):
    """
    For each incoming city name, attatch its Wikipedia parsetree
    """
    async def process_batch(self, normalized_city_names):
        for city in normalized_city_names:
            params = {
                "action": "parse",
                "format": "json",
                "redirects": 1,
                "prop": "parsetree",
                "page": city
            }
            raw_response = (await self.make_request(params))['parse']['parsetree']['*']
            response = xml.canonicalize(raw_response, strip_text=True)
            yield (city, xml.fromstring(response))

In [8]:
pipeline = DataPipeline(NormalizeCityNames(), ParseTree(), data=['Fort Wayne'])
await pipeline.run()

[('Fort Wayne, Indiana', <Element 'root' at 0x7fbb86fa3db0>)]

In [29]:
class GetCitiesFromCounties(WikipediaPipelineStep):
    """
    Figure out which of a city's navboxes is for the county, and use that to expand each
    city into the cities in its county.

    Leaves city and tree attached so that the tree for the original city is not obtained twice.
    """
    async def process_batch(self, city_parsetrees):
        for original_city, tree in city_parsetrees:
            seat = None
            cities = None
            state = None
            for template in self.get_navbox_templates(tree):
                raw_response, template_page = await self.get_template_page(template)
                county_root = template_page.find(".//template[title='US county navigation box']")
                state_root = template_page.find(".//template[title='US state navigation box']")
                if county_root is not None:
                    seat = await self.get_seat(county_root)
                    cities = await self.parse_cities(raw_response)
                elif state_root is not None:
                    state = await self.get_state(state_root)
                if seat and state:
                    break
            for city in cities:
                yield (original_city, tree, city, state, seat)

    def get_navbox_templates(self, wiki_page_tree):
        """Finds the topic navigation boxes on the wiki page (usually at the bottom)"""
        navboxes = wiki_page_tree.findall(".//template[title='Navboxes']/part[name='list']/value/template/title")
        return ['Template:{}'.format(elem.text) for elem in navboxes]

    async def get_template_page(self, template):
        """Gets the template that defines the navigation box.
        
        Returns
        -------
        tuple[str, ElementTree]
            The first element is the raw XML returned by the WikiPedia API.
            The second element is the stripped and parsed XML data.
        """
        params = {
            "action": "parse",
            "format": "json",
            "redirects": 1,
            "prop": "parsetree",
            "page": template
        }
        raw_response = (await self.make_request(params))['parse']['parsetree']['*']
        response = xml.canonicalize(raw_response, strip_text=True)
        return raw_response, xml.fromstring(response)
        
    async def get_seat(self, root):
        """Gets the seat (e.g. metropolis) for a specified area from the navbox"""
        seat_name = root.find(".//part[name='seat']/value").text
        params = {
            "action": "query",
            "format": "json",
            "redirects": "1",
            "titles": seat_name
        }
        response = await self.make_request(params)
        page = list(response['query']['pages'].values())[0]
        assert 'missing' not in page
        return page['title']

        
    async def parse_cities(self, raw_response_txt):
        """Using the raw XML, gets the cities from the navbox because they will be formatted as a list."""
        listed_city = re.compile(r"""
            ^\* \ *       # Line starts with "*" plus any number of spaces
            \[{2}         # Start of link "[["
                ([^\|]+)  # First part of link (between "[[" and "|"). This is the part that gets captured.
                \|        # Separator "|"
                [^\|]+    # Second part of link (between "|" and "]]")
            \]{2}‡?       # End of link "]]" plus optional ‡ character
            \ *$          # End with any number of spaces
        """, re.VERBOSE + re.MULTILINE)
        return listed_city.findall(raw_response_txt)
    
    async def get_state(self, root):
        """Gets the canonical state name for a city from the navbox"""
        return root.find(".//part[name='template_name']/value").text

In [30]:
pipeline = DataPipeline(NormalizeCityNames(), ParseTree(), GetCitiesFromCounties(), data=['Fort Wayne'])
await pipeline.run()
pipeline.results[:3]

[('Fort Wayne, Indiana',
  <Element 'root' at 0x7fbb7ab28c20>,
  'Fort Wayne, Indiana',
  'Indiana',
  'Fort Wayne, Indiana'),
 ('Fort Wayne, Indiana',
  <Element 'root' at 0x7fbb7ab28c20>,
  'New Haven, Indiana',
  'Indiana',
  'Fort Wayne, Indiana'),
 ('Fort Wayne, Indiana',
  <Element 'root' at 0x7fbb7ab28c20>,
  'Woodburn, Indiana',
  'Indiana',
  'Fort Wayne, Indiana')]

In [31]:
normalize = NormalizeCityNames()
parse = ParseTree()

class SecondNormalizeStep(NormalizeCityNames):
    """Subclass of NormalizeCityNames, which is modified to accept and yield back the city's county seat"""
    max_batch_size = 49
    def __init__(self, first_step):
        super().__init__()
        self._duplicate_cache = first_step._duplicate_cache
    
    async def process_batch(self, batch):
        cities, states, seats = zip(*batch)
        i = 0
        async for normalized_city in super().process_batch(cities):
            yield normalized_city, states[i], seats[i]
            i += 1
            
    def is_duplicate(self, record):
        city = record[0]
        return super().is_duplicate(city)
        
    
class SecondParseTreeStep(ParseTree):
    """Subclass of ParseTree, which is modified to accept and yield back the city's county seat"""
    def __init__(self, first_step):
        super().__init__()
        self._duplicate_cache = first_step._duplicate_cache
        
    async def process_batch(self, batch):
        norm_cities, states, seats = zip(*batch)
        i = 0
        async for city_name, tree in super().process_batch(norm_cities):
            yield city_name, states[i], seats[i], tree
            
    def is_duplicate(self, record):
        city = record[0]
        return super().is_duplicate(city)
        

class NewCitySplit(PipelineStep):
    """Runs the same processing on new cities in the county.
    
    It's designed specifically not to process the original city by creating a "split":
    
        incoming city matches original?
          |          \
         yes          no
          |            \
          |             |
          |             v
          |         normalize
          |             |
          |             v
          |           parse
          |             |
          v             v
         yield        yield
           \            /
            \          /
             \        /
             |       |
             v       v
         NewCitySplit.outputs
    
    """
    def __init__(self, normalize_step, parse_step):
        super(NewCitySplit, self).__init__()
        self.normalize_step = SecondNormalizeStep(normalize_step)
        self.parse_step = SecondParseTreeStep(parse_step)
        self.normalize_step.attach(self.parse_step)
        self.parse_step.outputs = self.outputs
    
    async def process_batch(self, batch):
        for original_city, tree, city, state, seat in batch:
            if city == original_city:
                yield city, state, seat, tree
            else:
                self.normalize_step.put((city, state, seat))
        # Don't mark this task as "complete" until the pipeline step are done
        await self.normalize_step.join()
        await self.parse_step.join()
    
    def start(self):
        super().start()
        self.normalize_step.start()
        self.parse_step.start()
        
        
    def stop(self):
        super().stop()
        self.normalize_step.stop()
        self.parse_step.stop()
                
        

In [32]:
normalize = NormalizeCityNames()
parse = ParseTree()

pipeline = DataPipeline(normalize, parse, GetCitiesFromCounties(), NewCitySplit(normalize, parse), data=['Fort Wayne, IN'])
wikipedia_results = await pipeline.run()
wikipedia_results[:5]

[('Fort Wayne, Indiana',
  'Indiana',
  'Fort Wayne, Indiana',
  <Element 'root' at 0x7fbb7aabeb30>),
 ('Aboite, Indiana',
  'Indiana',
  'Fort Wayne, Indiana',
  <Element 'root' at 0x7fbb7a4ecae0>),
 ('Aboite Township, Allen County, Indiana',
  'Indiana',
  'Fort Wayne, Indiana',
  <Element 'root' at 0x7fbb7a49ab80>),
 ('Academie, Indiana',
  'Indiana',
  'Fort Wayne, Indiana',
  <Element 'root' at 0x7fbb7a4a4b30>),
 ('Adams Township, Allen County, Indiana',
  'Indiana',
  'Fort Wayne, Indiana',
  <Element 'root' at 0x7fbb7a454040>)]

In [37]:
class WikiParsing(PipelineStep):
    """
    Extracts data from XML ElementTree to put into database or use with FourSquare API.
    
    Methods
    -------
    put(record):
        Adds a record. The record should be a Tuple of the following elements:
        
            1. city (string): The canonical name of the city; e.g. `'New Haven, Indiana'`
            2. state (string): The name of the state; e.g. `'Indiana'`
            3. seat (string): The county seat; e.g. `'Fort Wayne, Indiana'`
            4. tree (xml.etree.ElementTree): The parsed XML from Wikipedia for the city
    """
    async_batches = True
    batch_size = None
    
    async def process_batch(self, cities):
        for city, state, seat, tree in cites:
            latitude, longitude = self.parse_settlement_coords(tree)
            yield (city, {
                'city_name': city,
                'metro_name': seat,
                'state_name': state,
                'center_latitude': latitude,
                'center_longitude': longitude,
                'area_val': float(self.infobox_value(tree, "area_total_sq_mi")),
                'city_population': int(self.infobox_value(tree, "population_est")),
                'population_density': float(self.infobox_value(tree, "population_density_sq_mi"))
            }, self.get_weather_table(tree) if city == seat else None, self.get_population_history(tree))
            
    def is_duplicate(self, record):
        city = record[0]
        return super().is_duplicate(city)
    
    def parse_settlement_coordinates(wiki_data):
        coords = wiki_data.findall(".//part[name='coordinates']/value/template[title='coord']/part/value")
        # Convert from DMS (degrees, minutes, seconds) to Decimal
        lat_deg, lat_min, lat_sec, lat_pole, lng_deg, lng_min, lng_sec, lng_pole = [x.text for x in coords[:8]]
        lat_sign = 1 if lat_pole == 'N' else -1
        lng_sign = 1 if lng_pole == 'E' else -1
        latitude = sum([float(lat_deg), float(lat_min)/60.0, float(lat_sec)/3600.0]) * lat_sign
        longitude = sum([float(lng_deg), float(lng_min)/60.0, float(lng_sec)/3600.0]) * lng_sign
        return latitude, longitude
    
    def infobox_value(self, tree, part_name):
        template = wiki_data.find(".//template[title='Infobox settlement']")
        return template.find(".part[name='{}'].value".format(part_name)).text
    
    def get_weather_table(self, tree):
        weather_box = wiki_data.find(".//template[title='Weather box']")
        if weather_box is None:
            return None
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec', 'year']
        stat_names = {
            'record high F': 'record_high',
            'avg record high F': 'avg_record_high',
            'high F': 'avg_high',
            'low F': 'avg_low',
            'avg record low F': 'avg_record_low',
            'record low F': 'record_low',
            'precipitation inch': 'avg_precip',
            'snow inch': 'avg_snow',
            'precipitation days': 'precip_days',
            'snow days': 'snow_days',
            'sun': 'sunshine_hours',
            'percentsun': 'daily_sunshine'
        }

        series_list = []
        for month in months:
            data = {}
            for stat in stat_names.keys():
                elem = weather_box.find(f".//part[name='{month} {stat}'].value")
                if elem is None:
                    val = np.nan
                else:
                    val = float(elem.text.replace('−', '-')) # The dashes in the data are not standard dashes for some reason
                data[stat_names[stat]] = val
            series_list.append(
                pd.Series(data=data.values(), index=data.keys(), name=month)
            )
        return pd.DataFrame(series_list)
    
    def get_population_history(self, tree):
        census = tree.find(".//template[title='US Census population']")
        if census is None:
            return

        data = []
        index = []

        for part in census.findall("part"):
            year = part.find("name").text
            if year.isnumeric() and len(year) == 4:
                data.append(int(part.find("value").text))
                index.append(year)

        return pd.Series(
            data, 
            pd.MultiIndex.from_arrays(
                [[city_name] * len(index), index], names=['city', 'year']
            ), 
            name='census_population'
        )

In [None]:
class WikipediaDatabaseStep(DatabasePipelineStep):
    async def process_batch(self, batch):
        citites, city_records, weather_tables, population_tables = zip(*batch)
        for city in citites:
            yield city
        await self.insert_citites(city_records)
        await self.insert_weather_data(citites, weather_tables)
        await self.insert_population_history(citites, population_tables)
        
    meta = MetaData()
        
    city_table = Table("citites", self.meta,
        Column('city_name', String(50), primary_key=True, comment='City Name'),
        Column('metro_name', String(50), comment='Metropolitan Area Name'),
        Column('state_name', String(25), nullable=False, comment='State Name'),
        Column('center_latitude', Numeric(10, 6), nullable=False, comment='Latitude of City center'),
        Column('center_longitude', Numeric(10, 6), nullable=False, comment='Longitude of City center'),
        Column('area_val', Numeric(10, 4), nullable=False, comment='Area of city in square miles'),
        Column('city_population', Integer, nullable=False, comment='Total population of city'),
        Column('population_density', Numeric(10, 4), comment='Population Density per square mile')
    )
    
    weather_table = Table("metro_weather_data")
        
    async def insert_cities(self, city_records):
        async with self.engine.acquire() as conn:
            await conn.execute(
                self.city_table.insert(),
                city_records
            )
    
    async def insert_weather_data(self, cities, weather_tables):
        dfs = dict(cities, weather_tables)