# Hacker News Pipeline

## Introduction

Throughout the last module in the Dataquest course, we learned the concepts of functional programming and then built our own data pipeline class in Python. We learned about advanced Python concepts such as decorators, closures and good API design. We also learned how to implement a Directed Acyclic Graph (DAG) as the scheduler for our pipeline.

After completing all of these lessons, we finally built a robust data pipeline that schedules our tasks in the correct order. In this project, we will use the pipeline on a real world data pipeline project.
From a JSON API, we will filter, clean, aggregate, and summarise data in a sequence of tasks that will apply these transformations for us.

The data we will use comes from a [Hacker News](https://news.ycombinator.com/) (HN) API that returns JSON data of the top stories in 2014. Hacker News is a link aggregator website where users vote up stories that are interesting to the computer science, science and entrepreneur communities.

We will use a pre-downloaded JSON file called `hn_stories_2014.json`, that contains a single key, `stories` - a list of stories (posts). Each post has a set of keys, but we will only deal with the following keys:

* `created_at`: A timestamp of the story's creation time.
* `created_at_i`: A unix epoch timestamp.
* `url`: The URL of the story link.
* `objectID`: The ID of the story.
* `author`: The story's author (username on HN).
* `points`: The number of upvotes the story had. 
* `title`: The headline of the post.
* `num_comments`: The number of comments a post has.

Using this dataset, we will run a sequence of basic natural language processing tasks using our Pipeline class. The goal is to find the top 100 keywords of HN posts in 2014. This will give us an understanding of the most talked about tech topics at the time.

In [1]:
# Import the Pipeline class that we created in the previous lesson and instantiate an instance of it.
from pipeline import Pipeline

pipeline = Pipeline()

## Loading the JSON Data

Let's start by loading the JSON file data into Python. As JSON files resemble a key-value dictionary, we will parse it into a Python `dict` object using the `json` module.

In [2]:
# Create a task to load in the JSON data
import json

@pipeline.task()
def file_to_json():
    with open('hn_stories_2014.json') as file:
        data = json.load(file)
        stories = data['stories']
        return stories # Returns a list of dict objects, with each one being an individual story

## Filtering the Stories

Now we have loaded in all the stories as a list of `dict` objects, we can now operate on them. Let's start by filtering the list of stories to get the most popular stories of the year.

We can filter for popular stories by ensuring they are links (not `Ask HN` posts), have a good number of points, and have some comments. We will consider a good number of points to be above 50.

In [3]:
from pipeline import Pipeline
import json

pipeline = Pipeline()

@pipeline.task()
def file_to_json():
    with open('hn_stories_2014.json') as file:
        data = json.load(file)
        stories = data['stories']
        return stories

# Create a task to filter the stories for popular ones.
@pipeline.task(depends_on=file_to_json) # Set the dependency to ensure the task runs after file_to_json and uses its results
def filter_stories(stories):
    def is_popular(story):
        return story['title'].split(' ')[0:2] != 'Ask HN' and story['points'] > 50 and story['num_comments'] > 1
    
    return (story for story in stories if is_popular(story))




## Convert to CSV

With a reduced set of stories, it's time to write these `dict` objects to a CSV file. The purpose of translating the dictionaries to a CSV is that we want to have a consistent data format when running the later summaries. By keeping consistent data formats, each pipeline task will be adaptable with future task requirements.

In [4]:
from pipeline import Pipeline, build_csv
import json
import io
import csv
from datetime import datetime

pipeline = Pipeline()

@pipeline.task()
def file_to_json():
    with open('hn_stories_2014.json') as file:
        data = json.load(file)
        stories = data['stories']
        return stories

@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    def is_popular(story):
        return story['title'].split(' ')[0:2] != 'Ask HN' and story['points'] > 50 and story['num_comments'] > 1
    
    return (story for story in stories if is_popular(story))


# Create a task the writes the filtered JSON stories to a CSV file.
@pipeline.task(depends_on=filter_stories) # Set the dependency to filter_stories task
def json_to_csv(stories):
    lines = []
    for story in stories:
        # parse the created_at column as a datetime object
        lines.append((story['objectID'], datetime.strptime(story['created_at'], '%Y-%m-%dT%H:%M:%SZ'), story['url'], story['points'], story['title']))
    return build_csv(lines, header=['objectID', 'created_at', 'url', 'points', 'title'], file=io.StringIO())

## Extract Title Column

Using the CSV file format we created in the previous task, we can now extract the title column. Once we have extracted the titles of each popular post, we can then run the next word frequency task.

In [5]:
from pipeline import Pipeline, build_csv
import json
import io
import csv
from datetime import datetime

pipeline = Pipeline()

@pipeline.task()
def file_to_json():
    with open('hn_stories_2014.json') as file:
        data = json.load(file)
        stories = data['stories']
        return stories

@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    def is_popular(story):
        return story['title'].split(' ')[0:2] != 'Ask HN' and story['points'] > 50 and story['num_comments'] > 1
    
    return (story for story in stories if is_popular(story))

@pipeline.task(depends_on=filter_stories)
def json_to_csv(stories):
    lines = []
    for story in stories:
        lines.append((story['objectID'], datetime.strptime(story['created_at'], '%Y-%m-%dT%H:%M:%SZ'), story['url'], story['points'], story['title']))
    return build_csv(lines, header=['objectID', 'created_at', 'url', 'points', 'title'], file=io.StringIO())

# Create a task that extracts the title from each story.
@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    index = header.index('title')
    return (line[index] for line in reader) # Return a generator of each title    

## Cleaning the Titles

To create our word frequency model from Hacker News titles, we need to ensure that we use a consistent set of words. This means that we need to clean the titles by removing punctuation and standardising the strings to lower case.
This will prevent words like `Google`, `google` and `google?` from being classed as different words.

In [6]:
from pipeline import Pipeline, build_csv
import json
import io
import csv
from datetime import datetime
import string

pipeline = Pipeline()

@pipeline.task()
def file_to_json():
    with open('hn_stories_2014.json') as file:
        data = json.load(file)
        stories = data['stories']
        return stories

@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    def is_popular(story):
        return story['title'].split(' ')[0:2] != 'Ask HN' and story['points'] > 50 and story['num_comments'] > 1
    
    return (story for story in stories if is_popular(story))

@pipeline.task(depends_on=filter_stories)
def json_to_csv(stories):
    lines = []
    for story in stories:
        lines.append((story['objectID'], datetime.strptime(story['created_at'], '%Y-%m-%dT%H:%M:%SZ'), story['url'], story['points'], story['title']))
    return build_csv(lines, header=['objectID', 'created_at', 'url', 'points', 'title'], file=io.StringIO())

@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    index = header.index('title')
    return (line[index] for line in reader)

# Create a task to clean the titles
@pipeline.task(depends_on=extract_titles)
def clean_titles(titles):
    punctuation = string.punctuation
    for title in titles:
        for character in punctuation:
            if character in title:
                title = title.replace(character, '') # Remove punctuation
        
        yield lower(title) # Cast title as lower case
                

## Create the Word Frequency Dictionary

We will now build a word frequency dictionary that consists of key-value pairs of each word that features in the HN titles, along with how many times they occur.
To ensure that we find actual keywords, we will enforce the dictionary so that it doesn't include **stop words**. Stop words are those commonly used in language, such as `the`, `i` and `are` ect. These are commonly rejected in keyword searches.
We will use the `stop_words` module included in the directory, which contains a tuple of such words, to filter our results.

In [7]:
from pipeline import Pipeline, build_csv
import json
import io
import csv
from datetime import datetime
import string
from stop_words import stop_words

pipeline = Pipeline()

@pipeline.task()
def file_to_json():
    with open('hn_stories_2014.json') as file:
        data = json.load(file)
        stories = data['stories']
        return stories

@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    def is_popular(story):
        return story['title'].split(' ')[0:2] != 'Ask HN' and story['points'] > 50 and story['num_comments'] > 1
    
    return (story for story in stories if is_popular(story))

@pipeline.task(depends_on=filter_stories)
def json_to_csv(stories):
    lines = []
    for story in stories:
        lines.append((story['objectID'], datetime.strptime(story['created_at'], '%Y-%m-%dT%H:%M:%SZ'), story['url'], story['points'], story['title']))
    return build_csv(lines, header=['objectID', 'created_at', 'url', 'points', 'title'], file=io.StringIO())

@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    index = header.index('title')
    return (line[index] for line in reader)

@pipeline.task(depends_on=extract_titles)
def clean_titles(titles):
    punctuation = string.punctuation
    for title in titles:
        for character in punctuation:
            if character in title:
                title = title.replace(character, '')
        
        yield title.lower()
        
# Create a task that generates a word frequency dictionary
@pipeline.task(depends_on=clean_titles)
def build_keyword_dictionary(titles):
    frequency_dict = {}
    for title in titles:
        words = title.split(' ')
        for word in words:
            if word and word not in stop_words:
                if word in frequency_dict:
                    frequency_dict[word] += 1
                else:
                    frequency_dict[word] = 1
    
    return frequency_dict
        

##  Sorting the Top Words

Finally, we can sort the words so that we can see the most common keywords used in Hacker News titles. We will output a list of tuples with `(word, frequency)` as entries sorted from most used to least used. We will limit our results to the top 100.

In [8]:
from pipeline import Pipeline, build_csv
import json
import io
import csv
from datetime import datetime
import string
from stop_words import stop_words

pipeline = Pipeline()

@pipeline.task()
def file_to_json():
    with open('hn_stories_2014.json') as file:
        data = json.load(file)
        stories = data['stories']
        return stories

@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    def is_popular(story):
        return story['title'].split(' ')[0:2] != 'Ask HN' and story['points'] > 50 and story['num_comments'] > 1
    
    return (story for story in stories if is_popular(story))

@pipeline.task(depends_on=filter_stories)
def json_to_csv(stories):
    lines = []
    for story in stories:
        lines.append((story['objectID'], datetime.strptime(story['created_at'], '%Y-%m-%dT%H:%M:%SZ'), story['url'], story['points'], story['title']))
    return build_csv(lines, header=['objectID', 'created_at', 'url', 'points', 'title'], file=io.StringIO())

@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    index = header.index('title')
    return (line[index] for line in reader)

@pipeline.task(depends_on=extract_titles)
def clean_titles(titles):
    punctuation = string.punctuation
    for title in titles:
        for character in punctuation:
            if character in title:
                title = title.replace(character, '')
        
        yield title.lower()
        
@pipeline.task(depends_on=clean_titles)
def build_keyword_dictionary(titles):
    frequency_dict = {}
    for title in titles:
        words = title.split(' ')
        for word in words:
            if word and word not in stop_words:
                if word in frequency_dict:
                    frequency_dict[word] += 1
                else:
                    frequency_dict[word] = 1
    
    return frequency_dict

# Create a task to sort the word frequencies
@pipeline.task(depends_on=build_keyword_dictionary)
def top_100(word_frequencies):
    sorted_words = sorted(word_frequencies.items(), key=lambda item: item[1], reverse=True)
    return sorted_words[:100]
    


## Results

We can see that our pipeline and tasks work as intended, and it would be easy to update or add in tasks as our needs develop. 

If we look at the results, we find that `Google`, `bitcoin`, `programming` and `web` are popular keywords in Hacker News post titles.

In [17]:
completed_tasks = pipeline.run()
print(completed_tasks[top_100])

[('new', 185), ('google', 167), ('bitcoin', 102), ('open', 95), ('programming', 92), ('web', 89), ('data', 86), ('video', 79), ('python', 75), ('code', 74), ('facebook', 71), ('released', 71), ('using', 70), ('source', 68), ('2013', 65), ('2014', 65), ('free', 65), ('javascript', 65), ('game', 64), ('internet', 62), ('c', 60), ('work', 59), ('microsoft', 59), ('linux', 58), ('app', 57), ('pdf', 55), ('language', 54), ('software', 54), ('use', 53), ('startup', 52), ('make', 51), ('apple', 50), ('time', 49), ('yc', 48), ('security', 48), ('nsa', 45), ('github', 45), ('windows', 44), ('like', 44), ('project', 42), ('way', 42), ('world', 41), ('users', 40), ('developer', 40), ('1', 40), ('computer', 40), ('heartbleed', 40), ('dont', 38), ('git', 37), ('design', 37), ('ios', 37), ('os', 36), ('twitter', 36), ('ceo', 36), ('online', 36), ('vs', 36), ('big', 36), ('life', 36), ('day', 35), ('android', 34), ('years', 34), ('apps', 34), ('best', 34), ('simple', 33), ('mt', 33), ('court', 33), (