In [24]:
# OPERATING SYSTEM STUFF
import os
import io
import gc

# DATA SCIENCE
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

# API STUFF
import xlrd
import requests
import json

# SQL
from sqlalchemy import create_engine, text
from sqlalchemy.exc import ProgrammingError # ProgrammingError catches SQL write exceptions
from sqlalchemy import String, Integer, Float, Boolean
from sqlalchemy import create_engine, MetaData, Table, select
from sqlalchemy.sql import and_
from sqlalchemy import create_engine, MetaData, Table, select

# GEOCODING
from geopy.geocoders import GoogleV3

# CONFIGURATION FILES
import config
pd.set_option('display.float_format', '{:.6f}'.format)

In [25]:
# FUNCTION DECLARATIONS

def geolocate(row):
    if not row['GEOCODING ERR']:  # If GEOCODING ERR is False, run the geocoding API
        address = ', '.join([row['ADDRESS'],
                             #row['NEIGHBORHOOD'],
                             row['BOROUGH']]) + ', New York City'
        response = requests.get(f'https://maps.googleapis.com/maps/api/geocode/json?address={address}&key={config.GOOGLE_API_KEY}')
        res = response.json()
        if res['results']:
            location = res['results'][0]
            if location.get('partial_match'):  # Check for partial match
                row['GEOCODING ERR'] = True
                row['LATITUDE'] = None
                row['LONGITUDE'] = None
            else:
                row['LATITUDE'] = location['geometry']['location']['lat']
                row['LONGITUDE'] = location['geometry']['location']['lng']
        else:
            row['GEOCODING ERR'] = True  # Update GEOCODING ERR to True if geolocation failed
            row['LATITUDE'] = None
            row['LONGITUDE'] = None
    return row

def print_sql_table(engine, table_name):
    """
    This function retrieves and prints all the rows from a SQL table.

    Parameters:
    engine (sqlalchemy.engine.Engine): SQLAlchemy engine instance.
    table_name (str): Name of the table in the SQL database to be printed.

    Returns:
    None. The function prints the rows of the SQL table.
    """
    metadata = MetaData()

    # Reflect the table
    table = Table(table_name, metadata, autoload_with=engine)

    # Connect to the engine
    with engine.connect() as connection:
        # Select all rows from the table
        stmt = select([table])
        result = connection.execute(stmt)

        # Print the rows
        for row in result:
            print(row)

def upload_dataframe_to_sql(engine, table_name, df):
    """
    This function uploads a pandas DataFrame to a SQL table, either inserting new rows or updating existing ones.
    The DataFrame must contain the following columns: 'BOROUGH CODE', 'BOROUGH', 'ADDRESS', 'LATITUDE', 'LONGITUDE', 'GEOCODING ERR'.
    
    Rows from the DataFrame are compared to existing rows in the SQL table based on 'BOROUGH CODE', 'BOROUGH', and 'ADDRESS'.
    If a matching row is found in the SQL table, it is updated with the data from the DataFrame. If no matching row is found, a new row is inserted.

    Parameters:
    engine (sqlalchemy.engine.Engine): SQLAlchemy engine instance.
    table_name (str): Name of the table in the SQL database to which the DataFrame will be uploaded.
    df (pandas.DataFrame): DataFrame to be uploaded. Must contain the required columns.

    Returns:
    None. The function operates in-place on the SQL table.
    """
    metadata = MetaData()

    # Reflect the table
    table = Table(table_name, metadata, autoload_with=engine)

    # Replace NaN values with None
    df = df.where(pd.notnull(df), None)

    # Connect to the engine
    with engine.connect() as connection:
        for i, row in df.iterrows():
            # Check if the row exists
            stmt = select([table]).where(
                and_(
                    table.c['BOROUGH CODE'] == row['BOROUGH CODE'],
                    table.c['BOROUGH'] == row['BOROUGH'],
                    table.c['ADDRESS'] == row['ADDRESS']
                )
            )
            result = connection.execute(stmt).fetchone()
            
            # If the row exists, update it
            if result:
                update_values = {
                    'BOROUGH CODE': row['BOROUGH CODE'],
                    'BOROUGH': row['BOROUGH'],
                    'ADDRESS': row['ADDRESS'],
                    'LATITUDE': row['LATITUDE'],
                    'LONGITUDE': row['LONGITUDE'],
                    'GEOCODING ERR': row['GEOCODING ERR']
                }
                stmt = (
                    table.update().
                    where(
                        and_(
                            table.c['BOROUGH CODE'] == row['BOROUGH CODE'],
                            table.c['BOROUGH'] == row['BOROUGH'],
                            table.c['ADDRESS'] == row['ADDRESS']
                        )
                    ).
                    values(**update_values)
                )
            # If the row doesn't exist, insert it
            else:
                stmt = table.insert().values(**row.to_dict())

            # Execute the statement
            connection.execute(stmt)

