In [None]:
import array, math, os, psycopg2, random
from shapely.geometry import *
from shapely.wkb import loads
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool 

def init_capture(capture_dir):
    # Create capture dir if it doesn't exist
    if not os.path.exists(capture_dir):
        os.makedirs(capture_dir)

def format_tabblock_url(state):
    # URL for FIPS and GNIS codes file -- https://www.census.gov/geo/reference/docs/state.txt'
    return 'ftp://ftp2.census.gov/geo/tiger/TIGER2010/TABBLOCK/2010/tl_2010_{0}_tabblock10.zip'.format(state)

def format_lodes7_od_url(st, part, t, year):
    #http://lehd.ces.census.gov/data/lodes/LODES7/LODESTechDoc7.0.pdf
    return 'http://lehd.ces.census.gov/data/lodes/LODES7/{0}/od/{0}_od_{1}_{2}_{3}.csv.gz'.format(st, part, t, year)

def download_file(url, filename):
    command = "wget %s -O %s" % (url, filename)
    print "Downloading %s to %s" % (url, filename)
    !$command

def unzip_tabblock(filename, exdir):
    command = "unzip %s -d %s" % (filename, exdir)
    print "Unzip %s to %s" % (filename, exdir)
    !$command

def gunzip_lodes(filename):
    command = "gunzip %s" % filename
    print "Unzip %s" % filename
    !$command

def create_od_table(state):
    s =(
        "DROP TABLE IF EXISTS %s_od_jt00_2011;\n"
        "CREATE TABLE %s_od_jt00_2011 ( "
        "gid serial NOT NULL, "
        "w_geocode character varying(15), "
        "h_geocode character varying(15), "
        "S000 integer, "
        "SA01 integer, " 
        "SA02 integer, "
        "SA03 integer, "
        "SE01 integer, "
        "SE02 integer, "
        "SE03 integer, "
        "SI01 integer, "
        "SI02 integer, "
        "SI03 integer, "
        "createdate character varying(8));\n"
    ) % (state,state)
    return s

def copy_cvs_to_psql(state,filename):
    s = (
         "COPY %s_od_jt00_2011("
         "w_geocode,h_geocode,S000,SA01,"
         "SA02,SA03,SE01,SE02,SE03,SI01,"
         "SI02,SI03,createdate) FROM "
         "'%s' "
        "DELIMITER ',' CSV HEADER;\n"
         ) % (state, filename)
    return s

def LonLatToPixelXY(lonlat, scale = 1.):
    (lon, lat) = lonlat
    x = (lon + 180.0) * 256.0 / 360.0
    y = 128.0 - math.log(math.tan((lat + 90.0) * math.pi / 360.0)) * 128.0 / math.pi
    return [x*scale, y*scale]

def randomPoint(geom):
    poly = loads(geom, True)
    bbox = poly.bounds
    l,b,r,t = bbox
    while True:
        point = Point(random.uniform(l,r),random.uniform(t,b))
        if point is None:
            break
        if poly.contains(point):
            break
    return point.__geo_interface__['coordinates']

