In [1]:
import findspark
findspark.init('/Users/suheylgurbuz/spark-2.4.5-bin-hadoop2.7')

In [2]:
# Import the PySpark module
from pyspark.sql import SparkSession

# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()

# What version of Spark?
print(spark.version)

2.4.5


In [3]:
# Terminate the cluster
# spark.stop()

In [4]:
# Read data from CSV file
flights = spark.read.csv('flights.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Get number of records
print("The data contain %d records." % flights.count())

# View the first five records
flights.show(5)

# Check column data types
flights.dtypes

The data contain 50000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows



[('mon', 'int'),
 ('dom', 'int'),
 ('dow', 'int'),
 ('carrier', 'string'),
 ('flight', 'int'),
 ('org', 'string'),
 ('mile', 'int'),
 ('depart', 'double'),
 ('duration', 'int'),
 ('delay', 'int')]

In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
sms = spark.read.csv("sms.csv", sep=';', header=False, schema=schema)

# Print schema of DataFrame
sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [6]:
# Remove the 'flight' column
flights_drop_column = flights.drop('flight')

# Number of records with missing 'delay' values
flights_drop_column.filter('delay IS NULL').count()

# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()
print(flights_none_missing.count())

47022


In [7]:
# Import the required function
from pyspark.sql.functions import round

# Convert 'mile' to 'km' and drop 'mile' column
flights_km = flights_none_missing.withColumn('km', round(flights.mile * 1.60934, 0))

# Create 'label' column indicating whether flight delayed (1) or not (0)
flights_label = flights_none_missing.withColumn('label', (flights_km.delay >= 15).cast('integer'))

# Check first five records
flights_label.show(5)

+---+---+---+-------+---+----+------+--------+-----+-----+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|label|
+---+---+---+-------+---+----+------+--------+-----+-----+
|  0| 22|  2|     UA|ORD| 316| 16.33|      82|   30|    1|
|  2| 20|  4|     UA|SFO| 337|  6.17|      82|   -8|    0|
|  9| 13|  1|     AA|ORD|1236| 10.33|     195|   -5|    0|
|  5|  2|  1|     UA|SFO| 550|  7.98|     102|    2|    0|
|  7|  2|  6|     AA|ORD| 733| 10.83|     135|   54|    1|
+---+---+---+-------+---+----+------+--------+-----+-----+
only showing top 5 rows



In [8]:
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(flights_label)

# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights_label)

# Repeat the process for the other categorical feature
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)

flights_indexed.show(5)

+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|label|carrier_idx|org_idx|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+
|  0| 22|  2|     UA|ORD| 316| 16.33|      82|   30|    1|        0.0|    0.0|
|  2| 20|  4|     UA|SFO| 337|  6.17|      82|   -8|    0|        0.0|    1.0|
|  9| 13|  1|     AA|ORD|1236| 10.33|     195|   -5|    0|        1.0|    0.0|
|  5|  2|  1|     UA|SFO| 550|  7.98|     102|    2|    0|        0.0|    1.0|
|  7|  2|  6|     AA|ORD| 733| 10.83|     135|   54|    1|        1.0|    0.0|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+
only showing top 5 rows



In [9]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    'mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'mile', 'depart', 'duration',
], outputCol='features')

# Consolidate predictor columns
flights_assembled = assembler.transform(flights_indexed)

# Check the resulting column
flights_assembled.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[0.0,22.0,2.0,0.0,0.0,316.0,16.33,82.0]  |30   |
|[2.0,20.0,4.0,0.0,1.0,337.0,6.17,82.0]   |-8   |
|[9.0,13.0,1.0,1.0,0.0,1236.0,10.33,195.0]|-5   |
|[5.0,2.0,1.0,0.0,1.0,550.0,7.98,102.0]   |2    |
|[7.0,2.0,6.0,1.0,0.0,733.0,10.83,135.0]  |54   |
+-----------------------------------------+-----+
only showing top 5 rows



In [10]:
flights_assembled.show(5, truncate=False)

