In [1]:
import pyspark
pyspark.__version__

'3.5.1'

In [2]:
from pyspark.sql import SparkSession

In [3]:
# Create SparkSession object
spark = SparkSession.builder.master("local[1]").appName('test').getOrCreate()

24/08/08 06:20:08 WARN Utils: Your hostname, codespaces-648539 resolves to a loopback address: 127.0.0.1; using 10.0.0.70 instead (on interface eth0)
24/08/08 06:20:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/08 06:20:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Terminate the cluster
# spark.stop()

In [5]:
spark.version

'3.5.1'

### Read a csv file

In [54]:
# Read data from CSV file
df = spark.read.csv('flights.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Get number of records
print("The data contain %d records." % df.count())

# View the first five records
display(df.show(5))

# Check column data types
print(df.dtypes)

The data contain 15001 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| NULL|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows



None

[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


In [30]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
# from pyspark.sql.types import *

# Specify column names and types
schema = StructType([
    StructField("mon", FloatType()),
    StructField("dom", IntegerType()),
    StructField("dow", IntegerType()),
    StructField("carrier", StringType()),
    StructField("depart", IntegerType()),
    StructField("flight", StringType())
])

# Load data from a delimited file
df = spark.read.csv('flights.csv', sep=',', header=True, schema=schema)

# Print schema of DataFrame
display(df.printSchema())

# View the first five records
display(df.show(5))

root
 |-- mon: float (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- depart: integer (nullable = true)
 |-- flight: string (nullable = true)



None

+----+---+---+-------+------+------+
| mon|dom|dow|carrier|depart|flight|
+----+---+---+-------+------+------+
|10.0| 10|  1|     OO|  5836|   ORD|
| 1.0|  4|  1|     OO|  5866|   ORD|
|11.0| 22|  1|     OO|  6016|   ORD|
| 2.0| 14|  5|     B6|   199|   JFK|
| 5.0| 25|  3|     WN|  1675|   SJC|
+----+---+---+-------+------+------+
only showing top 5 rows



24/08/06 06:20:56 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 10, schema size: 6
CSV file: file:///workspaces/data_science_materials/pyspark/flights.csv


None

### Drop Column & Remove Nulls

In [55]:
# Remove the 'flight' column
df_drop_column = df.drop('flight')
print(df_drop_column.count())

# Number of records with missing 'delay' values
print(df_drop_column.filter('delay IS NULL').count())

# Remove records with missing 'delay' values
df_valid_delay = df_drop_column.filter('delay IS NOT NULL')
print(df_valid_delay.count())

# Remove records with missing values in any column and get the number of remaining rows
df_none_missing = df_valid_delay.dropna()
print(df_none_missing.count())

15001
915
14086
14086


In [56]:
# Import the required function
from pyspark.sql.functions import round

# Convert 'mile' to 'km' and drop 'mile' column (1 mile is equivalent to 1.60934 km)
df_indexed = df_none_missing.withColumn('km', round(df.mile * 1.60934, 0)) \
                    .drop('mile')

# Create 'label' column indicating whether flight delayed (1) or not (0)
df_indexed = df_indexed.withColumn('label', (df_indexed.delay >= 15).cast('integer'))

# Check first five records
df_indexed.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



### Categorical to Numerical Encoding

In [34]:
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(df_indexed)

# Indexer creates a new column with numeric index values
df_indexed = indexer_model.transform(df_indexed)

# Repeat the process for the other categorical feature
df_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(df_indexed).transform(df_indexed)
df_indexed.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|        2.0|    0.0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|        4.0|    2.0|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    4.0|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|        4.0|    3.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows



### Extracting train and test features

In [35]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    'mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration', 'label', 'delay'
], outputCol='features')

# Consolidate predictor columns
df_assembled = assembler.transform(df_indexed)

# Check the resulting column
# df_assembled = df_assembled.select('features', 'delay')
df_assembled.select('features', 'delay').show(5, truncate=False)

+---------------------------------------------------+-----+
|features                                           |delay|
+---------------------------------------------------+-----+
|[10.0,10.0,1.0,2.0,0.0,253.0,8.18,51.0,1.0,27.0]   |27   |
|[11.0,22.0,1.0,2.0,0.0,1188.0,7.17,127.0,0.0,-19.0]|-19  |
|[2.0,14.0,5.0,4.0,2.0,3618.0,21.17,365.0,1.0,60.0] |60   |
|[5.0,25.0,3.0,3.0,4.0,621.0,12.92,85.0,1.0,22.0]   |22   |
|[3.0,28.0,1.0,4.0,3.0,1732.0,13.33,182.0,1.0,70.0] |70   |
+---------------------------------------------------+-----+
only showing top 5 rows



### Train-test Split

In [36]:
# Split into training and testing sets in a 80:20 ratio
df_train, df_test = df_assembled.randomSplit([0.8, 0.2], seed=43)
print(df_train.count(), df_test.count())

# Check that training set has around 80% of records
training_ratio = df_train.count() / df_assembled.count()
print(training_ratio)
df_train.show(5, truncate=False)

11320 2766
0.8036348147096408
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+------------------------------------------------+
|mon|dom|dow|carrier|org|depart|duration|delay|km    |label|carrier_idx|org_idx|features                                        |
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+------------------------------------------------+
|0  |1  |2  |AA     |ORD|8.42  |155     |83   |1617.0|1    |1.0        |0.0    |[0.0,1.0,2.0,1.0,0.0,1617.0,8.42,155.0,1.0,83.0]|
|0  |1  |2  |AA     |ORD|15.25 |115     |20   |941.0 |1    |1.0        |0.0    |[0.0,1.0,2.0,1.0,0.0,941.0,15.25,115.0,1.0,20.0]|
|0  |1  |2  |AA     |ORD|15.5  |90      |25   |649.0 |1    |1.0        |0.0    |[0.0,1.0,2.0,1.0,0.0,649.0,15.5,90.0,1.0,25.0]  |
|0  |1  |2  |AA     |ORD|16.0  |135     |54   |1395.0|1    |1.0        |0.0    |[0.0,1.0,2.0,1.0,0.0,1395.0,16.0,135.0,1.0,54.0]|
|0  |1  |2  |AA     |ORD|21.5  |65      |133  |415.0 |1    |

## Decision Tree

### Model Fit

In [10]:
# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(df_assembled)

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(df_assembled)
prediction.select('label', 'prediction', 'probability').show(5, False)

                                                                                

+-----+----------+-----------+
|label|prediction|probability|
+-----+----------+-----------+
|1    |1.0       |[0.0,1.0]  |
|0    |0.0       |[1.0,0.0]  |
|1    |1.0       |[0.0,1.0]  |
|1    |1.0       |[0.0,1.0]  |
|1    |1.0       |[0.0,1.0]  |
+-----+----------+-----------+
only showing top 5 rows



### Confusion matrix

In [11]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label = 1').count()
FP = prediction.filter('prediction = 1 AND label = 0').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TP + TN)/ (TP + TN + FP + FN)
print(accuracy)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0| 6924|
|    1|       1.0| 7162|
+-----+----------+-----+

