# Data Fusion Engineer/ Data Engineer - Value Models : March 2019

In [1]:
import random
import re
from decimal import Decimal
import time

import requests
import urllib
import bs4
from bs4 import BeautifulSoup
import pandas as pd
import geocoder
from sqlalchemy import create_engine


In [2]:
class Tools:
    headers = ({'User-Agent':
            'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36'})
    
    def get_url_from_soup(property_soup: bs4.BeautifulSoup) -> str:
        """Return the property's page url from the beautifulsoup object."""
        property_url = property_soup.find('a', href=True)['href']
        return property_url
        
    def create_html_soup(url: str) -> bs4.BeautifulSoup:
        """Create a beautifulsoup object from the given web page url."""
        response = requests.get(url, headers=Tools.headers, proxies=Tools.get_proxy())
        html_soup = BeautifulSoup(response.text, 'html.parser')
        return html_soup
    
    def get_proxy_list() -> dict:
        """Open source tool from Github.
        See https://proxy.rudnkh.me/txt
        """
        resource_url = "https://proxy.rudnkh.me/txt"
        proxy_list = requests.get(resource_url).text.split('\n')[:-1]
        return proxy_list
        
    def get_proxy() -> dict:
        """Return a proxy dictionary for web scraping.
        """
        proxy_list = Tools.get_proxy_list()
        while True:
            try:
                proxy_dict = {"https:": "http://" + random.choice(proxy_list)}
                proxy_handler = urllib.request.ProxyHandler(proxy_dict)
                opener = urllib.request.build_opener(proxy_handler)
                urllib.request.install_opener(opener)
                req=urllib.request.Request("http://www.google.com")  # change the URL to test here
                sock=urllib.request.urlopen(req)
                return proxy_dict
            except IOError:
                print("Connection error! (Check proxy)")
                continue
        

In [3]:
class HouseProperty:
    def __init__(self, url: str):
        self.url = url
        self.id = self.get_id()
        self.html_soup = self.get_property_soup()
        self.location = str()
        self.price = float()
        self.featured_date = self.get_featured_date()
        self.set_price_and_location()
        self.lat = float()
        self.lon = float()
        self.geo_coding()
        
    def get_id(self) -> str:
        return re.search(r'\d{8}', self.url).group(0)
    
    def get_property_soup(self) -> bs4.BeautifulSoup:
        return Tools.create_html_soup(self.url)
    
    def set_price_and_location(self) -> None:
        p_tag_results = self.html_soup.find_all('p')
        price_pattern = '(?:[\$]{1}[,\d]+.?\d*)'
        price_found = False
        location_found = False
        for result in p_tag_results:
            if price_found & location_found:
                return
            result_text = result.text
            if 'Price:' in result_text:
                price_str = re.search(price_pattern, result_text).group(0)
                self.price = float(Decimal(re.sub(r'[^\d.]', '', price_str)))
                price_found = True
            if 'Location:' in result_text:
                self.location = result_text.split('Location: ')[1]
                location_found = True
            
    def geo_coding(self):
        HERE_APP_ID = 'Replace with your own HERE APP ID.'
        HERE_APP_CODE = 'Replace with your own HERE APP CODE.'
        if len(self.location) != 0:
            g = geocoder.here(self.location, app_id=HERE_APP_ID,
                           app_code=HERE_APP_CODE)
            self.lat = g.json['lat']
            self.lon = g.json['lng']
    
    def get_featured_date(self) -> str:
        print(f"THE URL FOR NEXT IS {self.url}")
        match = re.search(r'\d{4}/\d{1,2}/\d{1,2}', self.url)
        return match.group(0)
    
    def print_attributes(self):
        print(f"The id is {self.id},  the price is \
                      {self.price}, the location is {self.location}, and the featured date is \
                      {self.featured_date}, the URL is {self.url}")

