# 304 Spark SQL

The goal of this lab is to run some SQL queries.

- [Spark SQL programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)

This lab's notebook is in the ```material``` folder; the solutions will be released in the same folder.

The cluster configuration should be the same from 301, 302, and 303.

In [None]:
%%configure -f
{"executorMemory":"8G", "numExecutors":2, "executorCores":2, "conf": {"spark.dynamicAllocation.enabled": "false"}}

In [None]:
//val bucketname = "unibo-bd2122-egallinucci"
val bucketname = "eg-myfirstbucket"

val path_ml_movies = "s3a://"+bucketname+"/datasets/ml-movies.csv"
val path_ml_ratings = "s3a://"+bucketname+"/datasets/ml-ratings.csv"
val path_ml_tags = "s3a://"+bucketname+"/datasets/ml-tags.csv"

sc.applicationId

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

In [None]:
import java.util.Calendar
import org.apache.spark.sql.SaveMode
import org.apache.spark.HashPartitioner

object MovieLensParser {

  val noGenresListed = "(no genres listed)"
  val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
  val pipeRegex = "\\|(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
  val quotes = "\""
  
  /** Convert from timestamp (String) to year (Int) */
  def yearFromTimestamp(timestamp: String): Int = {
    val cal = Calendar.getInstance()
    cal.setTimeInMillis(timestamp.trim.toLong * 1000L)
    cal.get(Calendar.YEAR)
  }

  /** Function to parse movie records
   *
   *  @param line line that has to be parsed
   *  @return tuple containing movieId, title and genres, none in case of input errors
   */
  def parseMovieLine(line: String): Option[(Long, String, String)] = {
    try {
      val input = line.split(commaRegex)
      var title = input(1).trim
      title = if(title.startsWith(quotes)) title.substring(1) else title
      title = if(title.endsWith(quotes)) title.substring(0, title.length - 1) else title
      Some(input(0).trim.toLong, title, input(2).trim)
    } catch {
      case _: Exception => None
    }
  }

  /** Function to parse rating records
   *
   *  @param line line that has to be parsed
   *  @return tuple containing userId, movieId, rating, and year none in case of input errors
   */
  def parseRatingLine(line: String): Option[(Long, Long, Double, Int)] = {
    try {
      val input = line.split(commaRegex)
      Some(input(0).trim.toLong, input(1).trim.toLong, input(2).trim.toDouble, yearFromTimestamp(input(3)))
    } catch {
      case _: Exception => None
    }
  }

  /** Function to parse tag records
   *
   *  @param line line that has to be parsed
   *  @return tuple containing userId, movieId, tag, and year, none in case of input errors
   */
  def parseTagLine(line: String) : Option[(Long, Long, String, Int)] = {
    try {
      val input = line.split(commaRegex)
      Some(input(0).trim.toLong, input(1).trim.toLong, input(2), yearFromTimestamp(input(3)))
    } catch {
      case _: Exception => None
    }
  }

}

In [None]:
val rddMovies = sc.textFile(path_ml_movies).flatMap(MovieLensParser.parseMovieLine)
val rddRatings = sc.textFile(path_ml_ratings).flatMap(MovieLensParser.parseRatingLine)
val rddTags = sc.textFile(path_ml_tags).flatMap(MovieLensParser.parseTagLine)

### 304-1 SQL querying

In [None]:
rddMovies.toDF("movieId","title","genres").createOrReplaceTempView("movies")
rddRatings.toDF("userid","movieId","rating","year").createOrReplaceTempView("ratings")
rddTags.toDF("userId","movieId","tag","year").createOrReplaceTempView("tags")

Reduce the ```spark.sql.autoBroadcastJoinThreshold``` parameter, which determines the maximum size for DataFrames to be broadcasted (default value: "10485760b", i.e., 10MB)

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold","1485760b")

Calculate the average rating for each movie with an SQL query. Check the results AND the execution plan.

- Do you reckon some optimization by Catalyst?
- Is there something more that could be done (besides broadcasting, that we have disabled)?

Beware: Spark's UI is more difficult to read with SQL queries. Query execution times can be checked in the Notebook.

In [None]:
val sqlDF = spark.sql("SELECT m.title, avg(r.rating), count(*) FROM movies m, ratings r WHERE m.movieId=r.movieId GROUP BY m.title")
sqlDF.explain
sqlDF.show

Catalyst carries out column pruning (see execution plan), but does not pushdown the aggregation. Why?
1. It doesn't know that "title" is a descriptive attribute of movieid
2. It doesn't know that we are assuming a unique title for each movie

In [None]:
val sqlDF2 = spark.sql("SELECT movieId, avg(rating) as avgRating, count(*) as cnt FROM ratings GROUP BY movieId")
sqlDF2.createOrReplaceTempView("avgr")
val sqlDF3 = spark.sql("SELECT title, avgRating, cnt FROM avgr a, movies m WHERE a.movieId = m.movieId")
sqlDF3.explain
sqlDF3.show