1.0


## Logistic Regression

### Model fit & test

In [12]:
# Import the logistic regression class
from pyspark.ml.classification import LogisticRegression

# Create a classifier object and train on training data
logistic = LogisticRegression().fit(df_train)

# Create predictions for the testing data and show confusion matrix
prediction = logistic.transform(df_test)

# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

24/08/07 08:46:14 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/08/07 08:46:14 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0| 1380|
|    1|       1.0| 1386|
+-----+----------+-----+



In [13]:
# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label = 1').count()
FP = prediction.filter('prediction = 1 AND label = 0').count()

In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Calculate precision and recall
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})

# Find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: "areaUnderROC"})

precision = 1.00
recall    = 1.00


## Turning Text into Tables

### Text cleaning & vectorization

In [25]:
sms = spark.read.csv('spam.csv', header=True).select(['v2', 'v1']).withColumnRenamed('v2', 'text').dropna()
sms.show(3, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----+
|text                                                                                                                                                       |v1  |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----+
|Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |ham |
|Ok lar... Joking wif u oni...                                                                                                                              |ham |
|Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|spam|
+---------------------

In [26]:
# Import the necessary functions
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer
from pyspark.sql import functions as F

# Remove punctuation (REGEX provided) and numbers
wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\-]', ' '))
display(wrangled.show(3, truncate=False))
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))
wrangled = wrangled.withColumn('v1', regexp_replace(wrangled.v1, '"""', ''))
wrangled = wrangled.withColumn("label", F.when(wrangled.v1 == "spam", 0).otherwise(1))
display(wrangled.show(3, truncate=False))

# Merge multiple spaces
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))
display(wrangled.show(3, truncate=False))

# Split the text into words
wrangled = Tokenizer(inputCol='text', outputCol='words').transform(wrangled)

