# S2. PyTorch Data Pipeline - read multiple parquet files


![alt text](utils/2_1.png "")
- Data:
    - Kaggle dataset: https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store

![alt text](utils/AWS_1.png "")
![alt text](utils/GCS_1.png "")

- 目的：
![alt text](utils/2_3.png "")

# Step 1: 把数据保存为parquet格式

In [19]:
import os
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import matplotlib.pyplot as plt

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
print(torch.__version__)

2.0.1


In [22]:
spark = SparkSession.builder.appName("data_reading").getOrCreate()

schema = StructType() \
      .add("event_time", StringType(), True) \
      .add("event_type", StringType(), True) \
      .add("product_id", StringType(), True) \
      .add("category_id", StringType(), True) \
      .add("category_code", StringType(), True) \
      .add("brand", StringType(), True) \
      .add("price", DoubleType(), True) \
      .add("user_id", StringType(), True) \
      .add("user_session", StringType(), True)

In [73]:
# 2019-Oct.csv
# 数据量（行数）: 42,448,765

f = 'data/eCom_behavior_data/2019-Oct.csv'
N = 100000
df = spark.read.csv(f, schema=schema).limit(N)

print(f"data size: {df.count()}")
df.printSchema()



data size: 100000
root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



                                                                                

In [75]:
df.show(3, False)

+-----------------------+----------+----------+-------------------+-----------------------------------+--------+-----+---------+------------------------------------+
|event_time             |event_type|product_id|category_id        |category_code                      |brand   |price|user_id  |user_session                        |
+-----------------------+----------+----------+-------------------+-----------------------------------+--------+-----+---------+------------------------------------+
|event_time             |event_type|product_id|category_id        |category_code                      |brand   |null |user_id  |user_session                        |
|2019-10-01 00:00:00 UTC|view      |44600062  |2103807459595387724|null                               |shiseido|35.79|541312140|72d76fde-8bb3-4e00-8c23-a032dfed738c|
|2019-10-01 00:00:00 UTC|view      |3900821   |2053013552326770905|appliances.environment.water_heater|aqua    |33.2 |554748717|9333dfbd-b87a-4708-9857-6336556b0fcc|
+---

In [76]:
NUM_PARTITIONS = 100

folder = 'data/eCom_behavior_data/100K_dataset'
df.repartition(NUM_PARTITIONS).write.mode("overwrite").parquet(folder)

23/06/11 20:15:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/06/11 20:15:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/06/11 20:15:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/06/11 20:15:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/06/11 20:15:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/06/11 20:15:54 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/06/11 20:15:54 WARN MemoryManager: Total allocation exceeds 95.00% 

In [77]:
for name in os.listdir(folder):
    if name.endswith('.parquet'):
        file_dir = f'{folder}/{name}'
        print(f'current partition file: {name}')
        temp = pd.read_parquet(file_dir)
        print(f'current partition data size: {temp.shape}\n')

current partition file: part-00021-990b8e08-8e95-4ac3-8b16-988b46313b8f-c000.snappy.parquet
current partition data size: (1000, 9)

current partition file: part-00055-990b8e08-8e95-4ac3-8b16-988b46313b8f-c000.snappy.parquet
current partition data size: (1000, 9)

current partition file: part-00068-990b8e08-8e95-4ac3-8b16-988b46313b8f-c000.snappy.parquet
current partition data size: (1000, 9)

current partition file: part-00094-990b8e08-8e95-4ac3-8b16-988b46313b8f-c000.snappy.parquet
current partition data size: (1000, 9)

current partition file: part-00049-990b8e08-8e95-4ac3-8b16-988b46313b8f-c000.snappy.parquet
current partition data size: (1000, 9)

current partition file: part-00074-990b8e08-8e95-4ac3-8b16-988b46313b8f-c000.snappy.parquet
current partition data size: (1000, 9)

current partition file: part-00088-990b8e08-8e95-4ac3-8b16-988b46313b8f-c000.snappy.parquet
current partition data size: (1000, 9)

current partition file: part-00000-990b8e08-8e95-4ac3-8b16-988b46313b8f-c000