In [1]:
# -- spark 열기 --
import findspark
findspark.init()


# -- sesion 열기 --
from pyspark.sql import SparkSession
spark = (SparkSession.builder
    .appName("myAppName")
    .config("spark.executor.instances", "6")
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "4g")
    .config("spark.driver.maxResultSize", "2g") 
    .config("spark.executor.memoryOverhead", "2g")
    .config("spark.driver.memoryOverhead", "2g")
    .config("spark.master", "yarn")
    .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/01/26 13:39:12 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


# also_buy, also_viewed

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import input_file_name, regexp_extract

schema = StructType([
    StructField("asin", StringType(), True),
    StructField("also_buy", ArrayType(StringType()), True),
    StructField("also_view", ArrayType(StringType()), True),
])

In [32]:
# 파일 경로 설정 (HDFS 내의 경로)
file_path = "/minipro/ama/meta"

# 모든 JSON 파일을 읽어서 하나의 DataFrame으로 결합
df_relation = spark.read.schema(schema).json(file_path + "/*.json")

# 필요한 경우, 'filename' 컬럼 제거
df_relation = df_relation.drop("filename")

In [33]:
# 결과 DataFrame 출력
df_relation.show()

+----------+------------+--------------------+
|      asin|    also_buy|           also_view|
+----------+------------+--------------------+
|0764443682|        null|                null|
|1291691480|        null|                null|
|1940280001|        null|                null|
|1940735033|        null|                null|
|1940967805|        null|                null|
|1942705034|        null|                null|
|3293015344|        null|                null|
|5378828716|        null|                null|
|6041002984|        null|                null|
|630456984X|        null|                null|
|7106116521|        null|                null|
|8037200124|        null|                null|
|8037200221|        null|                null|
|8279996567|        null|                null|
|9239282785|        null|                null|
|9239281533|        null|                null|
|9269808971|        null|                null|
|9654263246|        null|[B07CQ3KY5B, B014...|
|B00004T3SN|[

In [34]:
from pyspark.sql.functions import explode
# asin을 기준으로 explode를 적용하여 새로운 컬럼 생성
df_exploded = df_relation.withColumn("also_buy", explode("also_buy"))
df_also_buy = df_exploded.select("asin", "also_buy")

In [35]:
df_exploded2 = df_relation.withColumn("also_view", explode("also_view"))
df_also_view = df_exploded2.select("asin", "also_view")

In [36]:
df_also_buy.show(5), df_also_view.show(5)

+----------+----------+
|      asin|  also_buy|
+----------+----------+
|B00004T3SN|B01KA5PTYG|
|B00007GDFV|B07C9V84JD|
|B00007GDFV|B01J6JE05G|
|B00007GDFV|B07JJQFHS5|
|B00007GDFV|B003EGITUK|
+----------+----------+
only showing top 5 rows

+----------+----------+
|      asin| also_view|
+----------+----------+
|9654263246|B07CQ3KY5B|
|9654263246|B014TEOG3O|
|9654263246|B078429G6J|
|9654263246|B01FRG9Z70|
|9654263246|B06W9MXMM3|
+----------+----------+
only showing top 5 rows



(None, None)

In [37]:
# df_also_buy.write.parquet("/minipro/parquet/df_also_buy.parquet")

In [38]:
# df_also_view.write.parquet("/minipro/parquet/df_also_view.parquet")

# product

In [39]:
schema = StructType([
    StructField("title", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("rank", StringType(), True),
    StructField("asin", StringType(), True),
    StructField("price", StringType(), True),
    StructField("category", StringType(), True)
])

In [44]:
# 모든 파일을 읽어서 하나의 DataFrame으로 결합
file_path = f"/minipro/ama/meta/"

df_product = spark.read.schema(schema).json(file_path + "/*.json")

# 파일 이름에서 카테고리 추출하여 컬럼 추가
df_product = df_product.withColumn("filename", input_file_name())
df_product = df_product.withColumn("category", regexp_extract("filename", "meta_(.*?)\\.json", 1))

# 필요한 경우, 'filename' 컬럼 제거
df_product = df_product.drop("filename")

In [45]:
df_product.show()

+--------------------+--------------------+--------------------+----------+------+--------------+
|               title|               brand|                rank|      asin| price|      category|
+--------------------+--------------------+--------------------+----------+------+--------------+
|Slime Time Fall F...|Group Publishing ...|13,052,976inCloth...|0764443682|  null|AMAZON_FASHION|
|XCC Qi promise ne...|                null|11,654,581inCloth...|1291691480|  null|AMAZON_FASHION|
|Magical Things I ...|   Christopher Manos|19,308,073inCloth...|1940280001|  null|AMAZON_FASHION|
|Ashes to Ashes, O...|Flickerlamp Publi...|19,734,184inCloth...|1940735033|  null|AMAZON_FASHION|
|Aether & Empire #...|                null|10,558,646inCloth...|1940967805| $4.50|AMAZON_FASHION|
|365 Affirmations ...|                null|16,179,013inCloth...|1942705034|  null|AMAZON_FASHION|
|Blessed by Pope B...|                null|7,787,039inClothi...|3293015344|  null|AMAZON_FASHION|
|Womens Sexy Sleev..

In [None]:
# df_product.write.parquet("/minipro/parquet/df_product.parquet")

# reviewer_id

In [47]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
schema = StructType([
    StructField("reviewerID", StringType(), True),
    StructField("reviewerName", StringType(), True),
])

In [49]:
# 파일 경로 설정 (HDFS 내의 경로)
file_path = "/minipro/ama"

# 모든 JSON 파일을 읽어서 하나의 DataFrame으로 결합
df_reviewer_id = spark.read.schema(schema).json(file_path + "/*.json")

# 필요한 경우, 'filename' 컬럼 제거
df_reviewer_id = df_reviewer_id.drop("filename")

In [50]:
df_reviewer_id.show()

+--------------+--------------------+
|    reviewerID|        reviewerName|
+--------------+--------------------+
|A1D4G1SNUZWQOT|               Tracy|
|A3DDWDH9PX2YX2|           Sonja Lau|
|A2MWC41EW7XL15|            Kathleen|
|A2UH2QQ275NV45|         Jodi Stoner|
| A89F3LQADZBS5|        Alexander D.|
|A29HLOUW0NS0EH|   Patricia R. Erwin|
| A7QS961ROI6E0|    REBECCA S LAYTON|
|A1BB77SEBQT8VX|  Darrow H Ankrum II|
| AHWOW7D1ABO9C|              rosieO|
| AKS3GULZE0HFC|          M. Waltman|
| A38NS6NF6WPXS|            BTDoxies|
|A1KOKO3HTSAI1H|        Robin Howard|
|A1G3S57JGZNPCL|kimberly a schott...|
| AGBL3TTP6GV4X|             gallina|
|A1Y36BSE9GKXLV|            Ms Irish|
|A1L1U968VNYVA4|                J.G.|
|A1NSKPSR0XZ0C9|               Jules|
|A3O5SXH5O8DWRP|     Debra Humphreys|
|A3I52T3ZCLRZZA|          Ann Bishop|
|A3VWTJR1QOI7JR|     Amazon Customer|
+--------------+--------------------+
only showing top 20 rows



In [43]:
# df_reviewer_id.write.parquet("/minipro/parquet/df_reviewer_id.parquet")

# review

In [51]:
from pyspark.sql.types import StructType, FloatType, BooleanType, StructField, StringType, IntegerType, ArrayType
schema = StructType([
    StructField("asin", StringType(), True),
    StructField("overall", FloatType(), True),
    StructField("vote", StringType(), True),
    StructField("verified", BooleanType(), True),
    StructField("reviewText", StringType(), True),
    StructField("reviewTime", StringType(), True),
    StructField("reviewerID", StringType(), True),
])

In [52]:
# 파일 경로 설정 (HDFS 내의 경로)
file_path = "/minipro/ama"

# 모든 JSON 파일을 읽어서 하나의 DataFrame으로 결합
df_review = spark.read.schema(schema).json(file_path + "/*.json")

# 필요한 경우, 'filename' 컬럼 제거
df_review = df_review.drop("filename")

In [53]:
df_review.show()

+----------+-------+----+--------+--------------------+-----------+--------------+
|      asin|overall|vote|verified|          reviewText| reviewTime|    reviewerID|
+----------+-------+----+--------+--------------------+-----------+--------------+
|7106116521|    5.0|null|    true|Exactly what I ne...|10 20, 2014|A1D4G1SNUZWQOT|
|7106116521|    2.0|   3|    true|I agree with the ...|09 28, 2014|A3DDWDH9PX2YX2|
|7106116521|    4.0|null|   false|Love these... I a...|08 25, 2014|A2MWC41EW7XL15|
|7106116521|    2.0|null|    true| too tiny an opening|08 24, 2014|A2UH2QQ275NV45|
|7106116521|    3.0|null|   false|                Okay|07 27, 2014| A89F3LQADZBS5|
|7106116521|    5.0|null|    true|Exactly what I wa...|07 19, 2014|A29HLOUW0NS0EH|
|7106116521|    4.0|null|    true|These little plas...|05 31, 2014| A7QS961ROI6E0|
|B00007GDFV|    3.0|null|    true|mother - in - law...|09 22, 2013|A1BB77SEBQT8VX|
|B00007GDFV|    3.0|null|    true|Item is of good q...|07 17, 2013| AHWOW7D1ABO9C|
|B00

In [None]:
# df_review.write.parquet("/minipro/parquet/df_review.parquet")

# 분석

# 저장한 자료 불러오기

In [2]:
# path_parquet = "/minipro/parquet/df_review.parquet"

# df_review = spark.read.format("parquet").load(path_parquet)

                                                                                

In [None]:
# path_parquet2 = "/minipro/parquet/df_product.parquet"

# df_product = spark.read.format("parquet").load(path_parquet2)

# 전체 로우 수

In [54]:
row_count = df_review.count()
print("Number of rows in df_review:", row_count)

[Stage 48:>                                                         (0 + 1) / 1]

Number of rows in df_review: 210445989


                                                                                

# asin을 기준으로 그룹화하고 리뷰 수 및 평균 평점 계산

In [55]:
from pyspark.sql.functions import to_date, year, count, avg, round

# reviewTime을 날짜 형식으로 변환
df_review = df_review.withColumn("reviewTime", to_date("reviewTime", "MM d, yyyy"))
# asin을 기준으로 그룹화하고 리뷰 수 및 평균 평점 계산
review_stats = (
    df_review
    .groupBy("asin")
    .agg(
        count("reviewerID").alias("cnt"),
        round(avg("overall"), 3).alias("avg_rate")
    )
    .orderBy("cnt", ascending=False)
)
# 결과 표시
review_stats.show()



+----------+-----+--------+
|      asin|  cnt|avg_rate|
+----------+-----+--------+
|038568231X|58150|   4.041|
|0297859382|44956|   3.891|
|0007420412|44381|   4.491|
|0141353678|37783|   4.682|
|0312577222|36620|   4.784|
|0099911701|33676|   4.591|
|B000X1MX7E|32495|   4.053|
|0553418025|30297|   4.632|
|0007548672|27954|   4.553|
|B00FLYWNYQ|27595|   4.591|
|B000W5QSYA|26994|    4.46|
|0316055433|25713|   3.758|
|B00YSG2ZPA|24558|   4.913|
|B00006CXSS|24489|   4.914|
|B000WGWQG8|23584|   4.687|
|0439023521|22538|   4.646|
|8184776217|22511|   4.646|
|0545582881|21603|   4.344|
|B017WJ5PR4|21567|   4.715|
|B017V4IPPO|21545|   4.715|
+----------+-----+--------+
only showing top 20 rows



                                                                                

# avg_rate를 기준으로 DESC

In [56]:
joined_df = review_stats.join(df_product, "asin")
from pyspark.sql.functions import col

sorted_df = joined_df.orderBy(col("avg_rate").desc())
sorted_df.show()



+----------+---+--------+--------------------+--------------------+--------------------+-------+--------+
|      asin|cnt|avg_rate|               title|               brand|                rank|  price|category|
+----------+---+--------+--------------------+--------------------+--------------------+-------+--------+
|0006395848|  1|     5.0|Lesley Stowe Fine...|        Lesley Stowe|  344,063 in Books (|  $5.98|   Books|
|0012066877|  1|     5.0|Making Math Meani...|         David Quine|3,400,087 in Books (|       |   Books|
|0006481493|  4|     5.0|Leaving Home (Win...|Visit Amazon's Ga...|4,358,694 in Books (|       |   Books|
|0006384889|  2|     5.0|The Story of My L...|Visit Amazon's Ch...|10,915,749 in Boo...|  $3.98|   Books|
|0006754287|  3|     5.0|The Magician's Ne...|          C.S. Lewis|3,209,724 in Books (|$115.84|   Books|
|000637512X|  1|     5.0|In the Sewers of ...|Visit Amazon's Ro...|5,854,557 in Books (| $10.17|   Books|
|0006861482|  1|     5.0|The Walled Kingdo...|

                                                                                

# 카테고리별 시간에 따른 그래프

In [73]:
from pyspark.sql.functions import year, to_date, count
import matplotlib.pyplot as plt

# reviewTime을 올바른 날짜 형식으로 변환
df_review = df_review.withColumn('reviewTime', to_date('reviewTime', 'MM dd, yyyy'))

# asin을 기준으로 조인하고 reviewYear 컬럼 추가
joined_df = df_product.join(df_review, 'asin').withColumn('reviewYear', year('reviewTime'))

# 카테고리와 리뷰 연도별로 그룹화하여 판매 수 집계
sales_by_category = joined_df.groupBy('category', 'reviewYear').agg(count('asin').alias('total_sales')).toPandas()

# 각 카테고리에 대한 색상 정의
category_colors = {
    'Amazon Fashion': 'red',
    'All Beauty': 'blue',
    'Appliances': 'green',
    'Arts Crafts and Sewing': 'purple',
    'Automotive': 'orange',
    'Books': 'brown',
    'CDs and Vinyl': 'pink',
    'Cell Phones and Accessories': 'gray',
    'Clothing Shoes and Jewelry': 'cyan',
    'Digital Music': 'magenta',
    'Electronics': 'yellow',
    'Gift Cards': 'lightgreen',
    'Grocery and Gourmet Food': 'lightblue',
    'Home and Kitchen': 'lightcoral',
    'Industrial and Scientific': 'lightgray',
    'Kindle Store': 'lime',
    'Luxury Beauty': 'lavender',
    'Magazine Subscriptions': 'gold',
    'Movies and TV': 'darkred',
    'Musical Instruments': 'darkblue',
    'Office Products': 'darkgreen',
    'Patio Lawn and Garden': 'darkorange',
    'Pet Supplies': 'darkcyan',
    'Prime Pantry': 'darkviolet',
    'Software': 'darkkhaki',
    'Sports and Outdoors': 'darkolivegreen',
    'Tools and Home Improvement': 'darkorchid',
    'Toys and Games': 'darksalmon',
    'Video Games': 'darkseagreen'
}
# Matplotlib을 사용하여 그래프 그리기
plt.figure(figsize=(12, 8))
for category, color in category_colors.items():
    category_data = sales_by_category[sales_by_category['category'] == category]
    # x축과 y축 데이터를 리스트나 1차원 배열로 변환
    x_data = category_data['reviewYear'].tolist()  # 또는 np.array(category_data['reviewYear'])
    y_data = category_data['total_sales'].tolist()  # 또는 np.array(category_data['total_sales'])
    plt.plot(x_data, y_data, marker='o', label=category, color=color)

# 그래프 스타일 및 레이블 설정
plt.title('Sales Over Years by Category')
plt.xlabel('Year')
plt.ylabel('Total Sales')
plt.legend(title='Category')

# 그래프 출력
plt.show()



24/01/26 17:32:06 WARN TaskSetManager: Lost task 840.0 in stage 81.0 (TID 13820) (datanode3 executor 5): java.io.FileNotFoundException: 
File does not exist: /minipro/ama/meta_Home_and_Kitchen.json
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:156)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2124)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:770)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:460)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.h

Py4JJavaError: An error occurred while calling o577.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 840 in stage 81.0 failed 4 times, most recent failure: Lost task 840.3 in stage 81.0 (TID 13826) (datanode3 executor 5): java.io.FileNotFoundException: 
File does not exist: /minipro/ama/meta_Home_and_Kitchen.json
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:156)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2124)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:770)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:460)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)


