## PySpark 기본 환경설정

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('appName').setMaster('local')
sc=SparkContext(conf=conf)
spark = SparkSession(sc)

## 유저의 log데이터 읽어오기

데이터는 hdfs에 {'beer':'b0002', 'date':'2020-01-23 00:25:39', 'member':'m200117021'} 형식으로 저장되어 있다

In [2]:
df =spark.read.format('json').load('hdfs:/flume/*')

table의 명칭 정하기(SQL문과 비슷한 구문이 많이 사용된다. 따라서 별도의 table명을 따로 지정해야 한다)

In [3]:
df.createOrReplaceTempView("df_view")

로그데이터를 불러오면 _corrupt_record 스키마와 Null과 같이 예상치 못했던 값이 들어올 수 있다.
<br>따라서 정상적인 구조상의 로그의 컬럼만을 지정하여 호출하고 Null값들을 제거한다

In [145]:
df.show()

+--------------------+----+-------------------+----------+
|     _corrupt_record|beer|               date|    member|
+--------------------+----+-------------------+----------+
|==> /media/sf_log...|null|               null|      null|
|==> /media/sf_log...|null|               null|      null|
|                null|  b0|2020-01-23 00:25:39|m200117021|
|                null|  b0|2020-01-23 00:25:46|m200117021|
|                null|  b1|2020-01-23 00:25:46|m200117021|
|                null|  b1|2020-01-23 00:25:47|m200117022|
|                null|  b3|2020-01-23 00:25:47|m200117021|
|                null|  b1|2020-01-23 00:25:46|m200117022|
|                null|  b1|2020-01-23 00:25:47|m200117022|
|                null|  b3|2020-01-23 00:25:47|m200117023|
+--------------------+----+-------------------+----------+



In [5]:
data = spark.sql("select beer, date, member from df_view").na.drop()
data.createOrReplaceTempView("data_view")
data.show()

+----+-------------------+----------+
|beer|               date|    member|
+----+-------------------+----------+
|  b0|2020-01-23 00:25:39|m200117021|
|  b0|2020-01-23 00:25:46|m200117021|
|  b1|2020-01-23 00:25:46|m200117021|
|  b1|2020-01-23 00:25:47|m200117022|
|  b3|2020-01-23 00:25:47|m200117021|
|  b1|2020-01-23 00:25:46|m200117022|
|  b1|2020-01-23 00:25:47|m200117022|
|  b3|2020-01-23 00:25:47|m200117023|
+----+-------------------+----------+



## 유저별로 좋아하는 맥주의 목록을 정리하기

유저별 '좋아요' 데이터가 두 번 저장될 경우 좋아요 취소로 간주하여 연산

In [6]:
db1 = spark.sql("select beer,member from data_view")

In [7]:
dt = db1.groupBy("beer","member").count()
dt.show()

+----+----------+-----+
|beer|    member|count|
+----+----------+-----+
|  b1|m200117022|    3|
|  b3|m200117021|    1|
|  b3|m200117023|    1|
|  b0|m200117021|    2|
|  b1|m200117021|    1|
+----+----------+-----+



filter를 사용하면 컬럼 간 조건문으로 사용할 수 있다.<br>
숫자비교 할 때 long은 적용되지 않으니 integer로 type을 integer로 변경

In [8]:
from pyspark.sql.types import *
dt = dt.withColumn("count", dt['count'].cast(IntegerType()))

2의 배수로 기록된 '좋아요' 기록은 'cancel'을 의미하니 홀수의 값을 가질 때의 기록만 남긴다

In [9]:
data = dt.filter(dt['count']%2 == 1 )
data.createOrReplaceTempView("data_view")
#data.collect()

In [10]:
usrinfo = spark.sql("select beer, member from data_view")
#usrinfo.show()

In [11]:
usrinfo.toPandas()

Unnamed: 0,beer,member
0,b1,m200117022
1,b3,m200117021
2,b3,m200117023
3,b1,m200117021


## 각 회원이 좋아하는 맥주취향을 알아보자

추천을 위해 전처리된 맥주목록 데이터 불러오기<br>
각 컬럼의 데이터가 one-hot Encoding되어 모든 값이 1 또는 0으로 표시됐다.<br>
컬럼당 중요도를 나타내기 위해 가중치를 준 값들이 있는데, 예를들어 10의 가중치로 주었다면 10 또는 0으로 표시됐다.

In [12]:
from pyspark.join import *

#### 전처리된 beer데이터 불러오기

1. 옵션으로 header을 주면 파일 내 맨 윗줄을 column명으로 인식한다.
2. inferSchema를 True로하면 알맞은 타입을 배정한다. (기본값으로 하면 string으로 입력된다)

In [13]:
beerinfo = spark.read.csv('hdfs:/data/*',header=True, inferSchema=True)

In [14]:
beerinfo.toPandas()

Unnamed: 0,num,도수(상),도수(중),도수(하),정제수,보리맥아,귀리맥아,설탕,스페인감초분말,호프,...,헤페바이스 둔켈,헤페바이젠,헤페바이젠 + 도펠 복,헬레스 라거,호피 세션 세종,호피 필스너,화이트 에일,가격(상),가격(하),상품명
0,b0,1,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,"마튼즈,라이거필스너"
1,b1,1,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,"마튼즈,라이거바이젠"
2,b2,0,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,1,메나브레아필스
3,b3,0,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,"민타임,런던라거"
4,b4,0,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,"민타임,런던페일에일"
5,b5,0,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,"민타임,필스너"
6,b6,0,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,"민타임,야키마레드"
7,b7,0,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,1,"민타임,윗비어"
8,b8,0,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,"민타임,라즈베리윗"
9,b9,1,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,"민타임,런던포터"


