# HDFS에서 영화 데이터 다운로드

In [1]:
%pyspark
df1 = spark.read \
           .option("header", "true") \
           .option("inferSchema", "false") \
           .csv("hdfs://spark-master-01:9000/skybluelee/movie/part-01.csv") 

df2 = spark.read \
           .option("header", "true") \
           .option("inferSchema", "false") \
           .csv("hdfs://spark-master-01:9000/skybluelee/movie/part-02.csv")
            
df3 = spark.read \
           .option("header", "true") \
           .option("inferSchema", "false") \
           .csv("hdfs://spark-master-01:9000/skybluelee/movie/part-03.csv")
            
df4 = spark.read \
           .option("header", "true") \
           .option("inferSchema", "false") \
           .csv("hdfs://spark-master-01:9000/skybluelee/movie/part-04.csv")
            
df5 = spark.read \
           .option("header", "true") \
           .option("inferSchema", "false") \
           .csv("hdfs://spark-master-01:9000/skybluelee/movie/part-05.csv")
            
df6 = spark.read \
           .option("header", "true") \
           .option("inferSchema", "false") \
           .csv("hdfs://spark-master-01:9000/skybluelee/movie/part-06.csv")            

# 데이터 타입 변경

In [3]:
%pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType

df1 = df1.withColumn('rating', col('rating').cast(IntegerType()))\
         .withColumn('spoiler_tag', col('spoiler_tag').cast(IntegerType()))\
         .withColumn("review_date",to_timestamp("review_date"))
         
df2 = df2.withColumn('rating', col('rating').cast(IntegerType()))\
         .withColumn('spoiler_tag', col('spoiler_tag').cast(IntegerType()))\
         .withColumn("review_date",to_timestamp("review_date"))
         
df3 = df3.withColumn('rating', col('rating').cast(IntegerType()))\
         .withColumn('spoiler_tag', col('spoiler_tag').cast(IntegerType()))\
         .withColumn("review_date",to_timestamp("review_date"))
         
df4 = df4.withColumn('rating', col('rating').cast(IntegerType()))\
         .withColumn('spoiler_tag', col('spoiler_tag').cast(IntegerType()))\
         .withColumn("review_date",to_timestamp("review_date"))
         
df5 = df5.withColumn('rating', col('rating').cast(IntegerType()))\
         .withColumn('spoiler_tag', col('spoiler_tag').cast(IntegerType()))\
         .withColumn("review_date",to_timestamp("review_date"))
         
df6 = df6.withColumn('rating', col('rating').cast(IntegerType()))\
         .withColumn('spoiler_tag', col('spoiler_tag').cast(IntegerType()))\
         .withColumn("review_date",to_timestamp("review_date"))         
         
           
df1.createOrReplaceTempView("part_01")
df2.createOrReplaceTempView("part_02")
df3.createOrReplaceTempView("part_03")
df4.createOrReplaceTempView("part_04")
df5.createOrReplaceTempView("part_05")
df6.createOrReplaceTempView("part_06")         

In [4]:
%pyspark
df1.printSchema()

# 각 파일의 데이터 확인

In [6]:
%pyspark
df_year_check= spark.sql("""
                            SELECT SUBSTR(review_date, 1, 4) AS Year, COUNT(*) AS review_count
                            FROM   part_06
                            GROUP  BY Year
                            ORDER  BY 1 DESC
                         """) \
                    .show(100)

# 2019~2021년에 해당하는 데이터를 해당 년도의 데이터만 존재하도록 설정

In [8]:
%pyspark
df_01 = spark.sql("""
                     SELECT  *, YEAR(review_date) AS YEAR, MONTH(review_date) AS MONTH,
                             CASE WHEN DAY(review_date) BETWEEN 1 AND 10 THEN '1_10'
                                  WHEN DAY(review_date) BETWEEN 11 AND 20 THEN '11_20'
                                  ELSE '21_31' END AS DAY
                     FROM    part_01
                     WHERE   YEAR(review_date) IN (2019, 2020, 2021)
                 """)
                      
df_02 = spark.sql("""
                     SELECT  *, YEAR(review_date) AS YEAR, MONTH(review_date) AS MONTH,
                             CASE WHEN DAY(review_date) BETWEEN 1 AND 10 THEN '1_10'
                                  WHEN DAY(review_date) BETWEEN 11 AND 20 THEN '11_20'
                                  ELSE '21_31' END AS DAY
                     FROM    part_02
                     WHERE   YEAR(review_date) IN (2019, 2020, 2021)
                 """)
                      
df_03 = spark.sql("""
                     SELECT  *, YEAR(review_date) AS YEAR, MONTH(review_date) AS MONTH,
                             CASE WHEN DAY(review_date) BETWEEN 1 AND 10 THEN '1_10'
                                  WHEN DAY(review_date) BETWEEN 11 AND 20 THEN '11_20'
                                  ELSE '21_31' END AS DAY
                     FROM    part_03
                     WHERE   YEAR(review_date) IN (2019, 2020, 2021)
                 """)  
                      
df_04 = spark.sql("""
                     SELECT  *, YEAR(review_date) AS YEAR, MONTH(review_date) AS MONTH,
                             CASE WHEN DAY(review_date) BETWEEN 1 AND 10 THEN '1_10'
                                  WHEN DAY(review_date) BETWEEN 11 AND 20 THEN '11_20'
                                  ELSE '21_31' END AS DAY
                     FROM    part_04
                     WHERE   YEAR(review_date) IN (2019, 2020, 2021)
                 """)           

df_total = df_01                    
df_total = df_total.union(df_02) 
df_total = df_total.union(df_03) 
df_total = df_total.union(df_04)

## 해당 SQL 결과

In [10]:
%pyspark
df_01 = spark.sql("""
                     SELECT  *, YEAR(review_date) AS YEAR, MONTH(review_date) AS MONTH,
                             CASE WHEN DAY(review_date) BETWEEN 1 AND 10 THEN '1_10'
                                  WHEN DAY(review_date) BETWEEN 11 AND 20 THEN '11_20'
                                  ELSE '21_31' END AS DAY
                     FROM    part_01
                     WHERE   YEAR(review_date) IN (2019, 2020, 2021)
                 """).show()

# 데이터를 파티셔닝하여 HDFS에 업로드

In [12]:
%pyspark
df_total.coalesce(1).write \
                    .option("header",True) \
                    .partitionBy("YEAR", "MONTH", "DAY") \
                    .mode("overwrite") \
                    .csv("hdfs://spark-master-01:9000/skybluelee/movie-partitioned")