In [1]:
from pyspark.sql.types import *
from pyspark.sql import Row
from time import time

# Introduction

Spark has the conceptual equivalent of R and pandas dataframes available through the DataFrames API. In Spark DataFrames may be created from existing RDDs, structured data files, external databases, or tables in Hive, etc. These DataFrames allow for the manipulation of distributed data through operations such as filter, groupby, and join, which are automatically optimized through parallelization and distribution across clusters when an action triggers the job.

We are going to use the dating profiles dataset for this tutorial. More information on this dataset can be found [here](https://sites.google.com/a/insightdatascience.com/spark-lab/s3-data/dating-profiles).

# Loading in the data
First, read in the data into an RDD. Here, we are using 16 partitions for our RDD.

In [2]:
# Read in raw ratings data (fromUserId, toUserId, rating)
ratingsCsvRDD = sc.textFile("s3a://insight-spark-after-dark/ratings.csv.gz").repartition(12)

Reformat the data into a JSON format by applying the function rec_tup (which converts the data into a JSON format) to the RDD using the .map and .toDF() transformation steps.

In [3]:
# Convert raw ratings RDD to Json RDD
def rec_tup(rating):
    tokens = rating.split(",")
    return Row(fromUserId=int(tokens[0]), toUserId=int(tokens[1]), rating=int(tokens[2]))

ratingsJson_DF = ratingsCsvRDD.map(rec_tup).toDF()
ratingsJson_DF.take(5)

[Row(fromUserId=1, rating=5, toUserId=4842),
 Row(fromUserId=1, rating=4, toUserId=12638),
 Row(fromUserId=1, rating=10, toUserId=18345),
 Row(fromUserId=1, rating=10, toUserId=22319),
 Row(fromUserId=1, rating=10, toUserId=26084)]

As you might remember, Spark does a Lazy evaluation which means each transformed RDD may be recomputed each time you run an action on it. We'll use the .persist method (this is actually also a transformation) to keep the result (transformed RDD) on the cluster for quick future access.

**Note**: persist is a transformation, it will only run when a future action is called. One result of this is that if you run an action twice, you will persist twice (and right now in Spark you will lose the pointer to the first persist).

In [4]:
# Cache the SchemaRDD as we'll be using this heavily moving forward
ratingsJson_DF.persist(StorageLevel.MEMORY_AND_DISK_SER)

DataFrame[fromUserId: bigint, rating: bigint, toUserId: bigint]

Quickly check the schema of the dataframe. 
To write SparkSQL, we need to create a table object from our dataframe which we can use to run SparkSQL commands. The transformation registerTempTable() does this and we call our table 'ratingsJsonTable'.

In [5]:
# Describe the SchemaRDD inferred from the JSON
ratingsJson_DF.printSchema()

root
 |-- fromUserId: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- toUserId: long (nullable = true)



# On to Spark DataFrames

Spark DataFrames is an abstract API that lets you perform Pandas or R like commands to your dataset.

In [6]:
# Show the top 10 most-active users who are giving out ratings
mostActiveUsersSchemaDF = ratingsJson_DF.select("fromUserId","toUserId")\
                                        .groupBy("fromUserId")\
                                        .count()\
                                        .withColumnRenamed("count", "num_ratings")\
                                        .orderBy(["num_ratings"], ascending=[0])
mostActiveUsersSchemaDF.take(10)

[Row(fromUserId=90280, num_ratings=25042),
 Row(fromUserId=56792, num_ratings=21599),
 Row(fromUserId=33639, num_ratings=19908),
 Row(fromUserId=61436, num_ratings=18810),
 Row(fromUserId=72351, num_ratings=18443),
 Row(fromUserId=127227, num_ratings=18342),
 Row(fromUserId=58765, num_ratings=18197),
 Row(fromUserId=76082, num_ratings=18019),
 Row(fromUserId=108318, num_ratings=17755),
 Row(fromUserId=131976, num_ratings=17560)]

# Sampling
We can also sample the dataframe with the sample() transformation. Great way to test the job before running on the full dataset.

Sample takes 3 arguments: **oversample (True/False), fraction of data in sample, seed**

In [7]:
ratingsJson_DF.sample(False, 0.1, 20).collect()

[Row(fromUserId=1, rating=1, toUserId=82536),
 Row(fromUserId=1, rating=10, toUserId=98556),
 Row(fromUserId=1, rating=10, toUserId=138830),
 Row(fromUserId=2, rating=9, toUserId=196265),
 Row(fromUserId=3, rating=7, toUserId=15530),
 Row(fromUserId=5, rating=9, toUserId=94074),
 Row(fromUserId=7, rating=7, toUserId=213803),
 Row(fromUserId=9, rating=10, toUserId=15629),
 Row(fromUserId=9, rating=9, toUserId=20960),
 Row(fromUserId=9, rating=10, toUserId=27136),
 Row(fromUserId=9, rating=10, toUserId=31890),
 Row(fromUserId=9, rating=10, toUserId=32177),
 Row(fromUserId=9, rating=7, toUserId=33822),
 Row(fromUserId=9, rating=6, toUserId=39420),
 Row(fromUserId=9, rating=10, toUserId=39994),
 Row(fromUserId=9, rating=4, toUserId=59330),
 Row(fromUserId=9, rating=6, toUserId=64805),
 Row(fromUserId=9, rating=1, toUserId=65906),
 Row(fromUserId=9, rating=1, toUserId=70018),
 Row(fromUserId=9, rating=10, toUserId=70420),
 Row(fromUserId=9, rating=2, toUserId=72611),
 Row(fromUserId=9, rati

# Next Steps

Here are some further questions to get to grips with Spark DataFrames.

### Task 1: Who are the top 5 users that gives the highest average rating of other profiles? 

In [8]:
mostActiveUsersSchemaDF = ratingsJson_DF.select("fromUserId", "rating")\
                                        .groupBy("fromUserId")\
                                        .avg("rating")\
                                        .withColumnRenamed("AVG(rating)", "avg_rating")\
                                        .orderBy(["avg_rating"], ascending=[0])
mostActiveUsersSchemaDF.take(5)

[Row(fromUserId=8294, avg_rating=9.973045822102426),
 Row(fromUserId=50518, avg_rating=9.966666666666667),
 Row(fromUserId=134609, avg_rating=9.95),
 Row(fromUserId=60129, avg_rating=9.87719298245614),
 Row(fromUserId=63150, avg_rating=9.873417721518987)]

### Task 2: How many pairs of users have rated each other with a rating > 5?

In [9]:
t1 = ratingsJson_DF.alias("t1")
t2 = ratingsJson_DF.withColumnRenamed("fromUserId", "f")\
                   .withColumnRenamed("toUserId", "t")\
                   .withColumnRenamed("rating", "r")\
                   .alias("t2")
cond = (t1.fromUserId == t2.t) & (t1.toUserId == t2.f) & (t1.rating > 5) & (t2.r > 5)
matchesDF = t1.join(t2, cond)
matchesDF.take(10)

Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 22.0 failed 4 times, most recent failure: Lost task 6.3 in stage 22.0 (TID 875, ip-172-31-34-107.us-west-2.compute.internal): java.io.FileNotFoundException: /tmp/spark-86a637c6-354d-4f08-81c1-a4af3c861cbc/executor-6474d8c2-34e1-460c-9ec4-e89e4476631d/blockmgr-4b3ec72c-e70c-43a0-9d35-610e8b9ae830/10/temp_shuffle_08d02f13-5114-4293-9892-45122b4f63a4 (Too many open files)
	at java.io.FileOutputStream.open(Native Method)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
	at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply$mcI$sp(python.scala:126)
	at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124)
	at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
	at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
	at org.apache.spark.sql.execution.EvaluatePython$.takeAndServe(python.scala:124)
	at org.apache.spark.sql.execution.EvaluatePython.takeAndServe(python.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: /tmp/spark-86a637c6-354d-4f08-81c1-a4af3c861cbc/executor-6474d8c2-34e1-460c-9ec4-e89e4476631d/blockmgr-4b3ec72c-e70c-43a0-9d35-610e8b9ae830/10/temp_shuffle_08d02f13-5114-4293-9892-45122b4f63a4 (Too many open files)
	at java.io.FileOutputStream.open(Native Method)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more


In [None]:
print "{} pairs of users".format(matchesDF.count()) 