# Human Mobility in San Diego County
Jessica Embury

### Subset LODES data to only include San Diego block groups and aggregate to Block Groups
### Excess tables deleted due to technical limitations

In [None]:
#import needed libraries
from bs4 import BeautifulSoup
import requests
import os
import glob
import gzip
import psycopg2

In [None]:
#USER ENTERED VARIABLES

#directory where this file is located and where data files will be stored
abs_path = ''

#for database 
host = ''
db = ''
user = ''
password = ''

schema_name = 'lodes'

#For fresh start (get all files for all states), use state_num = 0 and file_num = 0
#Change state_num and file_num if data retrieval needs to be resumed (after SSL Error, etc.)

#Enter CURRENT state number
#'az' = 0, 'ca' = 1, 'co' = 2, 'dc' = 3, 'fl' = 4, 'il' = 5, 'ma' = 6, 'mi'  = 7, 
#'mn' = 8, 'nv' = 9, 'ny' = 10, 'tx' = 11, 'wa' = 12
state_num = 0

#Enter last SUCCESSFUL file number for the current state (For example, in 'az', 71 out of 136, first file will be 72)
file_num = 0

In [None]:
#list object containing states to get lodes data from
#states = ['az', 'ca', 'co', 'dc', 'fl', 'il', 'ma', 'mi', 'mn', 'nv', 'ny', 'tx', 'wa']
states = ['ca']

In [None]:
#FUNCTIONS

#Function to get all file names from the associated url 
#reference: https://stackoverflow.com/questions/11023530/python-to-list-http-files-and-directories
def listFD(url, ext=''):
    page = requests.get(url).text
    #print(page)
    soup = BeautifulSoup(page, 'html.parser')
    return [url + '/' + node.get('href') for node in soup.find_all('a') if node.get('href').endswith(ext)]

#function for psycopg2 to connect to the PostgreSQL database server
#reference: https://github.com/NaysanSaran/pandas2postgresql/blob/master/notebooks/CompleteExample.ipynb
def connect(params_dic):
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(**params_dic)

    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1) 
    return conn

In [None]:
#CONNECT TO DATABASE

#specify the psql connection parameters
param_dic = {
    "host"      : host,
    "database"  : db,
    "user"      : user,
    "password"  : password,
}

#connect to the database server
conn = connect(param_dic)

#define the cursor
cur = conn.cursor()

In [None]:
#create new tables with san diego subset
for y in range(len(states) - state_num):
    
    #adjust to current state
    i = y + state_num
    
    #list object to contain names of all files to be downloaded from url - for current state
    filenames = []
    
    #variable for url to the current state's lodes files
    url = 'https://lehd.ces.census.gov/data/lodes/LODES7/' + states[i] + '/od/'
    
    #file extension of lodes data
    ext = 'csv.gz'

    #get all file names for the current state
    for file in listFD(url, ext):
        #print(file)
        filenames.append(os.path.basename(file))
    
    print('State: ' +states[i] + ', Number of files for ' + url +' = ' + str(len(filenames)))
    
        
    #for each file for the current state:
    for z in range(len(filenames) - file_num):
            
        #adjust to next file
        file = z + file_num
      
        #DATABASE SET UP- SUBSET SAN DIEGO DATA- CREATE NEW TABLES
        
        #database table name - same as filename, but remove extensions (.csv.gz)
        table_name = schema_name + '.' + filenames[file][:-7]
       
        #database table names for san diego subset
        sd_table_name = schema_name + '.' + filenames[file][:-7] + '_sd'
        
        #drop table if exists
        drop_table = ('DROP TABLE IF EXISTS {};'.format(sd_table_name))

        try:
            cur.execute(drop_table)
            print('Table dropped if exists.')
        except:
            print("Error. Table (if exists) not dropped.")
            
        #create table
        create_table = ("CREATE TABLE {} AS SELECT * FROM {} WHERE substring(w_geocode from 0 for 6) = '06073' AND substring(h_geocode from 0 for 6) = '06073';".format(sd_table_name, table_name))

        try:
            cur.execute(create_table)
            print('Table created.')
        except:
            print("Error. Table not created.")
            
    #commit database changes for each state
    conn.commit()

In [None]:
#some of the san diego subset tables are empty. Find and delete those tables.
#reference: https://pynative.com/python-postgresql-select-data-from-table/
#reference: https://dataedo.com/kb/query/postgresql/find-empty-tables-in-database
            
find_empty_tables = ("select c.relname as table_name from pg_class c join pg_namespace n on n.oid = c.relnamespace where c.relkind = 'r' and n.nspname not in ('information_schema','pg_catalog') and c.reltuples = 0 order by table_name;")   
#n.nspname as table_schema,       
try:
    cur.execute("set search_path to lodes, public;")
    cur.execute(find_empty_tables)
    empty_tables = cur.fetchall() 
except:
    print("Error.") 

empty_tables_list = []

for row in range(len(empty_tables)):
    
    nested_list = list(empty_tables[row])
    split = nested_list[0]
    empty_tables_list.append(split)

num_empty_tables = len(empty_tables_list)
print(num_empty_tables)

count = 1

#drop empty tables from database
for row in range(len(empty_tables_list)):
    
    table_name = empty_tables_list[row]
    print(table_name)
    
    drop_table = ('DROP TABLE IF EXISTS {};'.format(table_name))

    try:
        cur.execute(drop_table)
        print('Table dropped if exists. #{}'.format(count))
    except:
        print("Error. Table (if exists) not dropped.")
        
    count += 1
                   
