In [183]:
import findspark

findspark.init()

In [184]:
# spark 생성

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [185]:
match_data = spark.read.csv("hdfs://localhost:19000/data/match_data_2.csv", header="true", inferSchema="true")
winrate = spark.read.csv("hdfs://localhost:19000/data/winrate.csv", header="true", inferSchema="true")
winrate_team = spark.read.csv("hdfs://localhost:19000/data/winrate_team.csv", header="true", inferSchema="true")
winrate_team_recent = spark.read.csv("hdfs://localhost:19000/data/winrate_team_recent.csv", header="true", inferSchema="true")

In [186]:
match_data.show(5)

+----------+-------------------------+-------------+--------------+------------+---+----------+---------+--------+-----------+----------+--------+---------+------------+----------+---------+------+------+
|      date|                    title|        stage|         team1|       team2|win|      top1|      jg1|    mid1|       adc1|      sup1|    top2|      jg2|        mid2|      adc2|     sup2|point1|point2|
+----------+-------------------------+-------------+--------------+------------+---+----------+---------+--------+-----------+----------+--------+---------+------------+----------+---------+------+------+
|2021-08-29|2021 LCK 서머 포스트 시즌|       결승전|            T1|     DWG KIA|  L|  T1 Canna|  T1 Oner|T1 Faker|T1 Gumayusi|  T1 Keria| DK Khan|DK Canyon|DK ShowMaker|  DK Ghost| DK BeryL|     1|     3|
|2021-08-22|2021 LCK 서머 포스트 시즌|2라운드 2경기| Gen.G eSports|          T1|  L|GEN Rascal| GEN Clid| GEN BDD|  GEN Ruler|  GEN Life|T1 Canna|  T1 Oner|    T1 Faker|  T1 Teddy| T1 Keria|     1|     

In [202]:
winrate.show(5)

+---+----------+-------+-------+----------+-------+
|_c0|      date|player1|player2| daybefore|winrate|
+---+----------+-------+-------+----------+-------+
|  0|2021-08-29|  Canna|   Khan|2021-08-28|    50%|
|  1|2021-08-29|   Khan|  Canna|2021-08-28|    50%|
|  2|2021-08-22| Rascal|  Canna|2021-08-21|  44.8%|
|  3|2021-08-22|  Canna| Rascal|2021-08-21|  55.2%|
|  4|2021-08-21|   Rich|   Khan|2021-08-20|  28.6%|
+---+----------+-------+-------+----------+-------+
only showing top 5 rows



In [203]:
winrate_team.show(5)

+---+----------+-------------+-------------+----------+-------+
|_c0|      date|        team1|        team2| daybefore|winrate|
+---+----------+-------------+-------------+----------+-------+
|  0|2021-08-29|           T1|      DWG KIA|2021-08-28|  38.5%|
|  1|2021-08-29|      DWG KIA|           T1|2021-08-28|  61.5%|
|  2|2021-08-22|Gen.G eSports|           T1|2021-08-21|  36.7%|
|  3|2021-08-22|           T1|Gen.G eSports|2021-08-21|  63.3%|
|  4|2021-08-21| NS RED FORCE|      DWG KIA|2021-08-20|  22.2%|
+---+----------+-------------+-------------+----------+-------+
only showing top 5 rows



In [204]:
winrate_team_recent.show(5)

+---+----------+-------------+-------------+----------+----------+--------+--------+
|_c0|      date|        team1|        team2| daybefore|  twoweeks|winrate1|winrate2|
+---+----------+-------------+-------------+----------+----------+--------+--------+
|  0|2021-08-29|           T1|      DWG KIA|2021-08-28|2021-08-14|   66.7%|    100%|
|  1|2021-08-29|      DWG KIA|           T1|2021-08-28|2021-08-14|    100%|   66.7%|
|  2|2021-08-22|Gen.G eSports|           T1|2021-08-21|2021-08-07|   57.1%|   71.4%|
|  3|2021-08-22|           T1|Gen.G eSports|2021-08-21|2021-08-07|   71.4%|   57.1%|
|  4|2021-08-21| NS RED FORCE|      DWG KIA|2021-08-20|2021-08-06|     50%|    100%|
+---+----------+-------------+-------------+----------+----------+--------+--------+
only showing top 5 rows



In [187]:
# 뷰 생성

match_data.createOrReplaceTempView("match")
winrate.createOrReplaceTempView("player")
winrate_team.createOrReplaceTempView("team")
winrate_team_recent.createOrReplaceTempView("recent")

In [188]:
# 데이터 값 변환

# 선수별 승률, 팀 승률, 최근 2주간 승률 데이터 join
# win의 값 => W 이면 1, L이면 0
# (숫자)+'%'인 문자열을 (숫자)로 만들어줌

