In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

import pandas as pd
import numpy as np

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("fa21-ds5110-group10") \
    .getOrCreate()

In [3]:
df = spark.read.parquet("../../data/processed/chess_games_blitz_classic.parquet")

In [4]:
df.show(5)

+---------+--------------+-----------+------+----------+--------+--------+--------+---------------+---------------+---+--------------------+-----------+------------+--------------------+
|    event|         white|      black|result|   UTCDate| UTCTime|WhiteElo|BlackElo|WhiteRatingDiff|BlackRatingDiff|ECO|             Opening|TimeControl| Termination|                  AN|
+---------+--------------+-----------+------+----------+--------+--------+--------+---------------+---------------+---+--------------------+-----------+------------+--------------------+
|    Blitz|        Nippis|   Misha_44|   1-0|2016-01-26|18:03:38|    2068|    1846|           11.0|           -5.0|A34|English Opening: ...|      300+0|      Normal|1. c4 c5 2. Nc3 N...|
|    Blitz|   abracadaver| andremoniy|   1-0|2016-01-26|18:03:39|    1708|    1399|            3.0|           -3.0|A40|  English Defense #2|      180+0|      Normal|1. d4 b6 2. c4 Bb...|
|Classical|  tewarisachin|mohamad9003|   0-1|2016-01-26|18:03:39|

In [5]:
df.take(1)

[Row(event='Blitz', white='Nippis', black='Misha_44', result='1-0', UTCDate=datetime.date(2016, 1, 26), UTCTime='18:03:38', WhiteElo=2068, BlackElo=1846, WhiteRatingDiff=11.0, BlackRatingDiff=-5.0, ECO='A34', Opening='English Opening: Symmetrical Variation, Normal Variation', TimeControl='300+0', Termination='Normal', AN='1. c4 c5 2. Nc3 Nf6 3. Nf3 g6 4. d4 cxd4 5. Nxd4 Bg7 6. g3 O-O 7. Bg2 Nc6 8. O-O Nxd4 9. Qxd4 d6 10. Qd2 Rb8 11. Rd1 Qa5 12. a3 a6 13. b4 Qd8 14. Bb2 Be6 15. Nd5 Nxd5 16. cxd5 Bd7 17. Rac1 Rc8 18. Bxg7 Kxg7 19. Qd4+ f6 20. Qa7 Rb8 21. Rc3 Rf7 22. Rdc1 Bb5 23. e4 h5 24. Bh3 h4 25. Be6 Rf8 26. Rc7 hxg3 27. hxg3 f5 28. Rxb7 Rxb7 29. Qxb7 fxe4 30. Rc7 Kf6 31. Rc8 e3 32. Rxd8 Rxd8 33. fxe3 Rh8 34. Qb6 Rh5 35. Qd4+ Re5 36. Qf4+ Rf5 37. Bxf5 gxf5 38. e4 Bd3 39. Qxf5+ Kg7 40. g4 Kg8 41. g5 Kg7 42. g6 a5 43. Qf7+ Kh6 44. g7 Bxe4 45. g8=Q Bxd5 46. Qfh7# 1-0')]

In [6]:
# Broke strings into arrary
def movetype(x):
    import re
    moves = re.split('\d+\. ', x)[1:]
    return [x.strip() for x in moves]

udf_movetype = F.udf(lambda x: movetype(x), T.ArrayType(T.StringType()))

df = df.withColumn('moves', udf_movetype(F.col('AN')))
df = df.drop('AN')

In [7]:
df.take(1)