#commit database changes for each state
conn.commit()

In [None]:
#delete CA tables, keep only San Diego subset tables.
for y in range(len(states) - state_num):
    
    #adjust to current state
    i = y + state_num
    
    #list object to contain names of all files to be downloaded from url - for current state
    filenames = []
    
    #variable for url to the current state's lodes files
    url = 'https://lehd.ces.census.gov/data/lodes/LODES7/' + states[i] + '/od/'
    
    #file extension of lodes data
    ext = 'csv.gz'

    #get all file names for the current state
    for file in listFD(url, ext):
        #print(file)
        filenames.append(os.path.basename(file))
    
    print('State: ' +states[i] + ', Number of files for ' + url +' = ' + str(len(filenames)))
    
        
    #for each file for the current state:
    for z in range(len(filenames) - file_num):
            
        #adjust to next file
        file = z + file_num
        
        #database table name - same as filename, but remove extensions (.csv.gz)
        table_name = schema_name + '.' + filenames[file][:-7]
        print(table_name)
        
        #drop table if exists
        drop_table = ('DROP TABLE IF EXISTS {};'.format(table_name))

        try:
            cur.execute(drop_table)
            print('Table dropped if exists.')
        except:
            print("Error. Table (if exists) not dropped.")
            
#commit changes to the database
conn.commit()

In [None]:
#Aggregate data in SD tables to block groups
#Part 1: get table names to aggregate
find_lodes_tables =("SELECT table_name FROM information_schema.tables WHERE table_schema = 'lodes'")

try:
    cur.execute("set search_path to lodes, public;")
    cur.execute(find_lodes_tables)
    lodes_tables = cur.fetchall() 
except:
    print("Error.") 
    
lodes_tables_list = []
nested_list = []

for row in range(len(lodes_tables)):
    
    nested_list = list(lodes_tables[row])
    split = nested_list[0]
    lodes_tables_list.append(split)
    
#print(len(lodes_tables_list))
#print(lodes_tables_list)

#delete shapefiles from list
lodes_tables_list = lodes_tables_list[:-3]

#print(len(lodes_tables_list))
#print(lodes_tables_list)


In [None]:
#Aggregate data in SD tables to block groups
#Part 1: create tables with aggregated data
for row in range(len(lodes_tables_list)):
    
    table_name = schema_name + '.' + lodes_tables_list[row]
    
    aggregated_table = table_name + '_bg'
    #print(aggregated_table)

    drop_table = ("DROP TABLE IF EXISTS {};".format(aggregated_table))

    try:
        cur.execute(drop_table)
        print('Table dropped if exists.')
    except:
        print("Error. Table (if exists) not dropped.")

    create_table = ("CREATE TABLE {} AS SELECT t.w_geocode, t.h_geocode, SUM(t.s000) AS s000, SUM(t.sa01) AS sa01, SUM(t.sa02) AS sa02, SUM(t.sa03) AS sa03, SUM(t.se01) AS se01, SUM(t.se02) AS se02, SUM(t.se03) AS se03, SUM(t.si01) AS si01, SUM(t.si02) AS si02, SUM(t.si03) AS si03 FROM (SELECT SUBSTRING(w_geocode from 0 for 13) AS w_geocode, SUBSTRING(h_geocode from 0 for 13) AS h_geocode, s000, sa01, sa02, sa03, se01, se02, se03, si01, si02, si03 FROM {}) AS t GROUP BY t.w_geocode, t.h_geocode;".format(aggregated_table, table_name))
    #create_table = ("CREATE TABLE {} AS SELECT t.w_geocode, t.h_geocode, SUM(t.s000) AS s000, SUM(t.sa01) AS sa01, SUM(t.sa02) AS sa02, SUM(t.sa03) AS sa03, SUM(t.se01) AS se01, SUM(t.se02) AS se02, SUM(t.se03) AS se03, SUM(t.sl01) AS sl01, SUM(t.sl02) AS sl02, SUM(t.sl03) AS sl03 FROM (SELECT SUBSTRING(w_geocode from 0 for 13) AS w_geocode, SUBSTRING(h_geocode from 0 for 13) AS h_geocode, s000, sa01, sa02, sa03, se01, se02, se03, sl01, sl02, sl03 FROM {}) AS t GROUP BY t.w_geocode, t.h_geocode;".format(aggregated_table, table_name))

    try:
        cur.execute(create_table)
        print('Table created.')
    except:
        print("Error. Table not created.")

conn.commit()

In [None]:
#Delete tables with non-aggregated data
for row in range(len(lodes_tables_list)):
    
    table_name = schema_name + '.' + lodes_tables_list[row]

    drop_table = ("DROP TABLE IF EXISTS {};".format(table_name))

    try:
        cur.execute(drop_table)
        print('Table dropped if exists.')
    except:
        print("Error. Table (if exists) not dropped.")

conn.commit()

In [None]:
#create indexes for aggregated tables
for row in range(len(lodes_tables_list)):
    
    table_name = schema_name + '.' + lodes_tables_list[row] + '_bg'
    index_name = 'idx_' + lodes_tables_list[row] + '_bg_wh_geocode'
    
    create_index = ("create index {} on {} (w_geocode, h_geocode);".format(index_name, table_name))
    
    try:
        cur.execute(create_index)
        print('Index created.')
    except:
        print("Error. Index not created.")

conn.commit()

In [None]:
#COMMIT AND CLOSE DATABASE
#commit changes to the database
conn.commit()

#close the database connection
conn.close()

#close the cursor
cur.close()