+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+-----------------------------------------+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|label|carrier_idx|org_idx|features                                 |
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+-----------------------------------------+
|0  |22 |2  |UA     |ORD|316 |16.33 |82      |30   |1    |0.0        |0.0    |[0.0,22.0,2.0,0.0,0.0,316.0,16.33,82.0]  |
|2  |20 |4  |UA     |SFO|337 |6.17  |82      |-8   |0    |0.0        |1.0    |[2.0,20.0,4.0,0.0,1.0,337.0,6.17,82.0]   |
|9  |13 |1  |AA     |ORD|1236|10.33 |195     |-5   |0    |1.0        |0.0    |[9.0,13.0,1.0,1.0,0.0,1236.0,10.33,195.0]|
|5  |2  |1  |UA     |SFO|550 |7.98  |102     |2    |0    |0.0        |1.0    |[5.0,2.0,1.0,0.0,1.0,550.0,7.98,102.0]   |
|7  |2  |6  |AA     |ORD|733 |10.83 |135     |54   |1    |1.0        |0.0    |[7.0,2.0,6.0,1.0,0.0,733.0,10.83,135.0]  |
+---+---+---+-------+---+----+--

In [11]:
# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=17)

# Check that training set has around 80% of records
training_ratio = flights_train.count() / flights_assembled.count()
print(training_ratio)

0.7980732423121092


In [12]:
# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(flights_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+---------------------------------------+
|label|prediction|probability                            |
+-----+----------+---------------------------------------+
|0    |1.0       |[0.3530523876318839,0.6469476123681162]|
|0    |1.0       |[0.3530523876318839,0.6469476123681162]|
|1    |1.0       |[0.3530523876318839,0.6469476123681162]|
|1    |1.0       |[0.3530523876318839,0.6469476123681162]|
|1    |1.0       |[0.3530523876318839,0.6469476123681162]|
+-----+----------+---------------------------------------+
only showing top 5 rows



In [13]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1611|
|    0|       0.0| 2835|
|    1|       1.0| 3197|
|    0|       1.0| 1852|
+-----+----------+-----+

0.6352817272248552


In [14]:
# Import the logistic regression class
from pyspark.ml.classification import LogisticRegression

# Create a classifier object and train on training data
logistic = LogisticRegression().fit(flights_train)

# Create predictions for the testing data and show confusion matrix
prediction = logistic.transform(flights_test)
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1695|
|    0|       0.0| 2670|
|    1|       1.0| 3113|
|    0|       1.0| 2017|
+-----+----------+-----+



In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Calculate precision and recall
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})

# Find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: "areaUnderROC"})

precision = 0.63
recall    = 0.66


In [16]:
# Import the necessary functions
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer

# Remove punctuation (REGEX provided) and numbers
wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\-]', ' '))
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))

# Merge multiple spaces
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))

# Split the text into words
wrangled = Tokenizer(inputCol='text', outputCol='words').transform(wrangled)

wrangled.show(4, truncate=False)

+---+----------------------------------+-----+------------------------------------------+
|id |text                              |label|words                                     |
+---+----------------------------------+-----+------------------------------------------+
|1  |Sorry I'll call later in meeting  |0    |[sorry, i'll, call, later, in, meeting]   |
|2  |Dont worry I guess he's busy      |0    |[dont, worry, i, guess, he's, busy]       |
|3  |Call FREEPHONE now                |1    |[call, freephone, now]                    |
|4  |Win a cash prize or a prize worth |1    |[win, a, cash, prize, or, a, prize, worth]|
+---+----------------------------------+-----+------------------------------------------+
only showing top 4 rows



In [17]:
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF

# Remove stop words.
wrangled = StopWordsRemover(inputCol='words', outputCol='terms')\
      .transform(wrangled)

# Apply the hashing trick
wrangled = HashingTF(inputCol='terms', outputCol='hash', numFeatures=1024)\
      .transform(wrangled)

# Convert hashed symbols to TF-IDF
tf_idf = IDF(inputCol='hash', outputCol='features')\
      .fit(wrangled).transform(wrangled)
      
tf_idf.select('terms', 'features').show(4, truncate=False)

