# Spark, S3, Redshift와의 연동

In [None]:
%pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 원하는 기사 제목
article_title = ""

# conf 설정
conf = SparkConf()
conf.set("spark.hadoop.fs.s3a.access.key", "access_key")
conf.set("spark.hadoop.fs.s3a.secret.key", "secret_key")
conf.setAll([('spark.driver.extraClassPath', '/skybluelee/spark3/jars/hadoop-aws-3.3.1.jar:/skybluelee/spark3/jars/aws-java-sdk-bundle-1.11.901.jar:/skybluelee/spark3/jars/RedshiftJDBC4-1.2.1.1001.jar:/skybluelee/spark3/jars/RedshiftJDBC42-no-awssdk-1.2.36.1060.jar')])
# hadoop-aws, aws-java-sdk-bundle은 s3 <-> spark()
# RedshiftJDBC4, RedshiftJDBC42-no-awssdk는 spark <-> redshift
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
conf.set("spark.hadoop.fs.s3a.endpoint", "s3.ap-northeast-1.amazonaws.com")

spark = SparkSession.builder \
    .config(conf=conf) \
    .appName("Spark") \
    .getOrCreate()
spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

df1 = spark.read \
        .option("header", "true") \
        .csv("s3a://news-comments/comments_" + article_title)             
        
df1 = df1.withColumn('good', col('good').cast(IntegerType()))\
         .withColumn('bad', col('bad').cast(IntegerType()))\
         .withColumn("timestamp",to_timestamp("timestamp"))\
         .withColumn("written_time",to_timestamp("written_time")) \
         .withColumn('DiffInMinutes', round((unix_timestamp("timestamp") - unix_timestamp('written_time'))/60, 0))
# spark로 df를 읽는 경우 전부 string으로 읽기 때문에 형 변환         

df1.printSchema()      
df1.show()
df1.createOrReplaceTempView("comments")

df2 = spark.read \
        .option("header", "true") \
        .csv("s3a://news-comments/user_distribution_" + article_title)

df2 = df2.withColumn('total', col('total').cast(IntegerType()))\
         .withColumn('self_removed', col('self_removed').cast(IntegerType()))\
         .withColumn('auto_removed', col('auto_removed').cast(IntegerType()))\
         .withColumn('male', col('male').cast(IntegerType()))\
         .withColumn('female', col('female').cast(IntegerType()))\
         .withColumn('age_10', col('age_10').cast(IntegerType()))\
         .withColumn('age_20', col('age_20').cast(IntegerType()))\
         .withColumn('age_30', col('age_30').cast(IntegerType()))\
         .withColumn('age_40', col('age_40').cast(IntegerType()))\
         .withColumn('age_50', col('age_50').cast(IntegerType()))\
         .withColumn('age_60', col('age_60').cast(IntegerType()))\
         .withColumn("timestamp",to_timestamp("timestamp"))     
df2.printSchema()      
df2.show()
df2.createOrReplaceTempView("users")

# SQL Test

In [None]:
%pyspark
spark.sql(""" WITH temp AS(
                   SELECT comment, ROW_NUMBER() OVER (PARTITION BY timestamp ORDER BY good DESC) AS goodNo, timestamp, good, written_time, bad, DiffInMinutes,
                          total, male, female, age_10 ,age_20, age_30, age_40, age_50, age_60
                   FROM   comments C
                   JOIN users U USING(timestamp)
              )
              SELECT comment, good, bad, timestamp, written_time, DiffInMinutes, total, ROUND(good/total, 3) AS good_rate, ROUND(bad/total, 3) AS bad_rate, male, female, age_10 ,age_20, age_30, age_40, age_50, age_60
              FROM   temp
              WHERE  goodNo < 6""") \
     .show(100)

In [None]:
%pyspark
# 기사 입력 시간과 데이터 수집 시간 사이에 관계가 있을 것이라 생각하고 사용
df_test = df1.withColumn('DiffInMinutes', round((unix_timestamp("timestamp") - unix_timestamp('written_time'))/60, 0))

df_test.show()

In [None]:
%pyspark
# 각 시간대별로 좋아요 상위 10개의 댓글 수집
df_redshift_test= spark.sql(""" WITH temp AS(
                   SELECT C.title, comment, ROW_NUMBER() OVER (PARTITION BY timestamp ORDER BY good DESC) AS goodNo, timestamp, good, written_time, bad, DiffInMinutes,
                          total, male, female, age_10 ,age_20, age_30, age_40, age_50, age_60
                   FROM   comments C
                   JOIN users U USING(timestamp)
              )
              SELECT title, comment, good, bad, timestamp, written_time, DiffInMinutes, total, ROUND(good/total, 3) AS good_rate, ROUND(bad/total, 3) AS bad_rate, male, female, age_10 ,age_20, age_30, age_40, age_50, age_60
              FROM   temp
              WHERE  goodNo < 11""") 

# Redshift Upload

In [None]:
%pyspark
# redshift upload
df_redshift_test.write.format('jdbc').options(
      url='<jdbc url>',	  
      driver='com.amazon.redshift.jdbc42.Driver',
      dbtable='<schema>.<table>',
      user='<userid>',
      password='<password>').mode('overwrite').save() 