def split_list(alist, wanted_parts=1):
    length = len(alist)
    return [ alist[i*length // wanted_parts: (i+1)*length // wanted_parts] 
             for i in range(wanted_parts) ]

def my_range(start, end, step):
    while start <= end:
        yield start
        start += step

def get_count(conn, table):
    query = "SELECT count(*) FROM %s" % table
    cur = conn.cursor()
    cur.execute(query)
    rows = cur.fetchall()
    cur.close()
    return rows
    
def pack_color(color):
    return color['r'] + color['g'] * 256.0 + color['b'] * 256.0 * 256.0;

def unpack_color(f):
    b = math.floor(f / 256.0 / 256.0)
    g = math.floor((f - b * 256.0 * 256.0) / 256.0)
    r = math.floor(f - b * 256.0 * 256.0 - g * 256.0)
    return {'r':r,'g':g,'b':b}

se01_color = {'r':25, 'g':75, 'b':255}
se02_color = {'r':20, 'g':138, 'b':9}
se03_color = {'r':227, 'g':30, 'b':30}
ANSI_CODES = [
    ('01', 'al'), ('02', 'ak'), ('04', 'az'), ('05', 'ar'), ('06', 'ca'),
    ('08', 'co'), ('09', 'ct'), ('10', 'de'), ('11', 'dc'), ('12', 'fl'),
    ('13', 'ga'), ('15', 'hi'), ('16', 'id'), ('17', 'il'), ('18', 'in'),
    ('19', 'ia'), ('20', 'ks'), ('21', 'ky'), ('22', 'la'), ('23', 'me'),
    ('24', 'md'), ('25', 'ma'), ('26', 'mi'), ('27', 'mn'), ('28', 'ms'),
    ('29', 'mo'), ('30', 'mt'), ('31', 'ne'), ('32', 'nv'), ('33', 'nh'),
    ('34', 'nj'), ('35', 'nm'), ('36', 'ny'), ('37', 'nc'), ('38', 'nd'),
    ('39', 'oh'), ('40', 'ok'), ('41', 'or'), ('42', 'pa'), ('44', 'ri'),
    ('45', 'sc'), ('46', 'sd'), ('47', 'tn'), ('48', 'tx'), ('49', 'ut'),
    ('50', 'vt'), ('51', 'va'), ('53', 'wa'), ('54', 'wi'), ('55', 'wy'),
    ('56', 'wv')
]

In [None]:
init_capture("capture/tabblock_2010")
capture_dir = "capture/tabblock_2010/"
pa_id = 42
init_capture(capture_dir)
pa_url = format_tabblock_url(pa_id)
pa_filename = capture_dir + os.path.basename(pa_url)
if not os.path.isdir('./' + capture_dir):
    download_file(pa_url, pa_filename)
    unzip_tabblock(pa_filename, pa_filename.split('.')[0])

In [None]:
command = "createdb tl_2010_tabblock"
!$command
command = "psql -d tl_2010_tabblock -c 'CREATE EXTENSION postgis;'"
!$command

In [None]:
command = "shp2pgsql -s 4269:4326 capture/tabblock_2010/tl_2010_%s_tabblock10/tl_2010_%s_tabblock10.shp tl_2010_tabblock10 | psql -q -d tl_2010_tabblock" % (pa_id,pa_id)
!$command

In [None]:
command = "psql -d tl_2010_tabblock -c 'CREATE INDEX ON tl_2010_tabblock10 (geoid10);'"
!$command

In [None]:
command = "psql -d tl_2010_tabblock -c 'CREATE INDEX ON tl_2010_tabblock10 USING GIST (geom);'"
!$command

In [None]:
capture_dir = "capture/lodes7/pa/"
init_capture(capture_dir)
pa_url = format_lodes7_od_url('pa', 'main', 'JT00', '2011')
pa_filename = capture_dir + os.path.basename(pa_url)
if not os.path.isdir('./' + capture_dir):
    download_file(pa_url, pa_filename) 
    gunzip_lodes(pa_filename)


In [None]:
f = open("pa-csv-to-psql.sql", "w")
f.write(create_od_table('pa'))
f.write(copy_cvs_to_psql('pa', os.path.abspath(os.path.splitext(pa_filename)[0])))
f.close()
command = "psql -d tl_2010_tabblock -f pa-csv-to-psql.sql"
!$command


In [None]:
command = "psql -d tl_2010_tabblock -c 'CREATE INDEX ON pa_od_jt00_2011 (gid);'"
!$command

command = "psql -d tl_2010_tabblock -c 'CREATE INDEX ON pa_od_jt00_2011 (w_geocode);'"
!$command

command = "psql -d tl_2010_tabblock -c 'CREATE INDEX ON pa_od_jt00_2011 (h_geocode);'"
!$command


In [None]:
# faster
# runs multiple queries to build rows
# then runs multiple processes to process rows

state = 'pa'
dest = '%s-od-jt00-2011.bin' % state
STEP = 100000 # run SQL query 100000 rows at a time
    
packed_se01_color = pack_color(se01_color)
packed_se02_color = pack_color(se02_color)
packed_se03_color = pack_color(se03_color)

# find how many rows total:
conn = psycopg2.connect(dbname="tl_2010_tabblock", host="/var/run/postgresql")
rows = get_count(conn, '%s_od_jt00_2011' % state)
TOTAL = rows[0][0]
print 'Total Rows: %s' % TOTAL
conn.close()

def get_rows(index):
    offset = index * STEP    
    conn = psycopg2.connect(dbname="tl_2010_tabblock", host="/var/run/postgresql")
    print "Batch start: %s. Batch limit: %s" % (offset, STEP)
    query = (
            "select w.geom, h.geom, od.se01, od.se02, od.se03 "
            "from %s_od_jt00_2011 od  "
            "left join tl_2010_tabblock10 w on od.w_geocode = w.geoid10 "
            "left join tl_2010_tabblock10 h on od.h_geocode = h.geoid10 "
            "order by od.gid "
            "limit %s "
            "offset %s "
            ) % (state, STEP, offset)
    cur = conn.cursor()
    cur.execute(query)
    rows = cur.fetchall()
    cur.close()
    conn.close()
    return rows

def process_row(row):
    data = []
    workGeom = row[0]
    homeGeom = row[1]
    se01 = row[2]
    se02 = row[3]
    se03 = row[4]
    for i in range(se01):
        wpoint = randomPoint(workGeom)
        hpoint = randomPoint(homeGeom)
        data += LonLatToPixelXY(wpoint)
        data += LonLatToPixelXY(hpoint)
        data.append(packed_se01_color)
    for i in range(se02):
        wpoint = randomPoint(workGeom)
        hpoint = randomPoint(homeGeom)
        data += LonLatToPixelXY(wpoint)
        data += LonLatToPixelXY(hpoint)
        data.append(packed_se02_color)
    for i in range(se03):
        wpoint = randomPoint(workGeom)
        hpoint = randomPoint(homeGeom)
        data += LonLatToPixelXY(wpoint)
        data += LonLatToPixelXY(hpoint)
        data.append(packed_se03_color)
        
    #print "Randomizing points..."
    split = split_list(data,len(data)/3)
    random.shuffle(split)        
    data = []
    for x in split:
        for y in x:
            data += [y]
    return data

        
print "Querying rows..."
index = range(0, TOTAL / STEP)
pool = Pool(23)
all_rows = pool.map(get_rows, index)
pool.close()
pool.join()

rows = [row for rows in all_rows for row in rows] # flatten results
print 'Resulting rows: %s ' % len(rows)

print "Processing rows..."
pool = Pool(23) # use 24 of available 32 cores. Let's not be greedy...
results = pool.map(process_row, rows)
pool.close()
pool.join()

print "Appending results to %s" % dest
# at this point we have an array of arrays (e.g. [[73.1, 96.8, 73.2, 96.7, 625172], [73.2, 96.7, 73.1, 96.8, 622372], ...])
results = [item for sublist in results for item in sublist] # flatten results
array.array('f', results).tofile(open(dest, 'aw'))


print "Process finished."