In [None]:
import pyspark
import pyspark.sql.functions as func
import pyspark.sql.types as types
import matplotlib
%matplotlib inline 
sc = pyspark.SparkContext(master='local[*]', appName='MovieLens')
sqlContext = pyspark.sql.SQLContext(sc)

We define some util functions to parse the data,

In [None]:
def to_bool(value):
    '''
    Converts values (0, 1 (non-zero)) to boolean
    
    @param value: int value to convert
    '''
    v = int(value)
    return False if v == 0 else True

In [None]:
def data_from_csv(line):
    '''
    Converts a line of data table from CSV to DataFrame Row
    
    @param line: line of data row 
    @returns: Row of parsed values
    '''
    c = line.split('\t')
    
    row = dict()
    row['userId'] = int(c[0])
    row['itemId'] = int(c[1])
    row['rating'] = int(c[2])
    row['timestamp'] = int(c[3]) # Unix long timestamp, but int is both int and long in Python3 
    
    return pyspark.Row(**row)

In [None]:
def item_from_csv(line):
    '''
    Converts a line of item table from CSV to DataFrame Row
    
    @param line: line of item row 
    @returns: Row of parsed values
    '''
    c = line.split('|')
    
    row = dict()
    row['movieId'] = int(c[0])
    row['movieTitle'] = str(c[1])
    row['releaseDate'] = str(c[2])
    row['videoReleaseDate'] = str(c[3])
    row['imdbUrl'] = str(c[4])
    row['unknown'] = to_bool(c[5])
    row['action'] = to_bool(c[6])
    row['adventure'] = to_bool(c[7])
    row['animation'] = to_bool(c[8])
    row['childrens'] = to_bool(c[9])
    row['comedy'] = to_bool(c[10])
    row['crime'] = to_bool(c[11])
    row['documentary'] = to_bool(c[12])
    row['drama'] = to_bool(c[13])
    row['fantasy'] = to_bool(c[14])
    row['filmNoir'] = to_bool(c[15])
    row['horror'] = to_bool(c[16])
    row['musical'] = to_bool(c[17])
    row['mystery'] = to_bool(c[18])
    row['romance'] = to_bool(c[19])
    row['sciFi'] = to_bool(c[20])
    row['thriller'] = to_bool(c[21])
    row['war'] = to_bool(c[22])
    row['western'] = to_bool(c[23])
    
    return pyspark.Row(**row)

In [None]:
def user_from_csv(line):
    '''
    Converts a line of user table from CSV to DataFrame Row
    
    @param line: line of user row 
    @returns: Row of parsed values
    '''
    c = line.split('|')
    
    row = dict()
    row['userId'] = int(c[0])
    row['age'] = str(c[1])
    row['gender'] = str(c[2])
    row['occupation'] = str(c[3])
    row['zipCode'] = str(c[4])
        
    return pyspark.Row(**row)

Reading the data into DataFrames.

In [None]:
data_rdd = sc.textFile('../../data/ml-100k/u.data').map(data_from_csv)
data = sqlContext.createDataFrame(data_rdd)
data.printSchema()
data.show()

In [None]:
item_rdd = sc.textFile('../../data/ml-100k/u.item').map(item_from_csv)
item = sqlContext.createDataFrame(item_rdd)
item.printSchema()
item.show()

In [None]:
user_rdd = sc.textFile('../../data/ml-100k/u.user').map(user_from_csv)
user = sqlContext.createDataFrame(user_rdd)
user.printSchema()
user.show()