+--------------------------------+----------------------------------------------------------------------------------------------------+
|terms                           |features                                                                                            |
+--------------------------------+----------------------------------------------------------------------------------------------------+
|[sorry, call, later, meeting]   |(1024,[138,344,378,1006],[2.2391682769656747,2.892706319430574,3.684405173719015,4.244020961654438])|
|[dont, worry, guess, busy]      |(1024,[53,233,329,858],[4.618714411095849,3.557143394108088,4.618714411095849,4.937168142214383])   |
|[call, freephone]               |(1024,[138,396],[2.2391682769656747,3.3843005812686773])                                            |
|[win, cash, prize, prize, worth]|(1024,[31,69,387,428],[3.7897656893768414,7.284881949239966,4.4671645129686475,3.898659777615979])  |
+--------------------------------+--------------

In [18]:
# Split the data into training and testing sets
tf_idf_train, tf_idf_test = tf_idf.randomSplit([0.8, 0.2], seed=13)

# Fit a Logistic Regression model to the training data
logistic = LogisticRegression(regParam=0.2).fit(tf_idf_train)

# Make predictions on the testing data
prediction = logistic.transform(tf_idf_test)

# Create a confusion matrix, comparing predictions to known labels
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   47|
|    0|       0.0|  987|
|    1|       1.0|  124|
|    0|       1.0|    3|
+-----+----------+-----+



In [20]:
# Import the one hot encoder class
from pyspark.ml.feature import OneHotEncoderEstimator

# Create an instance of the one hot encoder
onehot = OneHotEncoderEstimator(inputCols=['org_idx'], outputCols=['org_dummy'])

# Apply the one hot encoder to the flights data
onehot = onehot.fit(flights_assembled)
flights_onehot = onehot.transform(flights_assembled)

# Check the results
flights_onehot.select('org', 'org_idx', 'org_dummy').distinct().sort('org_idx').show()

+---+-------+-------------+
|org|org_idx|    org_dummy|
+---+-------+-------------+
|ORD|    0.0|(7,[0],[1.0])|
|SFO|    1.0|(7,[1],[1.0])|
|JFK|    2.0|(7,[2],[1.0])|
|LGA|    3.0|(7,[3],[1.0])|
|SMF|    4.0|(7,[4],[1.0])|
|SJC|    5.0|(7,[5],[1.0])|
|TUS|    6.0|(7,[6],[1.0])|
|OGG|    7.0|    (7,[],[])|
+---+-------+-------------+



In [23]:
flights_assembled.show(5)