It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2668)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2604)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2603)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2603)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1178)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2798)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2787)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: 
File does not exist: /minipro/ama/meta_Home_and_Kitchen.json
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:156)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2124)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:770)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:460)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)


It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


24/01/26 17:32:09 WARN TaskSetManager: Lost task 791.0 in stage 81.0 (TID 13818) (datanode2 executor 1): TaskKilled (Stage cancelled)
24/01/26 17:32:09 WARN TaskSetManager: Lost task 717.0 in stage 81.0 (TID 13815) (datanode2 executor 1): TaskKilled (Stage cancelled)
24/01/26 17:32:09 WARN TaskSetManager: Lost task 771.0 in stage 81.0 (TID 13817) (datanode5 executor 4): TaskKilled (Stage cancelled)




24/01/26 17:32:09 WARN TaskSetManager: Lost task 864.3 in stage 81.0 (TID 13827) (datanode3 executor 5): TaskKilled (Stage cancelled)
24/01/26 17:32:09 WARN TaskSetManager: Lost task 766.0 in stage 81.0 (TID 13816) (datanode3 executor 5): TaskKilled (Stage cancelled)
24/01/26 17:32:09 WARN TaskSetManager: Lost task 614.0 in stage 81.0 (TID 13810) (datanode5 executor 4): TaskKilled (Stage cancelled)


