# Preprocessing

In [1]:
from pyspark import SparkContext

# Defining the spark context
sc = SparkContext("local[*]", "ADA")
# Display the parallelism of the current context
sc.defaultParallelism

8

In [2]:
import utils

# Configuration

In [3]:
config = {
    'sessions': ['progfun-003']
}

# Importing the data

## Meta data

In [4]:
meta = {}
for session in config['sessions']:
    meta[session] = {}

In [5]:
for session in config['sessions']:
    meta[session]['problems'] = {}
    for ProblemID, data in (sc
        .textFile('data/' + session + '_Problem_Events_with_Info.csv')
        .filter(utils.filter_problems)
        .map(utils.getHardCloseTime)
        .reduceByKey(lambda a, b : a)
        .collect()
    ):
        meta[session]['problems'][ProblemID] = data
meta[config['sessions'][0]]['problems'] 

{7: {'HardCloseTime': 1380452400, 'OpenTime': 1379242800},
 21: {'HardCloseTime': 1381057200, 'OpenTime': 1379847600},
 22: {'HardCloseTime': 1384689600, 'OpenTime': 1382875200},
 23: {'HardCloseTime': 1382785200, 'OpenTime': 1381057200},
 24: {'HardCloseTime': 1381662000, 'OpenTime': 1380452400},
 25: {'HardCloseTime': 1383480000, 'OpenTime': 1382266800}}

In [6]:
for session in config['sessions']:
    meta[session]['videos'] = {}
    for VideoID, data in (sc
        .textFile('data/' + session + '_Video_Info.csv')
        .map(lambda x: x.split(','))
        .filter(lambda x: 
            (not x[1] == 'None') 
            and (not x[1] == 'OpenTime') 
        )
        .map(lambda x: (int(x[7]), { 'OpenTime': int(x[1]) }))
        .reduceByKey(lambda a, b : a)
        .collect()
    ):
        meta[session]['videos'][VideoID] = data
meta[config['sessions'][0]]['videos']

