In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
from getpass import getpass
from mysql.connector import connect, Error
from dotenv import load_dotenv

class MySQL:
    
    def __init__(self):
        self.client = None
        load_dotenv()
    
    def connect_client(self):
        if self.client:
            return self.client
        
        try:
            self.client = connect(
                host='localhost',
                user=os.getenv('USERNAME'),
                password=os.getenv('PASS'),
                database='fcc',
                allow_local_infile=True
            )
        except Error as e:
            print(e)
            raise Error
        
        return self.client
    
    def run(self, query):
        if not self.client:
            cli = self.connect_client()
        else:
            cli = self.client
        
        try:
            with cli.cursor() as cursor:
                cursor.execute(query)
                cli.commit()
                
        except Error as e:
            self.close_client()
            print(e)
            raise Error
    
    def run_multi(self, f_name):
        if not self.client:
            cli = self.connect_client()
        else:
            cli = self.client
        
        try:
            directory = os.path.abspath('./sql')
            
            with open(f'{directory}/{f_name}') as file:
                query = file.read()
                with cli.cursor() as cursor:
                    cursor.execute(query, multi=True)
                    cli.commit()
            
        except Error as e:
            self.close_client()
            print(e)
            raise Error
    
    def get_cursor(self, query):
        if not self.client:
            cli = self.connect_client()
        else:
            cli = self.client
        
        try:
            cursor = cli.cursor()
            cursor.execute(query)
            return cursor
        except Error as e:
            self.close_client()
            print(e)
            return
    
    def close_client(self):
        if not self.client:
            return
        
        self.client.close()
        self.client = None
    


In [86]:
client = MySQL()

In [28]:
client.connect_client()

query = """
        CREATE TABLE test(
            id INT NOT NULL AUTO_INCREMENT,
            number INT,
            text VARCHAR(100),
            PRIMARY KEY (id)
        )
        """

query2 = """
        INSERT INTO test (number, text)
        VALUES (10, "Thingy")
        """

client.run(query2)


In [29]:
client.close_client()

In [89]:
client = MySQL()

In [None]:
client.run_multi('create_table.sql')
client.close_client()

In [61]:
client.run_multi('load_data_test.sql')
client.close_client()

In [62]:
test = 'ALABAMA_fiber.csv'
idx = test.index('_')
test[idx + 1:-4]

'fiber'

### Inserting Cable CSV data into MySQL table

In [None]:
d_path = os.path.abspath('../FCC-DATA')
directory = os.listdir(d_path)
directory.sort()

for d in directory:
    if d[-1] != 'v':
        continue
        
    idx = d.index('_') + 1
    
    if d[idx:-4] == 'fiber':
        continue
    
    query = f"""
        LOAD DATA LOCAL INFILE '{d_path}/{d}'
        INTO TABLE cable
        FIELDS TERMINATED BY ','
        ENCLOSED BY '"'
        LINES TERMINATED BY '\n'
        IGNORE 1 ROWS
        (provider_id, frn, brand_name, location_id, block_fips, h3index_hex8, technology_code, max_advertised_download_speed, max_advertised_upload_speed, low_latency, business_residential_code);
        """
    
    try:
        client.run(query)
        print(f'{d} complete')
    except:
        print(f'{d} error')
        break
    

### Inserting Fiber CSV data into MySQL table

In [None]:
d_path = os.path.abspath('../FCC-DATA')
directory = os.listdir(d_path)
directory.sort()

for d in directory:
    if d[-1] != 'v':
        continue
        
    idx = d.index('_') + 1
    
    if d[idx:-4] == 'cable':
        continue
    
    query = f"""
        LOAD DATA LOCAL INFILE '{d_path}/{d}'
        INTO TABLE fiber
        FIELDS TERMINATED BY ','
        ENCLOSED BY '"'
        LINES TERMINATED BY '\n'
        IGNORE 1 ROWS
        (provider_id, frn, brand_name, location_id, block_fips, h3index_hex8, technology_code, max_advertised_download_speed, max_advertised_upload_speed, low_latency, business_residential_code);
        """

    try:
        client.run(query)
        print(f'{d} complete')
    except:
        print(f'{d} error')
        break

### LocationData Class

New class that extends MySQL. After much research I determined that the only way I could adequately determine City and State from the data pulled from the Census was to use the h3-hex8 string provided combined with the Census.gov's geocoding API.

h3 conversion is a package for Python and the Census.gov's geocoding tool is free, so this worked out pretty well. The get_data function will -