Broadcasting can be enforced through hints.

In [None]:
val sqlDF = spark.sql("SELECT /*+  BROADCASTJOIN(m) */ m.title, avg(r.rating), count(*) FROM movies m, ratings r WHERE m.movieId=r.movieId GROUP BY m.title")
sqlDF.explain
sqlDF.show

Final version: broadcasting + pre-aggregation

In [None]:
val sqlDF2 = spark.sql("SELECT movieId, avg(rating) as avgRating, count(*) as cnt FROM ratings GROUP BY movieId")
sqlDF2.createOrReplaceTempView("avgr")
val sqlDF3 = spark.sql("SELECT /*+  BROADCASTJOIN(m) */ title, avgRating, cnt FROM avgr a, movies m WHERE a.movieId = m.movieId")
sqlDF3.show
sqlDF3.explain

### 304-2 Parquet

In [None]:
val path_ml_movies_parquet = "s3a://"+bucketname+"/datasets/ml-movies-parquet"
val path_ml_ratings_parquet = "s3a://"+bucketname+"/datasets/ml-ratings-parquet"
val path_ml_tags_parquet = "s3a://"+bucketname+"/datasets/ml-tags-parquet"

In [None]:
// Convert from RDD to Parquet
rddMovies.toDF("movieId","title","genres").coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).save(path_ml_movies_parquet)
rddRatings.toDF("userid","movieId","rating","year").write.format("parquet").mode(SaveMode.Overwrite).save(path_ml_ratings_parquet)
rddTags.toDF("userId","movieId","tag","year").coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).save(path_ml_tags_parquet)

In [None]:
spark.read.parquet("s3a://"+bucketname+"/datasets/ml-movies-parquet").createOrReplaceTempView("movies_pq")
spark.read.parquet("s3a://"+bucketname+"/datasets/ml-ratings-parquet").createOrReplaceTempView("ratings_pq")
spark.read.parquet("s3a://"+bucketname+"/datasets/ml-tags-parquet").createOrReplaceTempView("tags_pq")

In [None]:
val sqlDF = spark.sql("SELECT m.title, avg(r.rating), count(*) FROM movies_pq m, ratings_pq r WHERE m.movieId=r.movieId GROUP BY m.title")
sqlDF.show

### 304-3 SQL & Parquet on weather dataset

Convert the full weather dataset to Parquet, then use SQL to join with the station dataset and calculate the average temperature by country.

In [None]:
// CHECK THE FILE NAMES
val path_weather_full = "s3a://"+bucketname+"/datasets/weather.txt"
val path_stations = "s3a://"+bucketname+"/datasets/stations.csv"

val path_weather_full_parquet = "s3a://"+bucketname+"/datasets/weather-full-parquet"

case class WeatherData(
  usafwban:String,
  year:String,
  month:String,
  day:String,
  temperature:Double,
  validTemperature:Boolean
)

object WeatherData {
    def extract(row:String) = {
        val usafwban = row.substring(4,15)
        val year = row.substring(15,19)
        val month = row.substring(19,21)
        val day = row.substring(21,23)
        val airTemperature = row.substring(87,92)
        val airTemperatureQuality = row.charAt(92)

        new WeatherData(usafwban,year,month,day,airTemperature.toInt/10,airTemperatureQuality == '1')
    }
}

case class StationData(
  usafwban:String,
  name:String,
  country:String,
  state:String,
  call:String,
  latitude:Double,
  longitude:Double,
  elevation:Double,
  date_begin:String,
  date_end:String
)

object StationData {
  def extract(row:String) = {
    def getDouble(str:String) : Double = {
      if (str.isEmpty)
        return 0
      else
        return str.toDouble
    }
    val columns = row.split(",").map(_.replaceAll("\"",""))
    val latitude = getDouble(columns(6))
    val longitude = getDouble(columns(7))
    val elevation = getDouble(columns(8))
    new StationData(columns(0)+columns(1),columns(2),columns(3),columns(4),columns(5),latitude,longitude,elevation,columns(9),columns(10))
  }
}



In [None]:
// Convert from RDD to Parquet
sc.textFile(path_weather_full).map(WeatherData.extract).
    toDF("usafwban","year","month","day","airTemperature","airTemperatureQuality").
    coalesce(20).
    write.format("parquet").mode(SaveMode.Overwrite).save(path_weather_full_parquet)

In [None]:
spark.read.parquet(path_weather_full_parquet).createOrReplaceTempView("weather_pq")
sc.textFile(path_stations).map(StationData.extract).toDF().createOrReplaceTempView("station_pq")

In [None]:
val sqlDF = spark.sql("SELECT s.country, avg(w.airTemperature), count(*) FROM weather_pq w, station_pq s WHERE w.usafwban=s.usafwban GROUP BY s.country ORDER BY s.country")
sqlDF.explain
sqlDF.show