In [8]:
class Parser:
    def __init__(self, page_url: str, nr_of_properties_required: int):
        self.page_url = page_url
        self.nr_of_properties_required = nr_of_properties_required
        self.current_page_nr = 4
        self.property_containers = None
        self.house_property_list = list()
        self.df = None
        self.create_df()
        
    def retrieve_property_containers(self):
        html_soup = Tools.create_html_soup(self.page_url)
        # Property details under 'c-compact-river__entry' class
        self.property_containers = html_soup.find_all('div', class_="c-compact-river__entry") 
        return self.property_containers
    
    def create_df(self):
        self.df = pd.DataFrame(columns=['property_id',
                     'location',
                     'price',
                     'date',
                     'url',
                     'lat',
                     'lon'])
        
        self.df = self.df.astype( dtype={'property_id' : str, 
                         'location': str, # City, State -- TO BE DONE
                         'price': float,
                         'date': str,
                         'url': str,
                         'lat': float,
                         'lon': float})
        self.df.set_index('property_id')
        
    def add_property_to_df(self, house_property: HouseProperty):
        self.df = self.df.append({'property_id': house_property.id, 
                                  'price': house_property.price,
                                  'date': house_property.featured_date,
                                  'url': house_property.url,
                                  'location': house_property.location,
                                  'lat': house_property.lat,
                                  'lon': house_property.lon
                                  },
                                 ignore_index=True,
                                verify_integrity=True)
            
    def df_to_db(self):
        """Write records stored in the DataFrame to the PostgreSQL database, create table if not exits,
        append to or overwritten."""
        engine = create_engine('postgresql://123:123@yourhost/yourtable', 
                               echo=False)
        table_name = 'property'
        self.df.to_sql(table_name, engine, index=False, if_exists='replace')
        sql_str = F"""SELECT * FROM {table_name};"""
        engine.execute(sql_str).fetchall()
        
    def scrape_property(self):
        property_counter = 1
        while property_counter <= self.nr_of_properties_required:
            self.page_url = CURBED_URL + f'/{self.current_page_nr}'
            print(f"The current web page is {self.page_url}!!!!!")
            property_containers = self.retrieve_property_containers()
            for property_soup in property_containers:
                url = Tools.get_url_from_soup(property_soup)
                # If the current property cannot be parsed correctly, skip it.
                house_property = HouseProperty(url)
                if parser.df['property_id'].str.contains(house_property.id).any():
                    print("Property has been scraped already, thus skipped!")
                    continue
                self.add_property_to_df(house_property)
                house_property.print_attributes()
                time.sleep(0.5)
                if property_counter > self.nr_of_properties_required:
                    break
                print(property_counter)
                property_counter += 1
            self.current_page_nr += 1
        

In [9]:
CURBED_URL = "https://www.curbed.com/house-of-the-day/archives"
PROPERTY_SCRAPPED_REQUIRED = 10

parser = Parser(CURBED_URL, PROPERTY_SCRAPPED_REQUIRED)
parser.scrape_property()
parser.df_to_db()
    


The current web page is https://www.curbed.com/house-of-the-day/archives/4!!!!!
THE URL FOR NEXT IS https://www.curbed.com/2018/10/30/18041912/house-for-sale-colonial-new-hampshire
The id is 18041912,  the price is                       775000.0, the location is Plainfield, New Hampshire , and the featured date is                       2018/10/30, the URL is https://www.curbed.com/2018/10/30/18041912/house-for-sale-colonial-new-hampshire
1
THE URL FOR NEXT IS https://www.curbed.com/2018/10/29/18037456/midcentury-modern-house-for-sale-los-angeles-quincy-jones
The id is 18037456,  the price is                       2395000.0, the location is Los Angeles, California, and the featured date is                       2018/10/29, the URL is https://www.curbed.com/2018/10/29/18037456/midcentury-modern-house-for-sale-los-angeles-quincy-jones
2
THE URL FOR NEXT IS https://www.curbed.com/2018/10/26/18026644/house-for-sale-key-west-florida-pool
The id is 18026644,  the price is                     

ProgrammingError: (psycopg2.ProgrammingError) column "date" cannot be cast automatically to type date
HINT:  You might need to specify "USING date::date".

[SQL: ALTER TABLE property
                              ALTER COLUMN date TYPE DATE;]