In [26]:
os.environ['SQLALCHEMY_WARN_20'] = '0'
os.environ['SQLALCHEMY_SILENCE_UBER_WARNING'] = '1'

# Database params & credentials
username = 'root'
password = 'rootpassword'
hostname = 'db'
database_name = 'database_1'

# Create database connection
engine = create_engine(f'mysql+pymysql://{username}:{password}@{hostname}')

try:
    with engine.connect() as connection:
        connection.execute(text(f'CREATE DATABASE {database_name};'))
except ProgrammingError:
    pass

engine = create_engine(f'mysql+pymysql://{username}:{password}@{hostname}/{database_name}')

# Show databases
# with engine.connect() as connection:
#     result = connection.execute(text("SHOW DATABASES;"))
#     databases = [row[0] for row in result]
#     print(databases)

In [27]:
# Generate mapping between NYC data and Zillow categories.

# Show all the building classes
# This won't work unless the DataFrame 'combined' has been set up
# sorted_building_classes = sorted(
#     combined["BUILDING CLASS CATEGORY"].unique(),
#     key=lambda x: int(x.split(" ")[0])
# )

# Define the mappings
mapping = {
    "Single-family home": ['01 ONE FAMILY DWELLINGS'],
    "Multi-family home": [
        '03 THREE FAMILY DWELLINGS',
        '07 RENTALS - WALKUP APARTMENTS',
        '08 RENTALS - ELEVATOR APARTMENTS',
        '14 RENTALS - 4-10 UNIT'
    ],
    "Apartment": [
        '07 RENTALS - WALKUP APARTMENTS',
        '08 RENTALS - ELEVATOR APARTMENTS',
        '09 COOPS - WALKUP APARTMENTS',
        '10 COOPS - ELEVATOR APARTMENTS'
    ],
    "Condo": [
        '04 TAX CLASS 1 CONDOS',
        '12 CONDOS - WALKUP APARTMENTS',
        '13 CONDOS - ELEVATOR APARTMENTS',
        '15 CONDOS - 2-10 UNIT RESIDENTIAL',
        '16 CONDOS - 2-10 UNIT WITH COMMERCIAL UNIT'
    ],
    "Co-op": [
        '09 COOPS - WALKUP APARTMENTS',
        '10 COOPS - ELEVATOR APARTMENTS',
        '17 CONDO COOPS'
    ],
    "Duplex": ['02 TWO FAMILY DWELLINGS'],
    "Townhouse": [
        '01 ONE FAMILY DWELLINGS',
        '02 TWO FAMILY DWELLINGS'
    ],
    "Brownstone": [
        '01 ONE FAMILY DWELLINGS',
        '02 TWO FAMILY DWELLINGS'
    ],
    "Row house": [
        '01 ONE FAMILY DWELLINGS',
        '02 TWO FAMILY DWELLINGS'
    ],
}

