In [0]:
%spark
spark.sql("show tables").show()

In [1]:
%spark
val df = spark.sql("select * from my_table")
df.show()

In [2]:
%spark
spark.sql("select * from movies").show()

In [3]:
%spark
val df = spark.table("movies")
df.show()

In [4]:
%sql
show tables;
describe table movies;

In [5]:
%spark
import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection

case class Movie(movieId: Int, title: String, genres: String)
val movies_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType]  // create schema from case class

In [6]:
%spark
val movie_df_temp = spark.read
        .option("header", "true")
        .schema(movies_schema)
        .csv("hdfs://catalog-test/user/root/movie.csv")

In [7]:
%spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

val movie_df = movie_df_temp
    .select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre"))
    .withColumn("releasedYear", regexp_extract($"title", "\\((\\d{4})\\)", 1))
    .withColumn("releasedYear", $"releasedYear".cast(IntegerType))

In [8]:
%spark
movie_df.createOrReplaceTempView("movie")
spark.sql("show tables").show()
spark.sql("desc movie").show()
spark.sql("select * from movie limit 3").show()

In [9]:
%spark

// rating.csv
import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection
import java.sql.Timestamp
import org.apache.spark.sql.functions._

case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp)
val rating_schema = ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
val rating_df_temp = spark.read
    .option("header", "true")
    .schema(rating_schema)
    .csv("hdfs://catalog-test/user/root/rating.csv")

val rating_df_filtered = rating_df_temp.groupBy($"movieId")
    .agg(count("*").as("count")).filter($"count" > 10) // Filter out recored where only few people rate the movie
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))
    .select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month"))
    
rating_df.createOrReplaceTempView("rating")
spark.sql("select * from rating limit 3").show()

In [10]:
%spark
// Find out top rated movies
spark.sql("select movieId, avg(rating) as avgRating from rating group by movieId order by avgRating desc limit 5").show()

In [11]:
%spark
// Find out top rated genres
spark.sql("select m.genre, avg(r.rating) as avgRating from movie m join rating r on m.movieId = r.movieId group by m.genre order by avgRating desc limit 5").show()

In [12]:
%spark
// Create a new dataframe by joining
// Use Seq in order to avoid having duplicated column
val movie_rating_df = movie_df.join(rating_df, Seq("movieId"), "inner")
movie_rating_df.show()

In [13]:
%spark
// 새로운 dataframe을 parquet으로 저장(year, month partition 지정)
import org.apache.spark.sql.SaveMode
movie_rating_df.repartition($"year", $"month")
    .write.partitionBy("year", "month")
    .mode(SaveMode.Append)
    .parquet("hdfs://catalog-test/user/root/movie_rating")


In [14]:
%jdbc(hive)
CREATE DATABASE example__db_movie_rating
COMMENT 'MovieLens 20M Dataset'
Location 'hdfs://catalog-test/user/hadoop-test/example__db_movig_rating';

SHOW DATABASES;

DESCRIBE DATABASE example__db_movie_rating;


In [15]:
%jdbc(hive)
USE example__db_movie_rating;

CREATE EXTERNAL TABLE movie_rating (movieId int, title string, genre string, releasedYear int, userId int, rating double)
partitioned by (year int, month int) stored as parquet
location 'hdfs://catalog-test/user/root/movie_rating';

In [16]:
%jdbc(hive)
// partition 추가 및 메타데이터 업데이트
MSCK REPAIR TABLE movie_rating;

SHOW TABLES;

DESCRIBE movie_rating;

SELECT * FROM movie_rating limit 3;

In [17]:
%jdbc(hive)
SELECT releasedYear, title, avgRating, row_num
FROM (SELECT releasedYear, title, avg(rating) as avgRating, rank() over (partition by releasedYear order by avg(rating) desc) as row_num
FROM movie_rating GROUP BY releasedYear, title) T
where row_num <= 3 ORDER BY releasedYear, row_num;


In [18]:
%jdbc(hive)
SELECT year, title, popularity, row_num FROM
(SELECT year, title, count(distinct userID) as popularity, rank() over (partition by year order by count(distinct userID) desc) as row_num
FROM movie_rating GROUP BY year, title) T
WHERE row_num = 1 ORDER BY year, popularity desc;


In [19]:
%spark
spark


In [20]:
%spark

// ETL Sample Code

import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection

case class Movie(movieId: Int, title: String, genres: String)
val movies_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType]  // create schema from case class

// Read Source Data(Extract)
val movie_df = spark.read
        .option("header", "true")
        .schema(movies_schema)
        .csv("hdfs://catalog-test/user/root/movie.csv")
        
movie_df.show()
        
// Transformation
val transformedDf = movie_df
    .select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre"))
    .withColumn("releasedYear", regexp_extract($"title", "\\((\\d{4})\\)", 1))
    .withColumn("releasedYear", $"releasedYear".cast(IntegerType))
    
transformedDf.show()

// Write transformed data to target location(Load)
transformedDf.write.format("csv")
    .option("header", "true")
    .save("hdfs://catalog-test/user/root/movie_transformed.csv")

In [21]:
%spark
val read_transformed_df = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("hdfs://catalog-test/user/root/movie_transformed.csv")
    
read_transformed_df.show()

In [22]:
%spark

// ETL Sample Code using Hive

// Read in the source data from a Hive table
// Spark 에서 Hive Table 읽을 경우, first column 이 포함됨(skip.header.line.count 을 인식하지 못함), 따라서 따로 전처리 필요
//val movie_df_temp = spark.read.option("header", "true").option("inferSchema", "true").format("csv").load("hdfs://catalog-test/user/root/movie.csv") header 무시해서 읽음
val movie_df_temp = spark.table("movies") // hive 에서 바로 읽음

//movie_df_temp.show()

// Filter out first row
val movie_df = movie_df_temp.filter(movie_df_temp("movieid") =!= 0)
movie_df.show()


// Transformation
val transformedDF = movie_df
    .select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre"))
    .withColumn("releasedYear", regexp_extract($"title", "\\((\\d{4})\\)", 1))
    .withColumn("releasedYear", $"releasedYear".cast(IntegerType))
transformedDF.show()

// Write the transformed data to a new Hive table
transformedDF.write
    .format("hive")
    .mode("overwrite")
    .option("path", "hdfs://catalog-test/user/root/transformed_movie.csv")
    .saveAsTable("transformed_movie")

In [23]:
%spark
