In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer


spark = SparkSession.builder.appName("PySparkFashionMNIST").getOrCreate()

# Load test data
trainDF =  spark.read.csv("hdfs://namenode:9000/data/TicTacToe.csv", header = True)
trainDF = trainDF.select([col(column).cast(IntegerType()).alias(column) for column in trainDF.columns])
trainDF.show(10)
label_index = StringIndexer(inputCol="Condition", outputCol="Condition_index")
trainDF = label_index.fit(trainDF).transform(trainDF)
trainDF.show(10)




+---+---+-----+----+----+---+-----+-----+----+---------+
|one|two|three|four|five|si1|seven|eight|nine|Condition|
+---+---+-----+----+----+---+-----+-----+----+---------+
|  1|  1|    1|   1|   0|  0|    1|    0|   0|        1|
|  1|  1|    1|   1|   0|  0|    0|    1|   0|        1|
|  1|  1|    1|   1|   0|  0|    0|    0|   1|        1|
|  1|  1|    1|   1|   0|  0|    0|    3|   3|        1|
|  1|  1|    1|   1|   0|  0|    3|    0|   3|        1|
|  1|  1|    1|   1|   0|  0|    3|    3|   0|        1|
|  1|  1|    1|   1|   0|  3|    0|    0|   3|        1|
|  1|  1|    1|   1|   0|  3|    0|    3|   0|        1|
|  1|  1|    1|   1|   0|  3|    3|    0|   0|        1|
|  1|  1|    1|   1|   3|  0|    0|    0|   3|        1|
+---+---+-----+----+----+---+-----+-----+----+---------+
only showing top 10 rows

+---+---+-----+----+----+---+-----+-----+----+---------+---------------+
|one|two|three|four|five|si1|seven|eight|nine|Condition|Condition_index|
+---+---+-----+----+----+---+-

In [2]:
selected_cols = [col_name for col_name in trainDF.columns if col_name != "Condition"]

# # Создание вектора признаков
assembler = VectorAssembler(inputCols=selected_cols, outputCol="features")
trainDF_vector = assembler.transform(trainDF)

trainDF_vector = trainDF_vector.select("Condition_index", "features")
trainDF_vector.show(10)

+---------------+--------------------+
|Condition_index|            features|
+---------------+--------------------+
|            0.0|(10,[0,1,2,3,6],[...|
|            0.0|(10,[0,1,2,3,7],[...|
|            0.0|(10,[0,1,2,3,8],[...|
|            0.0|[1.0,1.0,1.0,1.0,...|
|            0.0|[1.0,1.0,1.0,1.0,...|
|            0.0|[1.0,1.0,1.0,1.0,...|
|            0.0|[1.0,1.0,1.0,1.0,...|
|            0.0|[1.0,1.0,1.0,1.0,...|
|            0.0|[1.0,1.0,1.0,1.0,...|
|            0.0|[1.0,1.0,1.0,1.0,...|
+---------------+--------------------+
only showing top 10 rows



In [3]:
(training_data, test_data) = trainDF_vector.randomSplit([0.8, 0.2],seed = 42)

evaluator = MulticlassClassificationEvaluator(labelCol="Condition_index", predictionCol="prediction", metricName="accuracy")

from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="Condition_index", featuresCol="features")
gbt_model = gbt.fit(training_data)
gbt_prediction = gbt_model.transform(test_data)

gbt_prediction.select("prediction", "Condition_index", "features").show(5)
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Gradient-boosted tree classifier [Accuracy] = %g"% (gbt_accuracy))
print("Gradient-boosted tree classifier [Error] = %g " % (1.0 - gbt_accuracy))

+----------+---------------+--------------------+
|prediction|Condition_index|            features|
+----------+---------------+--------------------+
|       0.0|            0.0|(10,[0,1,2,3,8],[...|
|       0.0|            0.0|(10,[0,1,2,5,6],[...|
|       0.0|            0.0|(10,[0,1,2,5,8],[...|
|       0.0|            0.0|(10,[0,1,4,5,8],[...|
|       0.0|            0.0|(10,[0,2,4,5,6],[...|
+----------+---------------+--------------------+
only showing top 5 rows

Gradient-boosted tree classifier [Accuracy] = 1
Gradient-boosted tree classifier [Error] = 0 


In [66]:
previous_turn = None
columns = ["one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "Condition"]
while True:
    turnsDF = spark.read.text("hdfs://namenode:9000/user/game")
    turnsDF = turnsDF.withColumn("one",substring("value", 1, 1))
    turnsDF = turnsDF.withColumn("two",substring("value", 2, 1)) 
    turnsDF = turnsDF.withColumn("three",substring("value", 3, 1)) 
    turnsDF = turnsDF.withColumn("four",substring("value", 4, 1)) 
    turnsDF = turnsDF.withColumn("five",substring("value", 5, 1)) 
    turnsDF = turnsDF.withColumn("six",substring("value", 6, 1)) 
    turnsDF = turnsDF.withColumn("seven",substring("value", 7, 1)) 
    turnsDF = turnsDF.withColumn("eight",substring("value", 8, 1)) 
    turnsDF = turnsDF.withColumn("nine",substring("value", 9, 1)) 
    turnsDF = turnsDF.withColumn("Condition",lit("1"))
    turnsDF = turnsDF.drop("value")
    # turnsDF.show()
    previous_turn_chech_row = turnsDF.head()
    # print(previous_turn_chech_row)

    previous_check_turn = spark.createDataFrame([Row(*previous_turn_chech_row)], columns)
    #try except for first iteration 
    try:
        if previous_turn.exceptAll(previous_check_turn).count() == 0:
            continue
    except AttributeError:
        pass
    
    previous_turn_row = turnsDF.head()
    previous_turn = spark.createDataFrame([Row(*previous_turn_chech_row)], columns)
    
    label_index = StringIndexer(inputCol="Condition", outputCol="Condition_index")
    turnsDF = label_index.fit(turnsDF).transform(turnsDF)
    turnsDF = turnsDF.select([col(column).cast(IntegerType()).alias(column) for column in turnsDF.columns])
    
    selected_cols = [col_name for col_name in turnsDF.columns if col_name != "Condition"]
    
    # # Создание вектора признаков
    assembler = VectorAssembler(inputCols=selected_cols, outputCol="features")
    turnsDF_vector = assembler.transform(turnsDF)
    
    turnsDF_vector = turnsDF_vector.select("Condition", "features")
    
    prediction = gbt_model.transform(turnsDF_vector)
    prediction.select("prediction").show(10)

+----------+
|prediction|
+----------+
|       0.0|
+----------+

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
+----------+

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
+----------+

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 