# 카테고리 별로 리뷰 수, 평균 별점 계산

In [None]:
from pyspark.sql.functions import col, avg, count

# df_review와 df_product를 asin 컬럼을 기준으로 조인
df_joined2 = df_review.join(df_product, "asin", "inner")

# 카테고리별로 평균 overall, 평균 vote, reviewText의 개수 계산
df_category_stats = df_joined2.groupBy("category").agg(
    avg("overall").alias("avg_overall"),
    avg("vote").alias("avg_vote"),
    count("reviewText").alias("count_reviewText")
)

# 결과 출력
df_category_stats.show()

# 워드클라우드

In [None]:
# 불용어 리스트
stop_words = set([
    "the", "is", "at", "which", "on",
    "for", "with", "a", "an", "and",
    "in", "it", "to", "has", "have",
    "was", "were", "but", "if", "or",
    "because", "as", "until", "while",
    "of", "at", "by", "for", "with",
    "about", "against", "between", "into",
    "through", "during", "before", "after",
    "above", "below", "to", "from", "up",
    "down", "in", "out", "on", "off",
    "over", "under", "again", "further",
    "then", "once", "here", "there",
    "when", "where", "why", "how",
    "all", "any", "both", "each",
    "few", "more", "most", "other",
    "some", "such", "no", "nor",
    "not", "only", "own", "same",
    "so", "than", "too", "very",
     "i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "your", "yours", "yourself", "yourselves",
    "he", "him", "his", "himself", "she", "her", "hers", "herself", "it", "its", "itself", "they", "them", "their",
    "theirs", "themselves", "what", "which", "who", "whom", "this", "that", "these", "those", "am", "is", "are", "was",
    "were", "be", "been", "being", "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", "the", "and",
    "but", "if", "or", "because", "as", "until", "while", "of", "at", "by", "for", "with", "about", "against", "between",
    "into", "through", "during", "before", "after", "above", "below", "to", "from", "up", "down", "in", "out", "on", "off",
    "over", "under", "again", "further", "then", "once", "here", "there", "when", "where", "why", "how", "all", "any",
    "both", "each", "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so",
    "than", "too", "very", "s", "t", "can", "will", "just", "don", "should", "now", "d", "ll", "m", "o", "re", "ve",
    "y", "ain", "aren", "couldn", "didn", "doesn", "hadn", "hasn", "haven", "isn", "ma", "mightn", "mustn", "needn",
    "shan", "shouldn", "wasn", "weren", "won", "wouldn"
])

