In [1]:
import boto3
import json
import dataset
import time

In [2]:
aws_lambda = boto3.client('lambda')
iam = boto3.client('iam')
role = iam.get_role(RoleName='LabRole')
rds = boto3.client('rds')
sfn = boto3.client('stepfunctions')

In [None]:
# Create RDS
response = rds.create_db_instance(
    DBInstanceIdentifier='relational-db',
    DBName='reddit_scrapes',
    MasterUsername='username',
    MasterUserPassword='password',
    DBInstanceClass='db.t2.micro',
    Engine='mysql',
    AllocatedStorage=5
)

# Wait until DB is available to continue
rds.get_waiter('db_instance_available') \
   .wait(DBInstanceIdentifier='relational-db')

In [18]:
# Describe where DB is available and on what port
db = rds.describe_db_instances()['DBInstances'][0]
ENDPOINT = db['Endpoint']['Address']
PORT = db['Endpoint']['Port']
DBID = db['DBInstanceIdentifier']

print(DBID,
      "is available at", ENDPOINT,
      "on Port", PORT,
     ) 

relational-db is available at relational-db.c2j1y7tdjd25.us-east-1.rds.amazonaws.com on Port 3306


In [159]:
# Get Name of Security Group
SGNAME = db['VpcSecurityGroups'][0]['VpcSecurityGroupId']

