In [None]:
# pyspark로 작성함
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F

# sparksession 생성
spark = SparkSession.builder.appName("loadDataToPy2Scala").getOrCreate()

In [None]:
df = spark.read.option("mergeSchema", "true").json("s3://path/to/s3/*/*")
df.show(10)

In [None]:
# 필요한 column을 withColumn 함수를 사용해서 추가

# timestamp column의 datatype을 string에서 timestamp로 변경
new_df = df.withColumn("timestamp_cov", col("timestamp").cast("timestamp"))

# timestamp 컬럼을 UTC로 지정
new_df = new_df.withColumn("utc_timestamp", to_utc_timestamp(col("timestamp_cov"), 'UTC'))

# timestamp 컬럼을 UTC에서 KTS로 변경
new_df = new_df.withColumn("kst_timestamp", from_utc_timestamp(col("utc_timestamp"), 'Asia/Seoul'))

# kst_timestamp 컬럼에서 1일을 더함
new_df = new_df.withColumn("add_kst_timestamp", F.date_add(col("kst_timestamp"), 1))

# kst_timestamp 컬럼에서 1일을 빼줌
new_df = new_df.withColumn("ywd", F.date_sub(col("kst_timestamp"), 1))

# add_kst_timestamp에서 datetime을 "ISO week"형태로 나타내줌
new_df = new_df.withColumn("yw", concat(year(col("add_kst_timestamp")), lit("-"), weekofyear(col("add_kst_timestamp")).cast("string")))

# timestamp에서 datetime을 "yyyy-MM"형태로 나타내줌
new_df = new_df.withColumn("ym", date_format(col("timestamp"), "yyyy-MM").cast("string"))

# timestamp에서 datetime을 "yyyy-MM-dd"형태로 나타내줌
new_df = new_df.withColumn("ymd", date_format(col("timestamp"), "yyyy-MM-dd").cast("string"))

new_df.show(10)

In [None]:
new_df.printSchema()

In [None]:
# selectExpr 함수로 필요한 컬럼만 조회
cov_new_df = new_df.selectExpr("schema",
                               "schema.schema AS schema_schema",
                               "schema.schema AS schema_schema",
                               "schema.schema AS schema_schema",
                               "schema.schema.schema AS schema_schema_schema",
                               "schema",
                               "schema.schema AS schema_schema",
                               "schema.schema AS properties_page",
                               "schema.schema.schema AS schema_schema_schema",
                               "schema.schema.schema AS schema_schema_schema",
                               "schema.schema.schema[0] AS schema_schema_schema_schema",
                               "schema.schema.schema[1] AS schema_schema_schema_schema",
                               "schema.schema.schema[0] AS schema_schema_schema_schema",
                               "schema.schema.schema[1] AS schema_schema_schema_schema",
                               "schema.schema.schema AS schema_schema_schema", 
                               "schema.schema.schema AS schema_schema_schema",
                               "schema.schema.schema AS schema_schema_schema",
                               "schema.schema.schema AS schema_schema_schema",
                               "schema",
                               "schema",
                               "kst_timestamp AS timestamp",
                               "yw",
                               "ywd",
                               "ym",
                               "ymd",
                               "schema",
                               "schema"
                              )

In [None]:
cov_new_df.show(10)

In [None]:
cov_new_df.printSchema()