In [1]:
!pip install pandas
!pip install sumy
!pip install psycopg2-binary
!pip install pyspark

[0m

In [2]:
import pandas as pd
from pandas import DataFrame as df
from pyspark.sql.functions import explode, trim, regexp_extract, array_join, col, regexp_replace, concat_ws, split, udf, lower, size, lit
from pyspark.sql.types import FloatType

# 실수 형태만 추출하는 함수 정의
def extract_float(text):
    import re
    result = re.findall(r'\d+\.\d+', text)
    if result:
        return float(result[0])
    else:
        return None

extract_float_udf = udf(extract_float, FloatType())

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

In [4]:
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [5]:
# Spark 세션 초기화
spark = SparkSession.builder \
    .appName("JSON to DataFrame") \
    .getOrCreate()

# JSON 파일 읽기
df = spark.read.option("multiline", "true").json("codeit_lectures.json")

# CATEGORY 테이블에 삽입할 정보를 담은 df
df_category = df.select(col("big_categ").alias("category_name")) \
                .withColumn("platform_id", lit(3)) \
                .dropna()

# SUBCATEGORY 테이블에 삽입할 정보를 담은 df
df_subcategory = df.withColumn("sub_categs", explode(col("sub_categs"))) \
                .select(
                    col("big_categ").alias("category_name"),
                    col("sub_categs.sub_categ").alias("subcategory_name")
                ) \
                .dropna()

# COURSE 테이블에 삽입할 정보를 담은 df
df_course = df.withColumn("sub_categs", explode(col("sub_categs"))) \
                .withColumn("lectures", explode(col("sub_categs.lectures"))) \
                .select(
                    col("sub_categs.sub_categ").alias("subcategory_name"),
                    col("lectures.title").alias("course_title"),
                    col("lectures.summary").alias("summary"),
                    col("lectures.lecture_num").alias("num_of_lecture"),
                    col("lectures.link").alias("url")
                ) \
                .dropna()

# num_of_lecture에서 숫자 정보만 추출
df_course = df_course.withColumn("num_of_lecture", regexp_extract(col("num_of_lecture"), "\\d+", 0).cast("int")) 
df_course = df_course.dropDuplicates(["url"])

# 최종 데이터 확인
df_category.show()
df_subcategory.show()
df_course.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/23 17:11:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+---------------+-----------+
|  category_name|platform_id|
+---------------+-----------+
|        웹 개발|          3|
|데이터 사이언스|          3|
|컴퓨터 사이언스|          3|
|프로그래밍 언어|          3|
|           기타|          3|
+---------------+-----------+

+---------------+--------------------+
|  category_name|    subcategory_name|
+---------------+--------------------+
|        웹 개발|          프론트엔드|
|        웹 개발|              백엔드|
|        웹 개발|              풀스택|
|데이터 사이언스|         데이터 분석|
|데이터 사이언스|           인공 지능|
|컴퓨터 사이언스|     프로그래밍 기초|
|컴퓨터 사이언스|   알고리즘·자료구조|
|컴퓨터 사이언스|객체 지향 프로그래밍|
|프로그래밍 언어|              Python|
|프로그래밍 언어|          JavaScript|
|           기타|        데이터베이스|
|           기타|         업무 자동화|
|           기타|           개발 도구|
|           기타|             IT 교양|
|           기타|              디자인|
+---------------+--------------------+

+-----------------+-----------------------------+-----------------------------------+--------------+--------------------+
| subcategory_name|    

In [6]:
df_course.count() 

81

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, split

# Spark 세션 초기화
spark = SparkSession.builder \
    .appName("JSON to DataFrame") \
    .getOrCreate()

# JSON 파일 읽기
df = spark.read.option("multiline", "true").json("codeit_lectures.json")

# SUBSECTION 테이블에 삽입할 정보를 담은 df
df_subsection = df.withColumn("sub_categs", explode(col("sub_categs"))) \
                .withColumn("lectures", explode(col("sub_categs.lectures"))) \
                .select(
                    col("lectures.title").alias("course_title"),
                    col("lectures.curriculum").alias("section_str")
                )

# "숫자\n"로 시작하는 패턴을 기준으로 문자열 분할
split_df = df_subsection.withColumn("split_sections", split("section_str", "\d+\n"))

# 첫 번째 요소는 빈 문자열이므로 제거
split_df = split_df.withColumn("split_sections", split_df["split_sections"].getItem(1))

# 최종 데이터 확인
split_df.show(truncate=False)

# Spark 세션 종료

+------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
# Spark 세션 초기화
spark = SparkSession.builder \
    .appName("JSON to DataFrame") \
    .getOrCreate()

# 데이터베이스 정보 설정
db_properties = {
    "driver": "org.postgresql.Driver",
    "user": "khudade",
    "password": "khudade"
}

# 데이터베이스 URL 설정
db_url = "jdbc:postgresql://54.180.100.57:5432/lecture_db"

In [8]:
# 데이터베이스에서 CATEGORY 정보 읽어와서 저장
df_category = spark.read.jdbc(url=db_url, table="CATEGORY", properties=db_properties)

# SUBCATEGORY 데이터프레임에 category_id 정보 매핑
df_subcategory = df_subcategory.join(df_category, df_subcategory.category_name == df_category.category_name, "left_outer") \
    .select(df_subcategory["*"], df_category.category_id.alias("category_id"))

df_subcategory = df_subcategory.select("category_id", "subcategory_name")

df_subcategory.show()

+-----------+--------------------+
|category_id|    subcategory_name|
+-----------+--------------------+
|          4|              Python|
|          4|          JavaScript|
|          5|        데이터베이스|
|          5|         업무 자동화|
|          5|           개발 도구|
|          5|             IT 교양|
|          5|              디자인|
|          1|          프론트엔드|
|          1|              백엔드|
|          1|              풀스택|
|          2|         데이터 분석|
|          2|           인공 지능|
|          3|     프로그래밍 기초|
|          3|   알고리즘·자료구조|
|          3|객체 지향 프로그래밍|
+-----------+--------------------+



In [20]:
# # 데이터베이스에 CATEGORY 정보 삽입
# df_category.write.jdbc(url=db_url, table="CATEGORY", mode="append", properties=db_properties)

# # 데이터베이스에 SUBCATEGORY 정보 삽입
# df_subcategory.write.jdbc(url=db_url, table="SUBCATEGORY", mode="append", properties=db_properties)

24/05/23 16:55:03 ERROR Executor: Exception in task 0.0 in stage 47.0 (TID 33)
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO CATEGORY ("category_id","category_name","platform_id") VALUES (1,'웹 개발',3) was aborted: ERROR: duplicate key value violates unique constraint "category_pkey"
  Detail: Key (category_id)=(1) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:169)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2286)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:521)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:870)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:893)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:740)
	at org.apa