wrangled.show(4, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----+
|text                                                                                                                                                       |v1  |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----+
|Go until jurong point  crazy   Available only in bugis n great world la e buffet    Cine there got amore wat                                               |ham |
|Ok lar    Joking wif u oni                                                                                                                                 |ham |
|Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005  Text FA to 87121 to receive entry question std txt rate T&C's apply 08452810075over18's|spam|
+---------------------

None

+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----+-----+
|text                                                                                                                                                       |v1  |label|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----+-----+
|Go until jurong point  crazy   Available only in bugis n great world la e buffet    Cine there got amore wat                                               |ham |1    |
|Ok lar    Joking wif u oni                                                                                                                                 |ham |1    |
|Free entry in   a wkly comp to win FA Cup final tkts   st May       Text FA to       to receive entry question std txt rate T&C's apply            over  '

None

+-------------------------------------------------------------------------------------------------------------------------------+----+-----+
|text                                                                                                                           |v1  |label|
+-------------------------------------------------------------------------------------------------------------------------------+----+-----+
|Go until jurong point crazy Available only in bugis n great world la e buffet Cine there got amore wat                         |ham |1    |
|Ok lar Joking wif u oni                                                                                                        |ham |1    |
|Free entry in a wkly comp to win FA Cup final tkts st May Text FA to to receive entry question std txt rate T&C's apply over 's|spam|0    |
+-------------------------------------------------------------------------------------------------------------------------------+----+-----+
only showing 

None

+-------------------------------------------------------------------------------------------------------------------------------+----+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                           |v1  |label|words                                                                                                                                                       |
+-------------------------------------------------------------------------------------------------------------------------------+----+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Go until jurong point crazy Available only in bugis n great world la e buffet Cine there got amore wat   

In [27]:
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF

# Remove stop words.
wrangled1 = StopWordsRemover(inputCol='words', outputCol='terms')\
      .transform(wrangled)
display(wrangled1.show(3, truncate=False))

# Apply the hashing trick
wrangled2 = HashingTF(inputCol='terms', outputCol='hash', numFeatures=24)\
      .transform(wrangled1)
display(wrangled2.show(3, truncate=False))

# Convert hashed symbols to TF-IDF
tf_idf = IDF(inputCol='hash', outputCol='features')\
      .fit(wrangled2).transform(wrangled2)
# tf_idf = tf_idf.withColumnRenamed('v1','label')
display(tf_idf.show(3, truncate=False))

+-------------------------------------------------------------------------------------------------------------------------------+----+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                           |v1  |label|words                                                                                                                                                       |terms                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------+----+-----+

None

+-------------------------------------------------------------------------------------------------------------------------------+----+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+
|text                                                                                                                           |v1  |label|words                                                                                                                                                       |terms                                                                                                                              |hash                                                  

None

                                                                                

+-------------------------------------------------------------------------------------------------------------------------------+----+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                           |v1  |label|words                                                                        

None

In [28]:
# Split the data into training and testing sets
sms_train, sms_test = tf_idf.randomSplit([0.4, 0.1], seed=13)

# Fit a Logistic Regression model to the training data
logistic = LogisticRegression(regParam=0.2).fit(sms_train)

# Make predictions on the testing data
prediction = logistic.transform(sms_test)

# Create a confusion matrix, comparing predictions to known labels
prediction.groupBy('label', 'prediction').count().show()

NameError: name 'LogisticRegression' is not defined

## Regression

### One-hot encoding

In [11]:
# Import the one hot encoder class
from pyspark.ml.feature import OneHotEncoder

# Create an instance of the one hot encoder
onehot = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy'])

# Apply the one hot encoder to the flights data
onehot = onehot.fit(df_assembled)
df_onehot = onehot.transform(df_assembled)

# Check the results
df_onehot.select('org', 'org_idx', 'org_dummy').distinct().sort('org_idx').show()

+---+-------+-------------+
|org|org_idx|    org_dummy|
+---+-------+-------------+
|ORD|    0.0|(7,[0],[1.0])|
|SFO|    1.0|(7,[1],[1.0])|
|JFK|    2.0|(7,[2],[1.0])|
|LGA|    3.0|(7,[3],[1.0])|
|SJC|    4.0|(7,[4],[1.0])|
|SMF|    5.0|(7,[5],[1.0])|
|TUS|    6.0|(7,[6],[1.0])|
|OGG|    7.0|    (7,[],[])|
+---+-------+-------------+



### Regression model w/o one-hot encoding

In [12]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create a regression object and train on training data
regression = LinearRegression(labelCol='duration').fit(df_train)

# Create predictions for the testing data and take a look at the predictions
predictions = regression.transform(df_test)
predictions.select('duration', 'prediction').show(5, False)

# Calculate the RMSE
RegressionEvaluator(labelCol='duration').evaluate(predictions)

24/08/08 06:21:37 WARN Instrumentation: [258f8c3c] regParam is zero, which might cause numerical instability and overfitting.
24/08/08 06:21:38 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/08/08 06:21:38 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/08/08 06:21:38 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


+--------+------------------+
|duration|prediction        |
+--------+------------------+
|195     |194.9999999999996 |
|120     |120.00000000000004|
|265     |265.0             |
|88      |87.9999999999999  |
|117     |116.99999999999932|
+--------+------------------+
only showing top 5 rows



2.8485105272259854e-13

In [13]:
# Intercept (average minutes on ground)
inter = regression.intercept
print(inter)

# Coefficients
coefs = regression.coefficients
print(coefs)

# Average minutes per km
minutes_per_km = regression.coefficients[0]
print(minutes_per_km)

# Average speed in km per hour
avg_speed = 60 / minutes_per_km
print(avg_speed)

6.617008537181419e-13
[-1.0804753471538299e-15,-1.2639879735261694e-15,-1.9506819523326384e-15,4.007560407165759e-14,-3.327950564692983e-14,1.2812678428908628e-15,1.1568848562433283e-15,0.9999999999999833,3.976028718197154e-14,1.3931928548436367e-16]
-1.0804753471538299e-15
-5.553111429895277e+16


### Regression model with one-hot encoding

In [14]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Split the data into training and testing sets
df_onehot_train, df_onehot_test = df_onehot.randomSplit([0.8, 0.2], seed=13)

# Create a regression object and train on training data
regression = LinearRegression(labelCol='duration').fit(df_onehot_train)

# Create predictions for the testing data
predictions = regression.transform(df_onehot_test)

# Calculate the RMSE on testing data
RegressionEvaluator(labelCol='duration').evaluate(predictions)

24/08/08 06:21:40 WARN Instrumentation: [417ed216] regParam is zero, which might cause numerical instability and overfitting.


3.329158461196226e-13

In [15]:
# Average speed in km per hour
avg_speed_hour = 60/regression.coefficients[0]
print(avg_speed_hour)

# Average minutes on ground at OGG
inter = regression.intercept
print(inter)

# Average minutes on ground at JFK
avg_ground_jfk = inter + regression.coefficients[3]
print(avg_ground_jfk)

# Average minutes on ground at LGA
avg_ground_lga = inter + regression.coefficients[4]
print(avg_ground_lga)

5934138335959200.0
-1.216034840414051e-12
-1.2518326973674596e-12
-1.1691513757719602e-12


### Bucketing & One-hot encoding

In [16]:
from pyspark.ml.feature import Bucketizer, OneHotEncoder

# Create buckets at 3 hour intervals through the day
buckets = Bucketizer(splits=[0,3,6,9,12,15,18,21,24], inputCol='depart', outputCol='depart_bucket')

# Bucket the departure times
bucketed = buckets.transform(df_onehot)
display(bucketed.select('depart','depart_bucket').show(5))

# Create a one-hot encoder
onehot = OneHotEncoder(inputCols=['depart_bucket'], outputCols=['depart_dummy'])

# One-hot encode the bucketed departure times
df_onehot1 = onehot.fit(bucketed).transform(bucketed)
df_onehot1.select('depart', 'depart_bucket', 'depart_dummy').show(5)

+------+-------------+
|depart|depart_bucket|
+------+-------------+
|  8.18|          2.0|
|  7.17|          2.0|
| 21.17|          7.0|
| 12.92|          4.0|
| 13.33|          4.0|
+------+-------------+
only showing top 5 rows



None

+------+-------------+-------------+
|depart|depart_bucket| depart_dummy|
+------+-------------+-------------+
|  8.18|          2.0|(7,[2],[1.0])|
|  7.17|          2.0|(7,[2],[1.0])|
| 21.17|          7.0|    (7,[],[])|
| 12.92|          4.0|(7,[4],[1.0])|
| 13.33|          4.0|(7,[4],[1.0])|
+------+-------------+-------------+
only showing top 5 rows



### Lasso & Ridge Regression

In [17]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
# Fit Lasso model (λ = 1, α = 1) to training data
regression = LinearRegression(labelCol='duration', regParam=1, elasticNetParam=1).fit(df_train)
# elasticNetParam: {0 for ridge & 1 for lasso}
# Calculate the RMSE on testing data
rmse = RegressionEvaluator(labelCol='duration').evaluate(regression.transform(df_test))
print("The test RMSE is", rmse)
# Look at the model coefficients
coeffs = regression.coefficients
print(coeffs)
# Number of zero coefficients
zero_coeff = sum([beta==0 for beta in regression.coefficients])
print("Number of coefficients equal to 0:", zero_coeff)

The test RMSE is 1.0074307566813816
[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9886078566829308,0.0,0.0]
Number of coefficients equal to 0: 9


## Pipeline

### Numerical Data

In [57]:
# Convert categorical strings to index values
indexer = StringIndexer(inputCol='org', outputCol='org_idx')

# One-hot encode index values
onehot = OneHotEncoder(
    inputCols=['org_idx', 'dow'],
    outputCols=['org_dummy', 'dow_dummy']
)

# Assemble predictors into a single column
assembler = VectorAssembler(inputCols=['km', 'org_dummy', 'dow_dummy'], outputCol='features')

# A linear regression object
regression = LinearRegression(labelCol='duration')

In [58]:
# Import class for creating a pipeline
from pyspark.ml import Pipeline

# Construct a pipeline
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])