# Adjust Permissions for that security group so that we can access it on Port 3306
# If already SG is already adjusted, print this out
try:
    ec2 = boto3.client('ec2')
    data = ec2.authorize_security_group_ingress(
            GroupId=SGNAME,
            IpPermissions=[
                {'IpProtocol': 'tcp',
                 'FromPort': PORT,
                 'ToPort': PORT,
                 'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}
            ]
    )
except ec2.exceptions.ClientError as e:
    if e.response["Error"]["Code"] == 'InvalidPermission.Duplicate':
        print("Permissions already adjusted.")
    else:
        print(e)

Permissions already adjusted.


#### Lambda function 1 to parallelize post scraping

In [4]:
# Create Lambda function 1
with open('final-deploy.zip', 'rb') as f:
    lambda_zip = f.read()

try:
    # If function hasn't yet been created, create it
    response = aws_lambda.create_function(
        FunctionName='lambda1',
        Runtime='python3.9',
        Role=role['Role']['Arn'],
        Handler='lambda1.lambda_handler',
        Code=dict(ZipFile=lambda_zip),
        Timeout=300
    )
except aws_lambda.exceptions.ResourceConflictException:
    # If function already exists, update it based on zip
    # file contents
    response = aws_lambda.update_function_code(
    FunctionName='lambda1',
    ZipFile=lambda_zip
    )

In [5]:
lambda_arn = [f['FunctionArn'] for f in aws_lambda.list_functions()['Functions']
            if f['FunctionName'] == 'lambda1'][0]

In [6]:
lambda_arn

'arn:aws:lambda:us-east-1:254224570814:function:lambda1'

In [7]:
def make_def(lambda_arn):
    definition = {
        "Comment": "My State Machine",
        "StartAt": "Map",
        "States": {
            "Map": {
                "Type": "Map",
                "End": True,
                "Iterator": {
                    "StartAt": "Lambda Invoke",
                    "States": {
                        "Lambda Invoke": {
                            "Type": "Task",
                            "Resource": "arn:aws:states:::lambda:invoke",
                            "OutputPath": "$.Payload",
                            "Parameters": {
                                "Payload.$": "$",
                                "FunctionName": lambda_arn
                            },
                            "Retry": [
                                {
                                    "ErrorEquals": [
                                        "Lambda.ServiceException",
                                        "Lambda.AWSLambdaException",
                                        "Lambda.SdkClientException",
                                        "Lambda.TooManyRequestsException",
                                        "States.TaskFailed"
                                    ],
                                    "IntervalSeconds": 2,
                                    "MaxAttempts": 6,
                                    "BackoffRate": 2
                                }
                            ],
                            "End": True
                        }
                    }
                }
            }
        }
    }
    return definition

In [8]:
sf_def = make_def(lambda_arn)
# Create state machine
try:
    response = sfn.create_state_machine(
        name='post_scraping',
        definition=json.dumps(sf_def),
        roleArn=role['Role']['Arn'],
        type='EXPRESS'
    )
except sfn.exceptions.StateMachineAlreadyExists:
    response = sfn.list_state_machines()
    state_machine_arn = [sm['stateMachineArn'] 
                         for sm in response['stateMachines'] 
                         if sm['name'] == 'post_scraping'][0]
    response = sfn.update_state_machine(
        stateMachineArn=state_machine_arn,
        definition=json.dumps(sf_def),
        roleArn=role['Role']['Arn']
    )

In [9]:
response = sfn.list_state_machines()
# Get arn for Step Function state machine
state_machine_arn = [sm['stateMachineArn'] 
                     for sm in response['stateMachines'] 
                     if sm['name'] == 'post_scraping'][0]
response

{'stateMachines': [{'stateMachineArn': 'arn:aws:states:us-east-1:254224570814:stateMachine:comment_scraping',
   'name': 'comment_scraping',
   'type': 'EXPRESS',
   'creationDate': datetime.datetime(2023, 5, 25, 13, 36, 55, 639000, tzinfo=tzlocal())},
  {'stateMachineArn': 'arn:aws:states:us-east-1:254224570814:stateMachine:post_scraping',
   'name': 'post_scraping',
   'type': 'EXPRESS',
   'creationDate': datetime.datetime(2023, 5, 25, 10, 12, 8, 607000, tzinfo=tzlocal())}],
 'ResponseMetadata': {'RequestId': '8f72e275-b777-4064-acfc-a5d41df22168',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '8f72e275-b777-4064-acfc-a5d41df22168',
   'date': 'Thu, 25 May 2023 22:44:57 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '343'},
  'RetryAttempts': 0}}

### Create tables in RDS

In [10]:
db_url = 'mysql+mysqlconnector://{}:{}@{}:{}/reddit_scrapes'.format('username',
                                                                   'password',
                                                                ENDPOINT, PORT)
db = dataset.connect(db_url)

#### posts table

In [152]:
query = """
    CREATE TABLE IF NOT EXISTS posts (
        subreddit VARCHAR(255),
        id VARCHAR(255),
        title TEXT,
        score INTEGER,
        url VARCHAR(255),
        comms_num INTEGER,
        body TEXT,
        ups INTEGER
    )
"""
db.query(query)

<dataset.util.ResultIter at 0x22683295160>

In [11]:
db.tables

['posts']

In [13]:
lst_subreddits = ['firearms', 'gunpolitics', 'Abortiondebate', 'prolife', 'prochoice', 'Music', 'LetsTalkMusic', 'movies', 'MovieSuggestions']
# For each of the subreddits, do the following using different lambda workers.
batches = [{'subreddit': i} for i in lst_subreddits]

In [14]:
batches

[{'subreddit': 'firearms'},
 {'subreddit': 'gunpolitics'},
 {'subreddit': 'Abortiondebate'},
 {'subreddit': 'prolife'},
 {'subreddit': 'prochoice'},
 {'subreddit': 'Music'},
 {'subreddit': 'LetsTalkMusic'},
 {'subreddit': 'movies'},
 {'subreddit': 'MovieSuggestions'}]

In [15]:
# Synchronous Execution
response = sfn.start_sync_execution(
    stateMachineArn=state_machine_arn,
    name='sync_scrape',
    input=json.dumps(batches)
)

In [16]:
len(db['posts'])

3940

In [17]:
# reboot RDS before the second scraping
rds.reboot_db_instance(DBInstanceIdentifier='relational-db')

{'DBInstance': {'DBInstanceIdentifier': 'relational-db',
  'DBInstanceClass': 'db.t2.micro',
  'Engine': 'mysql',
  'DBInstanceStatus': 'rebooting',
  'MasterUsername': 'username',
  'DBName': 'reddit_scrapes',
  'Endpoint': {'Address': 'relational-db.c2j1y7tdjd25.us-east-1.rds.amazonaws.com',
   'Port': 3306,
   'HostedZoneId': 'Z2R2ITUGPM61AM'},
  'AllocatedStorage': 5,
  'InstanceCreateTime': datetime.datetime(2023, 5, 25, 14, 21, 18, 81000, tzinfo=tzutc()),
  'PreferredBackupWindow': '07:08-07:38',
  'BackupRetentionPeriod': 1,
  'DBSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-06defb149cb9e5d43',
    'Status': 'active'}],
  'DBParameterGroups': [{'DBParameterGroupName': 'default.mysql8.0',
    'ParameterApplyStatus': 'in-sync'}],
  'AvailabilityZone': 'us-east-1f',
  'DBSubnetGroup': {'DBSubnetGroupName': 'default',
   'DBSubnetGroupDescription': 'default',
   'VpcId': 'vpc-038429464fd438924',
   'SubnetGroupStatus': 'Complete',
   'Subnets': [{'SubnetIde

#### Lambda function 2 to parallelize comment scraping

In [19]:
# Create Lambda function 2
with open('final-deploy2.zip', 'rb') as f:
    lambda_zip = f.read()

try:
    # If function hasn't yet been created, create it
    response = aws_lambda.create_function(
        FunctionName='lambda2',
        Runtime='python3.9',
        Role=role['Role']['Arn'],
        Handler='lambda2.lambda_handler',
        Code=dict(ZipFile=lambda_zip),
        Timeout=600
    )
except aws_lambda.exceptions.ResourceConflictException:
    # If function already exists, update it based on zip
    # file contents
    response = aws_lambda.update_function_code(
    FunctionName='lambda2',
    ZipFile=lambda_zip
    )

In [20]:
lambda_arn2 = [f['FunctionArn'] for f in aws_lambda.list_functions()['Functions']
            if f['FunctionName'] == 'lambda2'][0]

In [21]:
lambda_arn2

'arn:aws:lambda:us-east-1:254224570814:function:lambda2'

In [22]:
sf_def2 = make_def(lambda_arn2)
# Create state machine
try:
    response2 = sfn.create_state_machine(
        name='comment_scraping',
        definition=json.dumps(sf_def2),
        roleArn=role['Role']['Arn'],
        type='EXPRESS'
    )
except sfn.exceptions.StateMachineAlreadyExists:
    response2 = sfn.list_state_machines()
    state_machine_arn = [sm['stateMachineArn'] 
                         for sm in response['stateMachines'] 
                         if sm['name'] == 'comment_scraping'][0]
    response2 = sfn.update_state_machine(
        stateMachineArn=state_machine_arn,
        definition=json.dumps(sf_def2),
        roleArn=role['Role']['Arn']
    )

In [23]:
response2 = sfn.list_state_machines()
# Get arn for Step Function state machine
state_machine_arn = [sm['stateMachineArn'] 
                     for sm in response2['stateMachines'] 
                     if sm['name'] == 'comment_scraping'][0]
response2

{'stateMachines': [{'stateMachineArn': 'arn:aws:states:us-east-1:254224570814:stateMachine:comment_scraping',
   'name': 'comment_scraping',
   'type': 'EXPRESS',
   'creationDate': datetime.datetime(2023, 5, 25, 13, 36, 55, 639000, tzinfo=tzlocal())},
  {'stateMachineArn': 'arn:aws:states:us-east-1:254224570814:stateMachine:post_scraping',
   'name': 'post_scraping',
   'type': 'EXPRESS',
   'creationDate': datetime.datetime(2023, 5, 25, 10, 12, 8, 607000, tzinfo=tzlocal())}],
 'ResponseMetadata': {'RequestId': '50eab3f7-74bf-4fc5-a8f7-ea798ab568b0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '50eab3f7-74bf-4fc5-a8f7-ea798ab568b0',
   'date': 'Thu, 25 May 2023 22:47:51 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '343'},
  'RetryAttempts': 0}}

#### comments table

In [45]:
db_url = 'mysql+mysqlconnector://{}:{}@{}:{}/reddit_scrapes'.format('username',
                                                                   'password',
                                                                ENDPOINT, PORT)
db = dataset.connect(db_url)

In [25]:
query = """
    CREATE TABLE IF NOT EXISTS comments (
        subreddit VARCHAR(255),
        post_id VARCHAR(255),
        comment_score INTEGER,
        comment_body TEXT
    )
"""
db.query(query)

<dataset.util.ResultIter at 0x132af65deb0>

In [26]:
db.tables

['comments', 'posts']

In [27]:
len(db['comments'])

0

In [42]:
def scrape_subreddit(table, subreddit_name):
    len_po = len(list(table.find(subreddit=subreddit_name)))
    n = len_po // 50
    po = table.find(subreddit=subreddit_name)
    urls = [entry['url'] for entry in po]
    batches = [{'post_url': urls[i:i + n]} for i in range(0, len_po, n)]
    # Synchronous Execution
    response2 = sfn.start_sync_execution(
        stateMachineArn=state_machine_arn,
        name='sync_scrape_comment',
        input=json.dumps(batches)
    )

In [43]:
for subreddit_name in lst_subreddits:
    try:
        scrape_subreddit(db['posts'], subreddit_name)
    except:
        continue


In [46]:
len(db['comments'])

20889

In [None]:
db.close()