# Connecting to Spark

We're going to be using Python with DataFrames, which is only available in Spark 1.3 or later.  We're going to be using a recent version of open source spark.  To use it, you'll have to import the `SQLContext`.

In [1]:
from pyspark.sql import SQLContext
sql = SQLContext(sc)

Let's set up some common functions

# Reading a table

In [2]:
user = sql.read.format("org.apache.spark.sql.cassandra").\
               load(keyspace="training", table="user")

# Display

In [3]:
user.collect()

[Row(user_id=1, age=34, favorite_foods=[u'Bacon', u'Cheese'], name=u'Jon'),
 Row(user_id=2, age=22, favorite_foods=[u'Kale', u'Pizza', u'Wine'], name=u'Dani'),
 Row(user_id=4, age=1, favorite_foods=[u'Candy', u'Fear'], name=u'Baby Luke'),
 Row(user_id=3, age=108, favorite_foods=[u'Muffins', u'Pie', u'Steak'], name=u'Patrick'),
 Row(user_id=5, age=10, favorite_foods=[u'Anger'], name=u'Larry')]

# Basic Filtering

In [4]:
user[user.age > 35].collect()

[Row(user_id=3, age=108, favorite_foods=[u'Muffins', u'Pie', u'Steak'], name=u'Patrick')]

In [5]:
user[user.age < 20 and user.name.startswith("Baby") ].collect()

[Row(user_id=4, age=1, favorite_foods=[u'Candy', u'Fear'], name=u'Baby Luke')]

# A nicer reader

In [6]:
def create_reader(sql, keyspace):
    def reader(table):
        df = sql.read.format("org.apache.spark.sql.cassandra").\
               load(keyspace=keyspace, table=table)
        return df
    return reader

training_reader = create_reader(sql, "training")

# A Nicer Writer

In [7]:
def create_writer(sql, keyspace, mode="append"):
    def writer(df, table):
        df.write.format("org.apache.spark.sql.cassandra").\
                 options(table=table, keyspace=keyspace).save(mode="append")
    return writer

training_writer = create_writer(sql, "training")

# Migrating to a new structure

In [8]:
from pyspark.sql.functions import *
result = user.select(explode(user.favorite_foods).alias("food"), "user_id")
training_writer(result, "favorite_foods_index")

# SparkSQL

Register dataframe as a table

do whatever you want with it

In [9]:
user.registerTempTable("user")

In [10]:
sql.sql("select * from user where age > 15").collect()

[Row(user_id=1, age=34, favorite_foods=[u'Bacon', u'Cheese'], name=u'Jon'),
 Row(user_id=2, age=22, favorite_foods=[u'Kale', u'Pizza', u'Wine'], name=u'Dani'),
 Row(user_id=3, age=108, favorite_foods=[u'Muffins', u'Pie', u'Steak'], name=u'Patrick')]

# Load the movie lens data

Dataset lives in ml-10M100K directory


In [11]:
movies = sc.textFile("ml-10M100K/movies.dat").map(lambda x: x.split("::") )

In [12]:
movies = movies.map(lambda (x,y,z): (x,y,z.split("|")))

In [13]:
movies = movies.toDF(["movie_id", "name", "tags"])

In [39]:
movies.head(10)

[Row(movie_id=u'1', name=u'Toy Story (1995)', tags=[u'Adventure', u'Animation', u'Children', u'Comedy', u'Fantasy']),
 Row(movie_id=u'2', name=u'Jumanji (1995)', tags=[u'Adventure', u'Children', u'Fantasy']),
 Row(movie_id=u'3', name=u'Grumpier Old Men (1995)', tags=[u'Comedy', u'Romance']),
 Row(movie_id=u'4', name=u'Waiting to Exhale (1995)', tags=[u'Comedy', u'Drama', u'Romance']),
 Row(movie_id=u'5', name=u'Father of the Bride Part II (1995)', tags=[u'Comedy']),
 Row(movie_id=u'6', name=u'Heat (1995)', tags=[u'Action', u'Crime', u'Thriller']),
 Row(movie_id=u'7', name=u'Sabrina (1995)', tags=[u'Comedy', u'Romance']),
 Row(movie_id=u'8', name=u'Tom and Huck (1995)', tags=[u'Adventure', u'Children']),
 Row(movie_id=u'9', name=u'Sudden Death (1995)', tags=[u'Action']),
 Row(movie_id=u'10', name=u'GoldenEye (1995)', tags=[u'Action', u'Adventure', u'Thriller'])]

In [15]:
training_writer(movies, "movie")

In [16]:
# UserID::MovieID::Rating::Timestamp
ratings = sc.textFile("ml-10M100K/ratings.dat").map(lambda x: x.split("::") )

In [17]:
df = ratings.toDF(["user_id", "movie_id", "rating", "timestamp"])

In [18]:
ratings = df.select("movie_id", "user_id", "rating", "timestamp")

In [19]:
training_writer(ratings, "rating_by_movie")

# Aggregate Ratings by Movie

In [20]:
ratings.registerTempTable("ratings")

In [21]:
aggregated_ratings = sql.sql("select movie_id, avg(rating) as rating from ratings group by movie_id")

