- ref:https://www.kaggle.com/kaerunantoka/h-m-eda-w-pyspark
- ref: https://qiita.com/t-yotsu/items/4cabd1ae5406cfd7d741
- ref: https://qiita.com/taka4sato/items/4ab2cf9e941599f1c0ca

In [1]:
!pip install pyspark > /dev/null



In [2]:
import pyspark
from pyspark.sql import SparkSession, SQLContext, Row

spark = SparkSession.builder.appName('h-and-m-personalized-fashion-recommendations').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/13 02:24:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
class CFG:
    TRANSACTION_PATH = '../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv'
    ARTICLE_PATH = '../input/h-and-m-personalized-fashion-recommendations/articles.csv'
    CUSTOMER_PATH = '../input/h-and-m-personalized-fashion-recommendations/customers.csv'
    SAMPLE_PATH = '../input/h-and-m-personalized-fashion-recommendations/sample_submission.csv'
    IMAGE_PATH = '../input/h-and-m-personalized-fashion-recommendations/images'

# log data

In [4]:
train = spark.read.option('header','true').csv(CFG.TRANSACTION_PATH)

In [5]:
train.count()

                                                                                

31788324

In [6]:
train.show(5, False)

+----------+----------------------------------------------------------------+----------+--------------------+----------------+
|t_dat     |customer_id                                                     |article_id|price               |sales_channel_id|
+----------+----------------------------------------------------------------+----------+--------------------+----------------+
|2018-09-20|000058a12d5b43e67d225668fa1f8d618c13dc232df0cad8ffe7ad4a1091e318|0663713001|0.050830508474576264|2               |
|2018-09-20|000058a12d5b43e67d225668fa1f8d618c13dc232df0cad8ffe7ad4a1091e318|0541518023|0.03049152542372881 |2               |
|2018-09-20|00007d2de826758b65a93dd24ce629ed66842531df6699338c5570910a014cc2|0505221004|0.01523728813559322 |2               |
|2018-09-20|00007d2de826758b65a93dd24ce629ed66842531df6699338c5570910a014cc2|0685687003|0.016932203389830508|2               |
|2018-09-20|00007d2de826758b65a93dd24ce629ed66842531df6699338c5570910a014cc2|0685687004|0.016932203389830508|2 

In [7]:
train.printSchema()

root
 |-- t_dat: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sales_channel_id: string (nullable = true)



In [8]:
train.select(
    "t_dat"
).show(10, False)

+----------+
|t_dat     |
+----------+
|2018-09-20|
|2018-09-20|
|2018-09-20|
|2018-09-20|
|2018-09-20|
|2018-09-20|
|2018-09-20|
|2018-09-20|
|2018-09-20|
|2018-09-20|
+----------+
only showing top 10 rows



In [9]:
train.select('sales_channel_id').distinct().show(5)



+----------------+
|sales_channel_id|
+----------------+
|               1|
|               2|
+----------------+



                                                                                

In [10]:
train.registerTempTable("train_table") #dataframeにsqlテーブル名を付与
sqlContext = SQLContext(spark) #sparksqlを利用するには、sparkcontextだけでなくSQLContextも必要

sqlContext.sql(" SELECT * FROM train_table where sales_channel_id == 1 ").show(5, False)



+----------+----------------------------------------------------------------+----------+--------------------+----------------+
|t_dat     |customer_id                                                     |article_id|price               |sales_channel_id|
+----------+----------------------------------------------------------------+----------+--------------------+----------------+
|2018-09-20|00083cda041544b2fbb0e0d2905ad17da7cf1007526fb4c73235dccbbc132280|0688873012|0.03049152542372881 |1               |
|2018-09-20|00083cda041544b2fbb0e0d2905ad17da7cf1007526fb4c73235dccbbc132280|0501323011|0.053372881355932204|1               |
|2018-09-20|00083cda041544b2fbb0e0d2905ad17da7cf1007526fb4c73235dccbbc132280|0688873020|0.03049152542372881 |1               |
|2018-09-20|00083cda041544b2fbb0e0d2905ad17da7cf1007526fb4c73235dccbbc132280|0688873011|0.03049152542372881 |1               |
|2018-09-20|001127bffdda108579e6cb16080440e89bf1250a776c6e55f56e35e9ee029a8d|0397068015|0.033881355932203386|1 

In [11]:
train.groupBy("customer_id").count().sort("count", ascending=False).show(5)



