In [None]:
pip install pyspark==3.3.1

In [None]:
pip install pandas

In [None]:
pip install pyarrow

8.3기가 정도의 커다란 csv파일을 불러오기 쉽지 않기 때문에
로컬 스파크 환경을 구축하여 활용해보기로 한다.

In [3]:
from pyspark.sql import SparkSession
import pandas as pd

In [4]:
spark = SparkSession.builder.master('local[*]').appName('eCommerce behavior').getOrCreate()
spark

In [5]:
#스파크로 데이터프레임 읽기(헤더옵션으로 컬럼 적용, 데이터타입 추측)
df_market = spark.read.option('header', 'true').csv('data/2019-Nov.csv', inferSchema = True)

In [6]:
df_market.show()

+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|2019-11-01 09:00:00|      view|   1003461|2053013555631882655|electronics.smart...|  xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 09:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|  janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-11-01 09:00:01|      view|  17302664|2053013553853497655|                null|   creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 09:00:01|      view|   3601530|2053013563810775923|appliances.kitche...|      lg|712.87|518085591|3bfb58cd-7892-48c...|
|2019-11-01 09:00:01|      view|   1004775|2053013555631882655|electronics.s

In [7]:
#데이터프레임 각 컬럼의 이름과 데이터 타입 확인
df_market.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [8]:
#데이터프레임의 컬럼명 확인하기
df_market.columns

['event_time',
 'event_type',
 'product_id',
 'category_id',
 'category_code',
 'brand',
 'price',
 'user_id',
 'user_session']

각 컬럼에 대한 설명
1. event_time : 이벤트가 발생한 시간(UTC 기준)
2. event_type : 이벤트의 종류
3. product_id : 제품의 ID
4. category_id : 제품 카테고리의 ID
5. category_code : 제품 카테고리 분류 상세 내용
6. brand : 제품의 브랜드 이름
7. price : 제품의 가격
8. user_id : 고객(사용자) ID
9. user_session : 임시적인 고객(사용자)의 세션 ID. 각 사용자의 세션에 대해 동일하지만, 사용자가 긴 시간 후 온라인스토어에 돌아온다면 값이 변함

In [9]:
#데이터프레임 사이즈 확인
print((df_market.count(), len(df_market.columns)))

(67501979, 9)


데이터프레임의 사이즈는 6750만개의 행에 9개의 컬럼으로 파일을 불러오기도, 데이터 전처리를 수행하기도 개인 로컬 환경에서는 어려움이 있다.

따라서 기본적인 중복값과 결측치를 제거한 후에 Parquet포맷으로 파일을 새로이 저장하고 활용하기로 한다.

In [10]:
df_market = df_market.dropDuplicates()

In [11]:
from pyspark.sql.functions import isnan, when, count, col, isnull

df_market.select([count(when(isnull(c), c)).alias(c) for c in df_market.columns]).show()

+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|     21871423|9209177|    0|      0|          10|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+



category_code와 brand컬럼에 상당히 많은 결측치가 존재하는 것을 확인할 수 있다.
1. 현재 데이터셋에는 product_id만 존재하기 때문에, 고객이 어떤 제품을 보고 구매를 했는지 알기 위해서 category_code, brand 컬럼의 명확한 정보가 필요하다.
2. 수천만의 데이터 로그를 효율적으로 다루기 위해서는 의미가 분명하지 않은 행을 제거해주는 것이 좋다.
3. 데이터의 모수가 충분히 크기 때문에 결측치를 단순히 제거해주어도 데이터 분석 결과가 달라지는 문제는 발생하지 않을 것이다.

따라서 결측치를 제거해주되, category_code 또는 brand컬럼에 결측치가 있는 행을 제거한다.

In [12]:
df_market = df_market.dropna(subset=['category_code','brand'], how='any')

In [13]:
#결측치 없음 확인
df_market.select([count(when(isnull(c), c)).alias(c) for c in df_market.columns]).show()

+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|         0|         0|         0|          0|            0|    0|    0|      0|           6|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+



In [14]:
#데이터프레임을 파케이 파일로 저장
df_market.repartition(1).write.format('parquet').option('compression', 'gzip').save('data/market.parquet')