The goal of this project is to create a multi-dependency data pipeline that identifies the top 100 keywords in the Hacker News posts in 2014.

Steps in the pipeline:
- retrieve data from the JSON file
- segment posts that are popular
- convert data to csv
- extract 'title' column from data
- standardize words in the from 'title'
- build a word frequency dictionary
- identify top 100 words in the dictionary

In [1]:
from pipeline import Pipeline, build_csv
import json
import io
from datetime import datetime
import csv
import string
from stop_words import stop_words

In [2]:
pipeline = Pipeline()

@pipeline.task()
def files_to_json():
    with open('hn_stories_2014.json', 'r') as f:
        data = json.load(f)
        stories = data['stories']
    return stories

In [3]:
@pipeline.task(depends_on = files_to_json)
def filter_stories(stories):
    def is_popular(story):
        return story['points'] > 50 and story['num_comments'] > 1 and not story['title'].startswith('Ask HN')
    
    return (
        story for story in stories
        if is_popular(story)
    )

In [4]:
@pipeline.task(depends_on = filter_stories)
def json_to_csv(stories):
    header = ['objectID', 'created_at', 'url', 'points', 'title']
    lines = []
    for story in stories:
        created_at = datetime.strptime(story['created_at'], "%Y-%m-%dT%H:%M:%SZ")
        lines.append((story['objectID'], created_at, story['url'], 
                     story['points'], story['title']))
    return build_csv(lines, header = header, file = io.StringIO())

In [5]:
@pipeline.task(depends_on = json_to_csv)
def extract_titles(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('title')
    return (line[idx] for line in reader)

In [6]:
@pipeline.task(depends_on = extract_titles)
def clean_titles(titles):
    for title in titles:
        title = title.lower()
        for punctuation in string.punctuation:
            title = title.replace(punctuation, '')
        yield title

In [7]:
@pipeline.task(depends_on = clean_titles)
def build_key_word_dictionary(titles):
    word_freq = {}
    for title in titles:
        words = title.split()
        for word in words:
            if word not in stop_words:
                if word in word_freq:
                    word_freq[word] += 1
                else:
                    word_freq[word] = 1
    return word_freq

In [8]:
@pipeline.task(depends_on = build_key_word_dictionary)
def sort_top_words(word_dict):
    sorted_freq = [(word, word_dict[word])
                    for word in sorted(word_dict, key = word_dict.get, reverse = True)]
    return sorted_freq[:100]

In [9]:
output = pipeline.run()
print(output[sort_top_words])

[('new', 185), ('google', 167), ('bitcoin', 101), ('open', 92), ('programming', 90), ('web', 88), ('data', 85), ('video', 79), ('python', 76), ('code', 72), ('facebook', 71), ('released', 71), ('using', 70), ('2013', 65), ('javascript', 65), ('free', 64), ('source', 64), ('game', 63), ('internet', 62), ('microsoft', 59), ('c', 59), ('linux', 58), ('app', 57), ('pdf', 55), ('work', 54), ('language', 54), ('software', 52), ('2014', 52), ('startup', 51), ('apple', 50), ('use', 50), ('make', 50), ('time', 48), ('yc', 48), ('security', 48), ('nsa', 45), ('github', 45), ('windows', 44), ('1', 41), ('world', 41), ('way', 41), ('like', 41), ('project', 40), ('computer', 40), ('heartbleed', 40), ('git', 37), ('users', 37), ('dont', 37), ('design', 37), ('ios', 37), ('developer', 36), ('os', 36), ('twitter', 36), ('ceo', 36), ('vs', 36), ('life', 36), ('big', 35), ('day', 35), ('android', 34), ('online', 34), ('years', 33), ('simple', 33), ('court', 33), ('guide', 32), ('learning', 32), ('mt', 3