## Blog Corpus Big Data Assignment

### Goal: To get industry name mentions in blogs and their count.

We should be able to replace data with other sets (containing any types of comparable elements) and have it run correctly. 

Search for mentions of industry words in the blog authorship corpus.
The goal here is to first find all of the possible industries in which bloggers were classified. Then, to search each blogger’s posts for mentions of those industries and, counting the mentions by month and year.  

Download the corpus here: http://u.cs.biu.ac.il/~koppel/BlogCorpus.htm  


Unzip the corpus file and look at the contents of a few files before reading on. Each file in the corpus is named according to information about the blogger: user_id.gender.age.industry.star_sign.xml  

Within each xml file, there is a “<date>” tag which indicates the date of a proceeding “<post>”, which contains the text of an individual blog post.  

In [1]:
from pyspark import SparkContext, SparkConf
import os
import re
import pprint

In [2]:
sc = SparkContext()

## Create an rdd of all the filenames
Create an rdd for the contents of all files [i.e. sc.wholeTextFiles(file1,file2,...) ]

In [3]:
data = sc.wholeTextFiles('blogs/*.xml')

***
## Get all possible industry names:
- Use transformations until you are left with only a set of possible industries
- Use an action to export the rdd to a set and make this a spark broadcast variable

In [4]:
def get_industry_name(path):
    file_name = os.path.splitext(os.path.basename(path))
    return file_name[0].split('.')[-2]

rddIndustrySet = data.map(lambda file: get_industry_name(file[0]))
rddIndustrySet.persist()

industries = sc.broadcast(set(rddIndustrySet.collect()))

In [15]:
print(industries.value)

{'Automotive', 'Publishing', 'Banking', 'Maritime', 'Architecture', 'Sports-Recreation', 'Science', 'Education', 'Agriculture', 'Non-Profit', 'Consulting', 'Technology', 'Fashion', 'indUnk', 'Internet', 'Accounting', 'Communications-Media', 'Marketing', 'InvestmentBanking', 'Manufacturing', 'Telecommunications', 'Construction', 'Advertising', 'Military', 'Student', 'Environment', 'Religion', 'Museums-Libraries', 'RealEstate', 'Law', 'HumanResources', 'Tourism', 'Biotech', 'Transportation', 'BusinessServices', 'Engineering', 'Arts', 'Chemicals', 'LawEnforcement-Security', 'Government'}


***
## Search for industry names in posts, recording by year-month:

- Use transformations to search all posts across all blogs for mentions of industries, and record the frequency each industry was mentioned by month and year.  

- The industry names should only be matched, case insensitive, if they are next to a word boundary -- space or punctuation (e.g. “marketing” would match “I am in marketing sales” and “Marketing.” but not “I like supermarketing.” or “This is marketing5 now.”).

In [5]:
def parse_blog(file):
    content = file[1]
    content = content.replace('<Blog>', '').replace('</Blog>', '')
    content = content.replace('<date>', '').replace('<post>', '')
    content = ' '.join(content.split()).strip()
    
    blog_list = content.split('</post>')[:-1]
    blog_date_post_list = list()
    for line in blog_list:
        date, post = line.split('</date>')
        date = date.strip()
        post = post.strip()
        dmy = date.split(',')
        date = dmy[2]+'-'+dmy[1]
        pattern = re.compile('[^A-Za-z0-9 -]')
        post = pattern.sub('', post)
        blog_date_post_list.append((date, post))

    counts = dict()
    for date, post in blog_date_post_list:
        for word in post.split():
            word = word.lower()  # makes this case-insensitive
            for w in industries.value:
                if w.lower()==word:
                    try:  # try/except KeyError is just a faster way to check if w is in counts:
                        counts[(w, date)] += 1
                    except KeyError:
                        counts[(w, date)] = 1
    return sorted(list(counts.items()))

***
## Record frequencies

Use an action to print the recorded frequencies in this format:
- [(industry1, ((year-month1, count), (year-month2, count), …),
- (industry2, ((year-month1, count), (year-month2, count), …), …]

In [6]:
rddIndVsDate = data.flatMap(parse_blog).reduceByKey(lambda a, b: a+b)

In [7]:
rddFinal = rddIndVsDate.map(lambda value: (value[0][0], (value[0][1], value[1])))
rddFinal = rddFinal.sortBy(lambda x: x[1]).groupByKey().mapValues(list)

In [11]:
print(rddFinal.collect())

[('Arts', [('-', 1), ('1999-May', 1), ('1999-September', 1), ('2000-October', 1), ('2000-September', 1), ('2001-August', 1), ('2001-December', 1), ('2001-March', 1), ('2001-May', 1), ('2001-November', 1), ('2001-October', 1), ('2001-September', 1), ('2002-April', 5), ('2002-August', 8), ('2002-December', 10), ('2002-February', 5), ('2002-July', 9), ('2002-June', 4), ('2002-March', 1), ('2002-May', 4), ('2002-November', 20), ('2002-October', 14), ('2002-September', 21), ('2003-April', 31), ('2003-August', 64), ('2003-December', 63), ('2003-February', 37), ('2003-January', 10), ('2003-July', 50), ('2003-June', 41), ('2003-March', 30), ('2003-May', 52), ('2003-November', 93), ('2003-October', 40), ('2003-September', 58), ('2004-Agosto', 2), ('2004-April', 160), ('2004-Aprill', 1), ('2004-August', 547), ('2004-February', 102), ('2004-January', 93), ('2004-Julho', 6), ('2004-July', 757), ('2004-June', 545), ('2004-Junho', 3), ('2004-Juni', 1), ('2004-Maio', 1), ('2004-March', 180), ('2004-M

In [9]:
rddFinal.saveAsTextFile('./output')