In [1]:
Path = "hdfs:/user/hduser/MovieRecom/ml-100k"  

In [3]:
rawUserData = sc.textFile(Path+'/u.user')
print rawUserData.count()
print rawUserData.first()

943
1|24|M|technician|85711


# Establish DataFrame

In [4]:
rawUserData.take(7)

[u'1|24|M|technician|85711',
 u'2|53|F|other|94043',
 u'3|23|M|writer|32067',
 u'4|24|M|technician|43537',
 u'5|33|F|other|15213',
 u'6|42|M|executive|98101',
 u'7|57|M|administrator|91344']

In [5]:
userRDD = rawUserData.map(lambda line: line.split("|"))
userRDD.take(7)

[[u'1', u'24', u'M', u'technician', u'85711'],
 [u'2', u'53', u'F', u'other', u'94043'],
 [u'3', u'23', u'M', u'writer', u'32067'],
 [u'4', u'24', u'M', u'technician', u'43537'],
 [u'5', u'33', u'F', u'other', u'15213'],
 [u'6', u'42', u'M', u'executive', u'98101'],
 [u'7', u'57', u'M', u'administrator', u'91344']]

In [6]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [7]:
from pyspark.sql import Row
user_Rows = userRDD.map(lambda p:
     Row(
         userid=int(p[0]),
         age=int(p[1]),
         gender=p[2],
         occupation=p[3],
         zipcode=p[4]
    )                              
)
user_Rows.take(5)

[Row(age=24, gender=u'M', occupation=u'technician', userid=1, zipcode=u'85711'),
 Row(age=53, gender=u'F', occupation=u'other', userid=2, zipcode=u'94043'),
 Row(age=23, gender=u'M', occupation=u'writer', userid=3, zipcode=u'32067'),
 Row(age=24, gender=u'M', occupation=u'technician', userid=4, zipcode=u'43537'),
 Row(age=33, gender=u'F', occupation=u'other', userid=5, zipcode=u'15213')]

In [8]:
user_df = sqlContext.createDataFrame(user_Rows)
user_df.printSchema()

root
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- userid: long (nullable = true)
 |-- zipcode: string (nullable = true)



In [10]:
user_df.show(5)

+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 53|     F|     other|     2|  94043|
| 23|     M|    writer|     3|  32067|
| 24|     M|technician|     4|  43537|
| 33|     F|     other|     5|  15213|
+---+------+----------+------+-------+
only showing top 5 rows



# Spark SQL

In [11]:
user_df.registerTempTable("user_table")

In [12]:
sqlContext.sql("SELECT count(*) counts FROM user_table").show()

+------+
|counts|
+------+
|   943|
+------+



In [39]:
sqlContext.sql("""
SELECT count(*) counts 
FROM user_table
""").show()

+------+
|counts|
+------+
|   943|
+------+



In [17]:
sqlContext.sql("""
SELECT * 
FROM user_table
LIMIT 6
""").show()

+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 53|     F|     other|     2|  94043|
| 23|     M|    writer|     3|  32067|
| 24|     M|technician|     4|  43537|
| 33|     F|     other|     5|  15213|
| 42|     M| executive|     6|  98101|
+---+------+----------+------+-------+



Select partial field

In [18]:
userRDDnew = userRDD.map(lambda x:(x[0],x[3],x[2],x[1]))
userRDDnew.take(5)

[(u'1', u'technician', u'M', u'24'),
 (u'2', u'other', u'F', u'53'),
 (u'3', u'writer', u'M', u'23'),
 (u'4', u'technician', u'M', u'24'),
 (u'5', u'other', u'F', u'33')]

method 1 (by field name)

In [19]:
user_df.select("userid","occupation","gender","age").show(5)

+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|     1|technician|     M| 24|
|     2|     other|     F| 53|
|     3|    writer|     M| 23|
|     4|technician|     M| 24|
|     5|     other|     F| 33|
+------+----------+------+---+
only showing top 5 rows



method 2 (by dataframe.fieldname)

In [20]:
user_df.select(user_df.userid, user_df.occupation, user_df.gender, user_df.age).show(5)

+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|     1|technician|     M| 24|
|     2|     other|     F| 53|
|     3|    writer|     M| 23|
|     4|technician|     M| 24|
|     5|     other|     F| 33|
+------+----------+------+---+
only showing top 5 rows



method 3 (by keyword)

In [22]:
user_df[user_df['userid'],user_df['occupation'],user_df['gender'],user_df['age']].show(5)

+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|     1|technician|     M| 24|
|     2|     other|     F| 53|
|     3|    writer|     M| 23|
|     4|technician|     M| 24|
|     5|     other|     F| 33|
+------+----------+------+---+
only showing top 5 rows



# Add additional field

In [24]:
userRDDnew = userRDD.map(lambda x:(x[0],x[3],x[2],x[1],2016-int(x[1])))
userRDDnew.take(5)

[(u'1', u'technician', u'M', u'24', 1992),
 (u'2', u'other', u'F', u'53', 1963),
 (u'3', u'writer', u'M', u'23', 1993),
 (u'4', u'technician', u'M', u'24', 1992),
 (u'5', u'other', u'F', u'33', 1983)]

In [26]:
user_df.select("userid","occupation","gender","age",2016-user_df.age).show(5)

+------+----------+------+---+------------+
|userid|occupation|gender|age|(2016 - age)|
+------+----------+------+---+------------+
|     1|technician|     M| 24|        1992|
|     2|     other|     F| 53|        1963|
|     3|    writer|     M| 23|        1993|
|     4|technician|     M| 24|        1992|
|     5|     other|     F| 33|        1983|
+------+----------+------+---+------------+
only showing top 5 rows