data = spark.sql("SELECT if(match.win='W',1,0) as win, float(replace(a.winrate,'%','')) as topwr, float(replace(b.winrate,'%','')) as jgwr,\
    float(replace(c.winrate,'%','')) as midwr, float(replace(d.winrate,'%','')) as adcwr, float(replace(e.winrate,'%','')) as supwr,\
    float(replace(team.winrate,'%','')) as teamwr, float(replace(recent.winrate1,'%','')) as recentwr1, float(replace(recent.winrate2,'%','')) as recentwr2 from match\
    left join player a on match.date=a.date and match.top1 like CONCAT('%', a.player1) and match.top2 like CONCAT('%', a.player2)\
    left join player b on match.date=b.date and match.jg1 like CONCAT('%', b.player1) and match.jg2 like CONCAT('%', b.player2)\
    left join player c on match.date=c.date and match.mid1 like CONCAT('%', c.player1) and match.mid2 like CONCAT('%', c.player2)\
    left join player d on match.date=d.date and match.adc1 like CONCAT('%', d.player1) and match.adc2 like CONCAT('%', d.player2)\
    left join player e on match.date=e.date and match.sup1 like CONCAT('%', e.player1) and match.sup2 like CONCAT('%', e.player2)\
    left join team on match.date=team.date and match.team1=team.team1 and match.team2=team.team2\
    left join recent on match.date=recent.date and match.team1=recent.team1 and match.team2=recent.team2")

In [189]:
'''# 데이터 값 변환

#  win의 값 => W 이면 1, L이면 0
# (숫자)+'%'인 문자열을 (숫자)로 만들어줌

from pyspark.sql.functions import translate
from pyspark.sql.types import IntegerType,FloatType

aa = aa.withColumn('win', translate(aa.win, 'L','0').cast('int'))
aa = aa.withColumn('win', translate(aa.win, 'W','1').cast('int'))
aa = aa.withColumn('topwr', translate(aa.topwr, '%','').cast('float'))
aa = aa.withColumn('jgwr',translate(aa.jgwr, '%','').cast('float'))
aa = aa.withColumn('midwr',translate(aa.midwr, '%','').cast('float'))
aa = aa.withColumn('adcwr',translate(aa.adcwr, '%','').cast('float'))
aa = aa.withColumn('supwr',translate(aa.supwr, '%','').cast('float'))
aa = aa.withColumn('teamwr',translate(aa.teamwr, '%','').cast('float'))
aa = aa.withColumn('recentwr1',translate(aa.recentwr1, '%','').cast('float'))
aa = aa.withColumn('recentwr2',translate(aa.recentwr2, '%','').cast('float'))'''

"# 데이터 값 변환\n\n#  win의 값 => W 이면 1, L이면 0\n# (숫자)+'%'인 문자열을 (숫자)로 만들어줌\n\nfrom pyspark.sql.functions import translate\nfrom pyspark.sql.types import IntegerType,FloatType\n\naa = aa.withColumn('win', translate(aa.win, 'L','0').cast('int'))\naa = aa.withColumn('win', translate(aa.win, 'W','1').cast('int'))\naa = aa.withColumn('topwr', translate(aa.topwr, '%','').cast('float'))\naa = aa.withColumn('jgwr',translate(aa.jgwr, '%','').cast('float'))\naa = aa.withColumn('midwr',translate(aa.midwr, '%','').cast('float'))\naa = aa.withColumn('adcwr',translate(aa.adcwr, '%','').cast('float'))\naa = aa.withColumn('supwr',translate(aa.supwr, '%','').cast('float'))\naa = aa.withColumn('teamwr',translate(aa.teamwr, '%','').cast('float'))\naa = aa.withColumn('recentwr1',translate(aa.recentwr1, '%','').cast('float'))\naa = aa.withColumn('recentwr2',translate(aa.recentwr2, '%','').cast('float'))"

In [190]:
data.show()

+---+-----+-----+-----+-----+-----+------+---------+---------+
|win|topwr| jgwr|midwr|adcwr|supwr|teamwr|recentwr1|recentwr2|
+---+-----+-----+-----+-----+-----+------+---------+---------+
|  0| 50.0| 50.0| 41.2| 50.0| 45.7|  38.5|     66.7|    100.0|
|  0| 44.8| 33.3| 47.0| 45.8| 43.3|  36.7|     57.1|     71.4|
|  0| 28.6| 29.2| 66.7| 22.2| 22.2|  22.2|     50.0|    100.0|
|  0| 35.3| 66.7| 85.7| 50.0| 58.3|  35.3|     50.0|     57.1|
|  0| 55.6| 42.1| 50.0| 40.0| 69.2|  53.3|     85.7|     33.3|
|  1| 92.3| 88.2| 66.7| 92.3| 84.6|  92.3|     37.5|     57.1|
|  0| 85.7| null| 60.0| 66.7| 90.0|  70.1|     75.0|     14.3|
|  1| 47.6| 60.0| 61.9| null| 60.0|  50.0|     71.4|     40.0|
|  1| 75.0|100.0| 92.9|100.0| 78.9|  70.6|    100.0|     54.5|
|  0| 40.0| 40.0| null| null| null|  57.1|     30.0|     50.0|
|  1| 80.0| 83.3| 62.1| 75.0| 71.4|  60.6|     42.9|     25.0|
|  0| 38.9| 40.0| 66.7| null| 50.0|  53.6|     62.5|     28.6|
|  0| 80.0| null| 58.6| 40.0| 40.5|  54.0|     42.9|   

