In [1]:
# Import other modules not related to PySpark
import os
import sys
import pandas as pd
from pandas import DataFrame
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib
from mpl_toolkits.mplot3d import Axes3D
import math
from IPython.core.interactiveshell import InteractiveShell
from datetime import *
import statistics as stats
import pylab 
import seaborn as sns
import scipy.stats as scipy_stats
from scipy.stats import probplot

# This helps auto print out the items without explixitly using 'print'
InteractiveShell.ast_node_interactivity = "all" 
%matplotlib inline

In [2]:
# Import PySpark related modules
import pyspark
from pyspark.rdd import RDD

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import GBTRegressor, LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

from pyspark.sql import Row, DataFrame, SparkSession, SQLContext, functions, Window
from pyspark.sql.types import DoubleType
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.sql.functions import skewness, lit, desc, col,when, size, array_contains, row_number, \
isnan, udf, hour, array_min, array_max, countDistinct, median, collect_list, \
max, mean, min, stddev, monotonically_increasing_id  
from pyspark.sql.types import *

MAX_MEMORY = '10G'

# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)

def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Pyspark guide") \
        .config(conf=conf) \
        .getOrCreate()
    return spark

spark = init_spark()

In [3]:
filename_data = 'itinerariesClear.csv'

# Load the main data set into pyspark data frame 
df_z = spark.read.options(inferSchema='True', header='True', delimeter=',').csv(filename_data, mode='DROPMALFORMED')
print('Data frame type: ' + str(type(df_z)))

Data frame type: <class 'pyspark.sql.dataframe.DataFrame'>


In [4]:
from pyspark.sql.functions import col

In [5]:
data_subset = df_z.select(
 'startingAirport',
 'destinationAirport',
 'elapsedDays',
 col('isBasicEconomy').cast('Int').alias('isBasicEconomy'),
 col('isRefundable').cast('Int').alias('isRefundable'),
 col('isNonStop').cast('Int').alias('isNonStop'),
 'baseFare',
 'totalFare',
 col('totalTravelDistance').alias('label')
)
data_subset.show(10)

+---------------+------------------+-----------+--------------+------------+---------+--------+---------+-----+
|startingAirport|destinationAirport|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|label|
+---------------+------------------+-----------+--------------+------------+---------+--------+---------+-----+
|            LAX|               SFO|          0|             1|           0|        1|   26.98|     43.6|  339|
|            LAX|               SFO|          0|             0|           0|        1|   26.98|     43.6|  339|
|            LAX|               SFO|          0|             1|           0|        1|   26.98|     43.6|  339|
|            LAX|               SFO|          0|             1|           0|        1|   26.98|     43.6|  339|
|            LAX|               SFO|          0|             1|           0|        1|   45.58|     63.6|  339|
|            LAX|               SFO|          0|             1|           0|        1|   45.58|     63.6

In [6]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
data_class = df_z.select(
 'startingAirport',
 'destinationAirport',
 'elapsedDays',
 col('isBasicEconomy').cast('Int').alias('isBasicEconomy'),
 col('isRefundable').cast('Int').alias('isRefundable'),
 col('isNonStop').cast('Int').alias('isNonStop'),
 'baseFare',
 'totalFare',
 (col('totalTravelDistance') > 1500).cast('Int').alias('label')
)
data_class.show(10)


+---------------+------------------+-----------+--------------+------------+---------+--------+---------+-----+
|startingAirport|destinationAirport|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|label|
+---------------+------------------+-----------+--------------+------------+---------+--------+---------+-----+
|            LAX|               SFO|          0|             1|           0|        1|   26.98|     43.6|    0|
|            LAX|               SFO|          0|             0|           0|        1|   26.98|     43.6|    0|
|            LAX|               SFO|          0|             1|           0|        1|   26.98|     43.6|    0|
|            LAX|               SFO|          0|             1|           0|        1|   26.98|     43.6|    0|
|            LAX|               SFO|          0|             1|           0|        1|   45.58|     63.6|    0|
|            LAX|               SFO|          0|             1|           0|        1|   45.58|     63.6

In [7]:
strIdx = StringIndexer(inputCols = ['startingAirport', 'destinationAirport'], outputCols 
= ['startingAirportIdx', 'destinationAirportIdx'])
oneHotEnc = OneHotEncoder(inputCols=['startingAirportIdx', 
'destinationAirportIdx'], outputCols=['startingAirportEnc', 'destinationAirportEnc'])
catVect = VectorAssembler(inputCols=['startingAirportEnc', 'destinationAirportEnc', 
'isBasicEconomy', 'isRefundable', 'isNonStop'], outputCol='catFeatures')
numVect = VectorAssembler(inputCols=['baseFare', 'totalFare', 'elapsedDays'], 
outputCol='numFeatures')
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), 
outputCol='normFeatures')

featVect = VectorAssembler(inputCols=['catFeatures', 'normFeatures'], 
outputCol='features')
gbt = GBTClassifier(labelCol='label', featuresCol='features', maxDepth=4, 
maxBins=16)
pipeline = Pipeline(stages=[strIdx, oneHotEnc, catVect, numVect, minMax, featVect, 
gbt])