+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+--------------------+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|label|carrier_idx|org_idx|            features|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+--------------------+
|  0| 22|  2|     UA|ORD| 316| 16.33|      82|   30|    1|        0.0|    0.0|[0.0,22.0,2.0,0.0...|
|  2| 20|  4|     UA|SFO| 337|  6.17|      82|   -8|    0|        0.0|    1.0|[2.0,20.0,4.0,0.0...|
|  9| 13|  1|     AA|ORD|1236| 10.33|     195|   -5|    0|        1.0|    0.0|[9.0,13.0,1.0,1.0...|
|  5|  2|  1|     UA|SFO| 550|  7.98|     102|    2|    0|        0.0|    1.0|[5.0,2.0,1.0,0.0,...|
|  7|  2|  6|     AA|ORD| 733| 10.83|     135|   54|    1|        1.0|    0.0|[7.0,2.0,6.0,1.0,...|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+--------------------+
only showing top 5 rows



In [27]:
flights_train.show(5)

+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+--------------------+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|label|carrier_idx|org_idx|            features|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+--------------------+
|  0|  1|  2|     AA|JFK|1597|  6.58|     230|   50|    1|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|
|  0|  1|  2|     AA|JFK|2475|  12.0|     370|   11|    0|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|
|  0|  1|  2|     AA|JFK|2475|  17.0|     379|  -10|    0|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|
|  0|  1|  2|     AA|JFK|2586|   7.0|     385|  -16|    0|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|
|  0|  1|  2|     AA|LGA| 733| 14.58|     165|   -4|    0|        1.0|    3.0|[0.0,1.0,2.0,1.0,...|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+--------------------+
only showing top 5 rows



In [34]:
print(flights_train.count())

37527


In [46]:
flights_indexed.show(5)

+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|label|carrier_idx|org_idx|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+
|  0| 22|  2|     UA|ORD| 316| 16.33|      82|   30|    1|        0.0|    0.0|
|  2| 20|  4|     UA|SFO| 337|  6.17|      82|   -8|    0|        0.0|    1.0|
|  9| 13|  1|     AA|ORD|1236| 10.33|     195|   -5|    0|        1.0|    0.0|
|  5|  2|  1|     UA|SFO| 550|  7.98|     102|    2|    0|        0.0|    1.0|
|  7|  2|  6|     AA|ORD| 733| 10.83|     135|   54|    1|        1.0|    0.0|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+
only showing top 5 rows



In [37]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler_2 = VectorAssembler(inputCols=['mile'], outputCol='features')

# Consolidate predictor columns
flights_assembled_2 = assembler_2.transform(flights_indexed)

# Check the resulting column
flights_assembled_2.select('features', 'duration').show(5, truncate=False)

# Split into training and testing sets in a 80:20 ratio
flights_train_2, flights_test_2 = flights_assembled_2.randomSplit([0.8, 0.2], seed=17)

# Check that training set has around 80% of records
training_ratio_2 = flights_train_2.count() / flights_assembled_2.count()
print(training_ratio_2)

+--------+--------+
|features|duration|
+--------+--------+
|[316.0] |82      |
|[337.0] |82      |
|[1236.0]|195     |
|[550.0] |102     |
|[733.0] |135     |
+--------+--------+
only showing top 5 rows

0.7980732423121092


In [38]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create a regression object and train on training data
regression = LinearRegression(labelCol='duration').fit(flights_train_2)

# Create predictions for the testing data and take a look at the predictions
predictions = regression.transform(flights_test_2)
predictions.select('duration', 'prediction').show(5, False)

# Calculate the RMSE
RegressionEvaluator(labelCol='duration').evaluate(predictions)

+--------+------------------+
|duration|prediction        |
+--------+------------------+
|170     |133.3918900459809 |
|123     |131.68655178239865|
|135     |149.71441342598226|
|275     |268.9662820007684 |
|275     |268.9662820007684 |
+--------+------------------+
only showing top 5 rows



17.046852716017344

In [39]:
# Intercept (average minutes on ground)
inter = regression.intercept
print(inter)

# Coefficients
coefs = regression.coefficients
print(coefs)

# Average minutes per km
minutes_per_km = regression.coefficients[0]
print(minutes_per_km)

# Average speed in km per hour
avg_speed = 60 / minutes_per_km
print(avg_speed)

44.105250959853976
[0.12180987597015949]
0.12180987597015949
492.5708980665785


In [40]:
flights_onehot.show(5)

+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+--------------------+-------------+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|label|carrier_idx|org_idx|            features|    org_dummy|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+--------------------+-------------+
|  0| 22|  2|     UA|ORD| 316| 16.33|      82|   30|    1|        0.0|    0.0|[0.0,22.0,2.0,0.0...|(7,[0],[1.0])|
|  2| 20|  4|     UA|SFO| 337|  6.17|      82|   -8|    0|        0.0|    1.0|[2.0,20.0,4.0,0.0...|(7,[1],[1.0])|
|  9| 13|  1|     AA|ORD|1236| 10.33|     195|   -5|    0|        1.0|    0.0|[9.0,13.0,1.0,1.0...|(7,[0],[1.0])|
|  5|  2|  1|     UA|SFO| 550|  7.98|     102|    2|    0|        0.0|    1.0|[5.0,2.0,1.0,0.0,...|(7,[1],[1.0])|
|  7|  2|  6|     AA|ORD| 733| 10.83|     135|   54|    1|        1.0|    0.0|[7.0,2.0,6.0,1.0,...|(7,[0],[1.0])|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+----------

In [48]:
flights_onehot_2 = flights_onehot.drop('features')

In [49]:
flights_onehot_2.show(5)

+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+-------------+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|label|carrier_idx|org_idx|    org_dummy|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+-------------+
|  0| 22|  2|     UA|ORD| 316| 16.33|      82|   30|    1|        0.0|    0.0|(7,[0],[1.0])|
|  2| 20|  4|     UA|SFO| 337|  6.17|      82|   -8|    0|        0.0|    1.0|(7,[1],[1.0])|
|  9| 13|  1|     AA|ORD|1236| 10.33|     195|   -5|    0|        1.0|    0.0|(7,[0],[1.0])|
|  5|  2|  1|     UA|SFO| 550|  7.98|     102|    2|    0|        0.0|    1.0|(7,[1],[1.0])|
|  7|  2|  6|     AA|ORD| 733| 10.83|     135|   54|    1|        1.0|    0.0|(7,[0],[1.0])|
+---+---+---+-------+---+----+------+--------+-----+-----+-----------+-------+-------------+
only showing top 5 rows



In [122]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler_3 = VectorAssembler(inputCols=['mile', 'org_dummy'], outputCol='features')

# Consolidate predictor columns
flights_assembled_3 = assembler_3.transform(flights_onehot_2)

# Check the resulting column
flights_assembled_3.select('features', 'duration').show(5, truncate=False)

# Split into training and testing sets in a 80:20 ratio
flights_train_3, flights_test_3 = flights_assembled_3.randomSplit([0.8, 0.2], seed=17)

# Check that training set has around 80% of records
training_ratio_3 = flights_train_3.count() / flights_assembled_3.count()
print(training_ratio_3)

+----------------------+--------+
|features              |duration|
+----------------------+--------+
|(8,[0,1],[316.0,1.0]) |82      |
|(8,[0,2],[337.0,1.0]) |82      |
|(8,[0,1],[1236.0,1.0])|195     |
|(8,[0,2],[550.0,1.0]) |102     |
|(8,[0,1],[733.0,1.0]) |135     |
+----------------------+--------+
only showing top 5 rows

0.7980732423121092


In [123]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create a regression object and train on training data
regression_2 = LinearRegression(labelCol='duration').fit(flights_train_3)

# Create predictions for the testing data
predictions_2 = regression_2.transform(flights_test_3)

# Calculate the RMSE on testing data
RegressionEvaluator(labelCol='duration').evaluate(predictions_2)

11.154457617987484

In [211]:
flights_reshaped = flights_indexed['duration', 'mile', 'org', 'org_idx']
flights_reshaped.show(5)

+--------+----+---+-------+
|duration|mile|org|org_idx|
+--------+----+---+-------+
|      82| 316|ORD|    0.0|
|      82| 337|SFO|    1.0|
|     195|1236|ORD|    0.0|
|     102| 550|SFO|    1.0|
|     135| 733|ORD|    0.0|
+--------+----+---+-------+
only showing top 5 rows



In [212]:
flights_reshaped = flights_reshaped.groupby('duration').pivot('org').avg('mile').fillna(0)
flights_reshaped.show(5)

+--------+-----------------+-----------------+----+------------------+-----------------+-----+-----+---+
|duration|              JFK|              LGA| OGG|               ORD|              SFO|  SJC|  SMF|TUS|
+--------+-----------------+-----------------+----+------------------+-----------------+-----+-----+---+
|     148|            548.5|685.8823529411765| 0.0| 810.3333333333334|954.6923076923077|948.0|910.0|0.0|
|     243|           1522.0|           1619.0| 0.0|1448.3333333333333|           1846.0|  0.0|  0.0|0.0|
|     392|2551.733333333333|              0.0| 0.0|               0.0|              0.0|  0.0|  0.0|0.0|
|     540|              0.0|              0.0| 0.0|            4243.0|              0.0|  0.0|  0.0|0.0|
|      31|              0.0|              0.0|84.0|               0.0|              0.0|  0.0|  0.0|0.0|
+--------+-----------------+-----------------+----+------------------+-----------------+-----+-----+---+
only showing top 5 rows



In [213]:
print(flights_reshaped.count())

396


In [214]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler_4 = VectorAssembler(inputCols=['JFK', 'LGA', 'OGG', 'ORD', 'SFO', 'SJC', 'SMF', 'TUS'], outputCol='features')

# Consolidate predictor columns
flights_assembled_4 = assembler_4.transform(flights_reshaped)

# Check the resulting column
flights_assembled_4.select('features', 'duration').show(5, truncate=False)

# Split into training and testing sets in a 80:20 ratio
flights_train_4, flights_test_4 = flights_assembled_4.randomSplit([0.8, 0.2], seed=17)

# Check that training set has around 80% of records
training_ratio_4 = flights_train_4.count() / flights_assembled_4.count()
print(training_ratio_4)

+---------------------------------------------------------------------------------+--------+
|features                                                                         |duration|
+---------------------------------------------------------------------------------+--------+
|[548.5,685.8823529411765,0.0,810.3333333333334,954.6923076923077,948.0,910.0,0.0]|148     |
|(8,[0,1,3,4],[1522.0,1619.0,1448.3333333333333,1846.0])                          |243     |
|(8,[0],[2551.733333333333])                                                      |392     |
|(8,[3],[4243.0])                                                                 |540     |
|(8,[2],[84.0])                                                                   |31      |
+---------------------------------------------------------------------------------+--------+
only showing top 5 rows

0.797979797979798


In [215]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create a regression object and train on training data
regression_3 = LinearRegression(labelCol='duration').fit(flights_train_4)

# Create predictions for the testing data
predictions_3 = regression_3.transform(flights_test_4)

# Calculate the RMSE on testing data
RegressionEvaluator(labelCol='duration').evaluate(predictions_3)

74.82023114712439

In [216]:
# Average speed in km per hour
avg_speed_hour = 60 / regression_3.coefficients[0]
print(avg_speed_hour)

# Average minutes on ground at JFK
inter_2 = regression_3.intercept
print(inter_2)

# Average minutes on ground at LGA
avg_ground_jfk = inter_2 + regression_3.coefficients[1]
print(avg_ground_jfk)

# Average minutes on ground at OGG
avg_ground_lga = inter_2 + regression_3.coefficients[2]
print(avg_ground_lga)

733.822583434675
105.16470538246998
105.10140374363006
105.20472366738178


In [210]:
regression_3.coefficients

DenseVector([0.0024, 0.0068, 0.0062, 0.0099, 0.009, -0.0012, 0.0038, -0.0006])

In [217]:
from pyspark.ml.feature import Bucketizer, OneHotEncoderEstimator

# Create buckets at 3 hour intervals through the day
buckets = Bucketizer(splits=[0, 3, 6, 9, 12, 15, 18, 21, 24], inputCol='depart', outputCol='depart_bucket')

# Bucket the departure times
bucketed = buckets.transform(flights)
bucketed.select('depart', 'depart_bucket').show(5)

# Create a one-hot encoder
onehot = OneHotEncoderEstimator(inputCols=['depart_bucket'], outputCols=['depart_dummy'])

# One-hot encode the bucketed departure times
flights_onehot = onehot.fit(bucketed).transform(bucketed)
flights_onehot.select('depart', 'depart_bucket', 'depart_dummy').show(5)

+------+-------------+
|depart|depart_bucket|
+------+-------------+
|  9.48|          3.0|
| 16.33|          5.0|
|  6.17|          2.0|
| 10.33|          3.0|
|  8.92|          2.0|
+------+-------------+
only showing top 5 rows

+------+-------------+-------------+
|depart|depart_bucket| depart_dummy|
+------+-------------+-------------+
|  9.48|          3.0|(7,[3],[1.0])|
| 16.33|          5.0|(7,[5],[1.0])|
|  6.17|          2.0|(7,[2],[1.0])|
| 10.33|          3.0|(7,[3],[1.0])|
|  8.92|          2.0|(7,[2],[1.0])|
+------+-------------+-------------+
only showing top 5 rows

