In [1]:
import webdriver
import re
import db_functions
import numpy as np
import logging
from keys import private

from enum import Enum, auto
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s\n%(message)s\n',
    datefmt='%Y-%m-%d %H:%M:%S',
    filename='basic.log'
)

In [11]:
# Initial declarations

class SearchingState(Enum):
    SEARCHING = auto()
    ERROR = auto()
    COMPLETED = auto()

# Stores the name of field according to the db table with the correponding css selector in the page
class ListingFields(Enum):
	title = '.main__container__info--title'
	adress = '.info__map-link'
	amenities = '.amenities__list'
	description = '.amenities__base-description'
	price = '.price__item--main'
	condominium_fee = '.price__item.condominium'
	taxes = '.price__item.iptu'
	area = 'span[itemprop="floorSize"]'
	bedrooms = 'span[itemprop="numberOfRooms"]'
	garage_spaces = '.feature__item.text-regular.js-parking-spaces'
	bathrooms = 'span[itemprop="numberOfBathroomsTotal"]'
	floor = 'span[itemprop="floorLevel"]'

TAB_CHAR = chr(9)

# Numerical fields
numeric_fields = [
	'price',
	'condominium_fee',
	'taxes',
	'area',
	'bedrooms',
	'garage_spaces',
	'bathrooms',
	'floor'
]

# db, db_cursor = db_functions.connect_to_test_database_mysql_connector()
db, db_cursor = db_functions.connect_to_database_mysql_connector()

In [12]:
def get_amenities_dict() -> dict:
	""" 	
	Returns:
		dict: all amenities types that are inserted in the amenities table
	"""
	db_cursor.execute("""
	SELECT `COLUMN_NAME` 
	FROM `INFORMATION_SCHEMA`.`COLUMNS` 
	WHERE `TABLE_SCHEMA`='imoveis-balneario-camboriu' 
	AND `TABLE_NAME`='amenities'
	AND `COLUMN_NAME` != 'id'
	AND `COLUMN_NAME` != 'house_id';
	""")
	db.commit()

	return {x[0]:i for i, x in enumerate(db_cursor)}

In [13]:
def write_amenities_to_db(listings:dict, amenities_dict:dict) -> None:
	""" Write the amenities to database

	Args:
		listings (list[dict]): dict will all listing to insert in the database
		amenities_dict (dict): dictionary of all amenities types

	Returns:
		None
	"""
	db_cursor.execute('SELECT `id`, `id_external` FROM `dados-imoveis`')
	db.commit()

	dict_ids = {str(i[1]):i[0] for i in db_cursor}
	listing_amenities = []

	for i in listings:
		listing_amenities.append({
			'id':dict_ids[i['id_external']],
			'amenities': i['amenities'].lower().replace(' ', '_').split(sep='\n')
		})

	for i in listing_amenities:
		if i['amenities'][0] == 'null':
			continue

		# Checks if any column doesn't exist in DB
		for j in i['amenities']:
			if j not in amenities_dict:
				db_cursor.execute(f"""
				ALTER TABLE `imoveis-balneario-camboriu`.`amenities` 
				ADD COLUMN `{j}` TINYINT(1) NULL DEFAULT NULL;
				""")
				amenities_dict = get_amenities_dict()
				logging.info(f'Added {j} to amenities columns')

		i['amenities'] = [amenities_dict[j] for j in i['amenities']]
		
	list_to_update = np.array([])

	for i_list in listing_amenities:
		temp_list = np.full(len(amenities_dict), 'False')
		
		if i_list['amenities'][0] != 'null':
			for i_amenity in i_list['amenities']:
				temp_list[i_amenity] = 'True'

		
		temp_list = np.insert(temp_list, 0, i_list['id'])

		if len(list_to_update) > 0:
			list_to_update = np.vstack((list_to_update, temp_list.copy()))
		else:
			list_to_update = [temp_list.copy()]

	insert_query = f'INSERT INTO `amenities` (`house_id`, `{"`, `".join(list(amenities_dict.keys()))}`)\n'
	insert_query += f'VALUES\n'

	for lin in list_to_update:
		subbed = re.sub(string=str(lin), pattern='(\' \'|\'\n \'|\' \n\')', repl=', ')
		insert_query += f'{chr(9)}(' + re.sub(pattern='(?:\n|\[|\]|\')', repl='', string=subbed) + '),\n'
		
	insert_query =  re.sub(pattern=',(?!,)$', repl=';', string=insert_query)

	logging.info('Amenities insert:\n' + insert_query)
	db_cursor.execute(insert_query)
	db.commit()

In [14]:
def write_listings_to_db(listings: list[dict], amenities_dict: dict) -> None:
	""" Write listings to Database

	Args:
		listings (list[dict]): all listings
		amenities_dict (dict): all amenities types

	Returns:
		None
	"""
	for i, i_lin in enumerate(listings):
		if i_lin[ListingFields.price.name] in ['NULL', '0']:
			del listings[i]
		elif i_lin[ListingFields.title.name] == 'NULL':
			del listings[i]

	if len(listings) == 0:
		return '0 listings'

	insert_query = 'INSERT INTO `dados-imoveis` (\n'

	for i_field in all_listings[0]:
		insert_query += f'{TAB_CHAR}`{i_field}`,\n'

	# Removes trailing comma
	insert_query = insert_query[:-2]

	insert_query += '\n) VALUES \n'

	for i_listing in all_listings:
		curr_values = str(list(i_listing.values()))
		curr_values = TAB_CHAR + '(' + curr_values[1:-1] + ')'
		insert_query += curr_values + ',\n'

	# Removes trailing comma and replaces string null or 0 with DB null
	insert_query = insert_query[:-2]
	insert_query = insert_query.replace("'NULL'", 'NULL').replace("'0'", "NULL")

	logging.info('Listings insert:\n' + insert_query)
	db_cursor.execute(insert_query)
	db.commit()

	write_amenities_to_db(all_listings, amenities_dict)