In [None]:
#### from wordcloud import WordCloud
import matplotlib.pyplot as plt
from pyspark.sql.functions import explode, col, lower, count
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import monotonically_increasing_id

# 워드 클라우드 생성 함수
def generate_wordcloud(word_freq, title):
    wordcloud = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(word_freq)
    plt.figure(figsize=(10, 5))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.title(title)
    plt.axis('off')
    plt.show()
    
# 배치 크기 설정
batch_size = 1000000

# 각 'overall' 평점에 대한 워드 클라우드 생성
for overall in overall_values:
    overall_rating = overall["overall"]
    df_filtered = df_review.filter(col("overall") == overall_rating)

    # 로우 번호를 추가하여 배치 처리 가능하게 함
    windowSpec = Window.orderBy(monotonically_increasing_id())
    df_filtered = df_filtered.withColumn("row_number", row_number().over(windowSpec))

    total_filtered_rows = df_filtered.count()
    word_freq = {}

    for i in range(0, total_filtered_rows, batch_size):
        df_batch = df_filtered.filter(col("row_number") > i).limit(batch_size)
        df_batch = df_batch.drop("row_id")
        df_batch = df_batch.na.fill({"reviewText": ""}).withColumn("reviewText", lower(col("reviewText")))

        tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
        remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords=list(stop_words))
        words_df = tokenizer.transform(df_batch)
        words_df_filtered = remover.transform(words_df)

        word_counts = words_df_filtered.select(explode(col("filtered_words")).alias("word")).groupBy("word").count()
        word_counts_list = word_counts.collect()

        for word_count in word_counts_list:
            word = word_count["word"]
            count = word_count["count"]
            word_freq[word] = word_freq.get(word, 0) + count

    generate_wordcloud(word_freq, f"Word Cloud for Overall Rating: {overall_rating}")