# Split into training and testing sets in a 80:20 ratio
df_train, df_test = df_indexed.randomSplit([0.8, 0.2], seed=43)

# Train the pipeline on the training data
pipeline = pipeline.fit(df_train)

# Make predictions on the testing data
predictions = pipeline.transform(df_test)

24/08/08 07:21:25 WARN Instrumentation: [5f8a66e0] regParam is zero, which might cause numerical instability and overfitting.


### Text Data

In [31]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression

# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol="terms", outputCol="hash")
idf = IDF(inputCol="hash", outputCol="features")

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, logistic])

### Cross validation

In [37]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create an empty parameter grid
params = ParamGridBuilder().build()

# Create objects for building and evaluating a regression model
regression = LinearRegression(labelCol='duration')
evaluator = RegressionEvaluator(labelCol='duration')

# Create a cross validator
cv = CrossValidator(estimator=regression, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

# Train and test model on multiple folds of the training data
cv = cv.fit(df_train)

# NOTE: Since cross-valdiation builds multiple models, the fit() method can take a little while to complete.

24/08/08 06:43:18 WARN Instrumentation: [4f05065f] regParam is zero, which might cause numerical instability and overfitting.
24/08/08 06:43:19 WARN Instrumentation: [6bdcf2e3] regParam is zero, which might cause numerical instability and overfitting.
24/08/08 06:43:20 WARN Instrumentation: [e9313ea5] regParam is zero, which might cause numerical instability and overfitting.
24/08/08 06:43:21 WARN Instrumentation: [7220f35a] regParam is zero, which might cause numerical instability and overfitting.
24/08/08 06:43:22 WARN Instrumentation: [8ace372b] regParam is zero, which might cause numerical instability and overfitting.
24/08/08 06:43:23 WARN Instrumentation: [2a22b5d5] regParam is zero, which might cause numerical instability and overfitting.


### pipeline with CV

In [41]:
# Create an indexer for the org field
indexer = StringIndexer(inputCol='org', outputCol='org_idx')

# Create an one-hot encoder for the indexed org field
onehot = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy'])

# Assemble the km and one-hot encoded fields
assembler = VectorAssembler(inputCols=['km', 'org_dummy'], outputCol='features')

# Create a pipeline and cross-validator.
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=params,
                    evaluator=evaluator)

### Grid Search

In [43]:
# Create parameter grid
params = ParamGridBuilder()

# Add grids for two parameters
params = params.addGrid(regression.regParam, [0.01, 0.1, 1.0, 10.0]) \
               .addGrid(regression.elasticNetParam, [0.0, 0.5, 1.0])

# Build the parameter grid
params = params.build()
print('Number of models to be tested: ', len(params))

# Create cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

Number of models to be tested:  12


In [63]:
df_train.show(2)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
|  0|  1|  2|     AA|ORD|  8.42|     155|   83|1617.0|    1|
|  0|  1|  2|     AA|ORD| 15.25|     115|   20| 941.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 2 rows



In [1]:
# Model Fit
cv.fit(df_train, params=params)

# Get the best model from cross validation
best_model = cv.bestModel

# Look at the stages in the best model
print(best_model.stages)

# Get the parameters for the LinearRegression object in the best model
best_model.stages[3].extractParamMap()

# Generate predictions on testing data using the best model then calculate RMSE
predictions = best_model.transform(df_test)
print("RMSE =", evaluator.evaluate(predictions))

### Ensemble