In [2]:
import sqlalchemy
import pandas as pd
import boto3
import os
import psycopg2

In [3]:
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import psycopg2

links
[http://docs.sqlalchemy.org/en/latest/core/selectable.html](sqlalchemy)

[http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-network-security.html#vpc-security-groups](securitygroups)



# Setting up a RDS

## Contents
<h3>
    <ol>
        <li>Setting up the cluster using Boto</li>
        <li>Connecting to cluster with sqlalchemy</li>
        <li>Loading Data with sqlalchemy</li>
        <li>Testing out various sql needed for next steps</li>
    </ol>
</h3>

In [1]:
# Make sure you have configured your aws credentials in your environment
client = boto3.client('rds')


NameError: name 'boto3' is not defined

In [4]:
# check
print(client)

<botocore.client.RDS object at 0x112064710>


I will demonstrate the construction of the configuration stepwise. In reality you can just use a dict literal as a parameter to create_db_instance.  The api for rds is found here, [rds api](http://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_CreateDBInstance.html). From This API we can see that create_db_instance is going to need:

    response = client.create_db_instance(
        DBName='string',
        DBInstanceIdentifier='string',
        AllocatedStorage=123,
        DBInstanceClass='string',
        Engine='string',
        MasterUsername='string',
        MasterUserPassword='string',
        DBSecurityGroups=[
            'string',
        ],
        VpcSecurityGroupIds=[
            'string',
        ],
        AvailabilityZone='string',
        DBSubnetGroupName='string',
        PreferredMaintenanceWindow='string',
        DBParameterGroupName='string',
        BackupRetentionPeriod=123,
        PreferredBackupWindow='string',
        Port=123,
        MultiAZ=True|False,
        EngineVersion='string',
        AutoMinorVersionUpgrade=True|False,
        LicenseModel='string',
        Iops=123,
        OptionGroupName='string',
        CharacterSetName='string',
        PubliclyAccessible=True|False,
        Tags=[
            {
                'Key': 'string',
                'Value': 'string'
            },
        ],
        DBClusterIdentifier='string',
        StorageType='string',
        TdeCredentialArn='string',
        TdeCredentialPassword='string',
        StorageEncrypted=True|False,
        KmsKeyId='string',
        CopyTagsToSnapshot=True|False
    )

In [11]:
# step 1, make the db_configuration object
db_configuration = {}


Why are we using a dictionary when the function will take only keyword arguments?  Quick recap on why.

In [6]:
def printKwargs(first, second):
    print(first,second)

aDict = {}
aDict['first'] = 1
aDict['second'] = 2

printKwargs(aDict)

TypeError: printKwargs() missing 1 required positional argument: 'second'

Right, we can't pass a dictionary in to the the function.  However, python lets you unpack your dictionary into tuples!

In [7]:
printKwargs(**aDict)

1 2


In [8]:
# back to the actual work, we will sequentially construct the dict for the keyword arguments
# first off let's set the name
db_configuration['DBName'] = 'cfsdb'

In [9]:
# next we need the db instance identifier which is just a way to identify the db
db_configuration['DBInstanceIdentifier'] = 'cfsdb'

In [10]:
# the allocated storage accepts an integer argument. Between 5 and 6144, we'll use 5
db_configuration['AllocatedStorage'] = 5

In [11]:
# Next we need to choose the compute and memory capacity of the instance
# we will choose db.t2.micro as it is free.
db_configuration['DBInstanceClass'] = 'db.t2.micro'

In [12]:
# Next we choose the engine to run.  This is just the flavor of sql to run, we'll choose postgres since a lot 
# of python developers like postgres (this claim is unsubstantiated).
db_configuration['Engine'] = 'postgres'

In [13]:
# Choose the version of the engine. We're using 9.4.1 since it is available in all regions.
db_configuration['EngineVersion'] = '9.4.1'

In [12]:
# next set up the master user name, store it in your environment as AWS_DB_USER or just hard code it in. 8 char minimum
db_configuration['MasterUsername'] = os.getenv('AWS_DB_USER', 'TestUser')

# also setup the password, must have at least 8 characters
db_configuration['MasterUserPassword'] = os.getenv('AWS_DB_PASSWORD', 'TestPassword')


next up is the security groups, I'm hoping you have a default security group ready to go
if not, we'll create one right now
DBSecurityGroups takes as an argument, a list of strings.

In [15]:
# first let's see what security groups exist, use any of these
client.describe_db_security_groups()

{'DBSecurityGroups': [{'DBSecurityGroupDescription': 'Security group for my dataset',
   'DBSecurityGroupName': 'cfs_data_group',
   'EC2SecurityGroups': [],
   'IPRanges': [],
   'OwnerId': '132412529931',
   'VpcId': 'vpc-bba470de'},
  {'DBSecurityGroupDescription': 'default',
   'DBSecurityGroupName': 'default',
   'EC2SecurityGroups': [],
   'IPRanges': [],
   'OwnerId': '132412529931'}],
 'ResponseMetadata': {'HTTPStatusCode': 200,
  'RequestId': '3e77b2ac-9cba-11e5-a152-61a6740da2bd'}}

In [16]:
client.describe_db_subnet_groups()

{'DBSubnetGroups': [{'DBSubnetGroupDescription': 'default',
   'DBSubnetGroupName': 'default',
   'SubnetGroupStatus': 'Complete',
   'Subnets': [{'SubnetAvailabilityZone': {'Name': 'us-west-1c'},
     'SubnetIdentifier': 'subnet-1d5ab444',
     'SubnetStatus': 'Active'},
    {'SubnetAvailabilityZone': {'Name': 'us-west-1a'},
     'SubnetIdentifier': 'subnet-e9b1188c',
     'SubnetStatus': 'Active'}],
   'VpcId': 'vpc-bba470de'}],
 'ResponseMetadata': {'HTTPStatusCode': 200,
  'RequestId': '3e87b7ef-9cba-11e5-a05b-2be438d48f80'}}

In [17]:
# let's make a specific one for this example using the create_db_security_group method of the rds client
# the api is available here, http://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_CreateDBSecurityGroup.html
client.create_db_security_group(**{
        'DBSecurityGroupDescription': 'Security group for my dataset',
        'DBSecurityGroupName': 'cfs_data_group',
    })

ClientError: An error occurred (DBSecurityGroupAlreadyExists) when calling the CreateDBSecurityGroup operation: A security group named cfs_data_group already exists

In [18]:
# set the security groups list in the configuration
db_configuration['DBSecurityGroups'] = ['cfs_data_group']

In [19]:
# set the availability zone
db_configuration['AvailabilityZone'] = 'us-west-1c'

In [20]:
# mark the subnet group as default
db_configuration['DBSubnetGroupName'] = 'default'

In [21]:
db_configuration['VpcSecurityGroupIds'] = ['vpc-bba470de']

In [22]:
# we need to provide a window in which to let aws do maintenance.
# we'll just use monday from 06:00-14:00 
db_configuration['PreferredMaintenanceWindow'] = 'Mon:06:00-Mon:10:00'

In [23]:
# We need to provide a limit to how long aws keeps our db backups, values are from 0 to 35
db_configuration['BackupRetentionPeriod'] = 5

In [24]:
# provide the port number to connect to, postgres uses 5432
db_configuration['Port'] = 5432

In [25]:
# tell it that we do not want multiaz deployment
db_configuration['MultiAZ'] = False

In [26]:
    # give the license model for your instance, we're using postgresql-license
db_configuration['LicenseModel'] = 'postgresql-license'

In [27]:
# indicate that we will publicly access the data
db_configuration['PubliclyAccessible'] = True


In [28]:
# indicate the storage type, use standard
db_configuration['StorageType'] = 'standard'

In [29]:
# we do not want to encrypt the storage
db_configuration['StorageEncrypted'] = False

In [30]:
db_configuration

{'AllocatedStorage': 5,
 'AvailabilityZone': 'us-west-1c',
 'BackupRetentionPeriod': 5,
 'DBInstanceClass': 'db.t2.micro',
 'DBInstanceIdentifier': 'cfsdb',
 'DBName': 'cfsdb',
 'DBSecurityGroups': ['cfs_data_group'],
 'DBSubnetGroupName': 'default',
 'Engine': 'postgres',
 'EngineVersion': '9.4.1',
 'LicenseModel': 'postgresql-license',
 'MasterUserPassword': 'TestPassword',
 'MasterUsername': 'TestUser',
 'MultiAZ': False,
 'Port': 5432,
 'PreferredMaintenanceWindow': 'Mon:06:00-Mon:10:00',
 'PubliclyAccessible': True,
 'StorageEncrypted': False,
 'StorageType': 'standard',
 'VpcSecurityGroupIds': ['vpc-bba470de']}

Create the client

In [31]:
created_db = client.create_db_instance(**db_configuration)

ClientError: An error occurred (InvalidParameterCombination) when calling the CreateDBInstance operation: DB Security Groups and Vpc Security Groups cannot both be provided.

Need to delete the vpcsecuritygroupids then.

In [34]:
del db_configuration['DBSecurityGroups']

In [35]:
created_db = client.create_db_instance(**db_configuration)

In [54]:
add, port = client.describe_db_instances()['DBInstances'][0].get('Endpoint').values()

In [10]:
port, add = (5432, 'cfsdb.cs4yyoqmq4gl.us-west-1.rds.amazonaws.com')

## Using sqlalchemy

In [15]:
# creating engine

engine = create_engine('postgresql+psycopg2://{username}:{password}@{add}:{port}/{db_name}'.format(
    username=db_configuration['MasterUsername'],
    password=db_configuration['MasterUserPassword'],
    add=add,
    port=port,
    db_name=db_configuration['DBName']), echo=True)

In [66]:
db_configuration['DBName']

'cfsdb'

In [16]:
# import sqlalchemy api
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey


In [17]:
con = engine.connect()

2015-12-29 23:20:51,825 INFO sqlalchemy.engine.base.Engine select version()


INFO:sqlalchemy.engine.base.Engine:select version()


2015-12-29 23:20:51,826 INFO sqlalchemy.engine.base.Engine {}


INFO:sqlalchemy.engine.base.Engine:{}


2015-12-29 23:20:51,880 INFO sqlalchemy.engine.base.Engine select current_schema()


INFO:sqlalchemy.engine.base.Engine:select current_schema()


2015-12-29 23:20:51,881 INFO sqlalchemy.engine.base.Engine {}


INFO:sqlalchemy.engine.base.Engine:{}


2015-12-29 23:20:51,929 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1


INFO:sqlalchemy.engine.base.Engine:SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1


2015-12-29 23:20:51,930 INFO sqlalchemy.engine.base.Engine {}


INFO:sqlalchemy.engine.base.Engine:{}


2015-12-29 23:20:51,955 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1


INFO:sqlalchemy.engine.base.Engine:SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1


2015-12-29 23:20:51,956 INFO sqlalchemy.engine.base.Engine {}


INFO:sqlalchemy.engine.base.Engine:{}


2015-12-29 23:20:52,003 INFO sqlalchemy.engine.base.Engine show standard_conforming_strings


INFO:sqlalchemy.engine.base.Engine:show standard_conforming_strings


2015-12-29 23:20:52,004 INFO sqlalchemy.engine.base.Engine {}


INFO:sqlalchemy.engine.base.Engine:{}


In [70]:
# load data into pandas and then into sqlalchemy connection
transport_df = pd.read_csv("./csv/transport_mode.csv")
transport_df.to_sql('transport_mode', con=engine, if_exists='replace', index=False)

2015-12-07 01:02:15,245 INFO sqlalchemy.engine.base.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s


INFO:sqlalchemy.engine.base.Engine:select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s


2015-12-07 01:02:15,246 INFO sqlalchemy.engine.base.Engine {'name': 'transport_mode'}


INFO:sqlalchemy.engine.base.Engine:{'name': 'transport_mode'}


2015-12-07 01:02:15,315 INFO sqlalchemy.engine.base.Engine 
CREATE TABLE transport_mode (
	"Mode Code" BIGINT, 
	"Mode Description" TEXT
)




INFO:sqlalchemy.engine.base.Engine:
CREATE TABLE transport_mode (
	"Mode Code" BIGINT, 
	"Mode Description" TEXT
)




2015-12-07 01:02:15,316 INFO sqlalchemy.engine.base.Engine {}


INFO:sqlalchemy.engine.base.Engine:{}


2015-12-07 01:02:15,373 INFO sqlalchemy.engine.base.Engine COMMIT


INFO:sqlalchemy.engine.base.Engine:COMMIT


2015-12-07 01:02:15,401 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)


INFO:sqlalchemy.engine.base.Engine:BEGIN (implicit)


2015-12-07 01:02:15,403 INFO sqlalchemy.engine.base.Engine INSERT INTO transport_mode ("Mode Code", "Mode Description") VALUES (%(Mode Code)s, %(Mode Description)s)


INFO:sqlalchemy.engine.base.Engine:INSERT INTO transport_mode ("Mode Code", "Mode Description") VALUES (%(Mode Code)s, %(Mode Description)s)


2015-12-07 01:02:15,405 INFO sqlalchemy.engine.base.Engine ({'Mode Description': 'Single mode', 'Mode Code': 2}, {'Mode Description': 'Truck', 'Mode Code': 3}, {'Mode Description': 'For-hire truck', 'Mode Code': 4}, {'Mode Description': 'Private truck', 'Mode Code': 5}, {'Mode Description': 'Rail', 'Mode Code': 6}, {'Mode Description': 'Water', 'Mode Code': 7}, {'Mode Description': 'Inland Water', 'Mode Code': 8}, {'Mode Description': 'Great Lakes', 'Mode Code': 9}  ... displaying 10 of 21 total bound parameter sets ...  {'Mode Description': 'Other mode', 'Mode Code': 19}, {'Mode Description': 'Mode suppressed', 'Mode Code': 0})


INFO:sqlalchemy.engine.base.Engine:({'Mode Description': 'Single mode', 'Mode Code': 2}, {'Mode Description': 'Truck', 'Mode Code': 3}, {'Mode Description': 'For-hire truck', 'Mode Code': 4}, {'Mode Description': 'Private truck', 'Mode Code': 5}, {'Mode Description': 'Rail', 'Mode Code': 6}, {'Mode Description': 'Water', 'Mode Code': 7}, {'Mode Description': 'Inland Water', 'Mode Code': 8}, {'Mode Description': 'Great Lakes', 'Mode Code': 9}  ... displaying 10 of 21 total bound parameter sets ...  {'Mode Description': 'Other mode', 'Mode Code': 19}, {'Mode Description': 'Mode suppressed', 'Mode Code': 0})


2015-12-07 01:02:15,917 INFO sqlalchemy.engine.base.Engine COMMIT


INFO:sqlalchemy.engine.base.Engine:COMMIT


2015-12-07 01:02:15,937 INFO sqlalchemy.engine.base.Engine SELECT relname FROM pg_class c WHERE relkind = 'r' AND 'public' = (select nspname from pg_namespace n where n.oid = c.relnamespace) 


INFO:sqlalchemy.engine.base.Engine:SELECT relname FROM pg_class c WHERE relkind = 'r' AND 'public' = (select nspname from pg_namespace n where n.oid = c.relnamespace) 


2015-12-07 01:02:15,938 INFO sqlalchemy.engine.base.Engine {}


INFO:sqlalchemy.engine.base.Engine:{}


In [71]:
con.execute('SELECT "Mode Description" FROM transport_mode').fetchall()

2015-12-07 01:02:21,766 INFO sqlalchemy.engine.base.Engine SELECT "Mode Description" FROM transport_mode


INFO:sqlalchemy.engine.base.Engine:SELECT "Mode Description" FROM transport_mode


2015-12-07 01:02:21,768 INFO sqlalchemy.engine.base.Engine {}


INFO:sqlalchemy.engine.base.Engine:{}


[('Single mode',),
 ('Truck',),
 ('For-hire truck',),
 ('Private truck',),
 ('Rail',),
 ('Water',),
 ('Inland Water',),
 ('Great Lakes',),
 ('Deep Sea',),
 ('Multiple Waterways',),
 ('Air (incl truck & air)',),
 ('Pipeline',),
 ('Multiple mode',),
 ('Parcel, USPS, or courier',),
 ('Non-parcel multimode',),
 ('Truck and rail',),
 ('Truck and water',),
 ('Rail and water',),
 ('Other multiple mode',),
 ('Other mode',),
 ('Mode suppressed',)]

In [6]:
# read in all our files using the conversion pd-->sql
get_db_name = lambda f: f.split('.')[-2].split('/')[-1].strip('/')
get_db_name('./csv/FIPS_States.csv')

'FIPS_States'

In [19]:
[pd.read_csv(f).to_sql(get_db_name(f), con=engine, if_exists='ignore', index=False) for f in 
 ['./csv/state_latlon.csv', './csv/sctg.csv', './csv/cfs_areas.csv', './csv/FIPS_States.csv', './csv/naics.csv']]

ValueError: 'ignore' is not valid for if_exists

In [21]:
pd.read_csv('./csv/naics.csv').to_sql(get_db_name('./csv/naics.csv'), con=engine, if_exists='fail', index=False)

2015-12-29 23:24:26,156 INFO sqlalchemy.engine.base.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s


INFO:sqlalchemy.engine.base.Engine:select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s


2015-12-29 23:24:26,157 INFO sqlalchemy.engine.base.Engine {'name': 'naics'}


INFO:sqlalchemy.engine.base.Engine:{'name': 'naics'}


2015-12-29 23:24:26,232 INFO sqlalchemy.engine.base.Engine 
CREATE TABLE naics (
	"NAICS" BIGINT, 
	"Description" TEXT
)




INFO:sqlalchemy.engine.base.Engine:
CREATE TABLE naics (
	"NAICS" BIGINT, 
	"Description" TEXT
)




2015-12-29 23:24:26,233 INFO sqlalchemy.engine.base.Engine {}


INFO:sqlalchemy.engine.base.Engine:{}


2015-12-29 23:24:26,588 INFO sqlalchemy.engine.base.Engine COMMIT


INFO:sqlalchemy.engine.base.Engine:COMMIT


2015-12-29 23:24:26,614 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)


INFO:sqlalchemy.engine.base.Engine:BEGIN (implicit)


2015-12-29 23:24:26,616 INFO sqlalchemy.engine.base.Engine INSERT INTO naics ("NAICS", "Description") VALUES (%(NAICS)s, %(Description)s)


INFO:sqlalchemy.engine.base.Engine:INSERT INTO naics ("NAICS", "Description") VALUES (%(NAICS)s, %(Description)s)


2015-12-29 23:24:26,618 INFO sqlalchemy.engine.base.Engine ({'NAICS': 212, 'Description': 'Mining (except oil and gas)'}, {'NAICS': 311, 'Description': 'Food manufacturing'}, {'NAICS': 312, 'Description': 'Beverage and tobacco product manufacturing'}, {'NAICS': 313, 'Description': 'Textile mills'}, {'NAICS': 314, 'Description': 'Textile product mills'}, {'NAICS': 315, 'Description': 'Apparel manufacturing'}, {'NAICS': 316, 'Description': 'Leather and allied product manufacturing'}, {'NAICS': 321, 'Description': 'Wood product manufacturing'}  ... displaying 10 of 45 total bound parameter sets ...  {'NAICS': 5111, 'Description': 'Newspaper, periodical, book, and directory publishers'}, {'NAICS': 551114, 'Description': 'Corporate, subsidiary, and regional managing offices'})


INFO:sqlalchemy.engine.base.Engine:({'NAICS': 212, 'Description': 'Mining (except oil and gas)'}, {'NAICS': 311, 'Description': 'Food manufacturing'}, {'NAICS': 312, 'Description': 'Beverage and tobacco product manufacturing'}, {'NAICS': 313, 'Description': 'Textile mills'}, {'NAICS': 314, 'Description': 'Textile product mills'}, {'NAICS': 315, 'Description': 'Apparel manufacturing'}, {'NAICS': 316, 'Description': 'Leather and allied product manufacturing'}, {'NAICS': 321, 'Description': 'Wood product manufacturing'}  ... displaying 10 of 45 total bound parameter sets ...  {'NAICS': 5111, 'Description': 'Newspaper, periodical, book, and directory publishers'}, {'NAICS': 551114, 'Description': 'Corporate, subsidiary, and regional managing offices'})


2015-12-29 23:24:27,691 INFO sqlalchemy.engine.base.Engine COMMIT


INFO:sqlalchemy.engine.base.Engine:COMMIT


2015-12-29 23:24:27,719 INFO sqlalchemy.engine.base.Engine SELECT relname FROM pg_class c WHERE relkind = 'r' AND 'public' = (select nspname from pg_namespace n where n.oid = c.relnamespace) 


INFO:sqlalchemy.engine.base.Engine:SELECT relname FROM pg_class c WHERE relkind = 'r' AND 'public' = (select nspname from pg_namespace n where n.oid = c.relnamespace) 


2015-12-29 23:24:27,720 INFO sqlalchemy.engine.base.Engine {}


INFO:sqlalchemy.engine.base.Engine:{}


In [76]:
df = pd.read_csv('./csv/cfs_2012_pumf_first_50k.csv')

In [1]:
#df[:1].to_sql('transactions', con=engine, if_exists='fail', index=False, chunksize=10000)

In [2]:
#pd.read_csv('./cfs_2012_pumf_csv.txt').to_sql('transactions', con=engine, if_exists='fail', index=False, chunksize=10000)

In [3]:
#con.execute('DROP TABLE transactions')

In [4]:
#con.execute('SELECT "SHIPMT_ID" FROM transactions ORDER BY 1 DESC LIMIT 1').fetchall()

In [5]:
#con.execute('SELECT "QUARTER", SUM("WGT_FACTOR") from transactions GROUP BY 1').fetchall()