In [1]:
import pandas as pd
from sodapy import Socrata
from tqdm import tqdm
import os

import sqlalchemy as sa
from sqlalchemy import Table, MetaData, Column, Integer, text

## Get the data

Get the SF Data via API

In [4]:
## https://data.sfgov.org/resource/ramy-di5m.json - addresses with units
client = Socrata("data.sfgov.org", None)



In [5]:
addresses_with_units = {'api':'ramy-di5m', 'max_rows': 388000}
evictions = {'api':'5cei-gny5', 'max_rows': 43400}
buyout_agreements = {'api':'wmam-7g8d', 'max_rows': 5900}

In [6]:
results_addresses_with_units = client.get(addresses_with_units['api'], limit=addresses_with_units['max_rows'])
results_evictions = client.get(evictions['api'], limit=evictions['max_rows'])
results_buyout_agreements = client.get(buyout_agreements['api'], limit=buyout_agreements['max_rows'])

In [7]:
# Convert to pandas DataFrames
df_addresses_with_units = pd.DataFrame.from_records(results_addresses_with_units)
df_evictions = pd.DataFrame.from_records(results_evictions)
df_buyout_agreements = pd.DataFrame.from_records(results_buyout_agreements)

In [8]:
## make sure we have the same number of rows as is listed in the UI
print(f'Addresses with units dataframe size: {df_addresses_with_units.shape[0]}') 
print(f'Evictions dataframe size: {df_evictions.shape[0]}') 
print(f'Buyout Agreements dataframe size: {df_buyout_agreements.shape[0]}') 

Addresses with units dataframe size: 386975
Evictions dataframe size: 43393
Buyout Agreements dataframe size: 5855


In [9]:
df_addresses_with_units.head()

Unnamed: 0,eas_baseid,eas_subid,eas_fullid,address,address_number,street_name,street_type,cnn,longitude,latitude,...,nhood,:@computed_region_6qbp_sg9q,:@computed_region_qgnn_b9vv,:@computed_region_26cr_cadq,:@computed_region_ajp5_b2md,unit_number,parcel_number,block,lot,address_number_suffix
0,500181,822322,500181-822322-0,10 MARINA BLVD,10,MARINA,BLVD,8701000,-122.43245572,37.80528301,...,Marina,17,4,6,13,,,,,
1,445641,665584,445641-665584-0,1151 TAYLOR ST,1151,TAYLOR,ST,12443000,-122.41300438,37.79321685,...,Nob Hill,16,6,3,21,,,,,
2,446863,667522,446863-667522-0,1317 HYDE ST,1317,HYDE,ST,7129000,-122.41817598,37.79306975,...,Russian Hill,16,6,3,32,,,,,
3,363697,753449,363697-753449-0,3449 22ND ST #3,3449,22ND,ST,1196000,-122.42406182,37.75494435,...,Mission,52,3,5,20,3.0,,,,
4,359019,748103,359019-748103-0,537 14TH ST #2,537,14TH,ST,590000,-122.42512553,37.76783319,...,Mission,37,3,5,20,2.0,,,,


Get the ZipAtlas Data via CSVs, locally

In [10]:
# As dataframes
df_median_age = pd.read_csv('median-age.csv')
df_population_density = pd.read_csv('population-density.csv')
df_median_household_income = pd.read_csv('median-household-income.csv')

## Connect to the Azure SQL Database

In [11]:
# set environ variables like: 
##os.environ['AZURE_MSDS432_USERNAME'] = <username>
##os.environ['AZURE_MSDS432_PASSWORD'] = <password>

In [12]:
## You must have this driver installed on your local machine

username = os.getenv('AZURE_MSDS432_USERNAME')
password = os.getenv('AZURE_MSDS432_PASSWORD')
host = 'mysqlserver-432.database.windows.net'
database = 'mySampleDatabase'
authentication = "SqlPassword"
conn_string = sa.engine.url.URL(
     "mssql+pyodbc",
     username=username,
     password=password,
     host=host,
     port=1433,
     database=database,
     query={"driver": "ODBC Driver 18 for SQL Server", "authentication": authentication}
 )

