# CA675 Assignment 3: 'Cloud Application'
#Data Preparation
_Disclaimer: Submitted to Dublin City University, School of Computing for module CA675: Cloud Technologies, 2016. We hereby certify that the work presented and the material contained herein is our own except where explicitly stated references to other material are made_.

**Authors:** John Segrave, Paul O'Hara, Claire Breslin

**Emails:** john.segravedaly2@mail.dcu.ie, paul.ohara6@mail.dcu.ie, claire.breslin4@mail.dcu.ie

**Student IDs:** 14212108, 14212372, 14210826

**Code & Data available online:**
* **Code** -  Code and logs are in [this github repository](https://github.com/oharapaGitHub/ca675assignment3backend).
 * _You can download this data preparation notebook from GitHub and run it interactively in your own IPython Notebook._
* **Data** - Raw, Processed and Final data can be found on [Google Drive](https://drive.google.com/a/mail.dcu.ie/folderview?id=0B4KWi5yG-4oGR1FQRXNOQ0VlYTA&usp=sharing).

# Reader Insights with Wikipedia

## 1. Introduction 
### The Idea
You're a news agency or a freelance writer. You already know what the big news story of the day is - e.g. The US Presidential race. There are so many angles you could potentially write about, but which one to pick?  Ideally, you would peek inside the mind of (a lot of) your audience and find out "What angle **about this** do they find interesting **right now**? What do they want to know more about?"

Clickstream data can give insights into this. When people want to know more about something, they look it up (e.g. on Wikipedia). Then they click on other pages telling them more about the angle they find most useful or interesting.

What if we could make all those clicks visible? It would give content writers a window into the minds of readers - show the flow of their interests, their ‘stream of consciousness’  e.g. what can Wikipedia clickstream data tell us about the topics that people find interesting in the [Political_positions_of_Donald_Trump](www.wikipedia.org/wiki/Political_positions_of_Donald_Trump)? What other pages were they reading that led them there and what topics on that page did they find interesting enough to click on next?

### The Data

Wikipedia recently started publishing clickstream data [here](https://figshare.com/articles/Wikipedia_Clickstream/1305770). The data shows how people get to a Wikipedia pages and what other pages they click on from that page.  Our application aims to use this data to give journalists an insight into "what people are interested in right now" in the context of a particular topic - a kind of window into the public _'(click) stream of consciousness'_ as it were.

The raw log data _"contains counts of (referer, resource) pairs extracted from the request logs of Wikipedia"_. At present, the only log available covers all of the (aggregated) clicks between pages in the 'en' wikipedia corpus for the month of March 2016- around 25 million log entries (1.2Gb).

Ideally, we would like to have hourly (or finer) grained data, but while that is not available yet, the data to hand is perfectly suitable for a proof of concept. If the idea proves workable and useful, it could be extended to consume data at a finer level of granularity.  If time allows, we may experiment with streaming in new log entries as well.

_Original data thanks to the [Wikipedia Research Community](https://meta.wikimedia.org/wiki/Research:Index) (Wulczyn, Ellery; Taraborelli, Dario (2016): Wikipedia Clickstream. Figshare. https://dx.doi.org/10.6084/m9.figshare.1305770.v16)_ 

## 2. Data Preparation

The goal of the data preparation is to:
1. Clean the raw log data - remove any unusable information.
1. Turn the counts of (referer, resource) pairs into a fully-formed picture for each distinct wikipedia page as follows:
 * **Page Title:** The title of the wikipedia page (the string that comes after 'www.wikipedia.org/wiki/' in a wikipedia URL)
 * **Prioritised Inbound pages, with click counts**: A list of the pages (by title) that led to the current page, including a count of the number of clicks, ordered with the most interesting pages first (i.e. those with the highest click counts).
 * **Prioritised Outbound pages, with click counts** Similar to the previous list, but describing the pages on the current page - i.e. those that lead _out_ of the current page. Again, including a count of the number of clicks, ordered with the most interesting pages first (i.e. those with the highest click counts).

**Technology**
We are using Apache Spark to process the raw log data for the simple reason that it has a good ecosystem of other potentially useful components which we may look into if there is time after the initial application is completed. For example: Spark Streaming and GraphX.

## 3. Implementation
### Initialise variables and load raw clickstream data into Spark

In [1]:
source = '2016_03_clickstream'                                    # full data Mar 2016
#source = '2016_03_clickstream_sample_BRUSSELS_UTF8'               # medium test data - theme-centric (brussels)
#source = '2016_03_clickstream_sample_TEST_DATASET'                # small test data for testing - handcrafted
#source = '2016_03_clickstream_sample_BOWIE_UTF8'                  # medium test data - theme-centric (bowie)
#source = '2016_03_clickstream_200000_random_rows_UTF8'            # medium, randomised test data

# First files tried - turned out to be an experiment from 2015 with a different data format
#source = '2015_02_clickstream'                                    # full data Feb 2015

infilename = source + '.tsv.gz'
outfilename = source + '_RESULTS.tsv'
input_csv  = '/resources/' + infilename                            # Cloud location
output_csv = '/resources/' + outfilename                           # Cloud location
#input_csv = 'c:/users/john@segrave.org/downloads/js_wikipedia_data/' + infilename     # local location
#output_csv = 'c:/users/john@segrave.org/downloads/js_wikipedia_data/' + outfilename   # local location

# Point Spark at the full, compressed log data - this actual file will not be read until
# one of our Spark actions requires it (lazy loading).
# Metadata: https://datahub.io/dataset/wikipedia-clickstream/resource/be85cc68-d1e6-4134-804a-fd36b94dbb82
tsv_lines_rdd = sc.textFile(input_csv)

# And we'll cause the file to be read straight away by counting the number of log entries :-)
print("Total number of log entries: %d" % (tsv_lines_rdd.count()))

Total number of log entries: 25617311


### Create the basic 'raw data' RDD from which other processing will follow

In [2]:
# WIKIPEDIA CHANGED THE FILE FORMAT: THIS IS THE OLD FORMAT. NEW ONE FOR MARCH 2016 FOLLOWS... 
# Chop the data into tokens, perform datatype conversions and 
# It would probably be more efficient not to have the CSV header in the file to begin with, but we don't get to control that.
#raw_data_rdd = (tsv_lines_rdd
#         .filter(lambda line : not(line.startswith('prev_id')))               # Filter out the CSV header
#         .map(lambda line : line.split('\t'))                                 # Tokenise the input data
#         .map(lambda (prev_id, curr_id, n, prev_title, curr_title)
#              : (prev_id, curr_id, int(n), prev_title, curr_title)) # Datatype conversion
#         ).cache()                                                            # Cache the result because we'll use it a fair bit

In [3]:
# 'other-' pages just fill up the inbound list without adding value, so we'll
# collapse them into a single record to preserve accurate link counts & proportions.
other_title = "(external)_websearch_social_media_etc"

# First level of data cleaning, eliminate 'noise' pages that clog up the UI.
def eliminate_noise_pages(line) :
    # Tokenise the input data
    prev_title, curr_title, ref_type, n = line.split('\t')
    # Filter 'noise' pages by giving them all a mocked-up common page name (this preserves accurate click counts).
    prev_title_cleaned = (other_title if (ref_type == 'external') else prev_title)
    # Return filtered pages and do type conversion
    return (prev_title_cleaned, curr_title, int(n))

# Chop the data into tokens, perform datatype conversions and 
# It would probably be more efficient not to have the CSV header in the file to begin with, but we don't get to control that.
# WIKIPEDIA CHANGED THE FILE FORMAT: THIS IS THE NEW FORMAT FOR MARCH 2016 
raw_data_rdd = (tsv_lines_rdd
         # Filter out the CSV header
         .filter(lambda line : not(line.startswith('prev\t')))

         # Filter 'noise' pages and do type conversion
         .map(eliminate_noise_pages)

         # Map/reduce multiple noise page references down to a single page with aggregate count
         .map(lambda (prev_title, curr_title, n) : ((prev_title, curr_title), n))
         .reduceByKey(lambda n1, n2 : n1 + n2)
         .map(lambda ((prev_title, curr_title), n) : (prev_title, curr_title, n))
                
         # Cache the result because we'll use it a fair bit
         ).cache()

raw_data_rdd.takeSample(False, 1) # for dev/test, remove on production run

[(u'Minister-President_of_the_Brussels-Capital_Region', u'Rudi_Vervoort', 127)]

### Collect the list of pages that referred *into* every individual page (with corresponding click counts)

In [5]:
# WIKIPEDIA CHANGED THE FILE FORMAT: THIS IS THE OLD FORMAT, NEED NEW ONE FOR MARCH 2016 
#inbound_links_rdd = (raw_data_rdd
#
#         # Drop the ID fields as their quality is poor
#         .map(lambda (prev_id, curr_id, n, prev_title, curr_title, ref_type)
#              : (curr_title, ([prev_title], [n])))
#
#         # Create two lists - pages and coresponding clicks
#         .reduceByKey( lambda x, y: (x[0]+y[0],x[1]+y[1]))
#    )

In [6]:
# WIKIPEDIA CHANGED THE FILE FORMAT: THIS IS THE NEW FORMAT FOR MARCH 2016 
inbound_links_rdd = (raw_data_rdd
         .map(lambda (prev_title, curr_title, n)
              : (curr_title, ([prev_title], [n])))

         # Create two lists - pages and coresponding clicks
         .reduceByKey( lambda x, y: (x[0]+y[0],x[1]+y[1]))
    )
inbound_links_rdd.takeSample(False, 1) # for dev/test, remove on production run

[(u'European_School,_Brussels_III',
  ([u'European_School',
    u'List_of_international_schools',
    '(external)_websearch_social_media_etc'],
   [25, 11, 153]))]

### Collect the list of pages that are referred *out* from every individual page (with corresponding click counts)

In [8]:
# WIKIPEDIA CHANGED THE FILE FORMAT: THIS IS THE OLD FORMAT, NEED NEW ONE FOR MARCH 2016 
#outbound_links_rdd = (raw_data_rdd
#
#         # Drop the ID fields as their quality is poor
#         .map(lambda (prev_id, curr_id, n, prev_title, curr_title)
#              : (prev_title, ([curr_title], [n])))
#
#         # Create two lists - pages and coresponding clicks
#         .reduceByKey( lambda x, y: (x[0]+y[0],x[1]+y[1]))
#    )

In [9]:
outbound_links_rdd = (raw_data_rdd
         .map(lambda (prev_title, curr_title, n)
              : (prev_title, ([curr_title], [n])))

         # Create two lists - pages and coresponding clicks
         .reduceByKey( lambda x, y: (x[0]+y[0],x[1]+y[1]))
    )
outbound_links_rdd.takeSample(False, 1) # for dev/test, remove on production run

[(u'Copenhagen_Airport',
  ([u'Brussels_Airport',
    u'Brussels_South_Charleroi_Airport',
    u'Brussels_Airlines'],
   [44, 13, 13]))]

### This function that prioritises the inbound and outbound page lists
Sort them in descending order of click counts

In [11]:
from operator import itemgetter

max_length = 100 # truncate lists after this length, as there can be a very long tail of uninteresting low-click pages

# Had to do some messing in Spark to emit the correct list shape after joining & sorting.We'll use this
# function,in a map shortly.
def sort_2_lists(page_title, in_titles, in_counts, out_titles, out_counts) : 
    if len(in_counts) == 0 : # can happen, depending on the join characteristics
        in_counts2 = []
        in_titles2 = []
    else:
        # Pair the page title and count lists, sorts by the counts and emit a new pair of (sorted) lists
        in_titles2, in_counts2 = (list(a) for a in zip(*sorted(zip(in_titles, in_counts), key=itemgetter(1), reverse=True)))
    if len(out_counts) == 0 :
        out_counts2 = [] # can happen, depending on the join characteristics
        out_titles2 = []
    else:
        # Pair the page title and count lists, sorts by the counts and emit a new pair of sorted & truncated lists
        out_titles2, out_counts2 = (list(a) for a in zip(*sorted(zip(out_titles, out_counts), key=itemgetter(1), reverse=True)))
    return page_title, in_titles2[0:max_length], in_counts2[0:max_length], out_titles2[0:max_length], out_counts2[0:max_length]


### Join the inbound and outbound datasets for every page to form a single overall picture of the flow for each page.

In [12]:
def check_tuple(tup): return ([],[]) if tup is None else tup

# Join the inbound and outbound datasets for every page to form a single overall picture of the flow for each page.
all_links_rdd = (inbound_links_rdd

    # Pull it all together into a single dataset, using the page Title as the key on which to join the two datasets. 
    .rightOuterJoin(outbound_links_rdd)

    # Have to do some messing to emit the correct list shape after joining.
    .map(lambda (page_title, (in_tup, out_tup))
         : ((page_title), check_tuple(in_tup), check_tuple(out_tup)))

    # Sort the list by referral count.
    # Have to do some more messing to emit the correct list shape after sorting.
    .map(lambda ((page_title), (in_titles, in_counts), (out_titles, out_counts))
         : sort_2_lists(page_title, in_titles, in_counts, out_titles, out_counts))
    )

all_links_rdd.cache()


PythonRDD[171] at RDD at PythonRDD.scala:43

### Have a look at the result - a complete picture of the page flow for one page

In [14]:
all_links_rdd.takeSample(False, 1) # for dev/test, remove on production run

[(u'Heysel/Heizel_metro_station',
  [u'Brussels_Metro',
   u'Brussels_Metro_line_6',
   u'List_of_Brussels_Metro_stations'],
  [72, 15, 13],
  [u'Brussels_Metro_line_6'],
  [18])]

# 4. Save the output

Note: if we had a complete development pipeline here, we would be writing this back to our application database, not to CSV. However, we do not have a single integrated environment that has Hadoop, Spark, iPython, an application database and a web application server all neatly integrated!  So for this proof of concept application, we will get the data from Hadoop/Spark to our application database via an exported file. 

In [16]:
def to_tsv(page_title, in_titles, in_counts, out_titles, out_counts):
    # Write the inbound & outbound titles and count lists in a form that the web app can easily consume
    in_titles_str = '[' + ' '.join(in_titles) + ']'
    in_counts_str = '[' + ' '.join([str(count) for count in in_counts]) + ']'
    out_titles_str = '[' + ' '.join(out_titles) + ']'
    out_counts_str = '[' + ' '.join([str(count) for count in out_counts]) + ']'
    # Return it as a tab separated line for a .tsv file (not comma-separated, as page titles can contain commas)
    return '\t'.join([page_title, in_titles_str, in_counts_str, out_titles_str, out_counts_str])

# Export the finalised data to a .tsv (in real life, with a full pipeline, we'd inject this into the DB directly)
csv_rdd = all_links_rdd.map(lambda x : to_tsv(x[0], x[1], x[2], x[3], x[4]))
csv_rdd.saveAsTextFile(output_csv)

# Check the output
! ls -la /resources/*{source}*

-rw-rw-r-- 1 notebook resources 254006007 Apr  8 07:47 /resources/2016_03_clickstream_RESULTS.tsv.gz
-rw-rw-r-- 1 notebook resources       379 Apr 11 20:37 /resources/2016_03_clickstream_sample_TEST_DATASET_RESULTS.tsv.gz
-rw-rw-r-- 1     2003 resources       348 Apr  8 12:56 /resources/2016_03_clickstream_sample_TEST_DATASET.tsv.gz
-rw-rw-r-- 1     2003 resources 393452921 Apr  4 20:44 /resources/2016_03_clickstream.tsv.gz


# 5. Testing

## Testing Approach
As noted in Assignment 2, testing 'Big Data' solutions in the wild can be difficult. How can we know if the results we obtained are the correct results?

In this example, there are no other solutions to exchange results and compare with. So we create a set of "expected" results by *manually* replicating our data preparation on a small subset of the data. We then compare that to the 'actual' results produced by our data preparation code above and (for the same small dataset) and ensure that the two match.

This helps enormously with initial debugging, but the main value is in long-term automated testing. Armed with the 'expected' results dataset, we can then confidently make changes to our main analysis code (e.g. to refactor, improve performance, etc). We just re-run the new code, using the same sample and confirm that our actual results still match the 'expected' ones.

## Testing the wikipedia clickstream data
To test our data preparation code, we take 15 sample wikipedia clickstream log entries and treat them as a mini clickstream dataset. We sample pages around the theme of the 2016 Brussels bombings and select two main pages to focus on: [2016_Brussels_bombings](http://en.wikipedia.org/wiki/2016_Brussels_bombings) and [Reactions_to_the_2016_Brussels_bombings](http://en.wikipedia.org/wiki/Reactions_to_the_2016_Brussels_bombings). Within these pages, we select a comprehensive mix of all log entry types:
* Four regular click log entries - where one of our selected pages is either the 'current' page or the page that referred to it.
* Four log entries that are referred from an 'other' source - wikipedia pages that do *not* link directly to the page in question. This usually indicates the user ran a search from the referring page.
* Three log entries for each page that are referred from an 'external' source - such as Google, Facebook, etc.
* In the above entries, a good mix of our pages playing both 'referrer' and 'referred to' role.

Test dataset: See the [TEST_DATA](https://drive.google.com/drive/u/1/folders/0B4KWi5yG-4oGR1FQRXNOQ0VlYTA) folder on Google Drive.

We (manually) work out the expected content of each row in our final dataset:
* The pages - these  will form the Primary Key of our finalised data.
* The set of pages that were clicked through to a given page (in descending order of clicks).
* The set of pages that were clicked on to exit a given page (again, in descending order of clicks).
* We also perform some data cleaning along the way - e.g. consolidating the 'external' referer pages as they are not terribly interesting in the final application.

We then compare those expected results to an actual code run (using the same data) and check that these 'actual' results match the 'expected' ones. 

# Test Results

The full manual test scenario walk-through is quite involved, so it is provided separately to this Notebook (in the [test_data_clickstream_EXPECTED_RESULT.txt](https://github.com/oharapaGitHub/ca675assignment3backend/blob/master/data_mapping/test_data_clickstream_EXPECTED_RESULT.txt) file on Github.

Here is an excerpt from the test scenario walk-through. The EXPECTED row was calculated manually from the test dataset and the ACTUAL row was obtained by running the above Spark application on the same dataset. The results are identical, so we have some confidence that our code is producing the correct results.

```
EXPECTED :
2016_Brussels_bombings	[(external)_websearch_social_media_etc Main_Page Reactions_to_the_2016_Brussels_bombings Timeline_of_ISIL-related_events_(2016) Belgium_national_football_team 2016_Lahore_suicide_bombing Half-mast Belgium Schuman_metro_station]	[292235 145244 1523 985 155 145 69 53 48]	[Reactions_to_the_2016_Brussels_bombings]	[29799]

ACTUAL   :
2016_Brussels_bombings	[(external)_websearch_social_media_etc Main_Page Reactions_to_the_2016_Brussels_bombings Timeline_of_ISIL-related_events_(2016) Belgium_national_football_team 2016_Lahore_suicide_bombing Half-mast Belgium Schuman_metro_station]	[292235 145244 1523 985 155 145 69 53 48]	[Reactions_to_the_2016_Brussels_bombings]	[29799]
RESULT   : CONFIRMED
```