# Pipeline

<img src="images/pipeline.png" alt="raw data" style="width: 600px;"/>

# Step 0: Raw

<img src="images/pipeline-raw.png" alt="raw data" style="width: 600px;"/>

The first step of the pipeline is to retrieve and potentially store the raw data. There are two main sources for OpenEdx data:

|         | Tracking Logs           | MySQL Database  |
| :------------- |:-------------| :-----|
| Kind of data  | clickstream events | course outlines, student responses,<br> demographics  |
| Data format   | compressed JSON      |   depends on table and column <br>(course outlines are compressed bianry) |
| Data location | course VM in  <br> /edx/var/log/tracking/      | course VM  <br> use command "edxapp" |
| Data access | You may need to be given access <br> or ask  for data files to be transferred <br> to a location you can access      | Either requires access permission <br> or work with your admin/data steward <br> to determine what data you need |
| Documentation | Fairly well documented [here](http://edx.readthedocs.io/projects/devdata/en/stable/internal_data_formats/tracking_logs.html)      | Limited documentation [here](http://edx.readthedocs.io/projects/devdata/en/latest/internal_data_formats/index.html) <br> (mostly Edx specific) |
| Retrieving | Set up cronjob to rsync the data <br> from the VM daily | Shell script to query and uncompress <br> data, rsync to transfer |

For example, my crontab looks roughly like this:

```bash
0 0 * * * rsync -av /edx/var/log/tracking/* lmilechin@txe1-login.mit.edu:/path/to/data/tracking_logs/bwedx/
0 0 * * * sh transferOutline.sh
```

And the script I use for querying (I just grab the course outlines) and transferring the outlines is roughly:

```bash
#!/bin/bash

fname=outline-$(date +%Y%m%d%H%M%S).txt
mysql edxapp -B -N -e "select structure_json from course_structures_coursestructure;" | sed 's/\\n/\n/g' | base64 -d | gunzip | sed 's/}{/}\n{/g' > $fname

rsync -av $fname lmilechin@txe1-login.mit.edu:/path/to/data/course_outlines/bwedx/
```

We reccommend working with your administrator or data steward to set up the data transfer. He or she may have a preferred method of transfer. If you have an account on the course VM, you may want to read a bit more about [crontab](http://www.adminschoice.com/crontab-quick-reference) and [rsync](https://linux.die.net/man/1/rsync) (and if you are new to Linux you may want to learn how to use a command line text editor, such as vi or nano, before editing the crontab).

There is also evidence of a REST interface. You may need to be given access, but it seems to come with OpenEdx out of the box. We have discovered a few URLs, but could not find much documentation. It seems to be old (the [documentation](https://open.edx.org/open-edx-rest-apis) I found was from 2015), so I would not rely on it. But it could be worth looking into.

We have already retrieved what we are interested in: the tracking logs and the course outlines from the mySQL database. Here we will just specify the paths to our logs and outlines.

In [1]:
logLoc = "../data/raw/"
outlineLoc = "../data/"

"../data/"

Let's also grab the names of the files that we want to parse.

In [2]:
fnames = filter!(r".gz",readdir(logLoc))
fnames = logLoc.*replace.(fnames,".gz","")

outlinename = outlineLoc*filter!(r"outline",readdir(outlineLoc))[end];

As an example, this is what one line in the raw tracking logs looks like.

```json
{"username": "mhoule", "event_type": "problem_check", "ip": "129.55.200.20", "agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0", "host": "bwedx.mit.edu", "referer": "https://bwedx.mit.edu/courses/course-v1:edX+DemoX+Demo_Course/courseware/interactive_demonstrations/19a30717eff543078a5d94ae9d6c18a5/?child=first", "accept_language": "en;q=1.0, en;q=0.5", "event": {"submission": {"303034da25524878a2e66fb57c91cf85_2_1": {"input_type": "choicegroup", "question": "", "group_label": "", "response_type": "multiplechoiceresponse", "answer": "Neither - because there is no 'correct' frame of reference, the only true statement can be that there was no lock-to-key match.", "variant": "", "correct": true}}, "success": "correct", "grade": 1, "correct_map": {"303034da25524878a2e66fb57c91cf85_2_1": {"hint": "", "hintmode": null, "correctness": "correct", "npoints": null, "answervariable": null, "msg": "", "queuestate": null}}, "state": {"correct_map": {}, "input_state": {"303034da25524878a2e66fb57c91cf85_2_1": {}}, "has_saved_answers": null, "seed": 1, "done": null, "student_answers": {}}, "answers": {"303034da25524878a2e66fb57c91cf85_2_1": "choice_2"}, "attempts": 1, "max_grade": 1, "problem_id": "block-v1:edX+DemoX+Demo_Course+type@problem+block@303034da25524878a2e66fb57c91cf85"}, "event_source": "server", "context": {"course_user_tags": {}, "user_id": 6, "org_id": "edX", "asides": {}, "module": {"usage_key": "block-v1:edX+DemoX+Demo_Course+type@problem+block@303034da25524878a2e66fb57c91cf85", "display_name": "Attributing Blame"}, "course_id": "course-v1:edX+DemoX+Demo_Course", "path": "/courses/course-v1:edX+DemoX+Demo_Course/xblock/block-v1:edX+DemoX+Demo_Course+type@problem+block@303034da25524878a2e66fb57c91cf85/handler/xmodule_handler/problem_check"}, "time": "2017-09-15T13:42:58.153338+00:00", "page": "x_module"}
```

To give you an idea, there are maybe a hundred or so lines in each log file out of thousands that contain anything interesting.

# Step 1: Parse

<img src="images/pipeline-parse.png" alt="parse data" style="width: 600px;"/>

Properly parsing data is perhaps the trickiest and most important part of the pipeline. This is the step where raw data that is often not even human readable is processed to yield data that can provide insight. Some considerations when designing a parser might be:
* human readability
* required format for tools
* ability to easily filter and recombine
* ability to perform target operations
* field relevance
* data enrichment
* target analytics or visualizations

The D4M parser provided in this demo filters out some of the irreleveant data, combines a few data sources to enrich the final dataset, and produces Associative Arrays. The resulting associative arrays are easy to manipulate and filter, and can be ingested into a database.

First add the required packages and parser.

In [3]:
using D4M,JSON,JLD
include("../../src/julia/parser.jl")

Loaded /Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home/jre/lib/server/libjvm.dylib


extractnames (generic function with 1 method)

Specify where we will save our parsed data files and what we will call them.

In [4]:
saveLoc = "../data/parsed/julia/"
savenames = replace.(fnames,logLoc,saveLoc).*"-A.jld";

Now we can parse the data using our parser.

In [5]:
parseFile.(fnames,savenames,outlinename);

30-element Array{Any,1}:
 "../data/parsed/julia/tracking.log-20170911-1505128621-A.jld"
 "../data/parsed/julia/tracking.log-20170916-1505593021-A.jld"
 "../data/parsed/julia/tracking.log-20170922-1506082621-A.jld"
 "../data/parsed/julia/tracking.log-20171012-1507821421-A.jld"
 "../data/parsed/julia/tracking.log-20171016-1508163421-A.jld"
 "../data/parsed/julia/tracking.log-20171018-1508375821-A.jld"
 "../data/parsed/julia/tracking.log-20171024-1508876221-A.jld"
 "../data/parsed/julia/tracking.log-20171112-1510503421-A.jld"
 "../data/parsed/julia/tracking.log-20171122-1511367421-A.jld"
 "../data/parsed/julia/tracking.log-20171128-1511875021-A.jld"
 "../data/parsed/julia/tracking.log-20171204-1512429421-A.jld"
 "../data/parsed/julia/tracking.log-20171211-1512969421-A.jld"
 "../data/parsed/julia/tracking.log-20171217-1513495021-A.jld"
 ⋮                                                            
 "../data/parsed/julia/tracking.log-20180112-1515759421-A.jld"
 "../data/parsed/julia/trackin

Here is an example of a parsed event:

In [6]:
A = load("../data/parsed/julia/tracking.log-20180303-1520104621-A.jld")["A"]
printTriple(A[24,:])

(20180303-1520104621_0024,context_course_id|course-v1:LLx+LLX07+Q4_2017)	1
(20180303-1520104621_0024,context_org_id|LLx)	1
(20180303-1520104621_0024,context_path|https://bwedx.mit.edu/courses/course-v1:LLx+LLX07+Q4_2017/courseware)	1
(20180303-1520104621_0024,context_user_id|9)	1
(20180303-1520104621_0024,event_source|server)	1
(20180303-1520104621_0024,event_type|navigation)	1
(20180303-1520104621_0024,host|bwedx.mit.edu)	1
(20180303-1520104621_0024,ip|18.111.1.40)	1
(20180303-1520104621_0024,referer|https://bwedx.mit.edu/courses/course-v1:LLx+LLX07+Q4_2017/info)	1
(20180303-1520104621_0024,time|2018-02-27T17:13:15.351329+00:00)	1
(20180303-1520104621_0024,username|LaurenMilechin)	1


A D4M associative array is constructed with triples: two keys (row and column) map to a single value for each entry in the associative array. It can also be conceptualized as a sparse matrix, where the row and column indexes are strings.

Row keys are unique intentifiers for each event. I have formed these by taking the tracking log file name and the event number (the order that the events occurred in the file).

Column keys contain the information about the event. Most of them are created by taking the field name and combining it with the value. There are specialized functions for creating the column keys corresponding to problem events and module/section names. Thus, the main body of the parser creates the column keys.

In the Julia parser, the core of the `makecols` function is:

```julia
# dict- the dictionary returned by the JSON parser on the current log line
for key in keys(dict)

    # If field is a scalar (single) string or number, just add it
    if (isa(dict[key],String) || isa(dict[key],Number)) && ~isempty(dict[key])
        cols = [cols; prefix*key*'|'*replace(string(dict[key]),"\n","")]

        # If field is a non-empty array, add each element
    elseif isa(dict[key],Array) && ~isempty(dict[key])  # not supporting array of dicts for now
        if ~isa(dict[key][1],Dict)
            cols = [cols; (prefix*key*'|').*replace.(dict[key],"\n","")]
        else
            cols = [cols; join(join.(makecols.(dict[key],allOutlines,prefix*key*'_')))]
        end

    # If field is a struct, recursively call makecols on the struct
    elseif isa(dict[key],Dict)
        cols = [cols; makecols(dict[key],allOutlines,prefix*key*'_')]
    end
end
```

This occurs at the end of the function (roughly line 150 in parser.jl), after we have removed the fields we are not interested in, parsed the "event" field of problem_check events, and created column keys specifying navigation and course location information. It is fairly simple to add more functionality to this parser to do more enrichment or filter out more data.

The column keys are returned in an array the length of the log with an array of column keys for each parsed line of the log. Since the Assoc constructor requires arrays of the same length, to create the row keys we create a unique ID for each row in the log, and duplicate each row key to match the number of column keys for each line of the log. We then flatten both the row and column keys to create two arrays of strings of equal length:

```julia
# Parse the JSON log file into dict
log = JSON.parse.(log);

# Make column keys
cols = makecols.(log,allOutlines)

# Finish column keys
cIDs = split(join(join.(cols[cols .!= ""],'\n'),'\n'),'\n')

# Make row keys
lens = length.(cols)
rIDs = (fname[14:end]*'_').*lpad.([1:length(log);],4,0).*'\n';
rIDs = split(join(repeat.(rIDs,lens),"")[1:end-1],'\n')

# Form associative array
A = Assoc(rIDs, cIDs, 1)
```


# Step 2: Ingest

<img src="images/pipeline-ingest.png" alt="ingest data" style="width: 600px;"/>

If you are using a database, this is where you would ingest your data. I am using Accumulo. Note that currently if you want to use the database features of D4M you will need to checkout the "db" branch in Julia (`Pkg.checkout("D4M","db")`).

If you want to test this out, I suggest using [Apache Uno](https://github.com/apache/fluo-uno)- this will download all of Accumulo's dependencies and configure it for you. It may not work on Windows.

Note: dbinit() may cause segmentation fault when run on OSX- this is cause by the JVM used for database operations (See [JavaCall FAQ](http://juliainterop.github.io/JavaCall.jl/faq.html)) and is nothing to worry about.

In [7]:
# Pkg.checkout("D4M","db")
using JavaCall
dbinit()


signal (11): Segmentation fault: 11
while loading In[7], in expression starting on line 3
unknown function (ip: 0x1287362b3)
Allocations: 28987961 (Pool: 28978812; Big: 9149); GC: 73


Set connects to database and database tables. The "addColCombiner" command ensures that the "counts" in the degree table are incremented as the individual summed associative arrays are ingested.

In [48]:
DB = dbsetup("uno","db.conf")
myname = "bwedx_"

"bwedx_"

The schema we use is called the D4M schema. It consists of a main table and its transpose. This main table is often called the edge table, or incidence table, since it can be interpreted as an incidence matrix representation of a graph. The transpose table allows you to index your data by both row and column keys. Accumulo does very fast row queries, the transpose table puts your column keys in Accumulo's rows, giving you better query performace at the cost of increasing your data size. In D4M, we use a single object called a table pair to hold the edge table and its transpose, it handles ingesting data into both tables and queries the appropriate table.

The third table is the degree table (again named with graph terminology- the degree of a vertex is the number of other vertices it is connected to), which contains the number of times each column key occurs throught the database. It is good for query planning.

Create the tables that will hold the data. 

In [8]:
# Create/connect to tables
Tedge = DB[myname*"Tedge",myname*"TedgeT"]
TedgeDeg = DB[myname*"TedgeDeg"]

# Delete if it contains leftover data (we are only doing this to have a clean example)
if nnz(Tedge) != 0 || nnz(TedgeDeg) != 0
    delete(Tedge); delete(TedgeDeg)
    Tedge = DB[myname*"Tedge",myname*"TedgeT"]
    TedgeDeg = DB[myname*"TedgeDeg"]
end

addColCombiner(TedgeDeg,"Degree,","sum");

Creating bwedx_Tedge in uno
Creating bwedx_TedgeT in uno
Creating bwedx_TedgeDeg in uno


In [9]:
parseLoc = "../data/parsed/julia/"
parsenames = parseLoc.*readdir(parseLoc);

Now let's ingest the data. We can watch on the [Accumulo Monitor](http://localhost:9995/).

In [10]:
for i = 1:length(parsenames)
    A = load(parsenames[i])["A"]
    colName = convert(Array{Union{AbstractString,Number}},["Degree"])
    Adeg = putCol(sum(A,1)',colName)
    put(Tedge,A)
    put(TedgeDeg,Adeg)
end

# Step 3: Query

<img src="images/pipeline-query.png" alt="parse data" style="width: 600px;"/>

Let's quickly see how many events occured in each module.

In [22]:
deg = TedgeDeg[:,:]

printTriple(deg[StartsWith("module_name"),:])

(module_name|Build Instructions,Degree)	33
(module_name|Doppler Processing,Degree)	31
(module_name|Example Week 1: Getting Started,Degree)	86
(module_name|Final,Degree)	3
(module_name|Introduction,Degree)	13
(module_name|Introduction to High Performance Scientific Computing,Degree)	5
(module_name|Introduction to the LLSC Systems,Degree)	14
(module_name|Overview,Degree)	8
(module_name|Resources,Degree)	65
(module_name|Submitting, Monitoring, and Stopping Jobs,Degree)	14
(module_name|TX-E1 Basics,Degree)	26
(module_name|The Analyst Challenge,Degree)	686
(module_name|The Data Challenge,Degree)	232
(module_name|The System Challenge,Degree)	541
(module_name|Use Case 2: MapReduce,Degree)	5
(module_name|Use Case 4: Parallel Computing,Degree)	32
(module_name|Use Case 5: Databases and Data Analytics,Degree)	22
(module_name|Using the LLSC Systems,Degree)	20
(module_name|Welcome,Degree)	68


Now let's find all the answers to a particular question.

In [20]:
# Find all events that contain the question we are interested in
Atmp = Tedge[:,"event_question_2_1|Accumulo's data model can be interpreted as:,"]
A = Tedge[Row(Atmp),:]

printTriple(A[:,StartsWith("event_answer")])

(20171226-1514348222_0033,event_answer_2_1|Any["Semantic triples", "Incidence matrix of a graph", "Many small dense tables"])	1
(20171226-1514348222_0028,event_answer_2_1|Any["Semantic triples", "Incidence matrix of a graph"])	1


For this toy dataset the data size is not too big, we could probably query the entire table and be fine. However, that is not usually the case. When you are querying a large amount of data, you'll need to use an iterator. Here is an example from a realy course that we are offering.

We've connected to a database instance with real course data from one of our platforms. We are interested in navigation events from a particular course.

```julia
medlytics = "course_id|course-v1:BWSI+BWSI150+Winter_2018\n"
navEvents = "event_type|navigation\n"

println("Events in Medlytics Course: "*Val(TedgeDeg[medlytics,:])[1])
println("Navigation events: "*Val(TedgeDeg[navEvents,:])[1])
```
`Events in Medlytics Course: 24939`

`Navigation events: 240077`

Since there are more navigation events than events in the Medlytics course, we are going to first query all events in the Medlytics course, then filter out the navigation events. Because we are stil going to be retrieving a large number of entries, we will user an iterator so we don't overwhelm the database.

```julia
MedIt = getiterator(Tedge,5000)
Atmp = MedIt[:,medlytics]
MedEvents = Assoc("","","");

# While there are still elements in Atmp
while nnz(Atmp) > 0
    # Only grab navigation events
    Afull = Tedge[Row(Atmp),:]
    Anav = Afull[Row(Afull[:,"event_type|navigation\n"]),:]
    
    MedEvents = MedEvents + Anav
    
    # Get next query
    Atmp = MedIt[]
end

size(MedEvents)
```
`(15757, 16494)`

There are roughly 15,000 navigation events for this course.

# Step 4: Analyze/Visualize

<img src="images/pipeline-analyze.png" alt="parse data" style="width: 600px;"/>

Let's say we're interested in seeing a general trend of course traversal: do students go through the course linearly, or do they tend to jump around? We'll use the data from the Medlytics course example above.

In [46]:
using Plots
plotly()

A = load("courseNav.jld")["A"];

First we get all events that involve a navigation within the course. For example, we can see what modules and sections students have navigated to from the first section of the first module.

In [29]:
courseNavs = A[Row(A[:,StartsWith("new_course_loc|")]),:]
prevLoc = courseNavs[:,StartsWith("last_course_loc|")]
newLoc = courseNavs[:,StartsWith("new_course_loc|")]
navgraph = prevLoc'*newLoc

printTriple(navgraph[2,:])

(last_course_loc|001.001,new_course_loc|001.002)	7
(last_course_loc|001.001,new_course_loc|001.003)	1
(last_course_loc|001.001,new_course_loc|002.001)	3
(last_course_loc|001.001,new_course_loc|002.002)	4
(last_course_loc|001.001,new_course_loc|002.003)	1
(last_course_loc|001.001,new_course_loc|002.004)	1
(last_course_loc|001.001,new_course_loc|003.002)	5
(last_course_loc|001.001,new_course_loc|003.003)	1
(last_course_loc|001.001,new_course_loc|004.002)	1


Now we can plot a heatmap to see the most frequent traversals.

In [31]:
thresh = 5

ys = String.(Row(navgraph>thresh))
xs = String.(Col(navgraph>thresh))
z = full((navgraph>thresh).A)
heatmap(xs, ys, z, yflip=true, xrotation=45)

Curous about what that hotspot is? By hovering over it on the heatmap, we can see it's from location 003.002 to 003.003 (module 3 section 2-3). What sections are these in the course?

Here is a fancy way you can do an "AND" query.

In [45]:
# Find all rows with both last_course_loc|003.002 and new_course_loc|003.003
Atmp = sum(A[:,"last_course_loc|003.002,"] + A[:,"new_course_loc|003.003,"],2) == 2

# Grab the module and section names
popNav = A[Row(Atmp),StartsWith("last,new,")]

Col(popNav)

6-element Array{Union{AbstractString, Number},1}:
 "last_course_loc|003.002"                          
 "last_module_name|Module 2: Introduction to Coding"
 "last_section_name|Git"                            
 "new_course_loc|003.003"                           
 "new_module_name|Module 2: Introduction to Coding" 
 "new_section_name|Python"                          