# Implementing Large-Scale Database Solutions
## DynamoDB
First, let's create a DynamoDB table. Let's say that we're collecting and storing streaming Twitter (now X) data in our database. We'll use Twitter 'username' as our primary key here, since this will be unique to each user and will make for a good input for DynamoDB's hash function (you can also specify a sort key if you would like, though). We'll also set our Read and Write Capacity down to the minimum for this demo, but you can [scale this up](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html) if you need more throughput for your application (just be careful, as increasing your Read/Write Capacity too far will rapidly deplete your AWS credits).

In [1]:
import boto3

dynamodb = boto3.resource('dynamodb')

table = dynamodb.create_table(
    TableName='twitter',
    KeySchema=[
        {
            'AttributeName': 'username',
            'KeyType': 'HASH'
        }
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'username',
            'AttributeType': 'S'
        }
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 1,
        'WriteCapacityUnits': 1
    }    
)

# Wait until AWS confirms that table exists before moving on
table.meta.client.get_waiter('table_exists').wait(TableName='twitter')

# get data about table (should currently be no items in table)
print(table.item_count)
print(table.creation_date_time)

0
2024-04-10 12:55:15.656000-05:00


OK, so we currently have an empty DynamoDB table. Let's actually put some items into our table:

In [2]:
table.put_item(
   Item={
        'username': 'macs30123',
        'num_followers': 100,
        'num_tweets': 5
    }
)

table.put_item(
   Item={
        'username': 'jon_c',
        'num_followers': 10,
        'num_tweets': 0
    }
)

{'ResponseMetadata': {'RequestId': '98Q3E8R8750LK6KGVUKIANNAN3VV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Wed, 10 Apr 2024 17:57:24 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '98Q3E8R8750LK6KGVUKIANNAN3VV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2745614147'},
  'RetryAttempts': 0}}

We can then easily get items from our table using the `get_item` method and providing our key:

In [3]:
response = table.get_item(
    Key={
        'username': 'macs30123'
    }
)
item = response['Item']
print(item)

{'num_followers': Decimal('100'), 'username': 'macs30123', 'num_tweets': Decimal('5')}


We can also update existing items using the `update_item` method:

In [4]:
table.update_item(
    Key={
        'username': 'macs30123'
    },
    UpdateExpression='SET num_tweets = :val1',
    ExpressionAttributeValues={
        ':val1': 6
    }
)

{'ResponseMetadata': {'RequestId': 'C2GEU1UABJ3J7O85SDR0C7OT0FVV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Wed, 10 Apr 2024 17:57:29 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'C2GEU1UABJ3J7O85SDR0C7OT0FVV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2745614147'},
  'RetryAttempts': 0}}

Then, if we take a look again at this item, we'll see that it's been updated (note, though, that DynamoDB tables are [*eventually consistent* unless we specify otherwise](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadConsistency.html), so this might not always return the expected result immediately):

In [5]:
response = table.get_item(
    Key={
        'username': 'macs30123'
    }
)
item = response['Item']
print(item)

{'num_followers': Decimal('100'), 'username': 'macs30123', 'num_tweets': Decimal('6')}


Note as well, that even though it is not optimal to perform complicated queries in DynamoDB tables, we can write and run SQL-like queries to run again our DynamoDB tables if we want to:

In [6]:
response = table.meta.client.execute_statement(
    Statement='''
              SELECT *
              FROM twitter
              WHERE num_followers > 20
              '''
)
item = response['Items']
print(item)

[{'num_followers': Decimal('100'), 'username': 'macs30123', 'num_tweets': Decimal('6')}]


Supposed we wanted to gather data, perform pre-processing steps, and then enter into our database -- all in the cloud. To do this, we can use `boto3` to access our DynamoDB database from within other AWS resources (such as Lambda or EC2). For instance, let's create a Lambda function that will process some data (username, as well raw follower and tweet data) and enter the results of this processing into our database without ever leaving the AWS cloud (see zipped Lambda deployment package in this directory):

In [7]:
# create Lambda client
aws_lambda = boto3.client('lambda')

# Access our class IAM role, which allows Lambda
# to interact with other AWS resources
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='LabRole')

# Open our Zipped directory
with open('write_to_dynamodb.zip', 'rb') as f:
    lambda_zip = f.read()

try:
    # If function hasn't yet been created, create it
    response = aws_lambda.create_function(
        FunctionName='write_to_dynamodb',
        Runtime='python3.9',
        Role=role['Role']['Arn'],
        Handler='lambda_function.lambda_handler',
        Code=dict(ZipFile=lambda_zip),
        Timeout=3
    )
except aws_lambda.exceptions.ResourceConflictException:
    # If function already exists, update it based on zip
    # file contents
    response = aws_lambda.update_function_code(
    FunctionName='write_to_dynamodb',
    ZipFile=lambda_zip
    )

