# Objectives:

    1. Writing parallel web-scraping code from the serial version of code* 

    2. Identifying the most-used words using mrjob (MapReduce).

    3. Streaming real-time stock data and creating a SNS (Simple Notification Service) to track price fluctuation 
    using Boto3. 

*Baesens, B. and S. vanden Broucke (2018). Practical Web Scraping for Data Science: Best Practices and Examples with Python.  New York:  Apress.

## PyWren  parallel  scraping  code

**Question 1**

Running time for the parallel version of web-scraping code is 32.09 seconds, while the  running time for the serial version of the web-scarping code is 447.59 seconds.

**The running time for the parallelized code is around 8% of the serial version of the web-scraping code.**

**Scaling through AWS Lambda:**
The time dropped from around 80 seconds to 32 seconds when the number of partitions/batches size was increased from 10 to 40.


In [0]:
# ls_a2_q1.py

import pywren
import requests
import dataset
import re
import time
from datetime import datetime
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import sqlalchemy

db = dataset.connect('sqlite:///books.db')

base_url = 'http://books.toscrape.com/'

def scrape_books(html_soup, url):
    for book in html_soup.select('article.product_pod'):
        book_url = book.find('h3').find('a').get('href')
        book_url = urljoin(url, book_url)
        path = urlparse(book_url).path
        book_id = path.split('/')[2]
        
def scrape_book(html_soup, book_id):
    main = html_soup.find(class_='product_main')
    book = {}
    book['book_id'] = book_id
    book['title'] = main.find('h1').get_text(strip=True)
    book['price'] = main.find(class_='price_color').get_text(strip=True)
    book['stock'] = main.find(class_='availability').get_text(strip=True)
    book['rating'] = ' '.join(main.find(class_='star-rating')
                              .get('class')).replace('star-rating', '').strip()
    book['img'] = html_soup.find(class_='thumbnail').find('img').get('src')
    desc = html_soup.find(id='product_description')
    book['description'] = ''
    if desc:
        book['description'] = desc.find_next_sibling('p') \
                                  .get_text(strip=True)
    book_product_table = html_soup.find(
        text='Product Information').find_next('table')
    for row in book_product_table.find_all('tr'):
        header = row.find('th').get_text(strip=True)
        # Since we'll use the header as a column, cleaning it
        # to make sure SQLite will accept it
        header = re.sub('[^a-zA-Z]+', '_', header)
        value = row.find('td').get_text(strip=True)
        book[header] = value


# Scrape the pages in the catalogue
url = base_url
inp = input('Do you wish to re-scrape the catalogue (y/n)? ')
while True and inp == 'y':
    r = requests.get(url)
    html_soup = BeautifulSoup(r.text, 'html.parser')
    scrape_books(html_soup, url)
    # Is there a next page?
    next_a = html_soup.select('li.next > a')
    if not next_a or not next_a[0].get('href'):
        break
    url = urljoin(url, next_a[0].get('href'))

def do_almost_scraping(bk):
    for val in bk:
        book_id = val
        book_url = base_url + 'catalogue/{}'.format(book_id)
        r = requests.get(book_url)
        r.encoding = 'utf-8'
        html_soup = BeautifulSoup(r.text, 'html.parser')
        scrape_book(html_soup, book_id)
    return bk