In [22]:
aggregated_ratings.registerTempTable("average_ratings")

In [23]:
training_writer(aggregated_ratings, "average_rating")

In [24]:
movies.registerTempTable('movie')

In [25]:
sql.sql("""select movie.movie_id, name, tags, rating FROM movie join average_ratings on movie.movie_id = average_ratings.movie_id  order by rating desc limit 20""").collect()

[Row(movie_id=u'53355', name=u'Sun Alley (Sonnenallee) (1999)', tags=[u'Comedy', u'Romance'], rating=5.0),
 Row(movie_id=u'51209', name=u'Fighting Elegy (Kenka erejii) (1966)', tags=[u'Action', u'Comedy'], rating=5.0),
 Row(movie_id=u'42783', name=u'Shadows of Forgotten Ancestors (1964)', tags=[u'Drama', u'Romance'], rating=5.0),
 Row(movie_id=u'33264', name=u"Satan's Tango (S\xe1t\xe1ntang\xf3) (1994)", tags=[u'Drama'], rating=5.0),
 Row(movie_id=u'64275', name=u'Blue Light, The (Das Blaue Licht) (1932)', tags=[u'Drama', u'Fantasy', u'Mystery'], rating=5.0),
 Row(movie_id=u'5194', name=u"Who's Singin' Over There? (a.k.a. Who Sings Over There) (Ko to tamo peva) (1980)", tags=[u'Comedy'], rating=4.75),
 Row(movie_id=u'26073', name=u'Human Condition III, The (Ningen no joken III) (1961)', tags=[u'Drama', u'War'], rating=4.75),
 Row(movie_id=u'4454', name=u'More (1998)', tags=[u'Animation', u'IMAX', u'Sci-Fi'], rating=4.75),
 Row(movie_id=u'26048', name=u'Human Condition II, The (Ningen n

# Top movies for a tag

In [26]:
movies.columns

[u'movie_id', u'name', u'tags']

In [27]:
movies_by_tag = movies.select(explode(movies.tags).alias("tag"), "movie_id", "name")

In [28]:
training_writer(movies_by_tag, "movies_by_tag")

In [29]:
movies_by_tag.head(10)

[Row(tag=u'Adventure', movie_id=u'1', name=u'Toy Story (1995)'),
 Row(tag=u'Animation', movie_id=u'1', name=u'Toy Story (1995)'),
 Row(tag=u'Children', movie_id=u'1', name=u'Toy Story (1995)'),
 Row(tag=u'Comedy', movie_id=u'1', name=u'Toy Story (1995)'),
 Row(tag=u'Fantasy', movie_id=u'1', name=u'Toy Story (1995)'),
 Row(tag=u'Adventure', movie_id=u'2', name=u'Jumanji (1995)'),
 Row(tag=u'Children', movie_id=u'2', name=u'Jumanji (1995)'),
 Row(tag=u'Fantasy', movie_id=u'2', name=u'Jumanji (1995)'),
 Row(tag=u'Comedy', movie_id=u'3', name=u'Grumpier Old Men (1995)'),
 Row(tag=u'Romance', movie_id=u'3', name=u'Grumpier Old Men (1995)')]

In [34]:
aggregated_ratings.columns

[u'movie_id', u'rating']

In [43]:
#joined = movies.join(movies_by_tag, movies.movie_id == movies_by_tag.movie_id)
movies_by_tag.where(movies_by_tag.tag == "Adventure").\
              join(aggregated_ratings, movies_by_tag.movie_id==aggregated_ratings.movie_id).\
              orderBy(aggregated_ratings.rating, ascending=False).\
              select(movies_by_tag.movie_id, movies_by_tag.name, aggregated_ratings.rating).\
              limit(10).collect()

[Row(movie_id=u'26649', name=u'Lonesome Dove (1989)', rating=4.3076923076923075),
 Row(movie_id=u'908', name=u'North by Northwest (1959)', rating=4.261366341347299),
 Row(movie_id=u'1198', name=u'Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)', rating=4.261317249919736),
 Row(movie_id=u'25789', name=u'Shanghai Express (1932)', rating=4.25),
 Row(movie_id=u'720', name=u'Wallace & Gromit: The Best of Aardman Animation (1996)', rating=4.22979797979798),
 Row(movie_id=u'260', name=u'Star Wars: Episode IV - A New Hope (a.k.a. Star Wars) (1977)', rating=4.2202093397745575),
 Row(movie_id=u'1204', name=u'Lawrence of Arabia (1962)', rating=4.209582164476598),
 Row(movie_id=u'1196', name=u'Star Wars: Episode V - The Empire Strikes Back (1980)', rating=4.1943614395218916),
 Row(movie_id=u'1197', name=u'Princess Bride, The (1987)', rating=4.194158595641646),
 Row(movie_id=u'5618', name=u'Spirited Away (Sen to Chihiro no kamikakushi) (2001)', rating=4.18645294725956

[Row(movie_id=u'286', name=u'Nemesis 2: Nebula (1995)', tags=[u'Action', u'Sci-Fi', u'Thriller'], tag=u'Action', movie_id=u'286', name=u'Nemesis 2: Nebula (1995)')]