In [34]:
import findspark
findspark.init()

In [35]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("pyspark-titanic-sol")
    .config("spark.driver.memory", "512m")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

In [36]:
spark.version

'2.4.5'

# Dataset obtenido de Kaggle sobre partidas de ajedrez

In [37]:
df = spark.read.csv('/dataset/chess_games.csv', header=True)

In [38]:
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [39]:
df.count()

6256184

In [7]:
df.printSchema()

root
 |-- Event: string (nullable = true)
 |-- White: string (nullable = true)
 |-- Black: string (nullable = true)
 |-- Result: string (nullable = true)
 |-- UTCDate: string (nullable = true)
 |-- UTCTime: string (nullable = true)
 |-- WhiteElo: string (nullable = true)
 |-- BlackElo: string (nullable = true)
 |-- WhiteRatingDiff: string (nullable = true)
 |-- BlackRatingDiff: string (nullable = true)
 |-- ECO: string (nullable = true)
 |-- Opening: string (nullable = true)
 |-- TimeControl: string (nullable = true)
 |-- Termination: string (nullable = true)
 |-- AN: string (nullable = true)



In [8]:
df.show()

+------------------+---------------+---------------+------+----------+--------+--------+--------+---------------+---------------+---+--------------------+-----------+------------+--------------------+
|             Event|          White|          Black|Result|   UTCDate| UTCTime|WhiteElo|BlackElo|WhiteRatingDiff|BlackRatingDiff|ECO|             Opening|TimeControl| Termination|                  AN|
+------------------+---------------+---------------+------+----------+--------+--------+--------+---------------+---------------+---+--------------------+-----------+------------+--------------------+
|        Classical |        eisaaaa|       HAMID449|   1-0|2016.06.30|22:00:01|    1901|    1896|           11.0|          -11.0|D10|        Slav Defense|      300+5|Time forfeit|1. d4 d5 2. c4 c6...|
|            Blitz |         go4jas|     Sergei1973|   0-1|2016.06.30|22:00:01|    1641|    1627|          -11.0|           12.0|C20|King's Pawn Openi...|      300+0|      Normal|1. e4 e5 2. b3 Nf

# **Result** será la variable a predecir

In [9]:
df.select('Result').show()

+------+
|Result|
+------+
|   1-0|
|   0-1|
|   1-0|
|   1-0|
|   0-1|
|   0-1|
|   0-1|
|   1-0|
|   0-1|
|   1-0|
|   0-1|
|   1-0|
|   1-0|
|   1-0|
|   0-1|
|   0-1|
|   1-0|
|   1-0|
|   0-1|
|   1-0|
+------+
only showing top 20 rows



In [10]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import rand, when

## Si ganaron blancas Result = 1, si ganaron negras Result = 0 y si empataron Result = 9. La columna se conviente en tipo entero

In [11]:
df_v2 = df.withColumn('Result', when( df['Result'] == '1-0', '1').otherwise(df['Result']))
df_v3 = df_v2.withColumn('Result', when( df_v2['Result'] == '0-1', '0').otherwise(df_v2['Result']))
df_v4 = df_v3.withColumn('Result', when( df_v3['Result'] == '1/2-1/2', '9').otherwise(df_v3['Result']))
df_final = df_v4.withColumn('Result', df_v4['Result'].cast(IntegerType()))

In [12]:
df_final.printSchema()

root
 |-- Event: string (nullable = true)
 |-- White: string (nullable = true)
 |-- Black: string (nullable = true)
 |-- Result: integer (nullable = true)
 |-- UTCDate: string (nullable = true)
 |-- UTCTime: string (nullable = true)
 |-- WhiteElo: string (nullable = true)
 |-- BlackElo: string (nullable = true)
 |-- WhiteRatingDiff: string (nullable = true)
 |-- BlackRatingDiff: string (nullable = true)
 |-- ECO: string (nullable = true)
 |-- Opening: string (nullable = true)
 |-- TimeControl: string (nullable = true)
 |-- Termination: string (nullable = true)
 |-- AN: string (nullable = true)



In [24]:
# Guardar resultado en archivo parquet

In [29]:
output_dir = '/dataset/chees_game.parquet'

In [30]:
df_final \
    .write \
    .mode('overwrite') \
    .parquet(output_dir)

In [31]:
df_parquet = spark.read.parquet(output_dir)

In [32]:
df_parquet.count()

6256184

In [None]:
# Visualizacion de archivo parquet

In [33]:
spark.sql("CREATE TEMPORARY VIEW TABLE USING parquet OPTIONS (path \'/dataset/chees_game.parquet')")
spark.sql("SELECT * FROM TABLE").show()

+-------------------+---------------+--------------------+------+----------+--------+--------+--------+---------------+---------------+---+--------------------+-----------+------------+--------------------+
|              Event|          White|               Black|Result|   UTCDate| UTCTime|WhiteElo|BlackElo|WhiteRatingDiff|BlackRatingDiff|ECO|             Opening|TimeControl| Termination|                  AN|
+-------------------+---------------+--------------------+------+----------+--------+--------+--------+---------------+---------------+---+--------------------+-----------+------------+--------------------+
|         Classical |      acuario22|              mvbg53|     1|2016.07.27|16:43:40|    1825|    1740|            8.0|           -8.0|C50|Italian Game: Sch...|      600+0|Time forfeit|1. e4 e5 2. Nf3 N...|
| Bullet tournament |           ng99|Kingscrusher-YouTube|     0|2016.07.27|16:43:41|    2218|    2448|           -6.0|            7.0|C45|Scotch Game: Clas...|       60+0|