In [1]:
%output --no-stdout

In [2]:
@file:Repository("https://binrepo.target.com/artifactory/gradle")
@file:Repository("https://binrepo.target.com/artifactory/maven-central")
@file:Repository("https://binrepo.target.com/artifactory/jcenter")
@file:Repository("https://binrepo.target.com/artifactory/jitpack-maven")
@file:Repository("https://binrepo.target.com/artifactory/kotlin-maven")
@file:Repository("https://binrepo.target.com/artifactory/apache-maven")
@file:Repository("https://binrepo.target.com/artifactory/jitpack")
%use spark

In [3]:
%output --reset-to-defaults
@file:DependsOn("org.jetbrains.kotlinx.spark:kotlin-spark-api-3.0.0_2.12:1.0.0-preview1")

In [4]:
import org.jetbrains.kotlinx.spark.api.*
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.Metadata
import org.apache.spark.sql.functions.*

In [16]:
val ratingFile = "data/ml-latest/ratings.csv"
val movieFile = "data/ml-latest/movies.csv"
val linkFile = "data/ml-latest/links.csv"
val tagFile = "data/ml-latest/tags.csv"

In [5]:
val schemaTyped = StructType(
    arrayOf(
            StructField("userId", DataTypes.IntegerType, true, Metadata.empty()),
            StructField("movieId", DataTypes.IntegerType, true, Metadata.empty()),
            StructField("rating", DataTypes.DoubleType, true, Metadata.empty()),
            StructField("timestamp", DataTypes.IntegerType, true, Metadata.empty())
           )
)

In [17]:
withSpark(logLevel = SparkLogLevel.INFO) {
    val debug = false
    
    val ratings = spark
        .read()
        .option("header", "true")
        .option("inferSchema", "true")
        .csv(ratingFile)
        .withColumn("timestamp", to_timestamp(from_unixtime(Column("timestamp"))))

    val movies = spark
        .read()
        .option("header", "true")
        .option("inferSchema", "true")
        .csv(movieFile)
    
    val links = spark
        .read()
        .option("header", "true")
        .option("inferSchema", "true")
        .csv(linkFile)
        
    val tags = spark
        .read()
        .option("header", "true")
        .option("inferSchema", "true")
        .csv(tagFile)
        .withColumn("timestamp", to_timestamp(from_unixtime(Column("timestamp"))))
    
    // Array operations
    val movie_genre = (
        movies
            .withColumn("genres_array", split(Column("genres"), "\\|"))
            .withColumn("genre", explode(Column("genres_array")))
    )
    
    if (debug) {
        // See sample data
        ratings.show()
        movies.show()
        movie_genre.show()
        links.show()
        tags.show()
        
        // Show dataframe schema
        ratings.printSchema()
        movies.printSchema()
        
        // Filters
        movies.where(Column("genres").equalTo("Action")).show()
        movies.where("genres = 'Action'").show()
        movie_genre.select("movieId", "title", "genre").where("genre = '(no genres listed)'").show()
        
        // Get distinct values for column
        movie_genre.select("genre").distinct().show()
    
        // Select columns
        movie_genre.select("movieId", "title", "genre").show()
        
        // Count geners
        movie_genre.select("genre").groupBy("genre").count().sort(desc("count")).show()
    
        movies.printSchema()
        tags.printSchema()
    
        val opinions = movies.join(tags, movies.col("movieId").eq(tags.col("movieId"))).also { it.printSchema() }
    
        opinions.show()
    }
}

In [18]:
val spark = SparkSession
.builder()
.master("local[*]")
.appName("SimpleApp").orCreate

In [19]:
val ratings = spark
        .read()
        .option("header", "true")
        .option("inferSchema", "true")
        .csv(ratingFile)
        .withColumn("timestamp", to_timestamp(from_unixtime(Column("timestamp"))))
        
val movies = spark
        .read()
        .option("header", "true")
        .option("inferSchema", "true")
        .csv(movieFile)
    
val links = spark
    .read()
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(linkFile)

val tags = spark
    .read()
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(tagFile)
    .withColumn("timestamp", to_timestamp(from_unixtime(Column("timestamp"))))
    