In [27]:
user_df.select("userid","occupation","gender","age",(2016-user_df.age).alias("birthyear")).show(5)

+------+----------+------+---+---------+
|userid|occupation|gender|age|birthyear|
+------+----------+------+---+---------+
|     1|technician|     M| 24|     1992|
|     2|     other|     F| 53|     1963|
|     3|    writer|     M| 23|     1993|
|     4|technician|     M| 24|     1992|
|     5|     other|     F| 33|     1983|
+------+----------+------+---+---------+
only showing top 5 rows



# data filtering

In [29]:
userRDD.filter(lambda r: r[3]=='technician' and r[2]=='M' and r[1]=='24').take(6)

[[u'1', u'24', u'M', u'technician', u'85711'],
 [u'4', u'24', u'M', u'technician', u'43537'],
 [u'456', u'24', u'M', u'technician', u'31820'],
 [u'717', u'24', u'M', u'technician', u'84105'],
 [u'832', u'24', u'M', u'technician', u'77042'],
 [u'889', u'24', u'M', u'technician', u'78704']]

In [30]:
user_df.filter("occupation='technician'").filter("gender='M'").filter("age='24'").show()

+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+



In [32]:
#oneline mode
user_df.filter("occupation='technician' and gender='M' and age=24").show()

+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+



In [35]:
user_df.filter((user_df.occupation=='technician') & (user_df.gender=='M') & (user_df.age==24)).show()

+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+



In [36]:
user_df.filter((user_df['occupation']=='technician') & (user_df['gender']=='M') & (user_df['age']==24)).show()

+---+------+----------+------+-------+
|age|gender|occupation|userid|zipcode|
+---+------+----------+------+-------+
| 24|     M|technician|     1|  85711|
| 24|     M|technician|     4|  43537|
| 24|     M|technician|   456|  31820|
| 24|     M|technician|   717|  84105|
| 24|     M|technician|   832|  77042|
| 24|     M|technician|   889|  78704|
+---+------+----------+------+-------+



# Data ordering

In [37]:
userRDD.takeOrdered(5, key = lambda x:int(x[1]))

[[u'30', u'7', u'M', u'student', u'55436'],
 [u'471', u'10', u'M', u'student', u'77459'],
 [u'289', u'11', u'M', u'none', u'94619'],
 [u'142', u'13', u'M', u'other', u'48118'],
 [u'609', u'13', u'F', u'student', u'55106']]

In [38]:
userRDD.takeOrdered(5, key = lambda x:-1*int(x[1]))

[[u'481', u'73', u'M', u'retired', u'37771'],
 [u'767', u'70', u'M', u'engineer', u'00000'],
 [u'803', u'70', u'M', u'administrator', u'78212'],
 [u'860', u'70', u'F', u'retired', u'48322'],
 [u'559', u'69', u'M', u'executive', u'10022']]

In [40]:
sqlContext.sql("""
SELECT userid,occupation,gender,age 
FROM user_table
ORDER BY age
""").show(5)

+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|    30|   student|     M|  7|
|   471|   student|     M| 10|
|   289|      none|     M| 11|
|   880|   student|     M| 13|
|   628|      none|     M| 13|
+------+----------+------+---+
only showing top 5 rows



In [41]:
sqlContext.sql("""
SELECT userid,occupation,gender,age 
FROM user_table
ORDER BY age DESC
""").show(5)

+------+-------------+------+---+
|userid|   occupation|gender|age|
+------+-------------+------+---+
|   481|      retired|     M| 73|
|   860|      retired|     F| 70|
|   767|     engineer|     M| 70|
|   803|administrator|     M| 70|
|   559|    executive|     M| 69|
+------+-------------+------+---+
only showing top 5 rows



In [42]:
user_df.select("userid","occupation","gender","age").orderBy("age").show(5)

+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|    30|   student|     M|  7|
|   471|   student|     M| 10|
|   289|      none|     M| 11|
|   880|   student|     M| 13|
|   628|      none|     M| 13|
+------+----------+------+---+
only showing top 5 rows



In [43]:
user_df.select("userid","occupation","gender","age").orderBy(user_df.age).show(5)

+------+----------+------+---+
|userid|occupation|gender|age|
+------+----------+------+---+
|    30|   student|     M|  7|
|   471|   student|     M| 10|
|   289|      none|     M| 11|
|   142|     other|     M| 13|
|   609|   student|     F| 13|
+------+----------+------+---+
only showing top 5 rows



In [44]:
user_df.select("userid","occupation","gender","age").orderBy("age", ascending=0).show(5)

+------+-------------+------+---+
|userid|   occupation|gender|age|
+------+-------------+------+---+
|   481|      retired|     M| 73|
|   860|      retired|     F| 70|
|   767|     engineer|     M| 70|
|   803|administrator|     M| 70|
|   559|    executive|     M| 69|
+------+-------------+------+---+
only showing top 5 rows



In [45]:
user_df.select("userid","occupation","gender","age").orderBy(user_df.age.desc()).show(5)

+------+-------------+------+---+
|userid|   occupation|gender|age|
+------+-------------+------+---+
|   481|      retired|     M| 73|
|   860|      retired|     F| 70|
|   767|     engineer|     M| 70|
|   803|administrator|     M| 70|
|   559|    executive|     M| 69|
+------+-------------+------+---+
only showing top 5 rows