+--------------------+-----+
|         customer_id|count|
+--------------------+-----+
|be1981ab818cf4ef6...| 1895|
|b4db5e5259234574e...| 1441|
|49beaacac0c7801c2...| 1364|
|a65f77281a528bf5c...| 1361|
|cd04ec2726dd58a8c...| 1237|
+--------------------+-----+
only showing top 5 rows



                                                                                

In [12]:
train.groupBy("t_dat").count().sort("count", ascending=False).show(5)



+----------+------+
|     t_dat| count|
+----------+------+
|2019-09-28|198622|
|2020-04-11|162799|
|2019-11-29|160875|
|2018-11-23|142018|
|2018-09-29|141700|
+----------+------+
only showing top 5 rows



                                                                                

In [13]:
# groupby + aggは使える
train.groupBy("customer_id").agg({"price": "sum"}).show(3)

[Stage 18:>                                                         (0 + 1) / 1]

+--------------------+-------------------+
|         customer_id|         sum(price)|
+--------------------+-------------------+
|05f65801b9a2d28a5...| 1.7262881355932203|
|05f79b715286a38a8...| 0.3919322033898305|
|072d11a8c0a1e6d0f...|0.38801694915254237|
+--------------------+-------------------+
only showing top 3 rows



                                                                                

In [14]:
agged_df = train.groupBy("customer_id", "t_dat").count()
agged_df.show(3)

[Stage 21:>                                                         (0 + 1) / 1]

+--------------------+----------+-----+
|         customer_id|     t_dat|count|
+--------------------+----------+-----+
|00b5bd5358a051556...|2018-09-20|    3|
|00be0a263381af381...|2018-09-20|    1|
|023b48de81f6af9de...|2018-09-20|    9|
+--------------------+----------+-----+
only showing top 3 rows



                                                                                

In [15]:
agged_df.groupBy("customer_id").pivot("t_dat").sum("count").fillna(0).show(3)

22/03/13 02:28:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/03/13 02:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/03/13 02:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/03/13 02:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/03/13 02:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/03/13 02:28:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/03/13 02:28:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/03/13 02:28:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2

Py4JJavaError: An error occurred while calling o85.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 30.0 failed 1 times, most recent failure: Lost task 5.0 in stage 30.0 (TID 231) (4e8d388e4ace executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
	at org.apache.spark.io.ReadAheadInputStream.<init>(ReadAheadInputStream.java:106)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:74)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:523)
	at org.apache.spark.sql.execution.UnsafeKVExternalSorter.sortedIterator(UnsafeKVExternalSorter.java:206)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:264)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:224)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:364)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:120)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1$adapted(HashAggregateExec.scala:94)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$Lambda$3579/0x000000084140d040.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.RDD$$Lambda$2498/0x0000000841031840.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2461/0x0000000840ff6c40.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
	at org.apache.spark.io.ReadAheadInputStream.<init>(ReadAheadInputStream.java:106)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:74)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:523)
	at org.apache.spark.sql.execution.UnsafeKVExternalSorter.sortedIterator(UnsafeKVExternalSorter.java:206)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:264)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:224)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:364)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:120)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1$adapted(HashAggregateExec.scala:94)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$Lambda$3579/0x000000084140d040.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.RDD$$Lambda$2498/0x0000000841031840.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2461/0x0000000840ff6c40.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)


In [None]:
train.withColumn("real_price", train.price * 590).show(5, False)

In [None]:
train.columns

In [None]:
train.select("t_dat", "customer_id").write.parquet("./parquet") 

# article

In [None]:
article = spark.read.option('header','true').csv(CFG.ARTICLE_PATH)

In [None]:
article.count()

In [None]:
article.show(5, False)

In [None]:
article.printSchema()

In [None]:
article.select(
    'article_id',
    'index_name'
).show(5, False)

In [None]:
article.select('index_name').distinct().collect()

In [None]:
article.select('department_name').distinct().show(5)

In [None]:
article.filter(article["department_name"] == "Jacket Casual").show(5)

# customer

In [None]:
customer = spark.read.option('header','true').csv(CFG.CUSTOMER_PATH)

In [None]:
customer.show(5, False)

In [None]:
customer.groupby("FN").count().sort("count", ascending=False).show(5)

In [None]:
customer.groupby("age").count().sort("count", ascending=False).show(5)