In [13]:
engine = sa.create_engine(conn_string, pool_timeout=60)
connection = engine.connect()

## Check what tables we have now

In [14]:
check_tables_query = """
SELECt schema_name(t.schema_id) as schema_name,
       t.name as table_name,
       t.create_date,
       t.modify_date
FROM sys.tables t
ORDER BY schema_name,
         table_name;
"""

tables = pd.read_sql_query(check_tables_query, connection)

In [15]:
tables

Unnamed: 0,schema_name,table_name,create_date,modify_date
0,dbo,BuildVersion,2022-05-15 20:04:00.113,2022-05-15 20:04:01.767
1,dbo,ErrorLog,2022-05-15 20:04:00.113,2022-05-15 20:04:01.767
2,dbo,Rel_Building,2022-05-17 22:49:05.513,2022-05-17 22:49:05.513
3,dbo,Rel_BuyoutAgreements,2022-05-17 22:47:28.627,2022-05-17 22:47:28.627
4,dbo,Rel_Demographics,2022-05-17 22:34:37.987,2022-05-17 22:34:37.987
5,dbo,Rel_Eviction_Notices,2022-05-17 21:59:47.223,2022-05-17 21:59:47.223
6,dbo,Rel_Location,2022-05-17 22:25:35.210,2022-05-17 22:47:28.630
7,dbo,SF_Buyout_Agreements_Raw,2022-05-16 00:27:00.917,2022-05-16 00:27:00.943
8,dbo,SF_Eviction_Notices_Raw,2022-05-16 00:41:20.143,2022-05-16 00:41:20.190
9,dbo,Zip_Atlas_Median_Age_Raw,2022-05-16 00:26:38.500,2022-05-16 00:26:38.530


## Upload the raw tables

In [15]:
## Write raw tables to our database: Zip Atlas
engine = sa.create_engine(conn_string, pool_timeout=60)

df_median_age.to_sql('Zip_Atlas_Median_Age_Raw', con=engine, if_exists='replace', index_label='id')
df_population_density.to_sql('Zip_Atlas_Population_Density_Raw', con=engine, if_exists='replace', index_label='id')
df_median_household_income.to_sql('Zip_Atlas_Median_Household_Income_Raw', con=engine, if_exists='replace', index_label='id')


There are some weird columns in the datasets we get from the API that I'll just drop for now

In [16]:
df_evictions_columns_cleaned = df_evictions[df_evictions.columns.drop(list(df_evictions.filter(regex='@computed')))]
df_buyout_agreements_columns_cleaned = df_buyout_agreements[df_buyout_agreements.columns.drop(list(df_buyout_agreements.filter(regex='@computed')))]
df_addresses_with_units_columns_cleaned = df_addresses_with_units[df_addresses_with_units.columns.drop(list(df_addresses_with_units.filter(regex='@computed')))]

In [17]:
## Convert all the columns to strings - we'll need to update the dtype later

for column in df_evictions_columns_cleaned.columns:
    df_evictions_columns_cleaned = df_evictions_columns_cleaned.astype({column: str})

for column in df_buyout_agreements_columns_cleaned.columns:
    df_buyout_agreements_columns_cleaned = df_buyout_agreements_columns_cleaned.astype({column: str})
    
for column in df_addresses_with_units_columns_cleaned.columns:
    df_addresses_with_units_columns_cleaned = df_addresses_with_units_columns_cleaned.astype({column: str})

In [18]:
## Write raw tables to our database: SF Dataset
engine = sa.create_engine(conn_string, pool_timeout=60)

df_buyout_agreements_columns_cleaned.to_sql('SF_Buyout_Agreements_Raw', con=engine, if_exists='replace', index_label='id')