Py4JJavaError: An error occurred while calling o188.jdbc.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 47.0 failed 1 times, most recent failure: Lost task 0.0 in stage 47.0 (TID 33) (fb2c885a1f24 executor driver): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO CATEGORY ("category_id","category_name","platform_id") VALUES (1,'웹 개발',3) was aborted: ERROR: duplicate key value violates unique constraint "category_pkey"
  Detail: Key (category_id)=(1) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:169)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2286)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:521)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:870)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:893)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:740)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:891)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1009)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1009)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "category_pkey"
  Detail: Key (category_id)=(1) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285)
	... 19 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1009)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1007)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:890)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:70)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:753)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO CATEGORY ("category_id","category_name","platform_id") VALUES (1,'웹 개발',3) was aborted: ERROR: duplicate key value violates unique constraint "category_pkey"
  Detail: Key (category_id)=(1) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:169)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2286)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:521)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:870)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:893)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:740)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:891)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1009)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1009)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "category_pkey"
  Detail: Key (category_id)=(1) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285)
	... 19 more


In [9]:
df_subcategory = spark.read.jdbc(url=db_url, table="SUBCATEGORY", properties=db_properties)
df_subcategory.show()

df_course = df_course.join(df_subcategory, df_course.subcategory_name == df_subcategory.subcategory_name, "left_outer") \
    .select(df_course["*"], df_subcategory.subcategory_id.alias("subcategory_id"))

df_course.show()

+--------------+--------------------+-----------+
|subcategory_id|    subcategory_name|category_id|
+--------------+--------------------+-----------+
|             1|              Python|          4|
|             2|          JavaScript|          4|
|             3|        데이터베이스|          5|
|             4|         업무 자동화|          5|
|             5|           개발 도구|          5|
|             6|             IT 교양|          5|
|             7|              디자인|          5|
|             8|          프론트엔드|          1|
|             9|              백엔드|          1|
|            10|              풀스택|          1|
|            11|         데이터 분석|          2|
|            12|           인공 지능|          2|
|            13|     프로그래밍 기초|          3|
|            14|   알고리즘·자료구조|          3|
|            15|객체 지향 프로그래밍|          3|
+--------------+--------------------+-----------+

+-----------------+-----------------------------+-----------------------------------+--------------+-------------

In [10]:
# 모든 컬럼 이름 가져오기
all_columns = df_course.columns

# "subcategory_name"을 제외한 컬럼 이름 리스트 만들기
columns_to_select = [col for col in all_columns if col != "subcategory_name"]

# 해당 컬럼들을 선택하여 새로운 데이터프레임 생성
df_course = df_course.select(*columns_to_select)

# # 데이터베이스에 SUBCATEGORY 정보 삽입
# df_course.write.jdbc(url=db_url, table="COURSE", mode="append", properties=db_properties)

81
81


In [None]:
# Spark 세션 종료
spark.stop()