In [1]:
import os
import bs4
import re
from urllib.request import urlopen

import numpy as np
import pandas as pd

In [2]:
# create folder structure
fp = os.path.join('.', 'data', '1_wiki-raw')
if not os.path.isdir(fp):
    os.makedirs(fp)

In [3]:
# create dictionary of cities and wiki urls
city_urls = {
    'new_york': 'https://en.wikipedia.org/wiki/List_of_Michelin_starred_restaurants_in_New_York_City',
    'chicago': 'https://en.wikipedia.org/wiki/List_of_Michelin_starred_restaurants_in_Chicago',
    'san_francisco': 'https://en.wikipedia.org/wiki/List_of_Michelin_starred_restaurants_in_San_Francisco_Bay_Area',
    'washington_dc': 'https://en.wikipedia.org/wiki/List_of_Michelin_starred_restaurants_in_Washington,_D.C.',
#     'los_angeles': 'https://en.wikipedia.org/wiki/List_of_Michelin_starred_restaurants_in_Los_Angeles',
}

In [4]:
# save lists of US Michelin-starred restaurants to disk
for city_name in city_urls:
    url = city_urls[city_name]
    html = urlopen(url)
    page_content = html.read()
    export_fp = os.path.join(fp, city_name + '_page_content.html')
    with open(export_fp, 'wb') as fid:
         fid.write(page_content)

In [5]:
def wiki_table_to_df(table, city_name):
    '''
    Takes a Wikipedia HTML table of a city's Michelin starred restaurants by year 
    and converts the table to a dataframe, with a city_name column denoting
    the name of the city
    '''
    col_list = ['city_name']
    year_regex = re.compile(r'^\d{4}') # matches first 4 digits for text that starts with 4 digits

    # get list of column names and years from table
    for col_header in table.find_all('th'):
        header_text = col_header.text.strip('\n')
        mo = year_regex.search(header_text)
        if mo is None:
            col_list.append(header_text)
        else:
            col_list.append(mo.group())
    
    # get list of years (only applicable for Washington DC)
    for cell in table.find_all('td'):
        cell_text = cell.text
        mo = year_regex.search(cell_text)
        if mo is None:
            pass
        else:
            col_list.append(mo.group())
    
    # get number of rows
    row_num = len([row for row in table.find_all('tr')])
    
    # create a DataFrame to store info
    df = pd.DataFrame(columns=col_list, index=range(0, row_num-1)) 

    # get the text and star rating for each cell
    row_counter = 0
    for row in table.find_all('tr')[1:]:
        column_counter = 1
        columns = row.find_all('td')
        for column in columns:
            if column.get('colspan'):
                width = int(column.get('colspan'))
                for i in range(0,width):
                    df.iat[row_counter,column_counter] = 'Closed'
                    column_counter+=1
            else:
                stars = [img['alt'] for img in column.find_all('img')] # star rating is captured in alt text
                cell_text = column.get_text().strip('\n')
                df.iat[row_counter,column_counter] = cell_text + ''.join(stars).strip('\n')
                column_counter += 1
        row_counter += 1
    
    # fill in the city_name
    df['city_name'] = city_name.replace('_', ' ')
    
    return df

In [6]:
# create dataframes from wiki tables
dfs = []
for city_name in city_urls:
    with open(os.path.join(fp, city_name + '_page_content.html')) as html:
        soup = bs4.BeautifulSoup(html)
        table = soup.find_all(name='table')
        if city_name == 'san_francisco': # sf has two separate tables that need to be merged
            sf1 = wiki_table_to_df(table[0], city_name)
            sf2 = wiki_table_to_df(table[1], city_name)
            sf_merged = pd.merge(sf2, sf1, how='outer', on=['city_name', 'Name'])
            sf_merged.insert(2, 'Neighborhood/City', sf_merged['Neighborhood/City_x'].combine_first(sf_merged['Neighborhood/City_y']))
            sf_merged.drop(['Neighborhood/City_x', 'Neighborhood/City_y'], axis=1, inplace=True)
            dfs.append(sf_merged)
        else:
            dfs.append(wiki_table_to_df(table[0], city_name))

In [7]:
# clean values of closed restaurants, once a restaurant closes, assume it remains closed
def impute_closed(df):
    '''
    Takes a dataframe with restaurant star ratings by year and imputes:
        (1) 'Closed' for years after a restaurant has been marked as Closed
        (2) NaN for cells that are empty strings
    '''
    closed_regex = re.compile(r'(?i)closed')
    row_counter = 0
    for row in range(0, df.shape[0]):
        column_counter = 0
        for column in range(0, df.shape[1]):
            cell_text = str(df.iat[row_counter, column_counter])
            left_cell_text = str(df.iat[row_counter, max(column_counter-1, 0)])
            # if cell contains closed then change cell value to 'Closed'
            if closed_regex.search(cell_text) is not None:
                df.iat[row_counter, column_counter] = 'Closed'
            elif left_cell_text == 'Closed':
                df.iat[row_counter, column_counter] = 'Closed'
            # change cells with empty strings to nan
            elif not(cell_text):
                df.iat[row_counter, column_counter] = np.nan
            column_counter += 1
        row_counter +=1    
    return df
    
for i in range(0, len(dfs)):
    dfs[i] = impute_closed(dfs[i])
    dfs[i].replace([np.nan, '1 star', '2 stars', '3 stars'], [0, 1, 2, 3], inplace=True)
    dfs[i].drop(dfs[i].columns[2], axis=1, inplace=True)
    dfs[i] = dfs[i].melt(id_vars=['city_name', 'Name'], var_name='year')

In [8]:
def union_dfs(df_list):
    '''
    Takes a list of dataframes and returns the union of the dataframes
    '''
    final_df = pd.DataFrame()
    for df in df_list:
        final_df = final_df.append(df)
    return final_df

In [9]:
# union city dataframes, sort values, rename columns
star_df = union_dfs(df_list=dfs).sort_values(['city_name', 'Name', 'year']).reset_index(drop=True)
star_df.columns = ['city_name', 'rest_name', 'year', 'stars']

In [10]:
# get unique city and restaurant name combinations for web scraping
rest_df = star_df[['city_name', 'rest_name']].drop_duplicates()

In [11]:
# save interim data to disk
export_folder = os.path.join('.', 'data', '2_wiki-interim')
if not os.path.isdir(export_folder):
    os.mkdir(export_folder)

star_df.to_csv(os.path.join(export_folder, 'stars.csv'), index=False)
rest_df.to_csv(os.path.join(export_folder, 'restaurants.csv'), index=False)