# Flatten the mapping dictionary to create a dataframe
mapping_list = [(k, v) for k, vals in mapping.items() for v in vals]
mapping_df = pd.DataFrame(
    mapping_list,
    columns=['ZILLOW CATEGORY', 'BUILDING CLASS CATEGORY']
)

In [28]:
mapping_df.to_sql('cat_map', con=engine, index=False, if_exists='replace')

#Show tables
# with engine.connect() as connection:
#     result = connection.execute(text("SHOW TABLES;"))
#     tables = [row[0] for row in result]
#     print(tables)

#Show columns from table
with engine.connect() as connection:
    result = connection.execute(text("SHOW COLUMNS FROM geocodes;"))
    tables = [(row[0],row[1]) for row in result]
    print(tables)

[('BOROUGH CODE', 'int(11)'), ('BOROUGH', 'varchar(25)'), ('NEIGHBORHOOD', 'varchar(100)'), ('ADDRESS', 'varchar(255)'), ('LATITUDE', 'float'), ('LONGITUDE', 'float'), ('GEOCODING ERR', 'tinyint(1)')]


In [29]:
# URL Schema
# [Manhattan, Bronx, Brooklyn, Queens, Staten Island]

dataURLs = [
    'https://www.nyc.gov/assets/finance/downloads/pdf/rolling_sales/'
    'rollingsales_manhattan.xlsx',
    'https://www.nyc.gov/assets/finance/downloads/pdf/rolling_sales/'
    'rollingsales_bronx.xlsx',
    'https://www.nyc.gov/assets/finance/downloads/pdf/rolling_sales/'
    'rollingsales_brooklyn.xlsx',
    'https://www.nyc.gov/assets/finance/downloads/pdf/rolling_sales/'
    'rollingsales_queens.xlsx',
    'https://www.nyc.gov/assets/finance/downloads/pdf/rolling_sales/'
    'rollingsales_statenisland.xlsx'
]
# Create an empty array that will hold our NYC Housing DataFrames
data = []

# Pull data from the NYC website
for url in dataURLs:
    # Read Excel file and skip the first 4 rows
    df = pd.read_excel(url, skiprows=4, engine="openpyxl")
    data.append(df)

In [30]:
# Combine the dataframes from the nyc housing website
combined = pd.concat(data, ignore_index=True)

# Add borough names

# Rename the 'BOROUGH' column to 'BOROUGH CODE'
combined = combined.rename(columns={'BOROUGH': 'BOROUGH CODE'})
# Define the mapping for borough codes to borough names
borough_mapping = {1: 'MANHATTAN', 2: 'BRONX', 3: 'BROOKLYN', 4: 'QUEENS', 5: 'STATEN ISLAND'}
# Create a new 'BOROUGH' column based on 'BOROUGH CODE'
borough = combined['BOROUGH CODE'].map(borough_mapping)
# Insert the new 'BOROUGH' column into the DataFrame right after the 'BOROUGH CODE' column
combined.insert(loc=1, column='BOROUGH', value=borough)

# Remove bad rows
combined = combined[~combined['ADDRESS'].str.contains('N/A')]

In [31]:
# Write the contents of `combined` to the `sales` SQL table...
combined.to_sql('sales', con=engine, index=False, if_exists='replace')

In [32]:
# CREATE GEOCODING TABLE & COPY TO SQL TABLE

# Table name in the SQL database
geocodes_sql_table = 'geocodes'

data_types_df = {
    'BOROUGH CODE': int,
    'BOROUGH': str,
    'NEIGHBORHOOD': str,
    'ADDRESS': str,
    'LATITUDE': float,
    'LONGITUDE': float,
    'GEOCODING ERR': bool  # New column
}

data_types_sqlalchemy = {
    'BOROUGH CODE': Integer,
    'BOROUGH': String(25),
    'NEIGHBORHOOD': String(100),
    'ADDRESS': String(255),
    'LATITUDE': Float,
    'LONGITUDE': Float,
    'GEOCODING ERR': Boolean  # New column
}