// Array operations
val movie_genre = (
    movies
        .withColumn("genres_array", split(Column("genres"), "\\|"))
        .withColumn("genre", explode(Column("genres_array")))
)    

In [20]:
spark.conf().set("spark.sql.autoBroadcastJoinThreshold", -1)

In [21]:
// This syntax will only include the movieId in the result set once.
val opinions = movies.join(tags, "movieId")
    .withColumnRenamed("userId", "user_id")
    .withColumnRenamed("movieId", "movie_id")
    .select("user_id", "movie_id", "title", "tag", "timestamp")

// This syntax will include movieId twice
// movies.join(tags, movies.col("movieId").eq(tags.col("movieId"))).show()

In [22]:
opinions.show()

+-------+--------+--------------------+--------------------+-------------------+
|user_id|movie_id|               title|                 tag|          timestamp|
+-------+--------+--------------------+--------------------+-------------------+
|  40716|     148|Awfully Big Adven...|Nudity (Topless -...|2006-09-20 01:46:17|
|  73406|     148|Awfully Big Adven...|               1940s|2018-06-07 00:44:07|
|  73406|     148|Awfully Big Adven...|based on novel or...|2018-06-07 00:44:07|
|  73406|     148|Awfully Big Adven...|             england|2018-06-07 00:44:07|
|  73406|     148|Awfully Big Adven...|           liverpool|2018-06-07 00:44:07|
|  73406|     148|Awfully Big Adven...|     theatre company|2018-06-07 00:44:07|
| 103013|     148|Awfully Big Adven...|    nudity (topless)|2014-05-16 18:12:20|
| 146340|     148|Awfully Big Adven...|              catchy|2015-06-22 05:32:07|
|  15930|     471|Hudsucker Proxy, ...|             Fantasy|2011-08-02 18:57:17|
|  15930|     471|Hudsucker 

In [23]:
val opinionsWithRating = opinions.withColumnRenamed("timestamp", "tag_timestamp")
    .join(ratings, opinions.col("movie_id").eq(ratings.col("movieId"))
            .and(opinions.col("user_id").eq(ratings.col("userId"))), "left")
    .select("user_id", "movie_id", "title", "tag", "tag_timestamp", "rating", "timestamp")

opinionsWithRating.show()

+-------+--------+--------------------+----------------+-------------------+------+-------------------+
|user_id|movie_id|               title|             tag|      tag_timestamp|rating|          timestamp|
+-------+--------+--------------------+----------------+-------------------+------+-------------------+
| 277453|       1|    Toy Story (1995)|       animation|2006-05-12 00:13:44|   4.0|2006-05-12 00:13:47|
| 277453|       1|    Toy Story (1995)|          Disney|2006-05-12 11:05:42|   4.0|2006-05-12 00:13:47|
| 277453|       1|    Toy Story (1995)|           Pixar|2006-05-12 00:13:42|   4.0|2006-05-12 00:13:47|
| 277453|       1|    Toy Story (1995)|            toys|2006-05-12 11:05:40|   4.0|2006-05-12 00:13:47|
|  11476|       6|         Heat (1995)|       Al Pacino|2013-08-15 09:40:02|   3.5|2013-08-15 09:39:35|
|  11476|       6|         Heat (1995)|           crime|2013-08-15 09:40:11|   3.5|2013-08-15 09:39:35|
|  11476|       6|         Heat (1995)|    great acting|2013-08-

In [25]:
opinionsWithRating.count()

1108997

In [26]:
ratings.count()

27753444

In [27]:
ratings.select("userId").distinct().count()

283228

In [28]:
tags.select("userId").distinct().count()

19325

In [29]:
tags.select("movieId").distinct().count()

45981

In [30]:
ratings.join(movies, "movieId").select("title", "rating", "timestamp").groupBy("title").agg(
    count("*"),
    min("rating"),
    max("rating"),
    avg("rating"),
    min("timestamp"),
    max("timestamp")
).show()

+--------------------+--------+-----------+-----------+------------------+-------------------+-------------------+
|               title|count(1)|min(rating)|max(rating)|       avg(rating)|     min(timestamp)|     max(timestamp)|
+--------------------+--------+-----------+-----------+------------------+-------------------+-------------------+
|Men in Black (a.k...|   44287|        0.5|        5.0| 3.578533203874726|1997-07-04 05:54:16|2018-09-26 00:03:50|
|What's Up, Scarle...|       2|        2.0|        2.5|              2.25|2012-01-04 01:56:13|2012-01-04 08:26:23|
|    Peter Pan (2000)|       9|        2.0|        4.5|3.3333333333333335|2015-09-15 01:42:17|2018-04-03 17:24:00|
| Other People (2016)|      60|        0.5|        5.0|               3.5|2016-09-10 22:37:32|2018-09-19 22:40:25|
|         Prom (2011)|      54|        0.5|        5.0|2.8703703703703702|2011-09-29 14:53:00|2018-08-02 10:18:25|
|Komisarz Blond i ...|       1|        1.0|        1.0|               1.0|2017-1

In [31]:
tags.groupBy("movieId").agg(
    collect_set("tag").alias("tags"),
    count("tag").alias("tag_count"),
    collect_set("userId").alias("users"),
    count("userId").alias("user_count"),
    min("timestamp").alias("first_tagged_date"),
    max("timestamp").alias("last_tagged_date")
).sort(col("tag_count").desc()).show()

+-------+--------------------+---------+--------------------+----------+-------------------+-------------------+
|movieId|                tags|tag_count|               users|user_count|  first_tagged_date|   last_tagged_date|
+-------+--------------------+---------+--------------------+----------+-------------------+-------------------+
|    260|[Classic, Space e...|     9478|[271990, 118560, ...|      9478|2006-01-12 19:20:26|2018-09-19 08:11:56|
|    296|[ontology, Biblio...|     4963|[17546, 13488, 55...|      4963|2006-01-12 15:03:58|2018-09-23 11:49:46|
|  79132|[Intense, mystery...|     4670|[89073, 201766, 1...|      4670|2010-07-18 08:22:51|2018-09-23 11:55:45|
|   2571|[DVD-Video, Oscar...|     3915|[17546, 55578, 17...|      3915|2006-01-11 16:25:11|2018-09-23 11:54:54|
|   2959|[sabotage, dvd, H...|     3864|[55578, 179602, 1...|      3864|2006-01-13 19:50:45|2018-09-11 21:21:14|
|    318|[friendship, comp...|     3834|[27594, 201766, 1...|      3834|2006-01-12 15:33:27|2018

In [32]:
ratings.groupBy("userId").agg(
    collect_set("movieId").alias("movies"),
    count("*").alias("cnt"),
    avg("rating").alias("avg")
).sort(col("cnt").desc()).show()

+------+--------------------+-----+------------------+
|userId|              movies|  cnt|               avg|
+------+--------------------+-----+------------------+
|123100|[171057, 3702, 25...|23715|3.1306346194391734|
|117490|[4189, 25743, 629...| 9279|3.2784243991809463|
|134596|[3702, 146544, 58...| 8381| 3.198305691444935|
|212343|[6296, 256, 12203...| 7884|2.5880263825469303|
|242683|[4189, 6296, 4445...| 7515|3.2083166999334662|
|111908|[6296, 5809, 3476...| 6645|1.5249811888638074|
| 77609|[6296, 5809, 2594...| 6398|2.8122069396686467|
| 63783|[6296, 45611, 256...| 6346|3.4854238890639775|
|172357|[6296, 5809, 3958...| 5868| 2.442058623040218|
|141955|[118560, 6296, 58...| 5810|2.8747848537005165|
|158002|[6296, 4445, 2594...| 5701|3.7478512541659357|
|253511|[2338, 6296, 5809...| 5356| 3.011669156086632|
| 48470|[2338, 6296, 5809...| 5257| 2.472988396423816|
|183233|[2338, 4445, 2594...| 5169|2.4049139098471657|
| 94843|[6296, 2594, 743,...| 5130|1.1253411306042884|
| 73145|[1