In [1]:
from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder \
    .appName("241211_01_SparkSQL_SQLtest") \
    .getOrCreate()


24/12/11 10:28:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# 데이터 수집

In [2]:
from pyspark.sql import Row

user_data = [
    Row(user_id=1, username='A', address='서울'),
    Row(user_id=2, username='B', address='대전'),
    Row(user_id=3, username='C', address='경기도'),
    Row(user_id=4, username='D', address=None),
    Row(user_id=5, username='E', address=None),
    Row(user_id=6, username='F', address='서울'),
    Row(user_id=7, username='G', address='경기도'),
    Row(user_id=8, username='H', address='대구'),
    Row(user_id=9, username='I', address='부산'),
    Row(user_id=10, username='J', address='전주'),
    Row(user_id=11, username='K', address='광주')
]

In [3]:
user_df = spark.createDataFrame(user_data)
user_df.createOrReplaceTempView('users')

In [4]:
books_data = [
    Row(book_id=1, title="Book A", author_fname="John", author_lname="Doe", pages=300, released_year=2005, stock_quantity=55),
    Row(book_id=2, title="Book B", author_fname="Jane", author_lname="Smith", pages=250, released_year=2010, stock_quantity=40),
    Row(book_id=3, title="Book C", author_fname="Emily", author_lname="Jones", pages=180, released_year=2015, stock_quantity=20),
    Row(book_id=4, title="Book D", author_fname="Chris", author_lname="Brown", pages=320, released_year=2012, stock_quantity=75),
    Row(book_id=5, title="Book E", author_fname="Anna", author_lname="Davis", pages=270, released_year=2008, stock_quantity=35)
]


In [5]:
books_df = spark.createDataFrame(books_data)
books_df.createOrReplaceTempView('books')

In [1]:
query_users = '''
SELECT address, 
	   IF(address IN ('경기도', '서울'), '수도권', '지방') AS region 
FROM users;
'''
spark.sql(query_users).show()

NameError: name 'spark' is not defined

In [None]:
# books table
# stock_quantity >=50 '재고 많음', >= 30 '재고 중간', '재고 없음'


In [10]:
books_sql = '''
SELECT stock_quantity, 
	   IF(stock_quantity >= 50, '재고 많음',
		  IF(stock_quantity >= 30, '재고 중간', '재고 없음')) AS quantity_level
FROM books;
'''
spark.sql(books_sql).show()

+--------------+--------------+
|stock_quantity|quantity_level|
+--------------+--------------+
|            55|     재고 많음|
|            40|     재고 중간|
|            20|     재고 없음|
|            75|     재고 많음|
|            35|     재고 중간|
+--------------+--------------+



In [11]:
books_sql_1= '''
SELECT stock_quantity, 
	   CASE 
		   WHEN stock_quantity >= 50 THEN '재고 많음'
		   WHEN stock_quantity >= 30 THEN '재고 중간'
		   ELSE '재고 부족'
	   END AS quantity_level
FROM books;
'''
spark.sql(books_sql_1).show()

+--------------+--------------+
|stock_quantity|quantity_level|
+--------------+--------------+
|            55|     재고 많음|
|            40|     재고 중간|
|            20|     재고 부족|
|            75|     재고 많음|
|            35|     재고 중간|
+--------------+--------------+



# 실행계획 비교

In [13]:
spark.sql(books_sql).explain()