(Go into console to demonstrate test case and check DynamoDB table for new value)

Finally, you should make sure to delete your table (if you no longer plan to use it), so that you do not incur further charges while it is running:

In [8]:
table.delete()

{'TableDescription': {'TableName': 'twitter',
  'TableStatus': 'DELETING',
  'ProvisionedThroughput': {'NumberOfDecreasesToday': 0,
   'ReadCapacityUnits': 1,
   'WriteCapacityUnits': 1},
  'TableSizeBytes': 0,
  'ItemCount': 0,
  'TableArn': 'arn:aws:dynamodb:us-east-1:557316176944:table/twitter',
  'TableId': '5ef3c655-af13-498d-b136-1022d5d7a50e',
  'DeletionProtectionEnabled': False},
 'ResponseMetadata': {'RequestId': 'CKAU40P1JVJ8QCNGBRNLURJ2B7VV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Wed, 10 Apr 2024 18:01:31 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '350',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'CKAU40P1JVJ8QCNGBRNLURJ2B7VV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '1561284898'},
  'RetryAttempts': 0}}

## RDS

We can also create and interact with scalable cloud relational databases via `boto3`. Let's launch a MySQL database via AWS's RDS service. Note that we can explicitly scale up the hardware (e.g. instance class, and allocated storage) for our database via the `create_db_instance` parameters. We can also add additional read replicas of our database instance that we launch via [the `create_db_instance_read_replica` method](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.create_db_instance_read_replica) or create a cluster of a certain size from the start using [the `create_db_cluster` method](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.create_db_cluster).

In [11]:
rds = boto3.client('rds')

response = rds.create_db_instance(
    DBInstanceIdentifier='relational-db',
    DBName='twitter',
    MasterUsername='username',
    MasterUserPassword='password',
    DBInstanceClass='db.t3.micro',
    Engine='MySQL',
    AllocatedStorage=5
)

# Wait until DB is available to continue
rds.get_waiter('db_instance_available').wait(DBInstanceIdentifier='relational-db')

# Describe where DB is available and on what port
db = rds.describe_db_instances()['DBInstances'][0]
ENDPOINT = db['Endpoint']['Address']
PORT = db['Endpoint']['Port']
DBID = db['DBInstanceIdentifier']

print(DBID,
      "is available at", ENDPOINT,
      "on Port", PORT,
     )   

relational-db is available at relational-db.cz0auzktt3ts.us-east-1.rds.amazonaws.com on Port 3306


In order to access our MySQL database, we'll need to adjust some security settings associated with our server, though. By default, we're not able to access port 3306 on our database server over the internet and we will need to change this setting in order to connect to our database from our local machine. In practice, you should limit the allowed IP range as much as possible (to your home or office, for example) to avoid intruders from connecting to your databases. For the purposes of this demo, though, I am going to make it possible to connect to my database from anywhere on the internet (IP range 0.0.0.0/0):

In [12]:
# Get Name of Security Group
SGNAME = db['VpcSecurityGroups'][0]['VpcSecurityGroupId']

