In [None]:
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support import expected_conditions as EC
import time

class BlinkParser():
    CHROME_PATH = "/Users/ik/bin/chromedriver"
    DIRECTORY_URL = 'https://locations.blinkfitness.com/index.html'
    
    branch_status_dict = {
        0: ['Open Now - Closes at'],
        1: ['Opening Soon'], 
        2: ['Temporarily Closed', 'Closed'],
        3: ['Closed - Opens at']}

    def __init__(self):
        self.driver = None
        self.branch_directory_urls = []
        self.branch_info = []
        
    def load_chromedriver(self, path):
        chrome_options = Options()
        chrome_options.page_load_strategy = 'eager'
        return webdriver.Chrome(path, options=chrome_options)
    
    def parse(self):
        self.driver = self.load_chromedriver(BlinkParser.CHROME_PATH)
        self.driver.get(BlinkParser.DIRECTORY_URL)
        wait = WebDriverWait(self.driver, 10)
        
        branch_links = wait.until(lambda d: d.find_elements_by_tag_name('a'))
        
        # not including virginia beach since VA does not have standard directory like all other states
        self.branch_directory_urls = self.find_hrefs(
            list_a_tags=branch_links,
            url_starts_with='https://locations.blinkfitness.com/',
            url_does_not_include=['index','search','virginia-beach'])
        
        # if urls not parsed yet, then sleep and retry after 1 second
        while len(self.branch_directory_urls) == 0:
            time.sleep(1)
            
        self.parse_branch_info()
        
        self.driver.quit()
        

    def find_hrefs(self, list_a_tags, url_starts_with, url_does_not_include):
        list_urls = []

        for link in list_a_tags:
            url_string = link.get_attribute('href')

            if url_string.startswith(url_starts_with):
                if not any(x in url_string for x in url_does_not_include):
                    list_urls.append(url_string)
        return list_urls        
    
    def parse_branch_info(self):
        for url in self.branch_directory_urls:
            self.driver.get(url)
    
            # parse elements containing individual branch information
            wait = WebDriverWait(self.driver, 10)
            cities = wait.until(lambda d: d.find_elements_by_class_name('Directory-cityContainer'))
            
            for city in cities:
                branches = city.find_elements_by_class_name('Directory-listTeaser')
                
                for branch in branches:
                    temp_branch = {
                        'state': url[-2:].upper(),
                        'city': city.find_element_by_class_name('Directory-cityName').text,
                        'street': branch.find_element_by_class_name('Teaser-address').text,
                        'title': branch.find_element_by_class_name('Teaser-title').text,
                        'phone': branch.find_element_by_class_name('Teaser-phone').text,
                        'url': branch.find_element_by_class_name('Teaser-titleLink').get_attribute('href')}
                    
                    self.branch_info.append(temp_branch)

    def get_urls(self):
        return [branch['url'] for _,branch in enumerate(self.branch_info)]
    
    @staticmethod
    def status_to_code(status):    
        for status_code, status_text_list in BlinkParser.branch_status_dict.items():
            if status in status_text_list:
                return status_code
        return None
    
    def get_status_code(self, url):
        self.driver.get(url)
        wait = WebDriverWait(self.driver, 3)
        
        try:
            status = wait.until(lambda d: d.find_element_by_class_name('Hours-statusText')).text 
        except:
            # for branches that have no current status (usually branches that have not been opened yet)
            status = wait.until(lambda d: d.find_element_by_class_name('Core-openingDate')).text
    
        return BlinkParser.status_to_code(status)
        
    def parse_capacity(self):
        # load new driver in case connection refused from too many requests from initial parse
        self.driver = self.load_chromedriver(BlinkParser.CHROME_PATH)
        
        urls = self.get_urls()
        capacities = []
        
        for url in urls:
            status_code = self.get_status_code(url)
            
            # status code corresponds to dictionary at beginning of class (0 = branch is open)
            # find_elements used with walrus operator to avoid error for certain webpages where Core-capacityStatus shows up for unopened branches (it shouldn't)
            if not status_code and len(cap_element := self.driver.find_elements_by_class_name('Core-capacityStatus')) > 0:
                capacity = cap_element[0].text
            else:
                capacity = None
            
            capacities.append({
                'title': self.driver.find_element_by_class_name('LocationName-geo').text,
                'current_time': time.localtime(time.time()),
                'status_code': status_code,
                'capacity': capacity 
            })
            
        self.driver.quit()
        return capacities

In [5]:
from flask import Flask, render_template, jsonify
from flask_sqlalchemy import SQLAlchemy 

app = Flask(__name__)

app.config['ENV'] = 'development'
#app.config['SECRET_KEY'] = ''

app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///blink.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