1. Take the h3 string from the already existing Cable table
2. Use the h3 package to parse the string into an int
3. Use the h3 package to run the int and return a longitude and latitude based on the h3 value
4. Take the longitude and latitude and run a request to the Geocoding API which will return various info on the coordinates (more importantly city and state)
5. Input all the info (h3, city, state) and insert it into a new table, Locations

[Census Geocoder Tool](https://geocoding.geo.census.gov/geocoder/)

In [3]:
import h3.api.numpy_int as h3
import requests
from IPython.display import clear_output
import time
from IPython.lib.display import Audio

class LocationData(MySQL):
    
    def __init__(self):
        super().__init__()
    
    def get_data(self):
        if not self.client:
            cli = self.connect_client()
        else:
            cli = self.client
        
        query = """
            SELECT DISTINCT h3index_hex8
            FROM cable
            LIMIT 100000
            OFFSET 35490;
            """

        try:
            c1 = cli.cursor(buffered = True)
            c2 = cli.cursor(buffered = True)
            
            print('Loading data..')
            c1.execute(query)
            
            print('Done')
            time.sleep(2)
            
            count, start = 35490, time.time()
            

            for (h3index_hex8) in c1:
                h = h3index_hex8[0]
                
                dur = time.time() - start
                clear_output(wait = True)

                print(f'Working {count} / 1,086,137')
                print(time.strftime('%H:%M:%S', time.gmtime(dur)))
                
                (lat,lon) = h3.h3_to_geo(h3.string_to_h3(h))
                (city, state) = self.get_loc_data(lat, lon)

                add_data = f"""
                    INSERT IGNORE INTO location (hex_num, city_town, state)
                    VALUES('{h}', '{city}', '{state}');
                    """

                c2.execute(add_data)
                cli.commit()
                count += 1
                
        except (Error, Exception) as e:
            print(e)
            Audio('beep.wav', rate = 4410, autoplay=True)
            self.close_client()
            return
            
        
        c1.close()
        c2.close()
        self.close_client()
    
    def get_loc_data(self, lat, lon):
        params = {
            'x': lon,
            'y': lat,
            'benchmark': 'Public_AR_Current',
            'vintage': 'Current_Current',
            'format': 'json'
        }
        
        retries = 3
        complete = False
        
        while not complete:
            try:

                r = requests.get('https://geocoding.geo.census.gov/geocoder/geographies/coordinates', params=params).json()

                state = r['result']['geographies']['States'][0]['STUSAB']
                city = r['result']['geographies']['County Subdivisions'][0]['NAME']

            except Exception as e:
                print(e)
            else:
                complete = True
                return (city, state)
            finally:
                if not complete and retries:
                    retries -= 1
                elif not complete and not retries:
                    raise Exception('Maxed out of retries')
            

        # return (city, state)


NameError: name 'MySQL' is not defined

In [79]:
ld = LocationData()
ld.get_data()

Working 56217 / 1,086,137
02:11:33
'States'
'States'
'States'
'States'
Maxed out of retries


In [None]:
Looker Cognos QlikView Web FOCUS Alteryx

In [2]:
import h3.api.numpy_int as h3
import requests
from IPython.display import clear_output
import time
from IPython.lib.display import Audio

class LocationData(MySQL):
    
    def __init__(self):
        super().__init__()
        self.errors = []
    
    def get_data(self):
        if not self.client:
            cli = self.connect_client()
        else:
            cli = self.client
        
        query = """
            SELECT DISTINCT geography_desc_full, geography_id
            FROM Geography
            WHERE geography_type = 'Census Place' AND data_type = 'Fixed Broadband'
            """

        try:
            c1 = cli.cursor(buffered = True)
            c2 = cli.cursor(buffered = True)
            
            print('Loading data..')
            c1.execute(query)
            
            print('Done')
            time.sleep(2)
            
            count, start = 35799, time.time()
            

            for (geography_desc_full, geography_id) in c1:
                g_desc, g_id = geography_desc_full, geography_id
                
                dur = time.time() - start
                clear_output(wait = True)

                print(f'Working {count}')
                print(time.strftime('%H:%M:%S', time.gmtime(dur)))
                
                coords = self.get_loc_data(g_desc)
                
                if not coords:
                    count += 1
                    continue
                
                lat, lon = coords
                h = h3.geo_to_h3(lat, lon, 8)
                hex8 = h3.h3_to_string(h)
                
                # g = g.split(',')
                # city, state = g[-2], g[-1].strip()
                # print(hex8, city, state, len(state))

                add_data = f"""
                    INSERT INTO Location2 (geo_id, geo_desc, hex_num)
                    VALUES('{g_id}', "{g_desc}", '{hex8}');
                    """

                c2.execute(add_data)
                cli.commit()
                count += 1
                
        except (Error, Exception) as e:
            print(e)
            print(g, add_data)
            self.errors.append(g)
            self.write_errors()
            Audio('beep.wav', rate = 4410, autoplay=True)
            c1.close()
            c2.close()
            self.close_client()
            raise e
            
        
        c1.close()
        c2.close()
        self.close_client()
        self.write_errors()
    
    def get_loc_data(self, loc):
        
            try:
                r = requests.get(f'http://localhost:8080/search.php?q={loc}').json()

                lat = float(r[0]['lat'])
                lon = float(r[0]['lon'])

            except Exception as e:
                # print(f'Api Error: {e} {loc}')
                self.errors.append(loc)
                return None
            else:
                complete = True
                return (lat, lon)
    
    
    def write_errors(self, l_name='errors.log'):
        if not self.errors:
            return
        
        with open('errors.log', 'w') as file:
            for e in self.errors:
                file.write(f'{e}\n')
        
        self.errors = []
            

NameError: name 'MySQL' is not defined

In [150]:
loc = LocationData()
loc.get_data()

Working 67819
00:26:56


In [None]:
from us import states
from census import Census
# B01001_001E
c = Census('c33b362ba87b3191701eece4dec40b3c64c64319')
# [x for x in c.acs5.state_county_blockgroup('PRINCITY', states.AL.fips, Census.ALL, Census.ALL) if x['county'] == '075']
# c.pl.state_place()[:50]
# sorted(c.acs5.state_place('NAME', states.AL.fips, Census.ALL), key= lambda x: x['NAME'])
# c.pl.get('NAME,DIVISION', geo={'for': 'block:3013', 'in': f'state: {states.AL.fips} county:001 tract:020700'})
c.pl.state_county_tract('BLOCK', states.AL.fips, Census.ALL, Census.ALL)

In [5]:
import time
import json
from collections import defaultdict
from us import states
from census import Census
from IPython.display import clear_output

class PopData(Census, MySQL):
    
    def __init__(self, api_key):
        Census.__init__(self, api_key)
        MySQL.__init__(self)
        self.errors = []
        self.state_dict = {}
    
    def get_pop_data(self):
        if not self.client:
            cli = self.connect_client()
        else:
            cli = self.client
            
        query = """
            SELECT geo_id
            FROM Location2
            LIMIT 10000
            OFFSET 15000
        """
        
        self.get_places()
        cli.raise_on_warnings = True
        
        try:
            c1 = cli.cursor(buffered = True)
            c2 = cli.cursor(buffered = True)
            
            c1.execute(query)
            count, start = 15000, time.time()
            
            add_data = "REPLACE INTO Population (pop, geo_id)\nVALUES\n"

            
            for (geo_id) in c1:
                g = str(geo_id[0])
                st, pl = g[:-5], g[-5:]
                
                if not count % 10:
                    dur = time.time() - start
                    clear_output(wait = True)

                    print(f'Working {count}')
                    print(time.strftime('%H:%M:%S', time.gmtime(dur)))
                
                if len(st) == 1:
                    st = '0' + st
                
                # pop = self.acs5.state_place('B01001_001E', st, pl)
                
                if st in self.state_dict and pl in self.state_dict[st]:
                    pop = self.state_dict[st][pl]
                else:
                    self.errors.append(g)
                    count += 1
                    continue
                
                # add_data = f"""
                #     INSERT INTO Population (pop, geo_id)
                #     VALUES ({pop}, {geo_id[0]})
                # """
                add_data += f'\t({pop}, {geo_id[0]}),\n'
                count += 1

                
            add_data = add_data[:-2]
            add_data += ';'
            
            c2.execute(add_data)
            cli.commit()
                
                
        except (Error, Exception) as e:
            self.errors.append(g)
            raise e
        finally:
            c1.close()
            c2.close()
            self.close_client()
            self.write_errors()
    
    def get_places(self):
        with open('../misc/fips.json') as file:
            j = json.loads(file.read())
        
        for state in j.keys():
            j[state] = defaultdict(dict)
        
        all_places = self.acs5.state_place('B01001_001E', Census.ALL, Census.ALL)
        
        for a in all_places:
            st, pl, pop = a['state'], a['place'], int(a['B01001_001E'])
            
            if st not in j:
                continue

            j[st][pl] = pop
        
        self.state_dict = j
    
    def test_res(self, amt=10):
        if not self.client:
            cli = self.connect_client()
        else:
            cli = self.client
        
        try:
            with cli.cursor(buffered=True) as c:
                query = f"""
                    SELECT geo_id, pop
                    FROM Population
                    ORDER BY RAND()
                    LIMIT {amt}
                """
                
                c.execute(query)
                count = 1
                errors, e_count = [], 0
                
                for (geo_id, pop) in c:
                    clear_output(wait=True)
                    print(f'Testing {count} / {amt}')
                    
                    g = str(geo_id)
                    st, pl = g[:-5], g[-5:]
                    
                    if len(st) == 1:
                        st = '0' + st
                    
                    place = self.acs5.state_place('B01001_001E', st, pl)[0]
                    census_pop = place['B01001_001E']
                    
                    if pop != census_pop:
                        errors.append(f'Geo_id = {geo_id}, Pop = {pop}, Census = {census_pop}, Passed = {pop == census_pop}')
                        e_count += 1
                    
                    count += 1
                    
        except (Error, Exception) as e:
            raise e
        finally:
            self.close_client()
            
            print(f'{count - e_count - 1} Passing, {e_count} Fails')
            if e_count:
                for er in errors:
                    print(er)
            
            
    
    def write_errors(self, l_name='errors.log'):
        if not self.errors:
            return
        
        with open('errors.log', 'a') as file:
            for e in self.errors:
                file.write(f'{e}\n')
        
        self.errors = []
                
            

In [257]:
p = PopData('c33b362ba87b3191701eece4dec40b3c64c64319')
# p.get_pop_data()
p.test_res(100)

Testing 100 / 100
100 Passing, 0 Fails


In [38]:
import os
from IPython.display import clear_output

class BlockData(MySQL):
    
    def __init__(self):
        super().__init__()
        self.errors = []
        self.entries = 0
    
    def get_name_files(self, state=None):
        d_path = os.path.abspath('../NAME-DATA')
        st_dirs = os.listdir(d_path)
        
        if state:
            st_dirs = [st for st in st_dirs if st == state]
        
        for d in st_dirs:
            st_dir_path = f'{d_path}/{d}'
            files = os.listdir(st_dir_path)
            
            for f in files:
                self.ext_name(f'{st_dir_path}/{f}', d)
    
    def get_block_files(self):
        d_path = os.path.abspath('../BLOCK-DATA')
        st_dirs = os.listdir(d_path)
        
        for d in st_dirs:
            st_dir_path = f'{d_path}/{d}'
            files = os.listdir(st_dir_path)
            print(f'Start {d}')
            self.ext_block(f'{st_dir_path}/{files[0]}', d)
                
    
    def ext_name(self, f_path, state):
        if not self.client:
            cli = self.connect_client()
        else:
            cli = self.client
        
        clear_output(wait=True)
        
        try:
            query = 'REPLACE INTO Blocknames (id, state, pl_code, pl_name, pl_name_full) VALUES '
            
            with open(f_path, 'r') as file:
                data = [d.strip() for d in file.readlines()]
            
            self.entries += len(data) - 1
            
            for i,d in enumerate(data):
                if i == 0:
                    continue
                if i != 1:
                    query += ','
                
                vals = d.split('|')
                pl_code, pl_name, pl_name_full = vals[1], vals[2], vals[3]
                id = f'{state}-{pl_code}'
                
                query += f'("{id}","{state}","{pl_code}","{pl_name}","{pl_name_full}")'
            
            with cli.cursor() as c:
                c.execute(query)
                cli.commit()

        except (Error, Exception) as e:
            self.errors.append(f_path)
            raise e
        else:
            print(f'Done {state}')
        finally:
            self.close_client()
            self.write_errors()
    
    def ext_block(self, f_path, state):
        if not self.client:
            cli = self.connect_client()
        else:
            cli = self.client
        
        clear_output(wait=True)
        
        try:
            query = 'REPLACE INTO BFips2 (fips, pl_code, pl_id) VALUES '
            
            with open(f_path, 'r') as file:
                data = [d.strip() for d in file.readlines()]
            
            self.entries += len(data) - 1
            
            for i,d in enumerate(data):
                if i == 0:
                    continue
                if i != 1:
                    query += ','
                
                vals = d.split('|')
                id = vals[0]
                
                
                if vals[1]:
                    pl_code = f'"{vals[1]}"'
                    pl_id = f'"{state + "-" + vals[1]}"'
                else:
                    pl_code = 'NULL'
                    pl_id = 'NULL'
                    
                
                query += f'("{id}",{pl_code},{pl_id})'
            
            with cli.cursor() as c:
                c.execute(query)
                cli.commit()

        except (Error, Exception) as e:
            self.errors.append(f_path)
            raise e
        else:
            print(f'Done {state}')
        finally:
            self.close_client()
            self.write_errors()
            
    
    def write_errors(self):
        if not self.errors:
            return
        
        with open('error.log', 'w') as file:
            for e in self.errors:
                file.write(f'{e}\n')
            

In [41]:
b = BlockData()
# b.get_name_files('DC')
b.get_block_files()
# b.ext_block('/home/jpal/dev/int-speed/BLOCK-DATA/AK/BlockAssign_ST02_AK_INCPLACE_CDP.txt', 'AK')

Done NJ


In [102]:
b.entries

8132968

In [87]:
os.path.abspath('../BLOCK-DATA/AK/BlockAssign_ST02_AK_INCPLACE_CDP.txt')

'/home/jpal/dev/int-speed/BLOCK-DATA/AK/BlockAssign_ST02_AK_INCPLACE_CDP.txt'

In [71]:
import time
import json
from collections import defaultdict
from us import states
from census import Census
from IPython.display import clear_output

class PopData(Census, MySQL):
    
    def __init__(self, api_key):
        Census.__init__(self, api_key)
        MySQL.__init__(self)
        self.errors = []
        self.state_dict = {}
        
    def get_pl_cursor(self, state):
        if not self.client:
            cli = self.connect_client()
        else:
            cli = self.client
        
        cursor = cli.cursor(buffered = True)
        st = states.lookup(state).abbr
        
        query = f"""
            SELECT pl_code, state, id
            FROM Blocknames
            WHERE state = '{st}'
        """
        
        cursor.execute(query)
        
        return cursor
    
    def get_places(self):
        print('Loading Data')
        with open('../misc/fips.json') as file:
            j = json.loads(file.read())
        
        for state in j.keys():
            j[state] = defaultdict(dict)
        
        all_places = self.acs5.state_place('B01001_001E', Census.ALL, Census.ALL)
        
        for a in all_places:
            st, pl, pop = a['state'], a['place'], int(a['B01001_001E'])
            
            if st not in j:
                continue

            j[st][pl] = pop
        
        self.state_dict = j
        print('Done')
    
    def get_place_data(self):
        if not self.client:
            cli = self.connect_client()
        else:
            cli = self.client
        
        c1 = cli.cursor(buffered = True)
        self.get_places()
        
        try:
            for st in self.state_dict:
                clear_output(wait=True)
                print(f'Starting {st}')
                
                state_dict = self.state_dict[st]
                
                query = 'REPLACE INTO Population (id, pl_code, pop)\nVALUES\n'
                count = 1
                
                c2 = self.get_pl_cursor(st)

                for (pl_code, state, id) in c2:
                    
                    fips = states.lookup(state).fips
                    pop = self.state_dict[fips][pl_code]
                    
                    if not pop:
                        pop = '0'

                    query += f"('{id}','{pl_code}',{pop}),\n"
                    count += 1

                query = query[:-2] + ';'

                c1.execute(query)
                cli.commit()

                print(f'Done {st}')
        
        except (Error, Exception) as e:
            print(pl_code, state)
            raise e
        finally:
            c1.close()
            c2.close()
            self.close_client()
    

In [72]:
p = PopData('c33b362ba87b3191701eece4dec40b3c64c64319')
p.get_place_data()

Starting 56
Done 56


In [68]:
c = Census('c33b362ba87b3191701eece4dec40b3c64c64319')
c.acs5.state_place('B01001_001E', '21', '48000')

[]

In [74]:
import time
query = """
    SELECT DISTINCT c.brand_name, c.max_advertised_download_speed, b2.pl_name, b2.state, p.pop
    FROM Blocknames b2
    LEFT JOIN BFips2 b 
        ON b2.id = b.pl_id 
    LEFT JOIN Population p 
        ON b2.id = p.id 
    LEFT JOIN cable c 
        ON c.block_fips = b.fips 
    WHERE 
        b.pl_code IS NOT NULL 
"""

try:
    m = MySQL()
    cli = m.connect_client()
    start = time.time()

    with cli.cursor() as c:
        c.execute(query)
        res = c.fetchall()
        print(len(res))
        print(f'Final: {time.time() - start} secs')

except:
    print('Exit')
finally:
    cli.close_client()

56304
Final: 900.1119935512543 secs


AttributeError: 'CMySQLConnection' object has no attribute 'close_client'