### **1. Get Data**

In [3]:
import os
import zipfile

In [4]:
# Mount drive on google colab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
# Unzip dataset vào thư mục 'Lesson 2'
path = '/content/drive/MyDrive/Colab Notebooks/Datasets'
dataset_name = 'AI Specialist Lesson 04.zip'
data_path = os.path.join(path, dataset_name)

with zipfile.ZipFile(data_path, 'r') as dataset_zip_file:
  dataset_zip_file.extractall('/content')

In [6]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=1dfa1a36648dc27f53886fb076002a74c79d413d7cd4acc363fa034b150cbd4f
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


### **2. Read Dataset**

In [7]:
import pyspark
from pyspark.sql import SparkSession

In [8]:
spark = SparkSession.builder.appName('AI Specialist 29').getOrCreate()
spark

In [9]:
# Đọc file csv
pokemon = spark.read.option('header', 'true').csv('AI Specialist Lesson 04/pokemon.csv')
combat = spark.read.option('header', 'true').csv('AI Specialist Lesson 04/combats.csv')

In [10]:
display(pokemon)
display(combat)

DataFrame[#: string, Name: string, Type 1: string, Type 2: string, HP: string, Attack: string, Defense: string, Sp. Atk: string, Sp. Def: string, Speed: string, Generation: string, Legendary: string]

DataFrame[First_pokemon: string, Second_pokemon: string, Winner: string]

In [11]:
pokemon.show()
combat.show()

+---+----------------+------+------+---+------+-------+-------+-------+-----+----------+---------+
|  #|            Name|Type 1|Type 2| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|
+---+----------------+------+------+---+------+-------+-------+-------+-----+----------+---------+
|  1|       Bulbasaur| Grass|Poison| 45|    49|     49|     65|     65|   45|         1|    FALSE|
|  2|         Ivysaur| Grass|Poison| 60|    62|     63|     80|     80|   60|         1|    FALSE|
|  3|        Venusaur| Grass|Poison| 80|    82|     83|    100|    100|   80|         1|    FALSE|
|  4|   Mega Venusaur| Grass|Poison| 80|   100|    123|    122|    120|   80|         1|    FALSE|
|  5|      Charmander|  Fire|  NULL| 39|    52|     43|     60|     50|   65|         1|    FALSE|
|  6|      Charmeleon|  Fire|  NULL| 58|    64|     58|     80|     65|   80|         1|    FALSE|
|  7|       Charizard|  Fire|Flying| 78|    84|     78|    109|     85|  100|         1|    FALSE|
|  8|Mega 

In [12]:
# print kiến trúc
pokemon.printSchema()
combat.printSchema()

root
 |-- #: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Type 1: string (nullable = true)
 |-- Type 2: string (nullable = true)
 |-- HP: string (nullable = true)
 |-- Attack: string (nullable = true)
 |-- Defense: string (nullable = true)
 |-- Sp. Atk: string (nullable = true)
 |-- Sp. Def: string (nullable = true)
 |-- Speed: string (nullable = true)
 |-- Generation: string (nullable = true)
 |-- Legendary: string (nullable = true)

root
 |-- First_pokemon: string (nullable = true)
 |-- Second_pokemon: string (nullable = true)
 |-- Winner: string (nullable = true)



In [13]:
print(pokemon.dtypes)
print(combat.dtypes)

[('#', 'string'), ('Name', 'string'), ('Type 1', 'string'), ('Type 2', 'string'), ('HP', 'string'), ('Attack', 'string'), ('Defense', 'string'), ('Sp. Atk', 'string'), ('Sp. Def', 'string'), ('Speed', 'string'), ('Generation', 'string'), ('Legendary', 'string')]
[('First_pokemon', 'string'), ('Second_pokemon', 'string'), ('Winner', 'string')]


In [14]:
# pokemon.describe().show()
combat.describe().show()

+-------+-----------------+------------------+-----------------+
|summary|    First_pokemon|    Second_pokemon|           Winner|
+-------+-----------------+------------------+-----------------+
|  count|            50000|             50000|            50000|
|   mean|        400.49564|         403.15966|         408.8901|
| stddev|229.5494293606519|230.08364413480214|231.1599613266632|
|    min|                1|                 1|                1|
|    max|               99|                99|               99|
+-------+-----------------+------------------+-----------------+



In [15]:
print(pokemon.columns)
print(combat.columns)

['#', 'Name', 'Type 1', 'Type 2', 'HP', 'Attack', 'Defense', 'Sp. Atk', 'Sp. Def', 'Speed', 'Generation', 'Legendary']
['First_pokemon', 'Second_pokemon', 'Winner']


In [16]:
# Rename columns
pokemon = pokemon.withColumnRenamed('#', 'Pokedex')
pokemon = pokemon.withColumnRenamed('Sp. Atk', 'Sp_Atk')
pokemon = pokemon.withColumnRenamed('Sp. Def', 'Sp_Def')

In [17]:
pokemon.show()

+-------+----------------+------+------+---+------+-------+------+------+-----+----------+---------+
|Pokedex|            Name|Type 1|Type 2| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|Generation|Legendary|
+-------+----------------+------+------+---+------+-------+------+------+-----+----------+---------+
|      1|       Bulbasaur| Grass|Poison| 45|    49|     49|    65|    65|   45|         1|    FALSE|
|      2|         Ivysaur| Grass|Poison| 60|    62|     63|    80|    80|   60|         1|    FALSE|
|      3|        Venusaur| Grass|Poison| 80|    82|     83|   100|   100|   80|         1|    FALSE|
|      4|   Mega Venusaur| Grass|Poison| 80|   100|    123|   122|   120|   80|         1|    FALSE|
|      5|      Charmander|  Fire|  NULL| 39|    52|     43|    60|    50|   65|         1|    FALSE|
|      6|      Charmeleon|  Fire|  NULL| 58|    64|     58|    80|    65|   80|         1|    FALSE|
|      7|       Charizard|  Fire|Flying| 78|    84|     78|   109|    85|  100|         1| 

In [18]:
pokemon.describe().show()

+-------+------------------+------------------+------+------+------------------+-----------------+------------------+----------------+-----------------+------------------+------------------+---------+
|summary|           Pokedex|              Name|Type 1|Type 2|                HP|           Attack|           Defense|          Sp_Atk|           Sp_Def|             Speed|        Generation|Legendary|
+-------+------------------+------------------+------+------+------------------+-----------------+------------------+----------------+-----------------+------------------+------------------+---------+
|  count|               800|               799|   800|   414|               800|              800|               800|             800|              800|               800|               800|      800|
|   mean|             400.5|              NULL|  NULL|  NULL|          69.25875|         79.00125|           73.8425|           72.82|          71.9025|           68.2775|           3.32375|     N

### **3. Preprocessing Data**

In [19]:
NullDict = {col:pokemon.filter(pokemon[col].isNull()).count() for col in pokemon.columns}
NullDict

{'Pokedex': 0,
 'Name': 1,
 'Type 1': 0,
 'Type 2': 386,
 'HP': 0,
 'Attack': 0,
 'Defense': 0,
 'Sp_Atk': 0,
 'Sp_Def': 0,
 'Speed': 0,
 'Generation': 0,
 'Legendary': 0}

In [20]:
# 'Name' có 1 giá trị null
# dùng filter để lọc ra row này
# 4 cách đều cho ra kết quả như nhau

pokemon.filter('Name IS NULL').show()

pokemon.filter(pokemon.Name.isNull()).show()

from pyspark.sql.functions import col
pokemon.filter(col('Name').isNull()).show()

# cách thứ 4 dùng câu lệnh SQL
pokemon.createOrReplaceTempView('DATA') # Ánh xạ pokemon thành 1 table tên là DATA
spark.sql('SELECT * FROM DATA where Name IS NULL').show()

+-------+----+--------+------+---+------+-------+------+------+-----+----------+---------+
|Pokedex|Name|  Type 1|Type 2| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|Generation|Legendary|
+-------+----+--------+------+---+------+-------+------+------+-----+----------+---------+
|     63|NULL|Fighting|  NULL| 65|   105|     60|    60|    70|   95|         1|    FALSE|
+-------+----+--------+------+---+------+-------+------+------+-----+----------+---------+

+-------+----+--------+------+---+------+-------+------+------+-----+----------+---------+
|Pokedex|Name|  Type 1|Type 2| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|Generation|Legendary|
+-------+----+--------+------+---+------+-------+------+------+-----+----------+---------+
|     63|NULL|Fighting|  NULL| 65|   105|     60|    60|    70|   95|         1|    FALSE|
+-------+----+--------+------+---+------+-------+------+------+-----+----------+---------+

+-------+----+--------+------+---+------+-------+------+------+-----+----------+--------

In [21]:
pokemon.filter((col('Type 1')=='Grass') & (col('Type 2')=='Poison')).show()
pokemon.filter("`Type 1`='Grass' AND `Type 2`='Poison'").show()

+-------+-------------+------+------+---+------+-------+------+------+-----+----------+---------+
|Pokedex|         Name|Type 1|Type 2| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|Generation|Legendary|
+-------+-------------+------+------+---+------+-------+------+------+-----+----------+---------+
|      1|    Bulbasaur| Grass|Poison| 45|    49|     49|    65|    65|   45|         1|    FALSE|
|      2|      Ivysaur| Grass|Poison| 60|    62|     63|    80|    80|   60|         1|    FALSE|
|      3|     Venusaur| Grass|Poison| 80|    82|     83|   100|   100|   80|         1|    FALSE|
|      4|Mega Venusaur| Grass|Poison| 80|   100|    123|   122|   120|   80|         1|    FALSE|
|     49|       Oddish| Grass|Poison| 45|    50|     55|    75|    65|   30|         1|    FALSE|
|     50|        Gloom| Grass|Poison| 60|    65|     70|    85|    75|   40|         1|    FALSE|
|     51|    Vileplume| Grass|Poison| 75|    80|     85|   110|    90|   50|         1|    FALSE|
|     76|   Bellspro

In [22]:
pokemon.createOrReplaceTempView('Pokemon')
combat.createOrReplaceTempView('Combat')

spark.sql("SELECT * FROM Pokemon WHERE `Type 1` = 'Grass'").show()
spark.sql("SELECT * FROM Pokemon WHERE `Type 1` = 'Grass' AND `Type 2` = 'Poison'").show()

+-------+-------------+------+-------+---+------+-------+------+------+-----+----------+---------+
|Pokedex|         Name|Type 1| Type 2| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|Generation|Legendary|
+-------+-------------+------+-------+---+------+-------+------+------+-----+----------+---------+
|      1|    Bulbasaur| Grass| Poison| 45|    49|     49|    65|    65|   45|         1|    FALSE|
|      2|      Ivysaur| Grass| Poison| 60|    62|     63|    80|    80|   60|         1|    FALSE|
|      3|     Venusaur| Grass| Poison| 80|    82|     83|   100|   100|   80|         1|    FALSE|
|      4|Mega Venusaur| Grass| Poison| 80|   100|    123|   122|   120|   80|         1|    FALSE|
|     49|       Oddish| Grass| Poison| 45|    50|     55|    75|    65|   30|         1|    FALSE|
|     50|        Gloom| Grass| Poison| 60|    65|     70|    85|    75|   40|         1|    FALSE|
|     51|    Vileplume| Grass| Poison| 75|    80|     85|   110|    90|   50|         1|    FALSE|
|     76| 

>**Note:**
* Với các columname Type 1 và Type 2, có chứa space ở giữa, khi filter hoặc dùng sql truy vấn cần phải để trong cặp dấu ``.
* Gây rườm rà, khó viết, dễ sai. Vì thế, ngay từ đầu ta nên đổi tên của các column này để chúng không chứa space trong đó.

In [23]:
# Thay thế các giá trị Null ở cột Name hàng 63 bằng 1 giá trị khác
from pyspark.sql.functions import when
from pyspark.sql.functions import regexp_replace

pokemon = pokemon.withColumn('Name',                    # Tại cột Name
                             when(col('Pokedex')==63,   # Với giá trị null ở hàng 63
                                  'Primeape').otherwise(col('Name'))) # Thay giá trị null bằng 'Primeape', và giữ nguyên các giá trị khác

In [24]:
pokemon.filter(pokemon.Name.isNull()).show()

+-------+----+------+------+---+------+-------+------+------+-----+----------+---------+
|Pokedex|Name|Type 1|Type 2| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|Generation|Legendary|
+-------+----+------+------+---+------+-------+------+------+-----+----------+---------+
+-------+----+------+------+---+------+-------+------+------+-----+----------+---------+



### **4. Feature Engineering**

In [31]:
# Create schema
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
emptyRDD = spark.sparkContext.emptyRDD() # dataframe trong spark được gọi là RDD
# schema có thể coi là cấu trúc từng cột của dataframe
schema = StructType([StructField('Pokedex', IntegerType(), True),
                     StructField('Total_Fights', IntegerType(), True),
                     StructField('Total_Wins', IntegerType(), True),
                     StructField('Win_Percent', FloatType(), True)])

WinStateDF = spark.createDataFrame(emptyRDD,schema)
WinStateDF.printSchema()

root
 |-- Pokedex: integer (nullable = true)
 |-- Total_Fights: integer (nullable = true)
 |-- Total_Wins: integer (nullable = true)
 |-- Win_Percent: float (nullable = true)



In [30]:
WinStateDF.show()

+-------+------------+----------+-----------+
|Pokedex|Total_Fights|Total_Wins|Win_Percent|
+-------+------------+----------+-----------+
+-------+------------+----------+-----------+



In [40]:
combat.orderBy(['First_pokemon'], ascending=[True]).show()

+-------------+--------------+------+
|First_pokemon|Second_pokemon|Winner|
+-------------+--------------+------+
|            1|           717|   717|
|            1|           372|   372|
|            1|            54|    54|
|            1|           324|     1|
|            1|           766|   766|
|            1|           385|     1|
|            1|           557|   557|
|            1|           493|   493|
|            1|           153|   153|
|            1|           115|   115|
|            1|           604|   604|
|            1|           351|   351|
|            1|           579|     1|
|            1|           779|     1|
|            1|           194|     1|
|            1|           488|   488|
|            1|           679|   679|
|            1|           503|   503|
|            1|           285|     1|
|            1|           483|   483|
+-------------+--------------+------+
only showing top 20 rows



In [47]:
combat_first = combat.groupBy('First_pokemon').count()
combat_first.show()

+-------------+-----+
|First_pokemon|count|
+-------------+-----+
|          691|   65|
|          296|   50|
|          675|   49|
|          467|   58|
|          451|   58|
|          125|   63|
|          800|   61|
|          666|   43|
|           51|   61|
|          447|   65|
|          591|   64|
|          124|   67|
|            7|   60|
|          475|   65|
|          574|   63|
|          307|   60|
|          613|   59|
|          718|   60|
|          169|   58|
|          577|   62|
+-------------+-----+
only showing top 20 rows



In [48]:
combat_first.createOrReplaceTempView('DATA')
combat_first = spark.sql("SELECT * FROM DATA ORDER BY CAST(First_pokemon AS INT) ASC")
combat_first.show()

+-------------+-----+
|First_pokemon|count|
+-------------+-----+
|            1|   70|
|            2|   55|
|            3|   68|
|            4|   62|
|            5|   50|
|            6|   66|
|            7|   60|
|            8|   65|
|            9|   66|
|           10|   58|
|           11|   79|
|           13|   64|
|           14|   54|
|           15|   69|
|           16|   61|
|           17|   65|
|           18|   63|
|           19|   73|
|           20|   72|
|           21|   63|
+-------------+-----+
only showing top 20 rows



>**Note:**
* 1 mục đích nào đó có thể thực hiện bằng 2 cách, sử dụng câu lệnh của spark hoặc sql

In [38]:
combat_second = combat.groupBy('Second_pokemon').count()
combat_second.show()

+--------------+-----+
|Second_pokemon|count|
+--------------+-----+
|           691|   57|
|           675|   66|
|           467|   66|
|           296|   73|
|           451|   74|
|           800|   60|
|           125|   65|
|           666|   68|
|             7|   73|
|           591|   69|
|           124|   59|
|           447|   59|
|            51|   63|
|           574|   64|
|           307|   60|
|           718|   68|
|           475|   70|
|           613|   54|
|           747|   63|
|           544|   73|
+--------------+-----+
only showing top 20 rows



In [46]:
combat_second.createOrReplaceTempView('DATA')
combat_second = spark.sql('SELECT * FROM DATA ORDER BY CAST(Second_pokemon AS INT) ASC')
combat_second.show()

+--------------+-----+
|Second_pokemon|count|
+--------------+-----+
|             1|   63|
|             2|   66|
|             3|   64|
|             4|   63|
|             5|   62|
|             6|   52|
|             7|   73|
|             8|   74|
|             9|   69|
|            10|   59|
|            11|   62|
|            13|   80|
|            14|   61|
|            15|   64|
|            16|   59|
|            17|   46|
|            18|   67|
|            19|   55|
|            20|   47|
|            21|   63|
+--------------+-----+
only showing top 20 rows



In [49]:
combat_winner = combat.groupBy('Winner').count()

combat_winner.createOrReplaceTempView('DATA')
combat_winner = spark.sql('SELECT * FROM DATA ORDER BY CAST(Winner AS INT) ASC')
combat_winner.show()

+------+-----+
|Winner|count|
+------+-----+
|     1|   37|
|     2|   46|
|     3|   89|
|     4|   70|
|     5|   55|
|     6|   64|
|     7|  115|
|     8|  119|
|     9|  114|
|    10|   19|
|    11|   59|
|    13|   83|
|    14|   17|
|    15|   13|
|    16|   75|
|    17|   26|
|    18|   17|
|    19|   80|
|    20|  115|
|    21|   47|
+------+-----+
only showing top 20 rows



>**Note:**
* Trên đây ta đã tạo được 3 bảng riêng lẻ bao gồm:
* combat_first: đếm số trận đã đấu của mỗi pokemon khi nó là First_pokemon
* combat_second: đếm số trận đã đấu của mỗi pokemon khi nó là Second_pokemon
* combat_winner: đếm số trận chiến thắng của mỗi pokemon.
* Tiếp theo cần kết hợp 3 bảng này lại.

In [53]:
# Kết hợp 3 bảng trên lại thành 1
combat_first.createOrReplaceTempView('DATA1')
combat_second.createOrReplaceTempView('DATA2')
combat_winner.createOrReplaceTempView('DATA3')

combat_stats = spark.sql("SELECT F.First_pokemon AS Pokedex, F.count+S.count AS TotalFight, W.count AS TotalWin\
                          FROM DATA1 F\
                          INNER JOIN DATA2 S ON F.First_pokemon=S.Second_pokemon\
                          LEFT JOIN DATA3 W ON W.Winner = F.First_pokemon\
                          ORDER BY CAST(Pokedex AS INT) ASC")
combat_stats.show()

+-------+----------+--------+
|Pokedex|TotalFight|TotalWin|
+-------+----------+--------+
|      1|       133|      37|
|      2|       121|      46|
|      3|       132|      89|
|      4|       125|      70|
|      5|       112|      55|
|      6|       118|      64|
|      7|       133|     115|
|      8|       139|     119|
|      9|       135|     114|
|     10|       117|      19|
|     11|       141|      59|
|     13|       144|      83|
|     14|       115|      17|
|     15|       133|      13|
|     16|       120|      75|
|     17|       111|      26|
|     18|       130|      17|
|     19|       128|      80|
|     20|       119|     115|
|     21|       126|      47|
+-------+----------+--------+
only showing top 20 rows



In [57]:
combat_stats = combat_stats.withColumn('WinPercent',
                                       combat_stats.TotalWin/combat_stats.TotalFight)
combat_stats.show()

+-------+----------+--------+-------------------+
|Pokedex|TotalFight|TotalWin|         WinPercent|
+-------+----------+--------+-------------------+
|      1|       133|      37| 0.2781954887218045|
|      2|       121|      46|0.38016528925619836|
|      3|       132|      89| 0.6742424242424242|
|      4|       125|      70|               0.56|
|      5|       112|      55|0.49107142857142855|
|      6|       118|      64| 0.5423728813559322|
|      7|       133|     115| 0.8646616541353384|
|      8|       139|     119| 0.8561151079136691|
|      9|       135|     114| 0.8444444444444444|
|     10|       117|      19| 0.1623931623931624|
|     11|       141|      59|0.41843971631205673|
|     13|       144|      83| 0.5763888888888888|
|     14|       115|      17|0.14782608695652175|
|     15|       133|      13|0.09774436090225563|
|     16|       120|      75|              0.625|
|     17|       111|      26|0.23423423423423423|
|     18|       130|      17|0.13076923076923078|


In [58]:
pokemon.show()

+-------+----------------+------+------+---+------+-------+------+------+-----+----------+---------+
|Pokedex|            Name|Type 1|Type 2| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|Generation|Legendary|
+-------+----------------+------+------+---+------+-------+------+------+-----+----------+---------+
|      1|       Bulbasaur| Grass|Poison| 45|    49|     49|    65|    65|   45|         1|    FALSE|
|      2|         Ivysaur| Grass|Poison| 60|    62|     63|    80|    80|   60|         1|    FALSE|
|      3|        Venusaur| Grass|Poison| 80|    82|     83|   100|   100|   80|         1|    FALSE|
|      4|   Mega Venusaur| Grass|Poison| 80|   100|    123|   122|   120|   80|         1|    FALSE|
|      5|      Charmander|  Fire|  NULL| 39|    52|     43|    60|    50|   65|         1|    FALSE|
|      6|      Charmeleon|  Fire|  NULL| 58|    64|     58|    80|    65|   80|         1|    FALSE|
|      7|       Charizard|  Fire|Flying| 78|    84|     78|   109|    85|  100|         1| 

Kết hợp 2 bảng pokemon và combat_stats để tạo ra data cuối cùng mang thông tin từ cả 2 table pokemon và combat ban đầu

In [62]:
# Kết hợp 2 bảng pokemon và combat_stats
pokemon.createOrReplaceTempView('DATA_P')
combat_stats.createOrReplaceTempView('DATA_S')

data = spark.sql('SELECT F.Pokedex, F.HP, F.Attack, F.Defense, F.Sp_Atk, F.Sp_Def, F.Speed, S.WinPercent\
                  FROM DATA_P F\
                  INNER JOIN DATA_S S ON F.Pokedex = S.Pokedex\
                  ORDER BY CAST(S.Pokedex AS INT) ASC')
data.show()

+-------+---+------+-------+------+------+-----+-------------------+
|Pokedex| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|         WinPercent|
+-------+---+------+-------+------+------+-----+-------------------+
|      1| 45|    49|     49|    65|    65|   45| 0.2781954887218045|
|      2| 60|    62|     63|    80|    80|   60|0.38016528925619836|
|      3| 80|    82|     83|   100|   100|   80| 0.6742424242424242|
|      4| 80|   100|    123|   122|   120|   80|               0.56|
|      5| 39|    52|     43|    60|    50|   65|0.49107142857142855|
|      6| 58|    64|     58|    80|    65|   80| 0.5423728813559322|
|      7| 78|    84|     78|   109|    85|  100| 0.8646616541353384|
|      8| 78|   130|    111|   130|    85|  100| 0.8561151079136691|
|      9| 78|   104|     78|   159|   115|  100| 0.8444444444444444|
|     10| 44|    48|     65|    50|    64|   43| 0.1623931623931624|
|     11| 59|    63|     80|    65|    80|   58|0.41843971631205673|
|     13| 79|   103|    120|   135

>**Note:**
* Các model ML trên spark học theo kiểu phân tán (Distribute Learning) khác với ML khi dùng với pandas.

In [64]:
data.dtypes

[('Pokedex', 'string'),
 ('HP', 'string'),
 ('Attack', 'string'),
 ('Defense', 'string'),
 ('Sp_Atk', 'string'),
 ('Sp_Def', 'string'),
 ('Speed', 'string'),
 ('WinPercent', 'double')]

In [65]:
data = data.withColumn('HP', data['HP'].cast(IntegerType()))
data = data.withColumn('Attack', data['Attack'].cast(IntegerType()))
data = data.withColumn('Defense', data['Defense'].cast(IntegerType()))
data = data.withColumn('Sp_Atk', data['Sp_Atk'].cast(IntegerType()))
data = data.withColumn('Sp_Def', data['Sp_Def'].cast(IntegerType()))
data = data.withColumn('Speed', data['Speed'].cast(IntegerType()))

In [66]:
data.dtypes

[('Pokedex', 'string'),
 ('HP', 'int'),
 ('Attack', 'int'),
 ('Defense', 'int'),
 ('Sp_Atk', 'int'),
 ('Sp_Def', 'int'),
 ('Speed', 'int'),
 ('WinPercent', 'double')]

In [68]:
NullDict = {col:data.filter(data[col].isNull()).count() for col in data.columns}
NullDict

{'Pokedex': 0,
 'HP': 0,
 'Attack': 0,
 'Defense': 0,
 'Sp_Atk': 0,
 'Sp_Def': 0,
 'Speed': 0,
 'WinPercent': 1}

>**Note:**
* WinPercent có chứa 1 giá trị null

In [69]:
data.filter('WinPercent IS NULL').show()

+-------+---+------+-------+------+------+-----+----------+
|Pokedex| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|WinPercent|
+-------+---+------+-------+------+------+-----+----------+
|    231| 20|    10|    230|    10|   230|    5|      NULL|
+-------+---+------+-------+------+------+-----+----------+



In [70]:
data.filter(data.WinPercent.isNull()).show()

+-------+---+------+-------+------+------+-----+----------+
|Pokedex| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|WinPercent|
+-------+---+------+-------+------+------+-----+----------+
|    231| 20|    10|    230|    10|   230|    5|      NULL|
+-------+---+------+-------+------+------+-----+----------+



In [71]:
data.filter(col('WinPercent').isNull()).show()

+-------+---+------+-------+------+------+-----+----------+
|Pokedex| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|WinPercent|
+-------+---+------+-------+------+------+-----+----------+
|    231| 20|    10|    230|    10|   230|    5|      NULL|
+-------+---+------+-------+------+------+-----+----------+



Loại bỏ hàng có chứa null ở cột WinPercent

In [72]:
data = data.na.drop()

In [74]:
NullDict = {col:data.filter(data[col].isNull()).count() for col in data.columns}
NullDict

{'Pokedex': 0,
 'HP': 0,
 'Attack': 0,
 'Defense': 0,
 'Sp_Atk': 0,
 'Sp_Def': 0,
 'Speed': 0,
 'WinPercent': 0}

In [75]:
# Trong pyspark các feature sẽ được gom lại thành 1 vector giúp tăng tốc độ
# tính toán chứ không để riêng rẽ từng cột như bình thường.
# trong bài này ta gom các feature: 'HP', 'Attack', 'Defense', 'Sp_Atk', 'Sp_Def', 'Speed'
# lại thành 1 vector và đặt tên nó là Independent_Features
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=['HP', 'Attack', 'Defense',
                                              'Sp_Atk', 'Sp_Def', 'Speed'], outputCol='Independent_Features')