# database setup
db = SQLAlchemy(app)

class Branch(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(30), unique=True, nullable=False)
    phone = db.Column(db.String(12), unique=True, nullable=False)
    url = db.Column(db.String(100), nullable=False)
    address = db.relationship('Address', backref='branch', lazy=True, uselist=False)
    
    def __repr__(self):
        return f"{self.title.title()}"
    
class Address(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    branch_id = db.Column(db.Integer, db.ForeignKey('branch.id'), nullable=False)
    state = db.Column(db.String(2), nullable=False) # TODO: make this limited to state abbreviations
    city = db.Column(db.String(30), nullable=False)
    street = db.Column(db.String(100), unique=True, nullable=False)
    
    def __repr__(self):
        return f"{self.street}, {self.city}, {self.state}"
    
class Capacity(db.Model):
    id = db.Column(db.Integer,primary_key=True)
    branch_id = db.Column(db.Integer, db.ForeignKey('branch.id'), nullable=False)
    capacity = db.Column(db.String(100))
    status_code = db.Column(db.Integer)

In [6]:
def refresh_tables():
    db.drop_all()
    db.create_all()

def main(parser):
    refresh_tables()
    
    for branch in parser.branch_info:
        branch_address = Address(
            state = branch['state'],
            city = branch['city'],
            street = branch['street']
        )
        
        new_branch = Branch(
            title = branch['title'],
            address = branch_address,
            phone = branch['phone'],
            url = branch['url']
        )
        
        db.session.add(new_branch)
        db.session.commit()
        
    return

def capacity(parser):
    capacities = parser.parse_capacity()
    
    for cap in capacities:   
        blink_branch_id = Branch.query.filter(Branch.title == cap['title']).first().id
        
        # raise error if no valid branch 
        if not blink_branch_id:
            print(cap, 'does not exist in database')
            raise NameError('No valid blink branch id for capacity reading')
        
        print(blink_branch_id, cap['status_code'], cap['capacity'])
        
        new_capacity = Capacity(
            branch_id = blink_branch_id,
            status_code = cap['status_code'],
            capacity = cap['capacity']
        )
        
        db.session.add(new_capacity)
        db.session.commit()
    
    return    


In [None]:
parser = BlinkParser()
parser.parse()

In [None]:
refresh_tables()
main(parser)

In [34]:
capacity(parser)

NameError: name 'parser' is not defined

In [None]:
db.session.query(Capacity).filter(Capacity.capacity != None).join(Branch, Capacity.branch_id == Branch.id).all()

In [None]:
all_capacities = Capacity.query.all()
print(len(all_capacities),'\n')

for caps in all_capacities:
    print(caps.branch_id, caps.capacity, caps.status_code)

In [21]:
caps = db.session.query(Capacity, Branch).join(Branch).filter(Capacity.capacity != None).all()

In [33]:
for cap in caps:
    print(f"{cap.Branch.title} ({cap.Branch.address.state}): ~{cap.Capacity.capacity[-8:-5]}")

Jacksonville (FL): ~25%
Miramar (FL): ~25%
Midway (IL): ~50%
Merrionette Park (IL): ~50%
Bridgeport (IL): ~50%
Oak Lawn (IL): ~50%
Evanston (IL): ~50%
Beverly (MA): ~50%
Medford (MA): ~50%
Redford (MI): ~25%
Warren (MI): ~25%
Paramus (NJ): ~75%
Lodi (NJ): ~50%
Journal Square (NJ): ~75%
Parsippany (NJ): ~50%
Union (NJ): ~50%
Clifton (NJ): ~75%
Linden (NJ): ~25%
South Orange (NJ): ~50%
Irvington (NJ): ~50%
Passaic (NJ): ~50%
Perth Amboy (NJ): ~75%
East Orange (NJ): ~50%
Plainfield (NJ): ~50%
Ironbound (NJ): ~50%
Nutley (NJ): ~75%
Willingboro (NJ): ~25%
Parkchester (NY): ~50%
St Ann's (NY): ~50%
Concourse Village (NY): ~50%
Fordham (NY): ~50%
Gun Hill (NY): ~50%
Mt. Eden (NY): ~50%
Riverdale (NY): ~75%
In The Hub (NY): ~CAP
Hunts Point (NY): ~50%
East Tremont (NY): ~50%
Claremont (NY): ~25%
Bedford Park (NY): ~50%
Junction (NY): ~50%
Flatbush (NY): ~75%
Crown Heights (NY): ~50%
Sheepshead Bay (NY): ~CAP
Gates (NY): ~75%
Bed-Stuy (NY): ~VAI
Boerum Hill (NY): ~50%
Canarsie (NY): ~50%
Coney 