try:
    # Template Pandas DataFrame that we use to build the SQL database
    geocodes = pd.DataFrame(columns=data_types_df)
    # Create the table & confirm
    geocodes.to_sql(geocodes_sql_table, engine, # if_exists='replace',
                index=False, dtype=data_types_sqlalchemy)
    print(f"Table '{geocodes_sql_table}' created in database '{database_name}'.")
except ValueError:
    print(f"Table '{geocodes_sql_table}' already exists in database '{database_name}'.")


Table 'geocodes' already exists in database 'database_1'.


In [33]:
# Create a table of geographic information from `combined`
geocodingTable = combined[['BOROUGH CODE', 'BOROUGH', 'NEIGHBORHOOD', 'ADDRESS']].copy()
geocodingTable['LATITUDE'] = None
geocodingTable['LONGITUDE'] = None
geocodingTable['GEOCODING ERR'] = False

In [42]:
# Query the SQL database for the geocodes table
query = f"SELECT * FROM {geocodes_sql_table}"

# Execute the query and load the result into a DataFrame
df_queryResponse = pd.read_sql_query(query, engine)
#df_queryResponse = geocodingTable[:-5].copy() # Fictional, remove in prod

In [43]:
# Find any rows we downloaded from NYC that aren't already in the database
# with a mask
mask = geocodingTable[['BOROUGH CODE', 'BOROUGH', 'NEIGHBORHOOD', 'ADDRESS']].isin(
    df_queryResponse[['BOROUGH CODE', 'BOROUGH', 'NEIGHBORHOOD', 'ADDRESS']]
)
matching_rows = mask.all(axis=1)
missing_rows = geocodingTable[~matching_rows]

# Create a new row with a fictional address
new_row = pd.DataFrame([[5, 'STATEN ISLAND', 'WOODROW', '123 FAKE STREET', None, None, False]],
                       columns=missing_rows.columns)
# Append the new row to the dataframe
missing_rows = missing_rows.append(new_row, ignore_index=True)

In [44]:
# Print the updated dataframe
print(missing_rows.to_string())

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [45]:
# DANGEROUS! HIGH API USAGE!
# Apply geolocator function to all rows in updated_df DataFrame
# missing_rows = missing_rows.apply(geolocate, axis=1)

# Print the updated dataframe
print(missing_rows.to_string())

KeyboardInterrupt: 

In [None]:
engine = create_engine(f'mysql+pymysql://{username}:{password}@{hostname}/{database_name}')
upload_dataframe_to_sql(engine, 'geocodes', missing_rows)

In [None]:
# Create the SQLAlchemy engine
engine = create_engine(f'mysql+pymysql://{username}:{password}@{hostname}/{database_name}')
# Call the function to print the SQL table
print_sql_table(engine, 'geocodes')

Unnamed: 0,BOROUGH CODE,BOROUGH,NEIGHBORHOOD,ADDRESS,LATITUDE,LONGITUDE,GEOCODING ERR
0,1,MANHATTAN,ALPHABET CITY,347 EAST 4TH STREET,,,False
1,1,MANHATTAN,ALPHABET CITY,19 AVENUE D,,,False
2,1,MANHATTAN,ALPHABET CITY,110 AVENUE C,,,False
3,1,MANHATTAN,ALPHABET CITY,326 EAST 4TH STREET,,,False
4,1,MANHATTAN,ALPHABET CITY,328 EAST 4TH STREET,,,False
...,...,...,...,...,...,...,...
83288,5,STATEN ISLAND,WOODROW,104 GLADWIN STREET,,,False
83289,5,STATEN ISLAND,WOODROW,96 LENEVAR AVENUE,,,False
83290,5,STATEN ISLAND,WOODROW,401 BLOOMINGDALE ROAD,,,False
83291,5,STATEN ISLAND,WOODROW,3120 ARTHUR KILL ROAD,,,False
