In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# SparkSession 생성
spark = SparkSession.builder \
    .appName("PySpark Example") \
    .getOrCreate()
 
# User 데이터 정의
user_data = [
    Row(user_id=1, username='A', address='서울'),
    Row(user_id=2, username='B', address='대전'),
    Row(user_id=3, username='C', address='경기도'),
    Row(user_id=4, username='D', address=None),
    Row(user_id=5, username='E', address=None),
    Row(user_id=6, username='F', address='서울'),
    Row(user_id=7, username='G', address='경기도'),
    Row(user_id=8, username='H', address='대구'),
    Row(user_id=9, username='I', address='부산'),
    Row(user_id=10, username='J', address='전주'),
    Row(user_id=11, username='K', address='광주')
]

# Books 데이터 정의
books_data = [
    Row(book_id=1, title="Book A", author_fname="John", author_lname="Doe", pages=300, released_year=2005, stock_quantity=55),
    Row(book_id=2, title="Book B", author_fname="Jane", author_lname="Smith", pages=250, released_year=2010, stock_quantity=40),
    Row(book_id=3, title="Book C", author_fname="Emily", author_lname="Jones", pages=180, released_year=2015, stock_quantity=20),
    Row(book_id=4, title="Book D", author_fname="Chris", author_lname="Brown", pages=320, released_year=2012, stock_quantity=75),
    Row(book_id=5, title="Book E", author_fname="Anna", author_lname="Davis", pages=270, released_year=2008, stock_quantity=35)
]

# User 데이터프레임 생성
user_df = spark.createDataFrame(user_data)

user_df.createOrReplaceTempView('users') # 테이블로 보이기  # SQL 사용을 위해 TempView 등록

# Books 데이터프레임 생성
books_df = spark.createDataFrame(books_data)
books_df.createOrReplaceTempView('books') 



24/12/12 17:11:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/12 17:11:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/12/12 17:11:02 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
query_users='''
select address,
    IF(address IS NULL, '주소없음', address) AS address
FROM users;

'''
spark.sql(query_users).show()

[Stage 0:>                                                          (0 + 1) / 1]

+-------+--------+
|address| address|
+-------+--------+
|   서울|    서울|
|   대전|    대전|
| 경기도|  경기도|
|   null|주소없음|
|   null|주소없음|
|   서울|    서울|
| 경기도|  경기도|
|   대구|    대구|
|   부산|    부산|
|   전주|    전주|
|   광주|    광주|
+-------+--------+



                                                                                

In [None]:
# books table
#stock_quantity >=50 '재고 많음', >= 30 '재고 중간', 재고 없음 


In [6]:
books_sql = '''
SELECT stock_quantity, 
	   IF(stock_quantity >= 50, '재고 많음',
		  IF(stock_quantity >= 30, '재고 중간', '재고 없음')) AS quantity_level
FROM books;
'''
spark.sql(books_sql).show()

+--------------+--------------+
|stock_quantity|quantity_level|
+--------------+--------------+
|            55|     재고 많음|
|            40|     재고 중간|
|            20|     재고 없음|
|            75|     재고 많음|
|            35|     재고 중간|
+--------------+--------------+



In [7]:
books_sql_1= '''
SELECT stock_quantity, 
	   CASE 
		   WHEN stock_quantity >= 50 THEN '재고 많음'
		   WHEN stock_quantity >= 30 THEN '재고 중간'
		   ELSE '재고 부족'
	   END AS quantity_level
FROM books;
'''
spark.sql(books_sql_1).show()

+--------------+--------------+
|stock_quantity|quantity_level|
+--------------+--------------+
|            55|     재고 많음|
|            40|     재고 중간|
|            20|     재고 부족|
|            75|     재고 많음|
|            35|     재고 중간|
+--------------+--------------+



# 실행계획 비교 

In [16]:
spark.sql(books_sql).explain()