In [8]:
splits = data_class.randomSplit([0.8, 0.2])
train = splits[0]
test = splits[1].withColumnRenamed('label', 'trueLabel')
positive_count = train.filter(col("label") == 1).count()
negative_count = train.filter(col("label") == 0).count()
balance_ratio = positive_count / negative_count
print("Positive to Negative Class Ratio:", balance_ratio)


Positive to Negative Class Ratio: 1.008926585165657


In [9]:
paramGrid = (ParamGridBuilder() \
 .addGrid(gbt.maxDepth, [2, 4, 6]) \
 .addGrid(gbt.maxBins, [8, 16, 32]) \
 .build())
crossval = CrossValidator(
 estimator=pipeline,
 evaluator=BinaryClassificationEvaluator(),
 estimatorParamMaps=paramGrid,
 numFolds=2
)
model = crossval.fit(train)


In [12]:
prediction = model.transform(test)

In [13]:
print(prediction)

DataFrame[startingAirport: string, destinationAirport: string, elapsedDays: int, isBasicEconomy: int, isRefundable: int, isNonStop: int, baseFare: double, totalFare: double, trueLabel: int, startingAirportIdx: double, destinationAirportIdx: double, startingAirportEnc: vector, destinationAirportEnc: vector, catFeatures: vector, numFeatures: vector, normFeatures: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]


In [15]:
predictions = model.transform(test)
predictions = predictions.select('features', 'prediction', 'trueLabel')
predictions.show(50, truncate=False)


+---------------------------------------------------------------------+----------+---------+
|features                                                             |prediction|trueLabel|
+---------------------------------------------------------------------+----------+---------+
|(36,[8,20,33,34],[1.0,1.0,0.011627709114422331,0.017881335118561888])|1.0       |1        |
|(36,[8,20,33,34],[1.0,1.0,0.02229684123355814,0.023273992970002346]) |0.0       |0        |
|(36,[8,20,33,34],[1.0,1.0,0.02229684123355814,0.023670123566337346]) |0.0       |0        |
|(36,[8,20,33,34],[1.0,1.0,0.02229684123355814,0.023670123566337346]) |0.0       |0        |
|(36,[8,20,33,34],[1.0,1.0,0.02229684123355814,0.023670123566337346]) |0.0       |0        |
|(36,[8,20,33,34],[1.0,1.0,0.02229684123355814,0.023670123566337346]) |0.0       |0        |
|(36,[8,20,33,34],[1.0,1.0,0.02229684123355814,0.023670123566337346]) |0.0       |0        |
|(36,[8,20,33,34],[1.0,1.0,0.02229684123355814,0.023670123566337346]) 

In [16]:
evaluator = BinaryClassificationEvaluator(labelCol='trueLabel', 
rawPredictionCol='rawPrediction', metricName="areaUnderROC")
# Оценка качества модели

area_under_roc_cv = evaluator.evaluate(prediction)
print(f"Area under ROC curve (cross-validated): {area_under_roc_cv}")
# Метрики классификации
true_positives = predictions.filter("prediction == 1.0 AND trueLabel == 1").count()
true_negatives = predictions.filter("prediction == 0.0 AND trueLabel == 0").count()
false_positives = predictions.filter("prediction == 1.0 AND trueLabel == 0").count()
false_negatives = predictions.filter("prediction == 0.0 AND trueLabel == 1").count()
# Accuracy (точность)
accuracy = (true_positives + true_negatives) / (true_positives + true_negatives + false_positives + false_negatives)
print(f"Accuracy: {accuracy}")
# Precision (точность)
precision = true_positives / (true_positives + false_positives)
print(f"Precision: {precision}")
# Recall (полнота)
recall = true_positives / (true_positives + false_negatives)
print(f"Recall: {recall}")
# F1 Score (F-мера)
f1_score = 2 * (precision * recall) / (precision + recall)
print(f"F1 Score: {f1_score}")


Area under ROC curve (cross-validated): 0.9512634443328679
Accuracy: 0.8785819823412424
Precision: 0.9055139535896208
Recall: 0.8466564220510263
F1 Score: 0.8750966398266532


In [None]:
print(true_positives,true_negatives,false_positives,false_negatives)

In [None]:
print(predictions.show(5))

In [17]:
# Вывод матрицы ошибок (Confusion Matrix)
print("\n Confusion Matrix:")
print(f"True Positives: {true_positives}")
print(f"True Negatives: {true_negatives}")
print(f"False Positives: {false_positives}")
print(f"False Negatives: {false_negatives}")
best_model = model.bestModel
print("Лучшие параметры модели:")
for param_name, param_value in best_model.stages[-1].extractParamMap().items():
 print(f"{param_name.name}: {param_value}")


 Confusion Matrix:
True Positives: 83195
True Negatives: 88653
False Positives: 8681
False Negatives: 15068
Лучшие параметры модели:
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: all
featuresCol: features
impurity: variance
labelCol: label
leafCol: 
lossType: logistic
maxBins: 32
maxDepth: 6
maxIter: 20
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
seed: 7815301344688142822
stepSize: 0.1
subsamplingRate: 1.0
validationTol: 0.01
