In [1]:
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import to_timestamp, col, avg, split

In [2]:
spark = SparkSession.builder.master('local[3]').getOrCreate()
spark

21/10/07 09:56:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
!head -n 5 ../data/ml-1m/movies.dat

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy


### Read in data as Spark DataFrames

In [3]:
movies_df = \
    spark.read.csv('../data/ml-1m/movies.dat', sep='::', 
                   schema='movie_id INT, title STRING, genres STRING') \
    .withColumn('generes', split('genres', '[|]'))

ratings_df = \
    spark.read.csv('../data/ml-1m/ratings.dat', sep='::', 
                   schema='user_id INT, movie_id INT, rating INT, timestamp LONG') \
    .withColumn('timestamp', to_timestamp('timestamp'))

users_df = \
    spark.read.csv('../data/ml-1m/users.dat', sep='::', 
                   schema='user_id INT, gender STRING, age INT, occupation INT, zipcode STRING') \

In [4]:
movies_df.show(5)

+--------+--------------------+--------------------+--------------------+
|movie_id|               title|              genres|             generes|
+--------+--------------------+--------------------+--------------------+
|       1|    Toy Story (1995)|Animation|Childre...|[Animation, Child...|
|       2|      Jumanji (1995)|Adventure|Childre...|[Adventure, Child...|
|       3|Grumpier Old Men ...|      Comedy|Romance|   [Comedy, Romance]|
|       4|Waiting to Exhale...|        Comedy|Drama|     [Comedy, Drama]|
|       5|Father of the Bri...|              Comedy|            [Comedy]|
+--------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [11]:
movies_rdd = spark.sparkContext.textFile('../data/ml-1m/movies.dat')

movies_rdd.take(1)

["1::Toy Story (1995)::Animation|Children's|Comedy"]

In [8]:
movies_rdd.map(lambda x: x.split('::')).take(2)

[['1', 'Toy Story (1995)', "Animation|Children's|Comedy"],
 ['2', 'Jumanji (1995)', "Adventure|Children's|Fantasy"]]

In [9]:
movies_parsed = \
movies_rdd \
.map(lambda r: r.split('::')) \
.map(lambda r: Row(movie_id=int(r[0]), title=r[1], genres=r[2].split('|'))) \
.toDF()

In [10]:
movies_parsed.limit(20).toPandas()

Unnamed: 0,movie_id,title,genres
0,1,Toy Story (1995),"[Animation, Children's, Comedy]"
1,2,Jumanji (1995),"[Adventure, Children's, Fantasy]"
2,3,Grumpier Old Men (1995),"[Comedy, Romance]"
3,4,Waiting to Exhale (1995),"[Comedy, Drama]"
4,5,Father of the Bride Part II (1995),[Comedy]
5,6,Heat (1995),"[Action, Crime, Thriller]"
6,7,Sabrina (1995),"[Comedy, Romance]"
7,8,Tom and Huck (1995),"[Adventure, Children's]"
8,9,Sudden Death (1995),[Action]
9,10,GoldenEye (1995),"[Action, Adventure, Thriller]"


In [None]:
[['1', 'Toy Story (1995)', "Animation|Children's|Comedy"],
 ['2', 'Jumanji (1995)', "Adventure|Children's|Fantasy"]]

In [12]:
movies_df.registerTempTable('movies')

In [13]:
spark.sql("select movie_id, title, split(genres, '[|]') as genres from movies").limit(10).toPandas()

Unnamed: 0,movie_id,title,genres
0,1,Toy Story (1995),"[Animation, Children's, Comedy]"
1,2,Jumanji (1995),"[Adventure, Children's, Fantasy]"
2,3,Grumpier Old Men (1995),"[Comedy, Romance]"
3,4,Waiting to Exhale (1995),"[Comedy, Drama]"
4,5,Father of the Bride Part II (1995),[Comedy]
5,6,Heat (1995),"[Action, Crime, Thriller]"
6,7,Sabrina (1995),"[Comedy, Romance]"
7,8,Tom and Huck (1995),"[Adventure, Children's]"
8,9,Sudden Death (1995),[Action]
9,10,GoldenEye (1995),"[Action, Adventure, Thriller]"


In [14]:
# Analysis
a = ratings_df.join(users_df, on='user_id')

complete = a.join(movies_df, on='movie_id')

In [15]:
a.show(10)

+-------+--------+------+-------------------+------+---+----------+-------+
|user_id|movie_id|rating|          timestamp|gender|age|occupation|zipcode|
+-------+--------+------+-------------------+------+---+----------+-------+
|      1|    1193|     5|2000-12-31 16:12:40|     F|  1|        10|  48067|
|      1|     661|     3|2000-12-31 16:35:09|     F|  1|        10|  48067|
|      1|     914|     3|2000-12-31 16:32:48|     F|  1|        10|  48067|
|      1|    3408|     4|2000-12-31 16:04:35|     F|  1|        10|  48067|
|      1|    2355|     5|2001-01-06 17:38:11|     F|  1|        10|  48067|
|      1|    1197|     3|2000-12-31 16:37:48|     F|  1|        10|  48067|
|      1|    1287|     5|2000-12-31 16:33:59|     F|  1|        10|  48067|
|      1|    2804|     5|2000-12-31 16:11:59|     F|  1|        10|  48067|
|      1|     594|     4|2000-12-31 16:37:48|     F|  1|        10|  48067|
|      1|     919|     4|2000-12-31 16:22:48|     F|  1|        10|  48067|
+-------+---

In [None]:
complete.show(10)

In [None]:
avgs = complete.groupBy('user_id').agg(avg('rating').alias('avg_rating'))

final = complete.join(avgs, on='user_id')

df = final.withColumn('rescaled_rating', col('rating') - col('avg_rating'))

stats = df.groupBy('occupation').avg('rescaled_rating')

In [None]:
stats.show(10)

In [None]:
spark

In [None]:
# Analysis
a = ratings\
      .join(users, ratings['user_id']==users['user_id'], 'inner')\
      .drop(users['user_id'])

complete = a\
            .join(movies, a['movie_id']==movies['movie_id'], 'inner')\
            .drop(movies['movie_id'])

avgs = complete.groupBy('user_id').agg(
    spfun.avg('rating').alias('avg_rating')
)

final = complete\
          .join(avgs, complete['user_id']==avgs['user_id'])\
          .drop(avgs['user_id'])

df = final.withColumn('rescaled_rating', final['rating'] - final['avg_rating'])

stats = df.groupBy('occupation').avg('rescaled_rating')

# Output datasets
aggregates = dataiku.Dataset("aggregates")
dkuspark.write_with_schema(aggregates, stats)

In [None]:
# -*- coding: utf-8 -*-
import dataiku
import dataiku.spark as dkuspark
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import functions as spfun

# Load PySpark
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

# Point to the DSS datasets
users_ds = dataiku.Dataset("users")
movies_ds = dataiku.Dataset("movies")
ratings_ds = dataiku.Dataset("ratings")

# And read them as a Spark dataframes
users = dkuspark.get_dataframe(sqlContext, users_ds)
movies = dkuspark.get_dataframe(sqlContext, movies_ds)
ratings = dkuspark.get_dataframe(sqlContext, ratings_ds)

# Analysis
a = ratings\
      .join(users, ratings['user_id']==users['user_id'], 'inner')\
      .drop(users['user_id'])

complete = a\
            .join(movies, a['movie_id']==movies['movie_id'], 'inner')\
            .drop(movies['movie_id'])

avgs = complete.groupBy('user_id').agg(
    spfun.avg('rating').alias('avg_rating')
)

final = complete\
          .join(avgs, complete['user_id']==avgs['user_id'])\
          .drop(avgs['user_id'])

df = final.withColumn('rescaled_rating', final['rating'] - final['avg_rating'])

stats = df.groupBy('occupation').avg('rescaled_rating')

# Output datasets
aggregates = dataiku.Dataset("aggregates")
dkuspark.write_with_schema(aggregates, stats)