In [19]:
## the other two dataframes are very large.. it would help to have a progress bar
## Credit to: https://stackoverflow.com/questions/39494056/progress-bar-for-pandas-dataframe-to-sql

def chunker(seq, size):
    # from http://stackoverflow.com/a/434328
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

def insert_with_progress(df, connection_string, table_name):
    engine = sa.create_engine(connection_string, pool_timeout=60)
    chunksize = int(len(df) / 10) # 10%
    with tqdm(total=len(df)) as pbar:
        for i, cdf in enumerate(chunker(df, chunksize)):
            replace = "replace" if i == 0 else "append"
            cdf.to_sql(name=table_name, con=engine, if_exists=replace, index_label='id')
            pbar.update(chunksize)
            

In [39]:
insert_with_progress(df_evictions_columns_cleaned, conn_string, 'SF_Eviction_Notices_Raw')

47729it [09:45, 81.54it/s]                                                                                                                                                                      


In [20]:
#TODO: Update addresses with units dataframe

insert_with_progress(df_addresses_with_units_columns_cleaned, conn_string, 'SF_Addresses_With_Units_Raw')

425667it [1:14:05, 95.76it/s]                                                                                                                                                                             


## Check to see if it worked

In [22]:
tables = pd.read_sql_query(check_tables_query, connection)
tables

StatementError: (builtins.AttributeError) 'NoneType' object has no attribute 'cursor'
[SQL: 
SELECt schema_name(t.schema_id) as schema_name,
       t.name as table_name,
       t.create_date,
       t.modify_date
FROM sys.tables t
ORDER BY schema_name,
         table_name;
]

### Get the row count for each table

In [23]:
tables = ['Zip_Atlas_Median_Age_Raw', 'Zip_Atlas_Median_Household_Income_Raw', 'Zip_Atlas_Population_Density_Raw', 
           'SF_Buyout_Agreements_Raw', 'SF_Eviction_Notices_Raw', 'SF_Addresses_With_Units_Raw']

def get_row_count(tables_to_check):
    '''
    Checks the row count of tables passed to the function, and outputs
    a dataframe with the tablename and row count
    '''
    
    tables = []
    rows = []
    
    for table in tables_to_check:
        get_count_query = """
            SELECT COUNT(*) count
            FROM dbo.{};
        """.format(table)
        
        engine = sa.create_engine(conn_string, pool_timeout=60)
        connection = engine.connect()
        results = connection.execute(text(get_count_query))
        for row in results:
            rows.append(row['count'])
        
        tables.append(table)
        
    row_count = {
        'Table Name': tables, 
        'Record Count': rows
    }
    
    df = pd.DataFrame(row_count)
    return df

row_counts = get_row_count(tables)

In [24]:
row_counts

Unnamed: 0,Table Name,Record Count
0,Zip_Atlas_Median_Age_Raw,26
1,Zip_Atlas_Median_Household_Income_Raw,28
2,Zip_Atlas_Population_Density_Raw,26
3,SF_Buyout_Agreements_Raw,5855
4,SF_Eviction_Notices_Raw,43393
5,SF_Addresses_With_Units_Raw,386975


## Drop the tables, if necessary

In [21]:
## Drop the table
#engine = sa.create_engine(conn_string, pool_timeout=60)

#engine.execute("DROP TABLE dbo.SF_Buyout_Agreements_Raw;")
#engine.execute("DROP TABLE dbo.SF_Addresses_With_Units_Raw;")
#engine.execute("DROP TABLE dbo.SF_Eviction_Notices_Raw;")
#engine.execute("DROP TABLE dbo.Zip_Atlas_Median_Age;")
#engine.execute("DROP TABLE dbo.Zip_Atlas_Median_Household_Income;")
#engine.execute("DROP TABLE dbo.Zip_Atlas_Population_Density;")

<sqlalchemy.engine.result.ResultProxy at 0x7fa37f315f70>