In [1]:
%sh
tmp_dir=$(mktemp -d -t ml-latest-XXXXXXXXXX)
archive_path=$tmp_dir/ml-latest-small.zip
data_dir=/dbfs/FileStore/movielens

rm -rf $data_dir
mkdir -p $data_dir

echo "Using tem dir: $tmp_dir"
wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip -O $archive_path

echo "Extracting to $data_dir"
unzip $archive_path -d $data_dir
ls -al $data_dir
ls -al $data_dir/ml-latest-small

In [2]:
ml_root = 'dbfs:/FileStore/movielens'
ml_small = f'{ml_root}/ml-latest-small'

In [3]:
df_links = (spark
            .read
            .format('csv')
            .option("header", "true")
            .option('inferSchema', True)
            .load(f'{ml_small}/links.csv')
           )
display(df_links)

movieId,imdbId,tmdbId
1,114709,862.0
2,113497,8844.0
3,113228,15602.0
4,114885,31357.0
5,113041,11862.0
6,113277,949.0
7,114319,11860.0
8,112302,45325.0
9,114576,9091.0
10,113189,710.0


In [4]:
df_movies = (spark
            .read
            .format('csv')
            .option("header", "true")
            .option('inferSchema', True)
            .load(f'{ml_small}/movies.csv')
            .cache()
           )
display(df_movies)

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


In [5]:
df_ratings = (spark
            .read
            .format('csv')
            .option("header", "true")
            .option('inferSchema', True)
            .load(f'{ml_small}/ratings.csv')
           )
display(df_ratings)

userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
1,70,3.0,964982400
1,101,5.0,964980868
1,110,4.0,964982176
1,151,5.0,964984041
1,157,5.0,964984100


In [6]:
df_tags = (spark
            .read
            .format('csv')
            .option("header", "true")
            .option('inferSchema', True)
            .load(f'{ml_small}/tags.csv')
           )
display(df_tags)

userId,movieId,tag,timestamp
2,60756,funny,1445714994
2,60756,Highly quotable,1445714996
2,60756,will ferrell,1445714992
2,89774,Boxing story,1445715207
2,89774,MMA,1445715200
2,89774,Tom Hardy,1445715205
2,106782,drugs,1445715054
2,106782,Leonardo DiCaprio,1445715051
2,106782,Martin Scorsese,1445715056
7,48516,way too long,1169687325


In [7]:
filter_expr = 'movieId IN ({})'.format(",".join([str(row['movieId']) for row in df_movies.where("title like '%Bad Boys%'").collect()]))
print(filter_expr)

In [8]:
import json

json.dumps([r.asDict() for r in df_movies.where(filter_expr).collect()])

In [9]:
json.dumps([r.asDict() for r in df_ratings.where(filter_expr).orderBy('movieId').collect()])

In [10]:
import pyspark.sql.functions as F

df_rating_aggregates = ( df_ratings
                         .where(filter_expr)
                         .groupBy('movieId')
                         .agg(
                           F.count('rating').alias('times_rated'),
                           F.min('rating').alias('min_rating'),
                           F.max('rating').alias('max_rating'),
                           (F.sum('rating')/F.count('rating')).alias('average_rating')
                         )
                       )
df_rating_aggregates.show()