[Row(event='Blitz', white='Nippis', black='Misha_44', result='1-0', UTCDate=datetime.date(2016, 1, 26), UTCTime='18:03:38', WhiteElo=2068, BlackElo=1846, WhiteRatingDiff=11.0, BlackRatingDiff=-5.0, ECO='A34', Opening='English Opening: Symmetrical Variation, Normal Variation', TimeControl='300+0', Termination='Normal', moves=['c4 c5', 'Nc3 Nf6', 'Nf3 g6', 'd4 cxd4', 'Nxd4 Bg7', 'g3 O-O', 'Bg2 Nc6', 'O-O Nxd4', 'Qxd4 d6', 'Qd2 Rb8', 'Rd1 Qa5', 'a3 a6', 'b4 Qd8', 'Bb2 Be6', 'Nd5 Nxd5', 'cxd5 Bd7', 'Rac1 Rc8', 'Bxg7 Kxg7', 'Qd4+ f6', 'Qa7 Rb8', 'Rc3 Rf7', 'Rdc1 Bb5', 'e4 h5', 'Bh3 h4', 'Be6 Rf8', 'Rc7 hxg3', 'hxg3 f5', 'Rxb7 Rxb7', 'Qxb7 fxe4', 'Rc7 Kf6', 'Rc8 e3', 'Rxd8 Rxd8', 'fxe3 Rh8', 'Qb6 Rh5', 'Qd4+ Re5', 'Qf4+ Rf5', 'Bxf5 gxf5', 'e4 Bd3', 'Qxf5+ Kg7', 'g4 Kg8', 'g5 Kg7', 'g6 a5', 'Qf7+ Kh6', 'g7 Bxe4', 'g8=Q Bxd5', 'Qfh7# 1-0'])]

In [8]:
# This is still broken.  SOmething's up with the job, but the df is correct from the above.
df.printSchema()
#df.write.parquet("../../data/processed/chess_games_moves.parquet")

root
 |-- event: string (nullable = true)
 |-- white: string (nullable = true)
 |-- black: string (nullable = true)
 |-- result: string (nullable = true)
 |-- UTCDate: date (nullable = true)
 |-- UTCTime: string (nullable = true)
 |-- WhiteElo: integer (nullable = true)
 |-- BlackElo: integer (nullable = true)
 |-- WhiteRatingDiff: double (nullable = true)
 |-- BlackRatingDiff: double (nullable = true)
 |-- ECO: string (nullable = true)
 |-- Opening: string (nullable = true)
 |-- TimeControl: string (nullable = true)
 |-- Termination: string (nullable = true)
 |-- moves: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [9]:
# get the first 2 moves and put into a new columns
# https://stackoverflow.com/questions/52975567/get-first-n-elements-from-dataframe-arraytype-column-in-pyspark
from pyspark.sql.functions import col, asc
df=df.withColumn("first_two", F.array([F.col("moves")[0], F.col("moves")[1]]))
#df=df.withColumn("first_two_str", F.array([F.col("moves")[0], F.col("moves")[1]]))

df.take(2)

