In [1]:
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 064c4b62-7675-461e-9c80-65b7b8b21a6c
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 064c4b62-7675-461e-9c80-65b7b8b21a6c to get into ready status...
Session 064c4b62-7675-461e-9c80-65b7b8b21a6c has been created.



In [31]:
# 두 테이블을 glue 데이터 카탈로그에서 DynamicFrame으로 로드
table1 = glueContext.create_dynamic_frame.from_catalog(
    database = "final_raw_data",
    table_name = "jeju_latenight_pharmacy_2023"
)


table2 = glueContext.create_dynamic_frame.from_catalog(
    database="final_raw_data",
    table_name = "jeju_latenight_pharmacy_2019"
)




In [32]:
#DynamicFrame을 Spark DataFrame으로 변환
df1 = table1.toDF()
df2 = table2.toDF()



In [33]:
df1 = df1.withColumnRenamed('col0', 'Data Reference Date') \
         .withColumnRenamed('col1', 'Address') \
         .withColumnRenamed('col2', 'Pharmacy Name') \
         .withColumnRenamed('col3', 'Contact Number') \
         .withColumnRenamed('col4', 'Operating DoW') \
         .withColumnRenamed('col5', 'Township/Village/Neighborhood') \
         .withColumnRenamed('col6', 'Administrative City')

# 첫 번째 행을 제거
df1 = df1.filter(df1["Data Reference Date"] != "Data Reference Date")

# 결과 확인
df1.show(3)


+-------------------+----------------------------------+-------------+--------------+------------------------------+-----------------------------+-------------------+
|Data Reference Date|                           Address|Pharmacy Name|Contact Number|                 Operating DoW|Township/Village/Neighborhood|Administrative City|
+-------------------+----------------------------------+-------------+--------------+------------------------------+-----------------------------+-------------------+
|         2024-05-28|제주특별자치도 제주시 한림읍 한...|     현재약국|  064-796-9333|월+화+수+목+금+토(둘째·넷째...|                       한림읍|             제주시|
|         2024-05-28|제주특별자치도 제주시 조천읍 신...|     영재약국|  064-783-1959|             월+화+수+목+금+토|                       조천읍|             제주시|
|         2024-05-28|제주특별자치도 제주시 조천읍 신...|     조천약국|  064-783-8989|                      월+수+금|                       조천읍|             제주시|
+-------------------+----------------------------------+-------------+--------------+----

In [34]:
df2 = df2.withColumnRenamed('col0', 'Data Reference Date') \
         .withColumnRenamed('col1', 'Notes') \
         .withColumnRenamed('col2', 'Address') \
         .withColumnRenamed('col3', 'City/County') \
         .withColumnRenamed('col4', 'Pharmacy Name') \
         .withColumnRenamed('col5', 'Opening Time') \
         .withColumnRenamed('col6', 'Operating Days') \
         .withColumnRenamed('col7', 'Closing Time') \
         .withColumnRenamed('col8', 'Phone Number') \
         .withColumnRenamed('col9', 'Region')

# 첫 번째 행을 컬럼으로 사용하고 첫 번째 행을 데이터에서 제거
new_columns = df2.first()  # 첫 번째 행을 컬럼 이름으로 사용
df2 = df2.filter(df2["Data Reference Date"] != new_columns["Data Reference Date"])  # 첫 번째 행 제거

# 새로운 컬럼 이름을 사용하여 DataFrame 생성
df2 = df2.toDF(*new_columns)

# 결과 확인
df2.show(3)



+-------------------+------------+----------------------------------+-----------+-------------+------------+--------------------------+------------+------------+------+
|Data Reference Date|       Notes|                           Address|City/County|Pharmacy Name|Opening Time|            Operating Days|Closing Time|Phone Number|Region|
+-------------------+------------+----------------------------------+-----------+-------------+------------+--------------------------+------------+------------+------+
|         2019-05-22|특이사항없음|제주특별자치도 제주시 한림읍 한...|     제주시|     현재약국|       20:00|일요일 및 법정 공휴일 제외|       23:00|064-796-9333|한림읍|
|         2019-05-22|특이사항없음|제주특별자치도 제주시 조천읍 신...|     제주시|     영재약국|       20:00|일요일 및 법정 공휴일 제외|       23:00|064-783-1959|조천읍|
|         2019-05-22|  월. 수. 금|제주특별자치도 제주시 조천읍 신...|     제주시|     조천약국|       20:00|일요일 및 법정 공휴일 제외|       23:00|064-783-8989|조천읍|
+-------------------+------------+----------------------------------+-----------+-------------+-----------

In [35]:
df2 = df2.withColumnRenamed("Phone Number", "Contact Number")
df2 = df2.withColumnRenamed("Region", "Township/Village/Neighborhood")





In [36]:
merged_df = df1.join(df2, ["Data Reference Date", "Address", "Pharmacy Name", "Contact Number", "Township/Village/Neighborhood"], "outer")




In [47]:
from pyspark.sql.functions import col, when
merged_df = merged_df.withColumn("Operating Days", when(col("Operating DoW").isNull(), col("Operating Days")).otherwise(col("Operating DoW")))
merged_df = merged_df.withColumn("Administrative City", when(col("Administrative City").isNull(), col("City/County")).otherwise(col("Administrative City")))

final_df = merged_df.select("Data Reference Date", "Address", "Pharmacy Name", "Contact Number", "Operating DoW", "Township/Village/Neighborhood", "Administrative City", 
                            "Notes", "Opening Time", "Closing Time")




In [48]:
final_df.show()

+-------------------+-----------------------------------+-------------+--------------+------------------------------+-----------------------------+-------------------+------------+------------+------------+
|Data Reference Date|                            Address|Pharmacy Name|Contact Number|                 Operating DoW|Township/Village/Neighborhood|Administrative City|       Notes|Opening Time|Closing Time|
+-------------------+-----------------------------------+-------------+--------------+------------------------------+-----------------------------+-------------------+------------+------------+------------+
|         2019-05-22| 제주특별자치도 서귀포시 남원읍 ...|     조광약국|  064-764-4159|                          null|                       남원읍|           서귀포시|      월. 수|       20:00|       23:00|
|         2019-05-22| 제주특별자치도 서귀포시 대정읍 ...|     건강약국|  064-794-7575|                          null|                       대정읍|           서귀포시|      화. 목|       20:00|       23:00|
|         2019-05-2

In [49]:
from pyspark.sql.functions import to_date

# 데이터프레임의 '데이터기준일자' 열을 date 타입으로 변환
final_df = final_df.withColumn("Data Reference Date", to_date(final_df["Data Reference Date"], "yyyy-MM-dd"))




In [50]:
final_df.write.format("parquet").mode("overwrite").save("s3://final-project-transformed-data/jeju-latenight-pharmacy/")




In [86]:
job.commit()