{2: {'OpenTime': 1348034580},
 3: {'OpenTime': 1379242800},
 4: {'OpenTime': 1379242800},
 5: {'OpenTime': 1379242800},
 6: {'OpenTime': 1379242800},
 7: {'OpenTime': 1379242800},
 8: {'OpenTime': 1379242800},
 9: {'OpenTime': 1347974700},
 10: {'OpenTime': 1347974760},
 11: {'OpenTime': 1347974760},
 12: {'OpenTime': 1347974760},
 13: {'OpenTime': 1347974820},
 21: {'OpenTime': 1379242800},
 23: {'OpenTime': 1379242800},
 25: {'OpenTime': 1379242800},
 27: {'OpenTime': 1379242800},
 29: {'OpenTime': 1379242800},
 33: {'OpenTime': 1379242800},
 35: {'OpenTime': 1379847600},
 37: {'OpenTime': 1379847600},
 39: {'OpenTime': 1379847600},
 41: {'OpenTime': 1379847600},
 43: {'OpenTime': 1379847600},
 47: {'OpenTime': 1379847600},
 49: {'OpenTime': 1379847600},
 51: {'OpenTime': 1380452400},
 53: {'OpenTime': 1380452400},
 71: {'OpenTime': 1381057200},
 73: {'OpenTime': 1381662000},
 75: {'OpenTime': 1380452400},
 77: {'OpenTime': 1381057200},
 79: {'OpenTime': 1381057200},
 81: {'OpenTime'

In [7]:
for session in config['sessions']:
    meta[session]['matchingP'] = {}
    meta[session]['matchingV'] = {}
    for ProblemID in meta[session]['problems'].keys():
        meta[session]['matchingP'][ProblemID] = []
    for ProblemID in meta[session]['problems'].keys():
        for VideoID in meta[session]['videos'].keys():
            if meta[session]['problems'][ProblemID]['OpenTime'] == meta[session]['videos'][VideoID]['OpenTime']:
                meta[session]['matchingP'][ProblemID].append(VideoID)
                meta[session]['matchingV'][VideoID] = ProblemID
print(meta[config['sessions'][0]]['matchingP'])
print(meta[config['sessions'][0]]['matchingV'])

{21: [35, 37, 39, 41, 43, 47, 49], 22: [117, 119, 121, 123, 125, 127], 7: [3, 4, 5, 6, 7, 8, 21, 23, 25, 27, 29, 33], 24: [51, 53, 75], 25: [103, 105, 107, 109, 111, 113, 115], 23: [71, 77, 79, 81, 83, 85, 87]}
{83: 23, 3: 7, 4: 7, 5: 7, 6: 7, 7: 7, 8: 7, 75: 24, 77: 23, 103: 25, 79: 23, 115: 25, 81: 23, 107: 25, 21: 7, 23: 7, 25: 7, 71: 23, 27: 7, 29: 7, 33: 7, 35: 21, 113: 25, 37: 21, 39: 21, 41: 21, 43: 21, 109: 25, 47: 21, 49: 21, 53: 24, 51: 24, 105: 25, 117: 22, 119: 22, 111: 25, 121: 22, 87: 23, 123: 22, 125: 22, 85: 23, 127: 22}


## Importing Event Data

In [8]:
# Reading csv files: Create RDD () with one string entry per line in the file
rdd_problem_events = sc.textFile("data/"+session+"_Problem_Events_with_Info.csv")
rdd_video_events = sc.textFile("data/"+session+"_Video_Events.csv")
rdd_forum_events = sc.textFile("data/"+session+"_Forum_Events.csv")

In [9]:
def printHeader(rdd):
    print(list(enumerate(rdd.first().split(','))),'\n')

print("--- PROBLEM_EVENTS ---")
printHeader(rdd_problem_events)

print("--- VIDEO_EVENTS ---")
printHeader(rdd_video_events)

print("--- FORUM_EVENTS ---")
printHeader(rdd_forum_events)

--- PROBLEM_EVENTS ---
[(0, 'EventID'), (1, 'ForumUserID'), (2, 'MaximumSubmissions'), (3, 'AccountUserID'), (4, 'SubmissionNumber'), (5, 'Grade'), (6, 'TimeStamp'), (7, 'DataPackageID'), (8, 'ProblemID'), (9, 'SoftCloseTime'), (10, 'ProblemType'), (11, 'HardCloseTime'), (12, 'Platform'), (13, 'OpenTime'), (14, 'EventType'), (15, 'Title'), (16, 'SessionUserID'), (17, 'UniqueProblemID'), (18, 'UniqueUserID')] 

--- VIDEO_EVENTS ---
[(0, 'EventID'), (1, 'ForumUserID'), (2, 'OldTime'), (3, 'AccountUserID'), (4, 'CurrentTime'), (5, 'SeekType'), (6, 'TimeStamp'), (7, 'DataPackageID'), (8, 'UniqueRowID'), (9, 'TableName'), (10, 'VideoID'), (11, 'Platform'), (12, 'NewSpeed'), (13, 'EventSource'), (14, 'EventType'), (15, 'SessionUserID'), (16, 'NewTime'), (17, 'OldSpeed')] 

--- FORUM_EVENTS ---
[(0, 'EventID'), (1, 'ForumUserID'), (2, 'PostID'), (3, 'AccountUserID'), (4, 'TimeStamp'), (5, 'DataPackageID'), (6, 'UniqueRowID'), (7, 'TableName'), (8, 'Platform'), (9, 'EventSource'), (10, 'PostTy

In [10]:
def removeHeader(row):
    return not row.startswith('EventID')

rdd_problem_events = rdd_problem_events.filter(removeHeader)
rdd_video_events = rdd_video_events.filter(removeHeader)
rdd_forum_events = rdd_forum_events.filter(removeHeader)

In [11]:
rdd_problem_events_parsed = rdd_problem_events.filter(utils.filter_problems).map(utils.parse_problems)
rdd_video_events_parsed = rdd_video_events.filter(utils.filter_videos).map(utils.parse_videos)
rdd_forum_events_parsed = rdd_forum_events.filter(utils.filter_forums).map(utils.parse_forums)

In [12]:
# Handles issue with the forum events table having 'AccountUserID' instead of 'SessionUserID'
# Using the table progfun-002_User_Hash_Mapping
rdd_user_mapping = sc.textFile("data/"+config['sessions'][0]+"_User_Hash_Mapping.csv")

def f(x):
    x[1][1]['SessionUserID']=x[1][0] 
    return x[1][1]

rdd_forum_events_parsed = (rdd_user_mapping
    # INPUT of filter: Comma separated string
    .filter(lambda x: not x.startswith("ForumUserID"))
    # INPUT of map: Comma separated string
    .map(lambda x:(x.split(",")[1],x.split(",")[6]))
    # INPUT of join:
    # self= (key=AccountUserId (str), value=SessionUserId (str))
    # arg= (key=AccountUserId (str), value=ForumEvent (dict))
    .join(rdd_forum_events_parsed
        # INPUT of map: ForumEvent (dict)
        .map(lambda x: (x['AccountUserID'],x))
    )
    # (AccountUserId (str), (SessionUserId (str), ForumEvent (dict)))
    .map(f)
)

print(rdd_forum_events_parsed.count())
print(rdd_forum_events_parsed.take(1)[0])

788672
{'AccountUserID': '4754960', 'EventSubType': 'Load', 'DataPackageID': 'progfun-003', 'TimeStamp': 1384980685, 'EventType': 'Forum', 'SessionUserID': '365d40969bd5056c914e1c1cd2837a2193647da5'}


In [13]:
# Concatenantes all three table into one big table
rdd_events = (rdd_problem_events_parsed
    .union(rdd_video_events_parsed)
    .union(rdd_forum_events_parsed)
)
rdd_events.persist()
print(rdd_events.count())
print(rdd_events.first())

9027912
{'EventSubType': 'Check', 'Grade': 10.0, 'Title': 'Functional Sets / Functional Sets', 'ProblemType': 'Assignment', 'HardCloseTime': 1381057200, 'ProblemID': 21, 'DataPackageID': 'progfun-003', 'TimeStamp': 1384935108, 'EventType': 'Problem', 'SessionUserID': '17bf18d86b2d7a5b7aa4cbe19816b7b55a50be24'}


# Patterns from event sequences

In [14]:
rdd_events_by_students = (rdd_events
    # INPUT: Event (dict)
    .map(lambda x: (x['SessionUserID'],x))
    # INPUT: (SessionUserId (str), Event (dict))
    .groupByKey()
    # INPUT: (SessionUserId (str), [ Event (dict), ... ])
    .map(lambda x: (x[0],sorted(x[1], key=(lambda event: event['TimeStamp']))))
)

rdd_events_by_students.persist()
print("Number of students: %d" % rdd_events_by_students.count()) 

Number of students: 26660


In [15]:
# INPUT: (SessionStudentId (str), [ Event (dict), ... ])
session = config['sessions'][0]

def extractPatterns(x):
    StudentID = x[0]
    events = x[1]
    
    problemIDs = meta[session]['matchingP'].keys()
    videoIDs = meta[session]['matchingV'].keys()
    
    patterns = {}
    for ProblemID in problemIDs:
        patterns[ProblemID] = []
    
    curProblem = -1
    for event in events:
        if event['EventType']=='Problem':
            curProblem = event['ProblemID']
            patterns[curProblem].append(event)
        if event['EventType']=='Video':
            if event['VideoID'] in videoIDs:
                curProblem = meta[session]['matchingV'][event['VideoID']]
                patterns[curProblem].append(event)
        if event['EventType']=='Forum':
            if curProblem>0:
                patterns[curProblem].append(event)
                
    for ProblemID in problemIDs:
        patternStr = utils.patternToString(patterns[ProblemID],'minimal')
        lastProblemIndex = patternStr.rfind('P')
        patterns[ProblemID] = patterns[ProblemID][:(lastProblemIndex+1)]
    
    return [ {
        'StudentID': StudentID,
        'ProblemID': ProblemID,
        'Pattern': patterns[ProblemID]
    } for ProblemID in problemIDs ] 


rdd_patterns = (rdd_events_by_students
    # INPUT: (SessionStudentID (str), [ Event (dict), ... ] )
    .flatMap(extractPatterns)
)

rdd_patterns.persist()

PythonRDD[48] at RDD at PythonRDD.scala:48

In [16]:
def cutAfterHardCloseTime(item):
    StudentID = item['StudentID']
    ProblemID = item['ProblemID']
    Pattern = item['Pattern']
        
    return {
        'StudentID': StudentID,
        'ProblemID': ProblemID,
        'Pattern': [event for event in Pattern if event['TimeStamp'] < meta[session]['problems'][ProblemID]['HardCloseTime']]
    }

rdd_patterns = (rdd_patterns
    .map(cutAfterHardCloseTime)
)

In [17]:
rdd_patterns = (rdd_patterns
    .filter(lambda x: 'P' in utils.patternToString(x['Pattern'], 'minimal'))
)

rdd_patterns.persist()

PythonRDD[49] at RDD at PythonRDD.scala:48

In [18]:
def displayStudentsPatterns(rdd, n):
    for item in rdd.take(n):
        print(item['ProblemID'],'\t', item['StudentID'], end='\t>>>  ')
        for event in item['Pattern']:
            print(utils.eventToString(event), end=" ")
        print()
    print()

displayStudentsPatterns(rdd_patterns, 10)

21 	 2d84329202510c31f835d09699bc0c009a02f051	>>>  V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 V35 F F V37 V37 V37 V37 V37 V37 V37 V37 V37 V37 V37 P21 
7 	 2d84329202510c31f835d09699bc0c009a02f051	>>>  V8 V8 V8 V8 V8 V8 V8 V8 V8 V8 V8 V3 V3 V3 F F F V3 F F F F F F F V3 V3 V3 V8 V8 V8 V8 V8 V8 V8 V8 V8 V8 V8 V8 V8 V8 V3 V3 V3 V3 V3 V3 V3 V3 V3 V3 V3 V3 V3 V3 V4 V4 V4 V5 V5 V5 V5 V5 V5 F F F F F F F F F F F F V6 V6 V6 V6 V6 F F F F V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V6 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V7 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 V33 F F F F P7 P7 
21 	 a53f4f24fdc3791e35da6d45cda5126e73b367a8	>>>  V35 V35 P21 P21 
7 	 a53f4f24fdc3791e35da6d45cda5126e73b367a8	>>>  V8 V8 V8 F F F F F F F F F F F V3 V3 V3 V3 V3 V3 V3 V3 V3 V3 F F F F F F 

In [19]:
import json
import os
import shutil

directory='data/spark/preprocessed'
if os.path.exists(directory):
    shutil.rmtree(directory)

rdd_patterns.map(json.dumps).saveAsTextFile(directory)