In [76]:
output = featureassembler.transform(data)

In [77]:
output.show()

+-------+---+------+-------+------+------+-----+-------------------+--------------------+
|Pokedex| HP|Attack|Defense|Sp_Atk|Sp_Def|Speed|         WinPercent|Independent_Features|
+-------+---+------+-------+------+------+-----+-------------------+--------------------+
|      1| 45|    49|     49|    65|    65|   45| 0.2781954887218045|[45.0,49.0,49.0,6...|
|      2| 60|    62|     63|    80|    80|   60|0.38016528925619836|[60.0,62.0,63.0,8...|
|      3| 80|    82|     83|   100|   100|   80| 0.6742424242424242|[80.0,82.0,83.0,1...|
|      4| 80|   100|    123|   122|   120|   80|               0.56|[80.0,100.0,123.0...|
|      5| 39|    52|     43|    60|    50|   65|0.49107142857142855|[39.0,52.0,43.0,6...|
|      6| 58|    64|     58|    80|    65|   80| 0.5423728813559322|[58.0,64.0,58.0,8...|
|      7| 78|    84|     78|   109|    85|  100| 0.8646616541353384|[78.0,84.0,78.0,1...|
|      8| 78|   130|    111|   130|    85|  100| 0.8561151079136691|[78.0,130.0,111.0...|
|      9| 

>**Note:**
* Dễ dàng quan sát thấy Independent_Features chứa tất cả các giá trị của các feature riêng lẻ ban đầu.

In [78]:
# Dùng select để chọn ra cột cần dùng
finalized_data = output.select('Independent_Features', 'WinPercent')

In [79]:
finalized_data.show()

+--------------------+-------------------+
|Independent_Features|         WinPercent|
+--------------------+-------------------+
|[45.0,49.0,49.0,6...| 0.2781954887218045|
|[60.0,62.0,63.0,8...|0.38016528925619836|
|[80.0,82.0,83.0,1...| 0.6742424242424242|
|[80.0,100.0,123.0...|               0.56|
|[39.0,52.0,43.0,6...|0.49107142857142855|
|[58.0,64.0,58.0,8...| 0.5423728813559322|
|[78.0,84.0,78.0,1...| 0.8646616541353384|
|[78.0,130.0,111.0...| 0.8561151079136691|
|[78.0,104.0,78.0,...| 0.8444444444444444|
|[44.0,48.0,65.0,5...| 0.1623931623931624|
|[59.0,63.0,80.0,6...|0.41843971631205673|
|[79.0,103.0,120.0...| 0.5763888888888888|
|[45.0,30.0,35.0,2...|0.14782608695652175|
|[50.0,20.0,55.0,2...|0.09774436090225563|
|[60.0,45.0,50.0,9...|              0.625|
|[40.0,35.0,30.0,2...|0.23423423423423423|
|[45.0,25.0,50.0,2...|0.13076923076923078|
|[65.0,90.0,40.0,4...|              0.625|
|[65.0,150.0,40.0,...| 0.9663865546218487|
|[40.0,45.0,40.0,3...|  0.373015873015873|
+----------

In [80]:
finalized_data.dtypes

[('Independent_Features', 'vector'), ('WinPercent', 'double')]

>**Note:**
* finalized_data là dataset cuối cùng ta thu được, finalized_data sẽ được dùng để đưa vào các model ML, DP...

### **5. Build and train model with PySpark**

In [81]:
from pyspark.ml.regression import LinearRegression
# train test split
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
# fit model
regressor = LinearRegression(featuresCol='Independent_Features', labelCol='WinPercent')
regressor = regressor.fit(train_data)

In [83]:
regressor.coefficients # w

DenseVector([0.0002, 0.0012, 0.0, -0.0001, 0.0003, 0.0077])

In [84]:
regressor.intercept # b

-0.1462278648902566

### **6. Predict and Evaluate Model**

In [85]:
# Predictions
pred_results = regressor.evaluate(test_data)

In [86]:
# final comparision
pred_results.predictions.show()

+--------------------+-------------------+-------------------+
|Independent_Features|         WinPercent|         prediction|
+--------------------+-------------------+-------------------+
|[1.0,90.0,45.0,30...|0.33064516129032256| 0.2773115232315507|
|[10.0,55.0,25.0,3...| 0.5396825396825397| 0.6617648956708085|
|[20.0,15.0,20.0,1...| 0.2682926829268293| 0.5055424664865963|
|[25.0,20.0,15.0,1...|0.32407407407407407| 0.5764577564105332|
|[30.0,35.0,30.0,1...|          0.4765625| 0.5139817499997683|
|[30.0,45.0,59.0,3...| 0.2803030303030303| 0.3610481187403439|
|[30.0,80.0,90.0,5...| 0.4295774647887324|0.38794586498172423|
|[31.0,45.0,90.0,3...|0.21929824561403508|0.22924728359086247|
|[35.0,55.0,40.0,5...| 0.6838235294117647| 0.6273128123712489|
|[38.0,36.0,38.0,3...| 0.3893805309734513|0.34930117343737166|
|[38.0,41.0,40.0,5...| 0.4108527131782946|   0.42311002498602|
|[40.0,30.0,30.0,4...|0.07936507936507936|0.13758686893545588|
|[40.0,45.0,35.0,4...|               0.65|  0.614099932

In [87]:
# PErformance Metrics
pred_results.r2, pred_results.meanAbsoluteError, pred_results.meanSquaredError

(0.9130549663892484, 0.05729584253705454, 0.005613444451449277)