[Row(event='Blitz', white='Nippis', black='Misha_44', result='1-0', UTCDate=datetime.date(2016, 1, 26), UTCTime='18:03:38', WhiteElo=2068, BlackElo=1846, WhiteRatingDiff=11.0, BlackRatingDiff=-5.0, ECO='A34', Opening='English Opening: Symmetrical Variation, Normal Variation', TimeControl='300+0', Termination='Normal', moves=['c4 c5', 'Nc3 Nf6', 'Nf3 g6', 'd4 cxd4', 'Nxd4 Bg7', 'g3 O-O', 'Bg2 Nc6', 'O-O Nxd4', 'Qxd4 d6', 'Qd2 Rb8', 'Rd1 Qa5', 'a3 a6', 'b4 Qd8', 'Bb2 Be6', 'Nd5 Nxd5', 'cxd5 Bd7', 'Rac1 Rc8', 'Bxg7 Kxg7', 'Qd4+ f6', 'Qa7 Rb8', 'Rc3 Rf7', 'Rdc1 Bb5', 'e4 h5', 'Bh3 h4', 'Be6 Rf8', 'Rc7 hxg3', 'hxg3 f5', 'Rxb7 Rxb7', 'Qxb7 fxe4', 'Rc7 Kf6', 'Rc8 e3', 'Rxd8 Rxd8', 'fxe3 Rh8', 'Qb6 Rh5', 'Qd4+ Re5', 'Qf4+ Rf5', 'Bxf5 gxf5', 'e4 Bd3', 'Qxf5+ Kg7', 'g4 Kg8', 'g5 Kg7', 'g6 a5', 'Qf7+ Kh6', 'g7 Bxe4', 'g8=Q Bxd5', 'Qfh7# 1-0'], first_two=['c4 c5', 'Nc3 Nf6']),
 Row(event='Blitz', white='abracadaver', black='andremoniy', result='1-0', UTCDate=datetime.date(2016, 1, 26), UTCTime='18

In [10]:
print( df.groupBy('first_two').count().sort(col("count").desc()).show(5), df.count())


+------------------+------+
|         first_two| count|
+------------------+------+
|  [e4 e5, Nf3 Nc6]|332485|
|   [e4 e5, Nf3 d6]|129811|
|[e4 d5, exd5 Qxd5]| 86593|
|  [e4 c5, Nf3 Nc6]| 78661|
|    [e4 e6, d4 d5]| 72138|
+------------------+------+
only showing top 5 rows

None 3850385


In [24]:
print (f"Total columns are: {len(df.columns)}")

Total columns are: 17


In [29]:
#df.filter(df.result.contains('*')).collect()

In [11]:
# Encode result
# df=df.withColumn("enc_result", F.array([F.col("result")[0]]))
# https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
from pyspark.sql.types import IntegerType
#label_udf  = F.udf(lambda result: 1 if result =='1-0' else 0, IntegerType())
white_win_udf  = F.udf(lambda result:float(frac (result.split('-')[0])), IntegerType())
black_win_udf  = F.udf(lambda result:float(frac (result.split('-')[1])), IntegerType())


df=df.withColumn("whil", label_udf(df.result))
df=df.withColumn("result_label", label_udf(df.result))

df.take(1)
df.printSchema()

root
 |-- event: string (nullable = true)
 |-- white: string (nullable = true)
 |-- black: string (nullable = true)
 |-- result: string (nullable = true)
 |-- UTCDate: date (nullable = true)
 |-- UTCTime: string (nullable = true)
 |-- WhiteElo: integer (nullable = true)
 |-- BlackElo: integer (nullable = true)
 |-- WhiteRatingDiff: double (nullable = true)
 |-- BlackRatingDiff: double (nullable = true)
 |-- ECO: string (nullable = true)
 |-- Opening: string (nullable = true)
 |-- TimeControl: string (nullable = true)
 |-- Termination: string (nullable = true)
 |-- moves: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- first_two: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- result_label: integer (nullable = true)



In [16]:
# Encode features-First_Two
# https://spark.apache.org/docs/latest/ml-features#onehotencoder
# https://spark.apache.org/docs/latest/ml-features#vectorindexer
# https://datascience.stackexchange.com/questions/6268/how-to-convert-categorical-data-to-numerical-data-in-pyspark
# https://silpara.medium.com/pyspark-string-to-array-of-string-in-dataframe-b9572233ccea
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="first_two", outputCol="first_two_index")
indexed = indexer.fit(df).transform(df)
indexed.show(2)

IllegalArgumentException: requirement failed: The input column first_two must be either string type or numeric type, but got ArrayType(StringType,true).

In [None]:
# Select variables
vars_to_keep = ["median_house_value_final", 
              "total_bedrooms", 
              "population", 
              "households", 
              "median_income", 
              "rooms_per_household"]

# subset the dataframe on these predictors
df=data.select(vars_to_keep)

In [None]:
# Apply Random Forest

In [13]:
from pyspark.ml.feature import OneHotEncoder

df_ex = spark.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])

encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
                        outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df_ex)
encoded = model.transform(df_ex)
encoded.show()

+--------------+--------------+-------------+-------------+
|categoryIndex1|categoryIndex2| categoryVec1| categoryVec2|
+--------------+--------------+-------------+-------------+
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           1.0|           0.0|(2,[1],[1.0])|(2,[0],[1.0])|
|           2.0|           1.0|    (2,[],[])|(2,[1],[1.0])|
|           0.0|           2.0|(2,[0],[1.0])|    (2,[],[])|
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           2.0|           0.0|    (2,[],[])|(2,[0],[1.0])|
+--------------+--------------+-------------+-------------+



In [None]:
from pyspark.ml.feature import VectorIndexer

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()

In [None]:
df.take(2)
lp = dataRdd.map(lambda row:(1 if row[0]=='M' else 0, Vectors.dense(row[1]))).map(lambda row: LabeledPoint(row[0], row[1]))
df.select("result").rdd.flatMap(lambda row:(1 if row[0]=='1-0' else 0)).toDF()

In [None]:
# use random forest to determine which vairbales is the most influencial
# *cartesian product


In [None]:
# Use PCA to determine influencial moves

