<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Pyspark로-csv-파일-읽어들어오기" data-toc-modified-id="Pyspark로-csv-파일-읽어들어오기-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Pyspark로 csv 파일 읽어들어오기</a></span></li><li><span><a href="#Train,-Test-split-하기" data-toc-modified-id="Train,-Test-split-하기-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Train, Test split 하기</a></span></li><li><span><a href="#Scaling-하기" data-toc-modified-id="Scaling-하기-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Scaling 하기</a></span></li></ul></div>

In [1]:
import pandas as pd
import numpy as np
import os
import warnings
warnings.filterwarnings(action='ignore')
os.chdir('/Users/younghun/Desktop/gitrepo/data/woochuri/')

df = pd.read_csv('./woochuri_final_data.csv')
df.shape

(2916, 18)

In [2]:
# 다중공선성 변수 제거
multi_cols = ['최소상대습도', '최저기온', '평균기온', '1시간최다일사량']
dataset = df.drop(multi_cols, axis=1)
# 날짜 인덱스로 옮기기
dataset['날짜'] = pd.to_datetime(dataset['날짜'])
dataset = dataset.set_index('날짜')

# 우선 매출이 0인 값들 요일 평균값으로 대체
non_zero_sales = dataset[dataset['일매출'] != 0]
days = non_zero_sales.groupby('요일')['일매출'].mean().index
means = non_zero_sales.groupby('요일')['일매출'].mean().values
weekdays_means = dict(zip(days, means))

def replace_zero_sales(df):
    if df['일매출'] == 0:
        df['일매출'] = weekdays_means[df['요일']]
    return df

dataset = dataset.apply(replace_zero_sales, axis=1)

# 요일변수 요일별 매출 평균값 가중치로 변경
total = dataset.groupby('요일')['일매출'].mean().values.sum()
weekdays = dataset.groupby('요일')['일매출'].mean().index
values = np.round(dataset.groupby('요일')['일매출'].mean().values / total * 100, 2)
weekdays_dict = dict(zip(weekdays, values))

dataset['요일'] = dataset['요일'].map(weekdays_dict)

In [3]:
## Target 값 만드는 코드
# dataset['target'] = np.append(np.array(dataset.iloc[1:, 1]), 0)

In [4]:
# PySpark로 csv파일로 불러오기 위해 전처리해준 csv파일 저장
dataset = dataset.reset_index()
dataset.to_csv('./pyspark_df.csv', index=False)

---
## Pyspark로 csv 파일 읽어들어오기

In [5]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *

In [6]:
spark = SparkSession.builder.master('younghun')\
        .appName('Woochuri-Regression-Pyspark')\
        .getOrCreate()

In [7]:
# 날짜만 빼고 모두 float형임
pd_columns = dataset.columns[1:]
# 마지막 인자 True는 nullable에 대한 인자임(결측치 여부 명시해주기)
datetimes = [StructField('날짜', TimestampType(), False)]
fields = [StructField(field, FloatType(), False) for field in pd_columns]
fields = datetimes + fields
schema = StructType(fields)

df = spark.read.option("header", True)\
     .csv('/Users/younghun/Desktop/gitrepo/data/woochuri/pyspark_df.csv',
         schema=schema)
df.printSchema()

root
 |-- 날짜: timestamp (nullable = true)
 |-- 요일: float (nullable = true)
 |-- 일매출: float (nullable = true)
 |-- 설_추석_가중치: float (nullable = true)
 |-- 일반공휴일가중치: float (nullable = true)
 |-- 한우가격: float (nullable = true)
 |-- 육우가격: float (nullable = true)
 |-- 돼지탕박가격: float (nullable = true)
 |-- 평균상대습도: float (nullable = true)
 |-- 최고기온: float (nullable = true)
 |-- 평균풍속: float (nullable = true)
 |-- 최대풍속: float (nullable = true)
 |-- 일사량: float (nullable = true)
 |-- 일강수량: float (nullable = true)



## Train, Test split 하기

In [34]:
from pyspark.sql import Row, functions as F
from pyspark.sql import Window

df = df.withColumn('rank', F.row_number().over(Window.partitionBy().orderBy("날짜")))

In [44]:
ex_train_df = df.where(f"rank <= {2915}").drop("rank")
ex_test_df = df.where(f"rank = {2916}").drop("rank")
print("Train rows:", ex_train_df.count())
print("Test rows:", ex_test_df.count())

Train rows: 2915
Test rows: 1


In [39]:
## Pyspark로 2020년 1년치 교차검증하는 코드 만들기 ##
all_rows = df.count()
TRAIN_SPLIT = 365
cv_periods = all_rows - TRAIN_SPLIT

for i in range(cv_periods, all_rows):
    train_df = df.where(f"rank <= {i}").drop("rank")
    test_df = df.where(f"rank = {i+1}").drop("rank")
    

## Scaling 하기

In [None]:
# 먼저 Vector Assembler로 만들기