def make_groups(input_list, partitions = 1):
    length=len(list(input_list))
    return [input_list[i*length // partitions: (i+1)*length // partitions]
            for i in range(partitions)]

start=time.time()

# Now scrape book by book, oldest first
books = db['books'].find(order_by=['last_seen'])
my_book_ids=[]
for book in books:
    my_book_ids.append(book['book_id'])


#Setting up Pywren to scale via AWS Lambda
wrenexec=pywren.default_executor()


#Using executor's map function with 40 partitions/batches
futures=wrenexec.map(do_almost_scraping, make_groups(my_book_ids,40))

#collecting all the results
results=pywren.get_all_results(futures)

for val in results:
    db['books'].upsert({'book_id': val, 'last_seen': datetime.now()
                        }, ['book_id'])

end=time.time()
print("Time (in seconds): %f" % (end - start))

**Question 1(b)**

The relational database used in the scraping solution provides meaning to the data stored in its two tables: ‘books’, and ‘book_info’. ‘books’ table stores preliminary information about books like their id and last seen, while ‘book_info’ table stores detailed information about each book. For example, it has information on ‘title’, ‘price’, etc. Maintaining two different tables  when there’s meaningful  relationship between them makes querying a lot faster. It also provides backup to your data in some sense. One can easily write queries combining information from the two tables or write independent queries. Overall it saves cost and effort in procuring results from the large database.

Strength and limitations of moving over to large-scale database solutions on AWS:
Though relational databases are efficient over other databases, however, they might not render queries quickly if the volume of data is huge. For example, if my research group was facing dataset as huge as Amazon Customer Review/Product Reviews dataset which has reviews on millions of products by millions of people, I would definitely consider large-scale solution and host data on AWS servers. Some of the main benefits include: flexibility to choose between different storage options, scalability as and when needed, and security of the data. Moreover, if one has a good sense of the project requirements then AWS services could prove inexpensive when compared to other solutions in the market or increasing storage capacity/computation power on one’s own end.


# Identifying the top 10 words using mrjob

Question **2**

**Top ten words used in the book  description**


Output:

null	[["the", 5368], ["and", 3764], ["of", 3190], ["a", 3078], ["to", 2591], ["in", 1794], ["is", 1328], ["her", 1054], ["that", 917], ["for", 854]]

In [0]:
# *To write all of the book descriptions from books.db to a text file
sqlite3 books.db "SELECT description FROM book_info;" > db_text_q2.txt

# To find the top ten words from db_text_q2 text file using ls_a2_q2.py program
# Code to run it from the command line
python ls_a2_q2.py db_text_q2.txt -q



In [0]:
# ls_a2_q2.py

from mrjob.job import MRJob
from mrjob.step import MRStep
import re
from collections import Counter

WORD_RE = re.compile(r"[\w']+")


class MRToptenWords(MRJob):

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        '''
        Combiner to sum the words we have seen so far
        
        Inputs: 'word' (str) and 'counts' (int)
        
        Output: Tuple for each word with its total count
        '''
        # sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        '''
        Reducer to sum the words we have seen so far
        
        Inputs: 'word' (str) and 'counts' (int)
        
        Output: 'None' (no key is associated with the word-count tuple),
                (total count,word) tuple for each word
        '''
        yield None, (sum(counts), word)

 
    def reducer_find_max_word(self, word, word_count_pairs):
        '''
        Reducer function to find the word with maximum count
        
        Inputs: 'word' (str) and 'word_count_pairs' (tuple)
        
        Output: Top 10 most frequently used words with their total count
        '''
        yield word, Counter(dict((wrd,count) for count, wrd in word_count_pairs)).most_common(10)

    def steps(self):
        '''
        Steps function to execute all the steps in the mrjob process
        
        Output: List of the  top 10 most frequently used words along with their total count
        '''
        
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]


if __name__ == '__main__':
    MRToptenWords.run()

# Streaming Stock Data

Question **3**

**Script to feed data into a Kinesis stream (a producer)**

In [0]:
# producer.py

## feed data into kinesis

import json
import boto3
import random
import datetime

kinesis = boto3.client('kinesis')

def getReferrer():
    data = {}
    now = datetime.datetime.now()
    str_now = now.isoformat()
    data['EVENT_TIME'] = str_now
    data['TICKER'] = 'AAPL'
    price = random.random() * 100
    data['PRICE'] = round(price, 2)
    return data

while True:
        data = json.dumps(getReferrer())
        print(data)
        kinesis.put_record(
                StreamName="ls_q3_stream",
                Data=data,
                PartitionKey="partitionkey")


**Creating a topic for SNS (Simple Notification Service)**

In [0]:
# ls_2_q3_topic.py
import boto3

sns=boto3.client('sns', region_name='us-east-1')

#Creating the topic for price alert
topic_arn = sns.create_topic(Name='ls_q3_price_alert')['TopicArn']

response=sns.subscribe(TopicArn=topic_arn, Protocol='email', Endpoint='nipun@uchicago.edu')

**To read the data from the Kinesis stream  (a consumer)**

In [0]:
# consumer.py

import boto3
import time
import json
kinesis = boto3.client("kinesis")
shard_id = "shardId-000000000000"
pre_shard_it = kinesis.get_shard_iterator(StreamName="ls_q3_stream", ShardId=shard_id, ShardIteratorType="LATEST")
shard_it = pre_shard_it["ShardIterator"]

import boto3
sns=boto3.client('sns', region_name='us-east-1')
topic_arn = sns.create_topic(Name='ls_q3_price_alert')['TopicArn']

while 1==1:
     out = kinesis.get_records(ShardIterator=shard_it, Limit=1)
     shard_it = out["NextShardIterator"]
     x = out['Records'][0]['Data'].decode('ASCII')
     y=json.loads(x)
     if y['PRICE'] < 3:
         sns.publish(TopicArn = topic_arn, Subject = "Price went below 3!", Message = "Price {price} below 3 at time {tim$
         break
     time.sleep(1.0)


In [0]:
# create_ls_q_3_stream.py

# Simple script to create a kinesis stream
import boto3

client = boto3.client('kinesis')
response = client.create_stream(
   StreamName='ls_q3_stream',
   ShardCount=1
)

In [0]:
import boto3
import time

session = boto3.Session()

kinesis = session.client('kinesis', region_name='us-east-1')
ec2 = session.resource('ec2', region_name='us-east-1')
ec2_client = session.client('ec2', region_name='us-east-1')

instances = ec2.create_instances(ImageId='ami-0915e09cc7ceee3ab',
                                 MinCount=1,
                                 MaxCount=2,
                                 InstanceType='t2.micro',
                                 KeyName='Macs_acc_nipun',
                                )

# Wait until EC2 instances are running before moving on
waiter = ec2_client.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance.id for instance in instances])

i_id = [instance.id for instance in instances]

instance_dns = [instance.public_dns_name
                 for instance in ec2.instances.all()
                 if instance.state['Name'] == 'running'
               ]

code = ['producer.py', 'consumer.py']

In [0]:
import paramiko
from scp import SCPClient
ssh_producer, ssh_consumer = paramiko.SSHClient(), paramiko.SSHClient()

# Initialization of SSH tunnels takes a bit of time; otherwise get connection error on first attempt

time.sleep(5)

# Install boto3 on each EC2 instance and Copy our producer/consumer code onto producer/consumer EC2 instances

instance = 0

stdin, stdout, stderr = [[None, None] for i in range(3)]

for ssh in [ssh_producer, ssh_consumer]:
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(instance_dns[instance],
                username = 'ec2-user',
                key_filename='MACS_acc_nipun.pem')

    with SCPClient(ssh.get_transport()) as scp:
        scp.put(code[instance])

    '''
    if instance == 0:
        stdin[instance], stdout[instance], stderr[instance] = \
            ssh.exec_command("sudo pip install boto3 testdata")
    else:
    '''
    stdin[instance], stdout[instance], stderr[instance] = \
            ssh.exec_command("sudo pip install boto3")

    instance += 1


# Block until Producer has installed boto3 and testdata, then start running Producer script:
producer_exit_status = stdout[0].channel.recv_exit_status()
if producer_exit_status == 0:
    ssh_producer.exec_command("python %s" % code[0])
    print("Producer Instance is Running producer.py\n.........................................")
else:
    print("Error", producer_exit_status)

# Close ssh and show connection instructions for manual access to Consumer Instance
ssh_consumer.close; ssh_producer.close()

print("Connect to Consumer Instance by running: ssh -i \"MACS_acc_nipun.pem\" ec2-user@%s" % instance_dns[1])


**Terminating EC2 instances and deleting the Kinesis stream**

In [0]:

# Terminate EC2 Instances:
ec2_client.terminate_instances(InstanceIds=[instance.id for instance in instances])

# Confirm that EC2 instances were terminated:
waiter = ec2_client.get_waiter('instance_terminated')
waiter.wait(InstanceIds=[instance.id for instance in instances])
print("EC2 Instances Successfully Terminated")

# Delete Kinesis Stream (if it currently exists):
try:
    response = kinesis.delete_stream(StreamName='ls_q3_stream')
except kinesis.exceptions.ResourceNotFoundException:
    pass

# Confirm that Kinesis Stream was deleted:
waiter = kinesis.get_waiter('stream_not_exists')
waiter.wait(StreamName='ls_q3_stream')
print("Kinesis Stream Successfully Deleted")