== Physical Plan ==
*(1) Project [stock_quantity#12L, if ((stock_quantity#12L >= 50)) 재고 많음 else if ((stock_quantity#12L >= 30)) 재고 중간 else 재고 없음 AS quantity_level#96]
+- *(1) Scan ExistingRDD[book_id#6L,title#7,author_fname#8,author_lname#9,pages#10L,released_year#11L,stock_quantity#12L]




In [14]:
spark.sql(books_sql_1).explain()

== Physical Plan ==
*(1) Project [stock_quantity#12L, CASE WHEN (stock_quantity#12L >= 50) THEN 재고 많음 WHEN (stock_quantity#12L >= 30) THEN 재고 중간 ELSE 재고 부족 END AS quantity_level#99]
+- *(1) Scan ExistingRDD[book_id#6L,title#7,author_fname#8,author_lname#9,pages#10L,released_year#11L,stock_quantity#12L]




In [15]:
books_sql_2 = '''
select distinct author_lname from books;
'''
spark.sql(books_sql_2).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[author_lname#9], functions=[])
+- Exchange hashpartitioning(author_lname#9, 200), ENSURE_REQUIREMENTS, [id=#111]
   +- *(1) HashAggregate(keys=[author_lname#9], functions=[])
      +- *(1) Project [author_lname#9]
         +- *(1) Scan ExistingRDD[book_id#6L,title#7,author_fname#8,author_lname#9,pages#10L,released_year#11L,stock_quantity#12L]




In [16]:
spark.sql(books_sql_2).show()

                                                                                

+------------+
|author_lname|
+------------+
|       Jones|
|       Davis|
|       Smith|
|         Doe|
|       Brown|
+------------+



In [17]:
books_sql_3 = '''
select author_lname, count(*)
from books
group by author_lname;
'''
spark.sql(books_sql_3).explain()
spark.sql(books_sql_3).show()

== Physical Plan ==
*(2) HashAggregate(keys=[author_lname#9], functions=[count(1)])
+- Exchange hashpartitioning(author_lname#9, 200), ENSURE_REQUIREMENTS, [id=#158]
   +- *(1) HashAggregate(keys=[author_lname#9], functions=[partial_count(1)])
      +- *(1) Project [author_lname#9]
         +- *(1) Scan ExistingRDD[book_id#6L,title#7,author_fname#8,author_lname#9,pages#10L,released_year#11L,stock_quantity#12L]


+------------+--------+
|author_lname|count(1)|
+------------+--------+
|       Jones|       1|
|       Davis|       1|
|       Smith|       1|
|         Doe|       1|
|       Brown|       1|
+------------+--------+



# 데이터 변경

In [19]:
# books 테이블 데이터에 borrowed_by 추가
books_data_with_user = [
    Row(book_id=1, title="Book A", author_fname="John", author_lname="Doe", pages=300, released_year=2005, stock_quantity=55, borrowed_by=1),
    Row(book_id=2, title="Book B", author_fname="Jane", author_lname="Smith", pages=250, released_year=2010, stock_quantity=40, borrowed_by=2),
    Row(book_id=3, title="Book C", author_fname="Emily", author_lname="Jones", pages=180, released_year=2015, stock_quantity=20, borrowed_by=3),
    Row(book_id=4, title="Book D", author_fname="Chris", author_lname="Brown", pages=320, released_year=2012, stock_quantity=75, borrowed_by=None),
    Row(book_id=5, title="Book E", author_fname="Anna", author_lname="Davis", pages=270, released_year=2008, stock_quantity=35, borrowed_by=6)
]

# DataFrame 생성
books_df_with_user = spark.createDataFrame(books_data_with_user)

# Temp View 등록
books_df_with_user.createOrReplaceTempView("books")

In [None]:
# # borrowed_by 컬럼 추가 및 데이터 입력
# updated_books_df = books_df.withColumn(
#     "borrowed_by",
#     when(books_df.book_id == 1, 1)
#     .when(books_df.book_id == 2, 2)
#     .when(books_df.book_id == 3, 3)
#     .when(books_df.book_id == 4, lit(None))
#     .when(books_df.book_id == 5, 6)
#     .otherwise(None)
# )


In [20]:
books_sql = '''
SELECT *
FROM books;
'''
spark.sql(books_sql).show()

+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      1|Book A|        John|         Doe|  300|         2005|            55|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            40|          2|
|      3|Book C|       Emily|       Jones|  180|         2015|            20|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            75|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            35|          6|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



In [22]:
from pyspark.sql.functions import *

In [23]:
#book_id = 3, stock_quantity=50으로 바꾼다. > 전처리 과정

updated_books_df = books_df_with_user.withColumn(
    "stock_quantity",
    when(books_df_with_user.book_id == 3, 50).otherwise(books_df_with_user.stock_quantity)
)
updated_books_df.show()

+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      1|Book A|        John|         Doe|  300|         2005|            55|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            40|          2|
|      3|Book C|       Emily|       Jones|  180|         2015|            50|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            75|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            35|          6|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



In [24]:
# stock_quantity * 10% 증가

#뷰로 등록
updated_books_df.createOrReplaceTempView("books")
spark.sql("select * from books").show()

The history saving thread hit an unexpected error (OperationalError('attempt to write a readonly database')).History will not be written to the database.
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      1|Book A|        John|         Doe|  300|         2005|            55|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            40|          2|
|      3|Book C|       Emily|       Jones|  180|         2015|            50|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            75|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            35|          6|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



# 데이터 저장

In [25]:
# write 의 저장 mode : overwrite, append, ignore, error

updated_books_df.write.csv("data/output/sqltest_updated_books.csv", header=True, mode="overwrite")


In [26]:
user_df.write.csv("data/output/sqltest_updated_users.csv", header=True, mode="overwrite")


In [27]:
updated_books_df1 = spark.read.csv("data/output/sqltest_updated_books.csv", header=True)

In [29]:
user_df1 = spark.read.csv("data/output/sqltest_updated_users.csv", header=True)

In [28]:
updated_books_df1.show()

+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      3|Book C|       Emily|       Jones|  180|         2015|            50|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            75|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            35|          6|
|      1|Book A|        John|         Doe|  300|         2005|            55|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            40|          2|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



In [30]:
user_df1.show()

+-------+--------+-------+
|user_id|username|address|
+-------+--------+-------+
|      6|       F|   서울|
|      7|       G| 경기도|
|      8|       H|   대구|
|      9|       I|   부산|
|     10|       J|   전주|
|     11|       K|   광주|
|      1|       A|   서울|
|      2|       B|   대전|
|      3|       C| 경기도|
|      4|       D|   null|
|      5|       E|   null|
+-------+--------+-------+



# 조인 실습

In [32]:
#book_id, title, author_fname, author_lname, username, address
join_query = '''
SELECT book_id, title, author_fname, author_lname, username, address
FROM books b INNER JOIN users u ON b.borrowed_by = u.user_id;
'''
spark.sql(join_query).show()

+-------+------+------------+------------+--------+-------+
|book_id| title|author_fname|author_lname|username|address|
+-------+------+------------+------------+--------+-------+
|      5|Book E|        Anna|       Davis|       F|   서울|
|      1|Book A|        John|         Doe|       A|   서울|
|      3|Book C|       Emily|       Jones|       C| 경기도|
|      2|Book B|        Jane|       Smith|       B|   대전|
+-------+------+------------+------------+--------+-------+



In [None]:
# books LEFT JOIN users

In [None]:
# 사용자의 책 대여 목록 > 전체 사용자 > 대여한 정보가 있으면 나오면, 없으면 NULL
# books RIGHT JOIN users

In [None]:
# 특정지역=서울에 거주하는 사용자가 대여한 책 목록
join_query = '''
SELECT book_id, title, author_fname, author_lname, username, address
FROM books b LEFT JOIN users u ON b.borrowed_by = u.user_id
WHERE u.address = '서울';
'''


In [None]:
#사용자별로 대여한 책 수
join_query = '''
SELECT user_id, username, count(book_id)
FROM users u LEFT JOIN books b ON u.user_id = b.borrowed_by
GROUP BY u.user_id, u.username;
'''



In [33]:
#book_category > 300 이상이면 Long, Short
join_query = '''
SELECT book_id,title, pages, CASE 
                        WHEN pages>=300 THEN 'Long' ELSE 'Short'
                        END AS book_category
FROM books;
'''

In [None]:
#stock_quantity > 50 이상 '충분', 30 이상 '보통', 미만 '부족'
'''
SELECT book_id, title, stock_quantity, CASE WHEN stock_quantity>=50 THEN '충분'
                                            WHEN stock_quantity>=30 THEN '보통'
                                            ELSE '부족' END AS stock_status
FROM books;
'''

In [None]:
#책제목에 특정 키워드가 포함되어 있는지 확인할 때
'''WHERE title LIKE '%A%'''

In [None]:
#대여가 가장 많이 된 책의 작가를 조회
'''
SELECT author_fname, author_lname, count(book_id) as borrow_count
FROM books 
GROUP BY author_fname, author_lname
ORDER BY borrow_count DESC
LIMIT 1
'''

In [None]:
#책의 발행 연도별 대여 현황: 발행 연도별로 대여된 책의 수를 확인합니다.

In [None]:
# 사용자의 지역별 대여된 책 수: 사용자 지역별로 대여된 책의 수를 계산합니다.

In [None]:
# 대여되지 않은 책 중 가장 페이지 수가 많은 책: 대여되지 않은 책 중에서 페이지 수가 가장 많은 책을 조회합니다.

In [None]:
#실행계획, DAG 형태 분석

In [None]:
# csv 로 save

In [34]:
spark.stop()