# Adjust Permissions for that security group so that we can access it on Port 3306
# If already SG is already adjusted, print this out
try:
    ec2 = boto3.client('ec2')
    data = ec2.authorize_security_group_ingress(
            GroupId=SGNAME,
            IpPermissions=[
                {'IpProtocol': 'tcp',
                 'FromPort': PORT,
                 'ToPort': PORT,
                 'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}
            ]
    )
except ec2.exceptions.ClientError as e:
    if e.response["Error"]["Code"] == 'InvalidPermission.Duplicate':
        print("Permissions already adjusted.")
    else:
        print(e)

Alright, we're ready to connect to our database! This is a MySQL database, so let's install a Python package that will allow us to effectively handle this connection:

In [None]:
! pip install mysql-connector-python # Install mysql-connector if you haven't already

Then, we can just connect to the database and run queries in the same way that you have seen while working with SQLite databases (using the SQLite3 package). Very cool!

In [14]:
import mysql.connector
conn =  mysql.connector.connect(host=ENDPOINT, user="username", passwd="password", port=PORT, database='twitter')
cur = conn.cursor()

In [15]:
create_table = '''
               CREATE TABLE IF NOT EXISTS users (
                   username VARCHAR(10),
                   num_followers INT,
                   num_tweets INT,
                   PRIMARY KEY (username)
               )
               '''
insert_data  = '''
               INSERT INTO users (username, num_followers, num_tweets)
               VALUES 
                   ('macs30123', 100, 5),
                   ('jon_c', 10, 0)
               '''

for op in [create_table, insert_data]:
    cur.execute(op)

Our relational database is optimized for performing small, fast queries like these and will tend to out-perform our DynamoDB table at these kinds of operations:

In [16]:
cur.execute('''SELECT * FROM users''')
query_results = cur.fetchall()
print(query_results)

[('jon_c', 10, 0), ('macs30123', 100, 5)]


In [17]:
cur.execute('''SELECT username FROM users WHERE num_followers > 20''')
query_results = cur.fetchall()
print(query_results)

[('macs30123',)]


Once we're done executing SQL queries on our MySQL database, we can close our connection to the database and delete the database on AWS so that we're no longer charged for it:

In [18]:
conn.close()
response = rds.delete_db_instance(DBInstanceIdentifier='relational-db',
                       SkipFinalSnapshot=True
                      )
print(response['DBInstance']['DBInstanceStatus'])

# wait until DB is deleted before proceeding
rds.get_waiter('db_instance_deleted').wait(DBInstanceIdentifier='relational-db')
print("RDS Database has been deleted")

deleting
RDS Database has been deleted


# Data Warehousing with Redshift

[**Note: Redshift is not support in this year's AWS Academy accounts.** The following code can be run in a personal AWS account.]

When you need to run especially big queries against large datasets, it can make sense to perform these in a Data Warehouse like AWS Redshift. Recall that Redshift clusters organize our data in columnar storage (instead of rows, like a standard relational database) and can efficiently perform operations on these columns in parallel.

Let's spin up a Redshift cluster to see how this works (for our small Twitter demonstration data). Notice that we do need to provide the particular type of hardware that we want each one of our nodes to be, as well as the number of nodes that we want to include in our cluster (we can increase this for greater parallelism and storage capacity). For this demo, let's just select a two of one of the smaller nodes.

In [19]:
redshift = boto3.client('redshift')

response = redshift.create_cluster(
    ClusterIdentifier='myCluster',
    DBName='twitter',
    NodeType='dc2.large',
    NumberOfNodes=2,
    MasterUsername='username',
    MasterUserPassword='Password123'
)

# Wait until cluster is available before proceeding
redshift.get_waiter('cluster_available').wait(ClusterIdentifier='myCluster')

# Describe where cluster is available and on what port
cluster = redshift.describe_clusters(ClusterIdentifier='myCluster')['Clusters'][0]
ENDPOINT = cluster['Endpoint']['Address']
PORT = cluster['Endpoint']['Port']
CLUSTERID = cluster['ClusterIdentifier']

print(CLUSTERID,
      "is available at", ENDPOINT,
      "on Port", PORT,
     )

mycluster is available at mycluster.cplerylwuinc.us-east-1.redshift.amazonaws.com on Port 5439


Again, we'll need to make sure that we can connect with our cluster from our local machine. For the purposes of this demo, we'll open the port up to the Internet (although, again, you should only allow a narrow IP range in your own applications).

In [None]:
# Get Name of Security Group
SGNAME = cluster['VpcSecurityGroups'][0]['VpcSecurityGroupId']

# Adjust Permissions for that security group so that we can access it on Port 5439
# If already SG is already adjusted, print this out
try:
    ec2 = boto3.client('ec2')
    data = ec2.authorize_security_group_ingress(
            GroupId=SGNAME,
            IpPermissions=[
                {'IpProtocol': 'tcp',
                 'FromPort': PORT,
                 'ToPort': PORT,
                 'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}
            ]
    )
except ec2.exceptions.ClientError as e:
    if e.response["Error"]["Code"] == 'InvalidPermission.Duplicate':
        print("Permissions already adjusted.")
    else:
        print(e)

Redshift was originally forked from PostgreSQL, so the best way to connect with it is via a PostgreSQL Python adaptor (rather than the MySQL adaptor we used previously). We'll use `psycopg2` here.

In [None]:
! pip install psycopg2-binary

Note that once we import the package and connect, we can use the same workflow that we used for our MySQL database (and our local SQLite databases) to execute SQL queries:

In [22]:
import psycopg2
conn = psycopg2.connect(dbname='twitter', host=ENDPOINT, user="username", password="Password123", port=PORT)
cur = conn.cursor()

for op in [create_table, insert_data]:
    cur.execute(op)

In [23]:
cur.execute('''SELECT * FROM users''')
query_results = cur.fetchall()
print(query_results)

[('macs30123', 100, 5), ('jon_c', 10, 0)]


In [24]:
cur.execute('''SELECT username FROM users WHERE num_followers > 20''')
query_results = cur.fetchall()
print(query_results)

[('macs30123',)]


Then, once we're done, we can close our connection and delete our Redshift cluster in the same way as our RDS instance:

In [25]:
conn.close()
response = redshift.delete_cluster(ClusterIdentifier='myCluster',
                       SkipFinalClusterSnapshot=True
                      )
print(response['Cluster']['ClusterStatus'])

redshift.get_waiter('cluster_deleted').wait(ClusterIdentifier='myCluster')
print("Redshift Cluster has been deleted")

deleting
Redshift Cluster has been deleted