(Background on this error at: http://sqlalche.me/e/f405)

In [10]:
parser.df


Unnamed: 0,property_id,location,price,date,url,lat,lon
0,18041912,"Plainfield, New Hampshire",775000.0,2018/10/30,https://www.curbed.com/2018/10/30/18041912/hou...,43.5335,-72.35683
1,18037456,"Los Angeles, California",2395000.0,2018/10/29,https://www.curbed.com/2018/10/29/18037456/mid...,34.05349,-118.24532
2,18026644,"Key West, Florida",2799000.0,2018/10/26,https://www.curbed.com/2018/10/26/18026644/hou...,24.5545,-81.80023
3,18022130,"Bedford Hills, New York",899999.0,2018/10/25,https://www.curbed.com/2018/10/25/18022130/hou...,41.23781,-73.69831
4,18018190,"Lafayette, California",4895000.0,2018/10/24,https://www.curbed.com/2018/10/24/18018190/hou...,37.89077,-122.12813
5,18013532,"Winston-Salem, North Carolina",765000.0,2018/10/23,https://www.curbed.com/2018/10/23/18013532/hou...,36.10002,-80.24116
6,18008538,"New Marlborough, Massachusetts",2100000.0,2018/10/22,https://www.curbed.com/2018/10/22/18008538/his...,42.1108,-73.26504
7,17998714,"Lyme, Connecticut",798000.0,2018/10/19,https://www.curbed.com/2018/10/19/17998714/hou...,41.42464,-72.33475
8,17995004,"Memphis, Tennessee",1795000.0,2018/10/18,https://www.curbed.com/2018/10/18/17995004/mid...,35.14977,-90.04926
9,17989666,"Charleston, South Carolina",4095000.0,2018/10/17,https://www.curbed.com/2018/10/17/17989666/his...,32.78115,-79.93161


## Step 3: Enrich the dataset

### 3.1 Enrich table using Zillow dataset

In [19]:
import psycopg2

In [40]:
conn = psycopg2.connect("host=your db host here dbname=your db table here user=username here password=password here")

#### Create table "zip_to_zcta":

In [92]:
cur = conn.cursor()
cur.execute("""
CREATE TABLE zip_to_ztca(
enc_zip text,
city_name text,
state_abbr text,
zcta text)
""")
conn.commit()

#### Write data from csv file to zip_to_zcta table:

In [93]:
table_zip_to_ztca = 'zip_to_ztca'
cur = conn.cursor()
with open('data/zip_to_ztca.csv', 'r') as f:
    next(f) # Skip the header row.
    cur.copy_from(f, table_zip_to_ztca, sep=',')
    conn.commit()

#### Create table "Zip_MedianListingPrice_TopTier"

Idea: since in this excel file, there are **113 columns** *'year-month'* data when I fetched the csv file online. It is not wise to copy all of them into the database.   
Thus, some Python logic is applied here to only select *'year-month'* columns which are within the **scraped** date boundaries, plus the 5 months *'year-month'* columns prior to the oldest property scraped date, because new column **‘rolling_6_month_median_list_price’** requires that. 

E.g. If properties scraped are between *2018-08* -- *2018-10*, only columns '2018-03', '2018-04', '2018-05', '2018-06', '2018-07', '2018-08', '2018-09', '2018-10' will be copies to the database.

Python implementation to select 'year-month' columns to be copied into database from csv file.

In [15]:
# Find the date boundary from the scraped properties
from datetime import datetime
from dateutil.relativedelta import relativedelta

parser.df['date'] = pd.to_datetime(parser.df['date'])

# Furthest date(YYYY-MM) of the property scraped
furthest_date = parser.df['date'].min()
furthest_date = datetime(year=furthest_date.year, month=furthest_date.month, day=1)

# 5 months prior date(YYYY-MM) string to the furthest 
five_prior_date = furthest_date - relativedelta(months=5)
five_prior_date_str = f"{five_prior_date.year}-{five_prior_date.month:02d}"

# Latest date(YYYY-MM) string of the property scraped
latest_date = parser.df['date'].max()
latest_date = datetime(year=latest_date.year, month=latest_date.month, day=1)
latest_date_str = furthest_date_str = f"{latest_date.year}-{latest_date.month:02d}"

# Get index of 5 months prior furthest YYYY-MM column and the latest YYYY-MM column respectively
df_zip_mlp = pd.read_csv('data/Zip_MedianListingPrice_TopTier.csv', encoding = "ISO-8859-1")
cols = list(df_zip_mlp.columns)
idx_5_prior_month_col = cols.index(five_prior_date_str)
idx_latest_month_col = cols.index(latest_date_str)

# Get the subset of the dataframe including only relevant columns from the csv file - 'Zip_MedianListingPrice_TopTier'.
required_cols_index_list = list(range(4)) + list(range(idx_5_prior_month_col, idx_latest_month_col + 1))
df_zip_mlp_subset = df_zip_mlp.iloc[:, required_cols_index_list]

# Replace US states abbreviations with full name
states_dict = {
        'AK': 'Alaska',
        'AL': 'Alabama',
        'AR': 'Arkansas',
        'AS': 'American Samoa',
        'AZ': 'Arizona',
        'CA': 'California',
        'CO': 'Colorado',
        'CT': 'Connecticut',
        'DC': 'District of Columbia',
        'DE': 'Delaware',
        'FL': 'Florida',
        'GA': 'Georgia',
        'GU': 'Guam',
        'HI': 'Hawaii',
        'IA': 'Iowa',
        'ID': 'Idaho',
        'IL': 'Illinois',
        'IN': 'Indiana',
        'KS': 'Kansas',
        'KY': 'Kentucky',
        'LA': 'Louisiana',
        'MA': 'Massachusetts',
        'MD': 'Maryland',
        'ME': 'Maine',
        'MI': 'Michigan',
        'MN': 'Minnesota',
        'MO': 'Missouri',
        'MP': 'Northern Mariana Islands',
        'MS': 'Mississippi',
        'MT': 'Montana',
        'NA': 'National',
        'NC': 'North Carolina',
        'ND': 'North Dakota',
        'NE': 'Nebraska',
        'NH': 'New Hampshire',
        'NJ': 'New Jersey',
        'NM': 'New Mexico',
        'NV': 'Nevada',
        'NY': 'New York',
        'OH': 'Ohio',
        'OK': 'Oklahoma',
        'OR': 'Oregon',
        'PA': 'Pennsylvania',
        'PR': 'Puerto Rico',
        'RI': 'Rhode Island',
        'SC': 'South Carolina',
        'SD': 'South Dakota',
        'TN': 'Tennessee',
        'TX': 'Texas',
        'UT': 'Utah',
        'VA': 'Virginia',
        'VI': 'Virgin Islands',
        'VT': 'Vermont',
        'WA': 'Washington',
        'WI': 'Wisconsin',
        'WV': 'West Virginia',
        'WY': 'Wyoming'
}
df_zip_mlp_subset['State'].replace(states_dict, inplace=True)
# 

# Read the selected necessary columns into the database from csv file.


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self._update_inplace(new_data)


In [16]:
# Change column datat type of RegionName, City, State and Metro into TEXT type (OBJECT type in Pandas).
df_zip_mlp_subset = df_zip_mlp_subset.astype({'RegionName': str, 'City': str, 'State': str, 'Metro': str})

In [41]:
engine = create_engine('postgresql://123:123@yourhost/your_table', 
                               echo=False)
table_name = 'median_listing_price'
df_zip_mlp_subset.to_sql(table_name, engine, index=False, if_exists='replace')
engine.execute(F"SELECT * FROM {table_name}").close()

#### Create a dataframe by joinging Property table and the Median_Listing_Price table using SQL

In [66]:
import pandas.io.sql as sqlio

conn = psycopg2.connect("host=your db host here dbname=your db table here user=username here password=password here")
sql = """SELECT mlp.property_id, to_char(prop.date, 'YYYY-MM') as date, mlp.* FROM property_bk prop left join median_listing_price mlp on prop.location = concat(mlp."City", ', ', mlp."State");"""
df_property_median_joined = sqlio.read_sql_query(sql, conn)
conn = None
    


In [67]:
df_property_median_joined

Unnamed: 0,property_id,date,index,property_id.1,location,price,date.1,url,lat,lon,...,City,State,Metro,2018-05,2018-06,2018-07,2018-08,2018-09,2018-10,median_list_price
0,,2018-10,,,,,,,,,...,,,,,,,,,,
1,,2018-10,,,,,,,,,...,,,,,,,,,,
2,18013532.0,2018-10,5.0,18013532.0,"Winston-Salem, North Carolina",765000.0,2018/10/23,https://www.curbed.com/2018/10/23/18013532/hou...,36.10002,-80.24116,...,Winston-Salem,North Carolina,Winston-Salem,359500.0,359900.0,379900.0,399000.0,424950.0,410000.0,410000.0
3,18013532.0,2018-10,5.0,18013532.0,"Winston-Salem, North Carolina",765000.0,2018/10/23,https://www.curbed.com/2018/10/23/18013532/hou...,36.10002,-80.24116,...,Winston-Salem,North Carolina,Winston-Salem,310450.0,299500.0,279900.0,257500.0,266500.0,257250.0,257250.0
4,18013532.0,2018-10,5.0,18013532.0,"Winston-Salem, North Carolina",765000.0,2018/10/23,https://www.curbed.com/2018/10/23/18013532/hou...,36.10002,-80.24116,...,Winston-Salem,North Carolina,Winston-Salem,279950.0,260000.0,239500.0,237900.0,239700.0,247400.0,247400.0
5,18013532.0,2018-10,5.0,18013532.0,"Winston-Salem, North Carolina",765000.0,2018/10/23,https://www.curbed.com/2018/10/23/18013532/hou...,36.10002,-80.24116,...,Winston-Salem,North Carolina,Winston-Salem,412450.0,405000.0,397000.0,382450.0,382450.0,409000.0,409000.0
6,18013532.0,2018-10,5.0,18013532.0,"Winston-Salem, North Carolina",765000.0,2018/10/23,https://www.curbed.com/2018/10/23/18013532/hou...,36.10002,-80.24116,...,Winston-Salem,North Carolina,Winston-Salem,282549.0,316450.0,279884.5,279900.0,273426.5,269900.0,269900.0
7,18013532.0,2018-10,5.0,18013532.0,"Winston-Salem, North Carolina",765000.0,2018/10/23,https://www.curbed.com/2018/10/23/18013532/hou...,36.10002,-80.24116,...,Winston-Salem,North Carolina,Winston-Salem,228490.0,226995.0,231490.0,229445.0,228990.0,229445.0,229445.0
8,18013532.0,2018-10,5.0,18013532.0,"Winston-Salem, North Carolina",765000.0,2018/10/23,https://www.curbed.com/2018/10/23/18013532/hou...,36.10002,-80.24116,...,Winston-Salem,North Carolina,Winston-Salem,241700.0,239900.0,239900.0,239900.0,249900.0,249900.0,249900.0
9,,2018-10,,,,,,,,,...,,,,,,,,,,


In [69]:
df_property_median_joined_cols = list(df_property_median_joined.columns)
df_property_median_joined_cols['date'] = df_property_median_joined_cols['date'].astype('|S80')
for index, row in df_property_median_joined.iterrows():
    year_month = row['date'][:7].replace('/', '-')
    print(year_month)
    year_month_idx = df_property_median_joined_cols.index(year_month)
    df_property_median_joined.loc[index,'median_list_price'] = row[year_month]
    rolling_3_month_cols = df_property_median_joined_cols[year_month_idx -2 : year_month_idx + 1]
    df_property_median_joined.loc[index,'rolling_3_month_median_list_price'] = row[rolling_3_month_cols].mean(1)
    rolling_6_month_cols = df_property_median_joined_cols[year_month_idx -5 : year_month_idx + 1]
    df_property_median_joined.loc[index,'rolling_6_month_median_list_price'] = row[rolling_6_month_cols].mean(1)
    
    

TypeError: list indices must be integers or slices, not str

In [60]:
for index, row in df_property_median_joined.iterrows():
    print(row['date'][:7].replace('/', '-'))

date    2018/10/30
date          None
Name: 0, dtype: object
date    2018/10/29
date    2018/10/29
Name: 1, dtype: object
date    2018/10/29
date    2018/10/29
Name: 2, dtype: object
date    2018/10/29
date    2018/10/29
Name: 3, dtype: object
date    2018/10/29
date    2018/10/29
Name: 4, dtype: object
date    2018/10/29
date    2018/10/29
Name: 5, dtype: object
date    2018/10/29
date    2018/10/29
Name: 6, dtype: object
date    2018/10/29
date    2018/10/29
Name: 7, dtype: object
date    2018/10/29
date    2018/10/29
Name: 8, dtype: object
date    2018/10/29
date    2018/10/29
Name: 9, dtype: object
date    2018/10/29
date    2018/10/29
Name: 10, dtype: object
date    2018/10/29
date    2018/10/29
Name: 11, dtype: object
date    2018/10/29
date    2018/10/29
Name: 12, dtype: object
date    2018/10/29
date    2018/10/29
Name: 13, dtype: object
date    2018/10/29
date    2018/10/29
Name: 14, dtype: object
date    2018/10/29
date    2018/10/29
Name: 15, dtype: object
date    2018/10/29

In [46]:
df_property_median_joined = df_property_median_joined.groupby('property_id').mean(skipna=True)

In [47]:
median_price_temp_table = 'media_price_temp_table'
df_property_median_joined.to_sql(median_price_temp_table, engine, index=False, if_exists='replace')
engine.execute(f"SELECT * FROM {media_price_temp_table}").close()

### 3.2 Enrich the Property table with Whole Foods Locations data set.

#### Create the table whole_foods_locations

In [69]:
# Command to rollback when sth wrong happens
curs = conn.cursor()
curs.execute("ROLLBACK")
conn.commit()

In [53]:
table_whole_foods = 'whole_foods_locations'
cur = conn.cursor()
cur.execute(f"""
CREATE TABLE {table_whole_foods}(
store_address text,
city text,
state text,
latitude decimal,
longitude decimal)
""")
conn.commit()

#### Wirte the data from csv file to the database

In [70]:
cur = conn.cursor()
copy_sql = f"""
           COPY {table_whole_foods} FROM stdin WITH CSV HEADER
           DELIMITER as ','
           """
with open('data/Whole_Foods_locations.csv', 'r') as f:
#     next(f) # Skip the header row.
    cur.copy_expert(sql=copy_sql, file=f)
    conn.commit()
    cur.close()

#### Create a geo spatial view of Property table  
*A brief explanation for the nested SQL query following*:
1. Add column 'foods_location_count' to the table *Property* 
2. SQL query the well-known CTE(Common Table Expressions) to create temporary spatial tables for both Property and Whole_Foods_Locations. 
3. SQL to do spatial analysis, counting food locations within 25 KM for each property
4. JOIN the spatial analysis result from last step into the original Property table


In [73]:
property_geom = "property_geom"
foods_geom = "foods_geom"
nr_of_whole_foods_stores_nearby = "number_of_whole_foods_stores_nearby"
sql_food_counts = 
f"""
ALTER TABLE property_bk ADD {nr_of_whole_foods_stores_nearby} INT DEFAULT 0;

UPDATE property_bk as prop
SET {nr_of_whole_foods_stores_nearby} = COALESCE (prop_foods_count.foods_location_count , 0)
FROM 
(
WITH {property_geom} AS (
	SELECT *, ST_SetSRID(ST_MakePoint(prop.lon, prop.lat), 4326) as geom
	FROM property_bk as prop),
{foods_geom} AS ( 
	SELECT *, ST_SetSRID(ST_MakePoint(wfl.longitude, wfl.latitude), 4326) as geom
	FROM whole_foods_locations as wfl)
SELECT prop.property_id, count(wfl.*) as foods_location_count FROM {property_geom} prop, {foods_geom} wfl 
	WHERE ST_DWithin(prop.geom::geography, wfl.geom::geography, 25000) GROUP BY prop.property_id) prop_foods_count
WHERE prop.property_id = prop_foods_count.property_id;"""


In [75]:
conn = psycopg2.connect("host=your db host here dbname=your db table here user=username here password=password here")
cur = conn.cursor()
cur.execute(sql_food_counts)
conn.commit()
cur.close()