# Basic Steps to do Streaming


This entire project is based on gathering reviews from Pitchfork: http://pitchfork.com/
* more info at "Overview.ipynb"

* Run the script "pf_miner.py"
* This script webscrapes the Pitchfork Reviews page (http://pitchfork.com/reviews/albums/)       using Beautiful Soup
* Using boto it connects to S3 and dumps the review data in json format
* Use the cronjob to run every 24 hours at 6:15pm. 

# Detailed steps on how I Stream the data

###### Please note, I am trying to run the code in Notebook here to make it look modular. The reason I am running into errors here is I have put some of the functions and other files in separate folders for proper book keeping. So from this location I would have to change the paths of the functions I am calling. Recommended way to run this--- All the scripts will run in PyCharm or Atom. 


In [29]:
#!/usr/bin/spark-submit

from os.path import expanduser
import requests #when we type in a link in browser we make a request to that link
#we are using request to fetch information from link, once we get it we feed it to soup
#Soup parses html and gives us a tree and we go in there and get the information out of it 
#using soup
#This is how we are doing webscraping, as shown below. 
import json
import yaml
import pytz
from datetime import datetime
from dateutil.parser import parse as date_parse
from dateutil.relativedelta import relativedelta
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from concurrent.futures import ThreadPoolExecutor, as_completed
from bs4 import BeautifulSoup
from time import sleep
from make_db import partition_job

In [21]:
# The base url is the pitchfork reviews page and it returns review columns as a list
pf_url = 'http://pitchfork.com'
base_url = 'http://pitchfork.com/reviews/albums/'
review_columns = ['artist','album','score', 'album_art','genre','label',
                'pub_date','abstract','featured_tracks', 'review_content']

![pitchfork reviews](images/Scratch_Pitchfork.png)

**  Here we are taking data and storing it in S3, so connecting to S3. 

**  This part is part of the second S - Storing, but it is inside code so we will briefly talk about it here.

**  We connect to AWS S3 with our AWS credentials and *.yaml file which stores them

**  We create a bucket in AWS, here the name is "finalprojectpitchfork." 

**  We upload our data each time we get a review using the function "upload_to_s3"---include Pitchfork review albums - this page contains links to each of the reviews

**  We fetch the links from here and then fetch the actual review page using the links
and each individual review page contains information we want. 

**  Once we check that this code is running on local machine I created an EC2 instance on AWS to run it.

In [19]:
def connect_to_s3(credentials, bucket_name='finalprojectpitchfork'):

    conn = S3Connection(credentials['aws']['access_key_id'],credentials['aws']['secret_access_key'])

    try:
        bucket = conn.get_bucket(bucket_name)
        print("Bucket found on S3")
    except:
        bucket = conn.create_bucket(bucket_name)
        print("Creating bucket")
    return conn, bucket


def upload_to_s3(bucket, data):

    try:
        k = Key(bucket)
        k.key = data['artist'] + ' - ' + data['album']
        k.set_contents_from_string(json.dumps(data, ensure_ascii=False))
        print("Uploaded ", k.key)
    except:
        print("Failed to upload", k.key)

In [11]:
# Setting number of reviews to be fetched to be 10,000
#Request limit is set to 20. If there is error handling, i.e. either the site blocks us or
#there are no more reviews, we set request limit to 20.
def pf_miner(credentials, limit=10000, req_lim=20):

    err_count = 0

    _, bucket = connect_to_s3(credentials)

    for page in range(1, limit):
        print("Mining page: ", page)

        info_list = req_parse(base_url+'?page={}'.format(page),
                req_lim).findAll("div", {"class": "review"})
# when we request a webpage it takes a lot of waiting time, instead of waiting for them one at a time
#we request multiple ones. it takes 2 hours to get all of them this way, otherwise it takes overnight.
        with ThreadPoolExecutor(max_workers=5) as executor:

            future_to_url = {executor.submit(req_task, pf_url+info.a['href'],
                    req_lim): info.a['href'] for info in info_list}

#as completed dictionary gets fed in url dictionary
# yields task as they are completed
            for future in as_completed(future_to_url):
                try:
                    # get the resulting data
                    data = future.result()
#Either something wrong with the review or we are out of reviews. 
#Thats why I set a threshold of 20 errors.
                except (AttributeError, TypeError):

                    err_count += 1
                    if err_count >= req_lim:
                        print('No more reviews found. Exiting...')
                        return

                    print('No more links on the page.')
                    continue
                except:
                    print('Unknown Error occurred: ', future.exception())
                    continue

                upload_to_s3(bucket, data)




# 1 request task gets one of the review pages with 1 review on it.  
def req_task(link, req_lim):
    '''
    calls all the functions and returns a tuple of all the review data
    '''
    print('req_task: ', link)
    soup = req_parse(link, req_lim)

    review = [get_artist(soup), get_album(soup), get_score(soup),
                get_album_art(soup), get_genres(soup), get_label(soup),
                get_pub_date(soup), get_abstract(soup),
                get_featured_tracks(soup), get_review_content(soup)]
# originally the review gives a list of stuff as above and we convert it to json for easier handling
# Alternatively we could have done it through saving it as *.csv but handling would have 
#been difficult


#returns 1 page as json each time
    my_json = {}
    for col_name, item in zip(review_columns, review):
        my_json[col_name] = ''.join([c for c in item if ord(c) < 128])

    return my_json

#handles the list of multiple reviews page and return the soup of it

def req_parse(link, req_lim):
    '''
    GET the link, returns None if failure. Returns the soup if all went well.
    '''
    for _ in range(req_lim):
        try:
            req = requests.get(link,headers={'User-Agent': 'Mozilla/5.0'})
            break
        except:
            print('stuck')
            sleep(5)

    # Check the page and a catastrophe check 
    if req is None:
        return

    if req.status_code == 404:
        print ('There are no more pages to mine. Breaking out of loop.')
        return

    # Parse html
    soup = BeautifulSoup(req.text, 'html.parser')

    return soup

In [23]:
# Extractors go here
def get_artist(soup):
    try:
        return soup.findAll('h2',{'class':'artists'})[0].text.replace(":", " ").replace("/", " ")
    except:
        return ''

def get_album(soup):
    try:
        return soup.findAll('h1', {'class':'review-title'})[0].text.replace(":", " ").replace("/", " ")
    except:
        return ''

def get_score(soup):
    try:
        return soup.findAll('span',{'class':'score'})[0].text
    except:
        return ''

def get_album_art(soup):
    try:
        return soup.findAll('div',{'class':'album-art'})[0].img['src']
    except:
        return ''

def get_genres(soup):
    try:
        genres = soup.findAll('ul',{'class':'genre-list before'})
        result = ''
        for i in range(len(genres)):
            result += genres[i].text
        return result
    except:
        return ''

def get_label(soup):
    try:
        return soup.findAll('ul',{'class':'label-list'})[0].text
    except:
        return ''

def get_pub_date(soup):
    try:
        return soup.findAll('span',{'class':'pub-date'})[0]['title']
    except:
        return ''

def get_abstract(soup):
    try:
        return soup.findAll('div',{'class':'abstract'})[0].text
    except:
        return ''

def get_featured_tracks(soup):
    try:
        return soup.findAll('div',{'class':'player-display'})[0].text
    except:
        return ''

def get_review_content(soup):
    try:
        return soup.findAll('div', {'class':'contents dropcap'})[0].text.replace('\n',' ')
    except:
        return ''

#checks the first page to see if there are any new reviews

###### This is the cron job that with cron tab runs pf_minor.py every 24 hours at 6:15pm

In [12]:
def pf_croner(credentials, req_lim=20):
    ''' checks the first page for new reviews '''

    _, bucket = connect_to_s3(credentials)

    info_list = req_parse(base_url+'?page=1', req_lim) \
            .findAll("div", {"class": "review"})

    data_list = []

    # if the date cannot be parsed, that means it is new (e.g. "23 hrs ago")
    for info in info_list:
        data = req_task(pf_url+info.a['href'], req_lim)
        
        d = date_parse(data['pub_date'])
        datediff = relativedelta(d, pytz.utc.localize(datetime.now()))
        print(datetime.now(), d)
        if datediff.years == 0 and datediff.months == 0 and datediff.days == 0:        
            print("New Review Found")
            upload_to_s3(bucket, data)
            data_list.append(data)

    partition_job(data_list, credentials)

    print("CRON Completed")

In [27]:
#
def partition_job(source, credentials=None):

    if not credentials:
        credentials = my_cred.value

    conn, cur = connect_to_psql(credentials)

    query = """INSERT INTO reviews (artist, album, score, album_art, genre, label,
                pub_date, abstract, review_content)
                VALUES (%(artist)s, %(album)s, %(score)s, %(album_art)s, %(genre)s,
                %(label)s, %(pub_date)s, %(abstract)s, %(review_content)s)"""

    print("Starting job...")

    for item in source:

        try:
            if type(item) == unicode:
                data = json.loads(item)
            elif type(item) == dict:
                data = item
            print('Inserting album: ', data['album'])
            cur.execute(query, data)
        except:
            conn.rollback()

        conn.commit()

    print("Partition job completed.")

In [28]:
#We run the pf_miner.py first and hash out pf_croner. Subsequently we hash out pf_miner and just keep
#running pf_croner.py which runs the entire pf_miner as well as the cron job every 24 hours.
if __name__ == '__main__':
    ''' Run here to CRON '''
    cred = yaml.load(open(expanduser('~/Desktop/api_cred.yml')))
    pf_croner(cred)
    #pf_miner(cred)

Bucket found on S3
req_task:  http://pitchfork.com/reviews/albums/23135-world-spirituality-classics-1-the-ecstatic-music-of-alice-coltrane-turiyasangitananda/
2017-05-10 16:14:28.390960 2017-05-10 05:00:00+00:00
New Review Found
Uploaded  Alice Coltrane - World Spirituality Classics 1  The Ecstatic Music of Alice Coltrane Turiyasangitananda
req_task:  http://pitchfork.com/reviews/albums/23123-white-knight/
2017-05-10 16:14:28.951958 2017-05-10 05:00:00+00:00
New Review Found
Uploaded  Todd Rundgren - White Knight
req_task:  http://pitchfork.com/reviews/albums/23152-real-high/
2017-05-10 16:14:29.393570 2017-05-10 05:00:00+00:00
New Review Found
Uploaded  Nite Jewel - Real High
req_task:  http://pitchfork.com/reviews/albums/23217-finding-people-ep/
2017-05-10 16:14:29.752114 2017-05-10 05:00:00+00:00
New Review Found
Uploaded  Croatian Amor - Finding People EP
req_task:  http://pitchfork.com/reviews/albums/23099-inter-alia/
2017-05-10 16:14:30.241413 2017-05-09 05:00:00+00:00
req_task: 

NameError: name 'connect_to_psql' is not defined

![DAG](images/DAG.png)

![DAG](images/DE_Eli_May10.png)