== Physical Plan ==
*(1) Project [stock_quantity#77L, if ((stock_quantity#77L >= 50)) 재고 많음 else if ((stock_quantity#77L >= 30)) 재고 중간 else 재고 없음 AS quantity_level#145]
+- *(1) Scan ExistingRDD[book_id#71L,title#72,author_fname#73,author_lname#74,pages#75L,released_year#76L,stock_quantity#77L]




In [17]:
spark.sql(books_sql_1).explain()

== Physical Plan ==
*(1) Project [stock_quantity#77L, CASE WHEN (stock_quantity#77L >= 50) THEN 재고 많음 WHEN (stock_quantity#77L >= 30) THEN 재고 중간 ELSE 재고 부족 END AS quantity_level#148]
+- *(1) Scan ExistingRDD[book_id#71L,title#72,author_fname#73,author_lname#74,pages#75L,released_year#76L,stock_quantity#77L]




In [18]:
books_sql_2 = '''
select distinct author_lname from books;
'''
spark.sql(books_sql_2).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[author_lname#74], functions=[])
+- Exchange hashpartitioning(author_lname#74, 200), ENSURE_REQUIREMENTS, [id=#117]
   +- *(1) HashAggregate(keys=[author_lname#74], functions=[])
      +- *(1) Project [author_lname#74]
         +- *(1) Scan ExistingRDD[book_id#71L,title#72,author_fname#73,author_lname#74,pages#75L,released_year#76L,stock_quantity#77L]




In [20]:
spark.sql(books_sql_2).show()



+------------+
|author_lname|
+------------+
|       Jones|
|       Davis|
|       Smith|
|         Doe|
|       Brown|
+------------+



In [21]:
books_sql_3 = '''
select author_lname, count(*)
from books
group by author_lname;
'''
spark.sql(books_sql_3).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[author_lname#74], functions=[count(1)])
+- Exchange hashpartitioning(author_lname#74, 200), ENSURE_REQUIREMENTS, [id=#164]
   +- *(1) HashAggregate(keys=[author_lname#74], functions=[partial_count(1)])
      +- *(1) Project [author_lname#74]
         +- *(1) Scan ExistingRDD[book_id#71L,title#72,author_fname#73,author_lname#74,pages#75L,released_year#76L,stock_quantity#77L]




In [23]:
spark.sql(books_sql_3).show()

+------------+--------+
|author_lname|count(1)|
+------------+--------+
|       Jones|       1|
|       Davis|       1|
|       Smith|       1|
|         Doe|       1|
|       Brown|       1|
+------------+--------+



#  데이터 변경 

In [28]:
# books 테이블 데이터에 borrowed_by 추가
books_data_with_user = [
    Row(book_id=1, title="Book A", author_fname="John", author_lname="Doe", pages=300, released_year=2005, stock_quantity=55, borrowed_by=1),
    Row(book_id=2, title="Book B", author_fname="Jane", author_lname="Smith", pages=250, released_year=2010, stock_quantity=40, borrowed_by=2),
    Row(book_id=3, title="Book C", author_fname="Emily", author_lname="Jones", pages=180, released_year=2015, stock_quantity=20, borrowed_by=3),
    Row(book_id=4, title="Book D", author_fname="Chris", author_lname="Brown", pages=320, released_year=2012, stock_quantity=75, borrowed_by=None),
    Row(book_id=5, title="Book E", author_fname="Anna", author_lname="Davis", pages=270, released_year=2008, stock_quantity=35, borrowed_by=6)
]


# DataFrame 생성
books_df_with_user = spark.createDataFrame(books_data_with_user)

# Temp View 등록
books_df_with_user.createOrReplaceTempView("books")

In [29]:
books_sql = '''
SELECT *
FROM books;
'''
spark.sql(books_sql).show()

+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      1|Book A|        John|         Doe|  300|         2005|            55|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            40|          2|
|      3|Book C|       Emily|       Jones|  180|         2015|            20|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            75|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            35|          6|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



In [35]:
from pyspark.sql.functions import * 
#book_id =3, stock_quantity= 50 으로 바꾼다. -> 전처리 과정
updated_books_df= books_df_with_user.withColumn( 
    "stock_quantity",
    when(books_df_with_user.book_id == 3, 50).otherwise(books_df_with_user.stock_quantity)
)
updated_books_df.show()

+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      1|Book A|        John|         Doe|  300|         2005|            55|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            40|          2|
|      3|Book C|       Emily|       Jones|  180|         2015|            50|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            75|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            35|          6|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



In [36]:
#stock_quantity * 10% 증가 

updated_books_df= books_df_with_user.withColumn( 
    "stock_quantity",
    (col("stock_quantity")*1.1).cast("int")
)

#뷰로 등록 
updated_books_df.createOrReplaceTempView("books")
spark.sql("select* from books").show()

+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      1|Book A|        John|         Doe|  300|         2005|            60|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            44|          2|
|      3|Book C|       Emily|       Jones|  180|         2015|            22|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            82|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            38|          6|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



# 데이터 저장 

In [39]:
# mode : overwrite, append, ignore, error

updated_books_df.write.csv("data/output/sqltest_updated_books.csv", header=True, mode="overwrite")


In [42]:
user_df.write.csv("user", header=True, mode="overwrite")

In [43]:
updated_books_df1= spark.read.csv("data/output/sqltest_updated_books.csv", header=True)

In [44]:
updated_books_df1.show()

+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      3|Book C|       Emily|       Jones|  180|         2015|            22|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            82|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            38|          6|
|      1|Book A|        John|         Doe|  300|         2005|            60|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            44|          2|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



In [45]:
user_df1= spark.read.csv("user", header=True, mode="overwrite")

24/12/11 13:33:14 WARN ParseMode: overwrite is not a valid parse mode. Using PERMISSIVE.


In [None]:
user_df1.show()

# 조인 실습

In [48]:
#book_id, author_fnaem, author_lname, username, address
join_query= '''
SELECT book_id,title, author_fname, author_lname, username, address
FROM books b INNER JOIN users u ON b.borrowed_by = u.user_id;
'''
spark.sql(join_query).show()



+-------+------+------------+------------+--------+-------+
|book_id| title|author_fname|author_lname|username|address|
+-------+------+------------+------------+--------+-------+
|      5|Book E|        Anna|       Davis|       F|   서울|
|      1|Book A|        John|         Doe|       A|   서울|
|      3|Book C|       Emily|       Jones|       C| 경기도|
|      2|Book B|        Jane|       Smith|       B|   대전|
+-------+------+------------+------------+--------+-------+



In [None]:
#books LEFT JOIN users 
SELECT book_id,title, author_fname, author_lname, username, address
FROM books b LEFT JOIN users u ON b.borrowed_by = u.user_id;

In [None]:
# 사용자의 책 대여 목록 > 전체 사용자 > 대여한 정보가 있으면 나오면, 없으면 NULL
# books RIGHT JOIN users

In [51]:
#user 기준으로 book 붙이기 

join_query= '''
SELECT book_id,title, author_fname, author_lname, username, address
FROM users u LEFT JOIN books b ON u.user_id= b.borrowed_by;
'''
spark.sql(join_query).show()



+-------+------+------------+------------+--------+-------+
|book_id| title|author_fname|author_lname|username|address|
+-------+------+------------+------------+--------+-------+
|   null|  null|        null|        null|       G| 경기도|
|      5|Book E|        Anna|       Davis|       F|   서울|
|   null|  null|        null|        null|       I|   부산|
|   null|  null|        null|        null|       E|   null|
|      1|Book A|        John|         Doe|       A|   서울|
|   null|  null|        null|        null|       J|   전주|
|      3|Book C|       Emily|       Jones|       C| 경기도|
|   null|  null|        null|        null|       H|   대구|
|   null|  null|        null|        null|       K|   광주|
|      2|Book B|        Jane|       Smith|       B|   대전|
|   null|  null|        null|        null|       D|   null|
+-------+------+------------+------------+--------+-------+



In [None]:
# 특정지역= 서울에 거주하는 사용자가 대여한 책 목록 

join_query= '''
SELECT book_id,title, author_fname, author_lname, username, address
FROM users u LEFT JOIN books b ON u.user_id= b.borrowed_by
WHERE u.address = '서울';
'''

In [53]:
# 사용자별로 대여한 책 수
join_query= '''
SELECT user_id, username , count(book_id)
FROM users u LEFT JOIN books b ON u.user_id= b.borrowed_by -- 실행은 ;  
group by u.user_id, u.username;  
'''
spark.sql(join_query).show() # MYSQL에 입력하면 똑같은 결과값



+-------+--------+--------------+
|user_id|username|count(book_id)|
+-------+--------+--------------+
|      7|       G|             0|
|      6|       F|             1|
|      9|       I|             0|
|      5|       E|             0|
|      1|       A|             1|
|     10|       J|             0|
|      3|       C|             1|
|      8|       H|             0|
|     11|       K|             0|
|      2|       B|             1|
|      4|       D|             0|
+-------+--------+--------------+



In [56]:
#book_category >300 이상이면 long, short 카테고리 추가 

join_query= '''
SELECT book_id,title,pages, CASE
               WHEN pages>=300 THEN 'Long' ELSE 'Short'
               END AS book_category 
FROM books;
'''
spark.sql(join_query).show() 

+-------+------+-----+-------------+
|book_id| title|pages|book_category|
+-------+------+-----+-------------+
|      1|Book A|  300|         Long|
|      2|Book B|  250|        Short|
|      3|Book C|  180|        Short|
|      4|Book D|  320|         Long|
|      5|Book E|  270|        Short|
+-------+------+-----+-------------+



In [None]:
#stock_quantity >50 이상 '충분', 30 이상 '보통' , 미만 '부족'

In [58]:
join_query= '''
SELECT *, CASE 
               WHEN stock_quantity > 50 THEN '충분'
               WHEN stock_quantity >= 30 THEN '보통'
               ELSE '부족'
           END AS stock_status
    FROM books;
'''
spark.sql(join_query).show() 

+-------+------+------------+------------+-----+-------------+--------------+-----------+------------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|stock_status|
+-------+------+------------+------------+-----+-------------+--------------+-----------+------------+
|      1|Book A|        John|         Doe|  300|         2005|            60|          1|        충분|
|      2|Book B|        Jane|       Smith|  250|         2010|            44|          2|        보통|
|      3|Book C|       Emily|       Jones|  180|         2015|            22|          3|        부족|
|      4|Book D|       Chris|       Brown|  320|         2012|            82|       null|        충분|
|      5|Book E|        Anna|       Davis|  270|         2008|            38|          6|        보통|
+-------+------+------------+------------+-----+-------------+--------------+-----------+------------+



In [None]:
# 책 제목에 특정 키워드가 포함되어 있는지 확인항 때
'''WHERE title LIKE '%A%''''

In [59]:
# 대여가 많이 된 책의 작가를 조회 
join_query= '''
    SELECT author_fname, author_lname, count(book_id) as borrow_count
    FROM books 
    GROUP BY  author_fname, author_lname
    ORDER BY borrow_count DESC ;
'''
spark.sql(join_query).show()



+------------+------------+------------+
|author_fname|author_lname|borrow_count|
+------------+------------+------------+
|       Chris|       Brown|           1|
|        Anna|       Davis|           1|
|       Emily|       Jones|           1|
|        Jane|       Smith|           1|
|        John|         Doe|           1|
+------------+------------+------------+





In [62]:
#책의 발행 연도별 대여 현황: 발행 연도별로 대여된 책의 수를 확인합니다.
join_query= '''
SELECT released_year,
       COUNT(*) AS borrowed_books_count
FROM books
GROUP BY released_year
ORDER BY 
released_year;
'''
spark.sql(join_query).show()


+-------------+--------------------+
|released_year|borrowed_books_count|
+-------------+--------------------+
|         2005|                   1|
|         2008|                   1|
|         2010|                   1|
|         2012|                   1|
|         2015|                   1|
+-------------+--------------------+





In [63]:
# 사용자의 지역별 대여된 책 수: 사용자 지역별로 대여된 책의 수를 계산합니다.
join_query= '''
SELECT u.address, COUNT(book_id) AS borrowed_count
FROM books b LEFT JOIN  users u ON  b.borrowed_by = u.user_id
GROUP BY u.address
ORDER BY borrowed_count DESC; 
'''
spark.sql(join_query).show()



+-------+--------------+
|address|borrowed_count|
+-------+--------------+
|   서울|             2|
|   null|             1|
|   대전|             1|
| 경기도|             1|
+-------+--------------+





In [67]:
# 대여되지 않은 책 중 가장 페이지 수가 많은 책: 대여되지 않은 책 중에서 페이지 수가 가장 많은 책을 조회합니다.
join_query= '''
SELECT book_id, title, author_fname, author_lname, pages
FROM books
WHERE borrowed_by = 0
ORDER BY pages DESC
LIMIT 1;
'''
spark.sql(join_query).show()
# 다시 

+-------+-----+------------+------------+-----+
|book_id|title|author_fname|author_lname|pages|
+-------+-----+------------+------------+-----+
+-------+-----+------------+------------+-----+



In [76]:
# 재고가 부족한 책과 대여 상태: 재고가 30개 미만인 책과 해당 책이 대여된 상태인지 확인합니다.
join_query= '''
SELECT book_id, title, stock_quantity, CASE 
        WHEN borrowed_count > 0 THEN '대여됨'
        ELSE '대여되지 않음'
    END AS borrow_status
FROM books b LEFT JOIN users u ON b.borrowed_by= u.user_id
WHERE stock_quantity < 30;
'''

In [77]:
spark.sql(join_query).show()

AnalysisException: cannot resolve '`borrowed_count`' given input columns: [u.address, b.author_fname, b.author_lname, b.book_id, b.borrowed_by, b.pages, b.released_year, b.stock_quantity, b.title, u.user_id, u.username]; line 3 pos 13;
'Project [book_id#192L, title#193, stock_quantity#307, CASE WHEN ('borrowed_count > 0) THEN 대여됨 ELSE 대여되지 않음 END AS borrow_status#713]
+- Filter (stock_quantity#307 < 30)
   +- Join LeftOuter, (borrowed_by#199L = user_id#62L)
      :- SubqueryAlias b
      :  +- SubqueryAlias books
      :     +- Project [book_id#192L, title#193, author_fname#194, author_lname#195, pages#196L, released_year#197L, cast((cast(stock_quantity#198L as double) * 1.1) as int) AS stock_quantity#307, borrowed_by#199L]
      :        +- LogicalRDD [book_id#192L, title#193, author_fname#194, author_lname#195, pages#196L, released_year#197L, stock_quantity#198L, borrowed_by#199L], false
      +- SubqueryAlias u
         +- SubqueryAlias users
            +- LogicalRDD [user_id#62L, username#63, address#64], false


In [None]:
#실행계획, DAG 형태 분석 

In [None]:
#csv 로 save 

In [None]:
spark.stop()