In [15]:
# The existing external id's are used to not scrape duplicated listings
def get_existing_ids() -> list[int]:
	db_cursor.execute('SELECT `id_external` FROM `dados-imoveis`')
	db.commit()

	return [i[0] for i in db_cursor]

In [16]:
# Javascript to hide newsletter DIV - to be injected
javascript_hide_newsletter = """
var observer = new MutationObserver(function(mutations) {
    mutations.forEach(function(mutation) {
        if (mutation.addedNodes && mutation.addedNodes.length > 0) {
            for (var i = 0; i < mutation.addedNodes.length; i++) {
                var node = mutation.addedNodes[i];
                if (node.nodeType === Node.ELEMENT_NODE && (node.matches('#ins-lead-collection-popup-with-image-adaptive') || node.matches('#ins-frameless-overlay'))) {
                    node.style.display = 'none';
                }
            }
        }
    });
});

observer.observe(document.body, { childList: true, subtree: true });
"""

In [17]:
# Open webpage, close cookie banner, wait for page to load and scroll to bottom
chrome_driver = webdriver.WebDriver(False)
chrome_driver.open_webpage(private.SCRAPPING_WEBSITE)

cookie_banner = chrome_driver.driver.find_elements_by_id('cookie-notifier-cta')
if(len(cookie_banner) > 0):
    cookie_banner[0].click()

WebDriverWait(chrome_driver.driver, 30).until(
	EC.presence_of_element_located((By.CLASS_NAME, "results__list.js-results"))
)

chrome_driver.driver.execute_script(javascript_hide_newsletter)

# Scroll to bottom to show everything and back up
chrome_driver.scroll_to_bottom()

# Hacky to consume first click
chrome_driver.driver.find_element_by_tag_name('body').click()
chrome_driver.random_sleep()
chrome_driver.driver.switch_to.window(chrome_driver.driver.window_handles[1])
chrome_driver.driver.close()
chrome_driver.driver.switch_to.window(chrome_driver.driver.window_handles[0])

In [None]:
# Scrapping loop
currentState = SearchingState.SEARCHING

while(currentState == SearchingState.SEARCHING):
	existing_ids_in_db = get_existing_ids() # To not scrape duplicated ids
	amenities_dict = get_amenities_dict() # To be passed into the insertion funcitons and not be called more than once per page

	listing_cards = chrome_driver.driver.find_elements_by_class_name('card-container.js-listing-card')
	all_listings = []

	for curr_listing in listing_cards:
		# Open listing
		listing_id = curr_listing.get_attribute('data-id')

		# Skip to next if already scrapped
		if int(listing_id) in existing_ids_in_db:
			continue
		
		# Wait for current listing to be into view
		curr_listing.location_once_scrolled_into_view
		WebDriverWait(chrome_driver.driver, 30).until(
			EC.element_to_be_clickable((By.CSS_SELECTOR, f'[data-id="{listing_id}"]'))
		)

		curr_listing.click()
		chrome_driver.random_sleep()
		chrome_driver.driver.switch_to.window(chrome_driver.driver.window_handles[1])

		# Extracting information from listing
		listing = {'id_external': listing_id}

		for i_field in ListingFields:
			field = chrome_driver.driver.find_elements_by_css_selector(i_field.value)
			# Checking if it has anything, otherwise inserts NULL, as per standard
			if len(field) > 0:
				listing[i_field.name] = field[0].text
			else:
				listing[i_field.name] = 'NULL'

		# Removing icon from adress
		listing['adress'] = re.sub(pattern='pin[\n]*', string=listing['adress'], repl='')

		# Extracting data from numerical fields
		for i_field in numeric_fields:
			listing[i_field] = '0' + ''.join(re.findall(pattern=r"[^\d]*(\d*)[^\d]*", string=listing[i_field]))

		# Appends to all listings, closes tab, switch to listings tab and wait
		all_listings.append(listing)
		# print('Added new listing')
		# print(listing)
		# print('-'*20)
		chrome_driver.random_sleep(long=True)
		chrome_driver.driver.close()
		chrome_driver.driver.switch_to.window(chrome_driver.driver.window_handles[0])

	# Dumps whole page into Database
	write_listings_to_db(listings=all_listings, amenities_dict=amenities_dict)
	
	# Next page button
	page_buttons = chrome_driver.driver.find_elements_by_css_selector('li:has(button.pagination__button--active) + li')
	current_page = int(chrome_driver.driver.find_element_by_css_selector('.pagination__button--active').text)

	# try to go to next page if possible - the maximum number of pages on the website is 100
	if(len(page_buttons) > 0 and current_page != '100'):
		
		page_buttons[0].location_once_scrolled_into_view
		chrome_driver.random_sleep()
		page_buttons[0].click()

		# Wait for listings
		WebDriverWait(chrome_driver.driver, 30).until(
			EC.presence_of_element_located((By.CLASS_NAME, "results__list.js-results"))
		)

		# Refresh ids that were just inserted 
		existing_ids_in_db = get_existing_ids()

		# Scroll to bottom to load all the listings
		chrome_driver.scroll_to_bottom()

		chrome_driver.random_sleep(long=True)
	else:
		# If can't find next page then scrapping is completed
		currentState = SearchingState.COMPLETED