# Pyspark를 이용한 전처리 실습1
- data: Predict Future Sales, [URL](https://www.kaggle.com/c/competitive-data-science-predict-future-sales/overview)
- 데이터 수가 2,935,849이고 가게, 판매 아이템 별 그룹이 매우 많은 데이터다.
- 판다스와 넘파이로는 전처리에 한계를 느껴 pyspark를 이용해 전처리를 해보고자 한다.

In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [3]:
!pip install pyspark

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import when

from pyspark.sql.functions import col
from pyspark.ml.feature import Imputer

spark = SparkSession.builder.appName('spark-dataframe-demo').getOrCreate()

## 데이터 읽기
- inferSchema = True(컬럼타입), header = True(컬럼명) 옵션으로 데이터를 읽는다.

In [5]:
# data = pd.read_csv('../input/competitive-data-science-predict-future-sales/sales_train.csv')
# items = pd.read_csv('../input/competitive-data-science-predict-future-sales/items.csv')
# shops = pd.read_csv('../input/competitive-data-science-predict-future-sales/shops.csv')
# tests = pd.read_csv('../input/competitive-data-science-predict-future-sales/test.csv')
# item_categories = pd.read_csv('../input/competitive-data-science-predict-future-sales/item_categories.csv')

data = spark.read.csv('../input/competitive-data-science-predict-future-sales/sales_train.csv', inferSchema = True, header = True)
items = spark.read.csv('../input/competitive-data-science-predict-future-sales/items.csv', inferSchema = True, header = True)
shops = spark.read.csv('../input/competitive-data-science-predict-future-sales/shops.csv', inferSchema = True, header = True)
tests = spark.read.csv('../input/competitive-data-science-predict-future-sales/test.csv', inferSchema = True, header = True)
item_categories = spark.read.csv('../input/competitive-data-science-predict-future-sales/item_categories.csv', inferSchema = True, header = True)

In [6]:
data.show(3)

## 데이터 대체
- 판매가격과 판매량에 음수가 있으면 안되는데 음수가 있다.
- 그룹(shop, item) 별 판매가격의 중위값으로 음수 판매량을 대체한다. (판매량도 마찬가지)

In [7]:
# 음수인 판매가격
data.filter(data['item_price']<0).show(truncate=False)

In [8]:
# 음수인 판매량
data.filter(data['item_cnt_day']<0).show(truncate=False)

## 그룹별 연산
- groupBy와 agg 이용
- from pyspark.sql import functions as F -> 함수 제공
    - F.avg
    - F.percentile_approx(변수, 분위수)
    - ...

In [9]:
# 그룹 별 중위값을 구한다 -> 중위값으로 음수 판매량, 음수 가격을 대체
## filter -> 양수인 데이터만 추출 
med = data.filter('item_price>0 and item_cnt_day>0').\
                groupBy(["shop_id","item_id"]).\
                  agg(F.percentile_approx("item_price",0.5).alias('medPrice'),
                      F.percentile_approx("item_cnt_day",0.5).alias('medCnt'))
med.show(3)

In [10]:
# 그룹별 중위수를 원 데이터와 조인한다
data = data.join(med, on=["shop_id","item_id"])
data.show(3)

In [11]:
med.describe().show()

## 조건에 충족하면 값을 대체하기
- 조건 충족 여부 함수: when, otherwise
- item_price<0이면 medPrice(중위값)을, 정상적인 값(양수)이면 변수 원래 값(item_price)을 선택

In [12]:
data = data.withColumn('item_price', when(data.item_price<0, data['medPrice']).\
                otherwise(data.item_price))
data = data.withColumn('item_cnt_day', when(data.item_cnt_day<0, data['medCnt']).\
                otherwise(data.item_cnt_day))

- 데이터 확인: 위 명령에서 음수값을 모두 대체했으므로 더이상 음수값이 없어야 한다.

In [13]:
# 음수인 가격 없음!
data.filter(data['item_price']<0).show(truncate=False)

In [14]:
# 음수인 판매량 없음!
data.filter(data['item_cnt_day']<0).show(truncate=False)

# 컬럼 제거
- medPrice와 medCnt는 음수값을 대체하는 데 사용했으므로 삭제해도 된다.

In [17]:
data = data.drop(*('medPrice','medCnt'))
data.show()

## Parquet 포맷으로 저장 & pyspark로 읽기
- pandas dataframe 은 pa.Table로 만든 후 parquet로 저장해야하는데 spark datafame은 바로 데이터 저장 가능

In [19]:
#---- pandas datafame을 parquet로 저장할 때
# import pyarrow as pa
# import pyarrow.parquet as pq

# table = pa.Table.from_pandas(dta)
# pq.write_table(table, 'example.parquet')

In [21]:
data.write.parquet('/kaggle/working/sale_train.parquet')

In [22]:
data_pq = spark.read.parquet('/kaggle/working/sale_train.parquet')

In [23]:
data_pq.show()