In [191]:
data

DataFrame[win: int, topwr: float, jgwr: float, midwr: float, adcwr: float, supwr: float, teamwr: float, recentwr1: float, recentwr2: float]

In [192]:
# 결측값 개수 보기

from pyspark.sql.functions import isnan, when, count, col, isnull

data.select([count(when(isnull(c), c)).alias(c) for c in data.columns]).show()

+---+-----+----+-----+-----+-----+------+---------+---------+
|win|topwr|jgwr|midwr|adcwr|supwr|teamwr|recentwr1|recentwr2|
+---+-----+----+-----+-----+-----+------+---------+---------+
|  0|   24|  55|   37|   57|   35|     0|        0|        0|
+---+-----+----+-----+-----+-----+------+---------+---------+



In [193]:
# 결측값 처리

# 승률이기 때문에 아예 경기를 치르지 않았을 경우 50으로 채워 넣는다.

data = data.na.fill(50)

In [194]:
data.select([count(when(isnull(c), c)).alias(c) for c in data.columns]).show()

+---+-----+----+-----+-----+-----+------+---------+---------+
|win|topwr|jgwr|midwr|adcwr|supwr|teamwr|recentwr1|recentwr2|
+---+-----+----+-----+-----+-----+------+---------+---------+
|  0|    0|   0|    0|    0|    0|     0|        0|        0|
+---+-----+----+-----+-----+-----+------+---------+---------+



In [195]:
data.show()

+---+-----+-----+-----+-----+-----+------+---------+---------+
|win|topwr| jgwr|midwr|adcwr|supwr|teamwr|recentwr1|recentwr2|
+---+-----+-----+-----+-----+-----+------+---------+---------+
|  0| 50.0| 50.0| 41.2| 50.0| 45.7|  38.5|     66.7|    100.0|
|  0| 44.8| 33.3| 47.0| 45.8| 43.3|  36.7|     57.1|     71.4|
|  0| 28.6| 29.2| 66.7| 22.2| 22.2|  22.2|     50.0|    100.0|
|  0| 35.3| 66.7| 85.7| 50.0| 58.3|  35.3|     50.0|     57.1|
|  0| 55.6| 42.1| 50.0| 40.0| 69.2|  53.3|     85.7|     33.3|
|  1| 92.3| 88.2| 66.7| 92.3| 84.6|  92.3|     37.5|     57.1|
|  0| 85.7| 50.0| 60.0| 66.7| 90.0|  70.1|     75.0|     14.3|
|  1| 47.6| 60.0| 61.9| 50.0| 60.0|  50.0|     71.4|     40.0|
|  1| 75.0|100.0| 92.9|100.0| 78.9|  70.6|    100.0|     54.5|
|  0| 40.0| 40.0| 50.0| 50.0| 50.0|  57.1|     30.0|     50.0|
|  1| 80.0| 83.3| 62.1| 75.0| 71.4|  60.6|     42.9|     25.0|
|  0| 38.9| 40.0| 66.7| 50.0| 50.0|  53.6|     62.5|     28.6|
|  0| 80.0| 50.0| 58.6| 40.0| 40.5|  54.0|     42.9|   

In [196]:
# 데이터 저장

data.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("hdfs://localhost:19000/data/model_data.csv")

In [199]:
temp = spark.read.csv("hdfs://localhost:19000/data/model_data.csv", header="true", inferSchema="true")

In [201]:
temp.show()

+---+-----+-----+-----+-----+-----+------+---------+---------+
|win|topwr| jgwr|midwr|adcwr|supwr|teamwr|recentwr1|recentwr2|
+---+-----+-----+-----+-----+-----+------+---------+---------+
|  0| 50.0| 50.0| 41.2| 50.0| 45.7|  38.5|     66.7|    100.0|
|  0| 44.8| 33.3| 47.0| 45.8| 43.3|  36.7|     57.1|     71.4|
|  0| 28.6| 29.2| 66.7| 22.2| 22.2|  22.2|     50.0|    100.0|
|  0| 35.3| 66.7| 85.7| 50.0| 58.3|  35.3|     50.0|     57.1|
|  0| 55.6| 42.1| 50.0| 40.0| 69.2|  53.3|     85.7|     33.3|
|  1| 92.3| 88.2| 66.7| 92.3| 84.6|  92.3|     37.5|     57.1|
|  0| 85.7| 50.0| 60.0| 66.7| 90.0|  70.1|     75.0|     14.3|
|  1| 47.6| 60.0| 61.9| 50.0| 60.0|  50.0|     71.4|     40.0|
|  1| 75.0|100.0| 92.9|100.0| 78.9|  70.6|    100.0|     54.5|
|  0| 40.0| 40.0| 50.0| 50.0| 50.0|  57.1|     30.0|     50.0|
|  1| 80.0| 83.3| 62.1| 75.0| 71.4|  60.6|     42.9|     25.0|
|  0| 38.9| 40.0| 66.7| 50.0| 50.0|  53.6|     62.5|     28.6|
|  0| 80.0| 50.0| 58.6| 40.0| 40.5|  54.0|     42.9|   