유저별 맥주선호를 알아내기 위해 beerinfo와 usrinfo를 병합

In [15]:
preferences = usrinfo.join(beerinfo, beerinfo.num == usrinfo.beer, 'left')

유저별 선호를 나타내는 정보를 제외한 컬럼삭제

In [16]:
preferences = preferences.drop(*['num','beer','상품명'])

#### 유저별로 각 컬럼 데이터의 평균을 구한다
선호하는 맥주성분들의 평균을 구함으로써 유저별로 선호데이터를 구할 수 있다.

In [17]:
# member를 제외한 모든 컬럼
col=preferences.columns[1:]
# member을 기준으로 모든 컬럼에 대해 평균을 구한다
preferences = preferences.groupBy('member').mean(*col)

groupBy로 변한 컬럼이름(도수(상) -> mean(도수(상)))을 원래대로 돌려놓자

In [18]:
oldColumns = preferences.columns[1:]
newColumns = beerinfo.columns[1:-1]

for i in range(len(oldColumns)):
    preferences = preferences.withColumnRenamed(oldColumns[i],newColumns[i])

In [19]:
preferences.toPandas()

Unnamed: 0,member,도수(상),도수(중),도수(하),정제수,보리맥아,귀리맥아,설탕,스페인감초분말,호프,...,헤페바이스,헤페바이스 둔켈,헤페바이젠,헤페바이젠 + 도펠 복,헬레스 라거,호피 세션 세종,호피 필스너,화이트 에일,가격(상),가격(하)
0,m200117021,0.5,0.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
1,m200117023,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
2,m200117022,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0


## 각 회원의 맥주취향에 맞게 추천하자

In [20]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [21]:
# 군집학습을 위해 Vector화
features = beerinfo.columns[1:-1]
assembler = VectorAssembler(inputCols=features, outputCol="features")
# vector로 변경을 위한 설정의 assembler를 각각 데이터에 적용한다.
train = assembler.transform(beerinfo)
rec = assembler.transform(preferences)

In [22]:
# Kmeans를 위한 군집개수 설정과 학습
kmeans = KMeans().setK(100).setSeed(10)

In [23]:
model = kmeans.fit(train)

In [24]:
train_result = model.transform(train).select('num','prediction').collect()
rec_result = model.transform(rec).select('member','prediction').collect()

In [25]:
train_result_df = spark.createDataFrame(train_result)
rec_result_df = spark.createDataFrame(rec_result)

In [26]:
recByuser = rec_result_df.join(train_result_df, rec_result_df.prediction == train_result_df.prediction, 'left').\
select(rec_result_df.member, rec_result_df.prediction, train_result_df.num)

In [27]:
rec_result_df.toPandas()

Unnamed: 0,member,prediction
0,m200117021,5
1,m200117023,21
2,m200117022,5


## 유저별 맥추추천 목록 중 서비스에 적용할 목록을 추리자

유저별로 경험한적 있는 맥주는 제외

In [52]:
recByuser02 = recByuser.select('num','member').subtract(usrinfo.select('beer','member'))

유저별 맥주 3개씩만 추천

In [30]:
# 멤버별 아이디를 유일값으로 추출하여 리스트로 담기
member = recByuser02.select('member').distinct()
member = member.rdd.map(lambda r: r[0]).collect()

In [221]:
# 멤버별로 3개씩 맥주를 추천(rec1, rec2, rec3)
rec01=[]
rec02=[]
rec03=[]

for str in member:
    rec01.append(recByuser02.filter(recByuser02['member'] == str ).head(3)[0])
    rec02.append(recByuser02.filter(recByuser02['member'] == str ).head(3)[1])
    rec03.append(recByuser02.filter(recByuser02['member'] == str ).head(3)[2])

# row로 저장된 데이터를 DataFrame화하기 위해 parallelize
rec01=sc.parallelize(rec01)
rec02=sc.parallelize(rec02)
rec03=sc.parallelize(rec03)

# 각 추천컬럼을 Dataframe화
rec01=spark.createDataFrame(rec01,['rec1','member'])
rec02=spark.createDataFrame(rec02,['rec2','member'])
rec03=spark.createDataFrame(rec03,['rec3','member'])

# 추천컬럼을 가진 각각의 Dataframe을 병합
recByuser03 = rec01.join(rec02, on=['member'],how='inner')
recByuser03 = recByuser03.join(rec03,on=['member'],how='inner')


In [222]:
# Join한 DataFrame이 Oracle Server에 전송되지않는 버그 해결을 위해 collect화 후 createDataFrame
tmp=recByuser03.collect()
 
recByuser03=spark.createDataFrame(tmp)

## 추천맥주를 Oracle Server에 전송

In [223]:
recByuser03.write.jdbc("jdbc:oracle:thin:@192.168.32.1:1521:XE", \
                table="rec",
                mode="overwrite",\
                properties={"user":"beeradmin","password":"1234"})