In [1]:
# Import SparkSession
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [2]:
# Import all from `sql.types`
from pyspark.sql.types import *

def build_schema():
    """Build and return a schema to use for the sample data."""
    schema = StructType(
        [
            StructField("dofM", StringType(), True),
            StructField("dofW", StringType(), True),
            StructField("carrier", StringType(), True),
            StructField("tailnum", StringType(), True),
            StructField("flnum", IntegerType(), True),
            StructField("org_id", StringType(), True),
            StructField("origin", StringType(), True),
            StructField("dest_id", StringType(), True),
            StructField("dest", StringType(), True),
            StructField("crsdeptime", DoubleType(), True),
            StructField("deptime", DoubleType(), True),
            StructField("depdelaymins", DoubleType(), True),
            StructField("crsarrtime", DoubleType(), True),
            StructField("arrtime", DoubleType(), True),
            StructField("arrdelay", DoubleType(), True),
            StructField("crselapsedtime", DoubleType(), True),
            StructField("dist", IntegerType(), True),
        ]
    )
    return schema

#load data from csv and create DF
df = spark.read.csv(
    "/mnt/d/dev/sparkExample/data/rita2014jan.csv", header=False, mode="DROPMALFORMED", schema=build_schema() 
)

df.show(2)


+----+----+-------+-------+-----+------+------+-------+----+----------+-------+------------+----------+-------+--------+--------------+----+
|dofM|dofW|carrier|tailnum|flnum|org_id|origin|dest_id|dest|crsdeptime|deptime|depdelaymins|crsarrtime|arrtime|arrdelay|crselapsedtime|dist|
+----+----+-------+-------+-----+------+------+-------+----+----------+-------+------------+----------+-------+--------+--------------+----+
|   1|   3|     AA| N338AA|    1| 12478|   JFK|  12892| LAX|     900.0|  914.0|        14.0|    1225.0| 1238.0|    13.0|         385.0|2475|
|   2|   4|     AA| N338AA|    1| 12478|   JFK|  12892| LAX|     900.0|  857.0|         0.0|    1225.0| 1226.0|     1.0|         385.0|2475|
+----+----+-------+-------+-----+------+------+-------+----+----------+-------+------------+----------+-------+--------+--------------+----+
only showing top 2 rows



In [3]:
#rename default column name - this vs passing in schema?
df.printSchema()
#df = df.toDF(dofM: String, dofW: String, carrier: String, tailnum: String, flnum: Int, org_id: String, origin: String, dest_id: String, dest: String, crsdeptime: Double, deptime: Double, depdelaymins: Double, crsarrtime: Double, arrtime: Double, arrdelay: Double, crselapsedtime: Double, dist: Int)

root
 |-- dofM: string (nullable = true)
 |-- dofW: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flnum: integer (nullable = true)
 |-- org_id: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest_id: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- crsdeptime: double (nullable = true)
 |-- deptime: double (nullable = true)
 |-- depdelaymins: double (nullable = true)
 |-- crsarrtime: double (nullable = true)
 |-- arrtime: double (nullable = true)
 |-- arrdelay: double (nullable = true)
 |-- crselapsedtime: double (nullable = true)
 |-- dist: integer (nullable = true)



In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

#indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(df.columns)-set(['date'])) ]

#convert columns with string value to int index value
col_convert = ['carrier', 'tailnum', 'origin', 'dest']
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in col_convert ]

pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)

df = df_r.select(list(set(df_r.columns) - set(col_convert)))

df.show(20)
#df_r.select('dofM', 'dofM_index', 'carrier', 'carrier_index', 'tailnum', 'tailnum_index').show(20)

+----+-------+----+----------+------------+-------------+--------------+------+--------+-------+-----+------------+----------+-------------+-------+----------+----+
|dist|dest_id|dofM|dest_index|depdelaymins|tailnum_index|crselapsedtime|org_id|arrdelay|deptime|flnum|origin_index|crsarrtime|carrier_index|arrtime|crsdeptime|dofW|
+----+-------+----+----------+------------+-------------+--------------+------+--------+-------+-----+------------+----------+-------------+-------+----------+----+
|2475|  12892|   1|       2.0|        14.0|       3511.0|         385.0| 12478|    13.0|  914.0|    1|        18.0|    1225.0|          4.0| 1238.0|     900.0|   3|
|2475|  12892|   2|       2.0|         0.0|       3511.0|         385.0| 12478|     1.0|  857.0|    1|        18.0|    1225.0|          4.0| 1226.0|     900.0|   4|
|2475|  12892|   4|       2.0|        65.0|       4088.0|         385.0| 12478|    59.0| 1005.0|    1|        18.0|    1225.0|          4.0| 1324.0|     900.0|   6|
|2475|  12

In [5]:
from pyspark.sql.functions import when

df_r = df.withColumn('delayed', when(df.depdelaymins > 40, 1.0).otherwise(0.0))
df_r.select('depdelaymins', 'delayed').show(20)

+------------+-------+
|depdelaymins|delayed|
+------------+-------+
|        14.0|    0.0|
|         0.0|    0.0|
|        65.0|    1.0|
|       110.0|    1.0|
|        17.0|    0.0|
|        10.0|    0.0|
|        23.0|    0.0|
|         0.0|    0.0|
|        29.0|    0.0|
|        15.0|    0.0|
|         0.0|    0.0|
|         0.0|    0.0|
|         0.0|    0.0|
|         0.0|    0.0|
|         0.0|    0.0|
|         0.0|    0.0|
|         0.0|    0.0|
|         0.0|    0.0|
|         0.0|    0.0|
|         0.0|    0.0|
+------------+-------+
only showing top 20 rows



In [6]:
from pyspark.sql.functions import col
inDfCols = df_r.columns
df_r = df_r.select(*(col(c).cast('int').alias(c) for c in inDfCols))

In [7]:
# Split the dataset randomly into 70% for training and 30% for testing.
trainingData, testData = df_r.randomSplit([0.7, 0.3])
trainingData.printSchema()
trainingData.show(2)
print "We have %d training examples and %d test examples." % (trainingData.count(), testData.count())

root
 |-- dist: integer (nullable = true)
 |-- dest_id: integer (nullable = true)
 |-- dofM: integer (nullable = true)
 |-- dest_index: integer (nullable = true)
 |-- depdelaymins: integer (nullable = true)
 |-- tailnum_index: integer (nullable = true)
 |-- crselapsedtime: integer (nullable = true)
 |-- org_id: integer (nullable = true)
 |-- arrdelay: integer (nullable = true)
 |-- deptime: integer (nullable = true)
 |-- flnum: integer (nullable = true)
 |-- origin_index: integer (nullable = true)
 |-- crsarrtime: integer (nullable = true)
 |-- carrier_index: integer (nullable = true)
 |-- arrtime: integer (nullable = true)
 |-- crsdeptime: integer (nullable = true)
 |-- dofW: integer (nullable = true)
 |-- delayed: integer (nullable = true)

+----+-------+----+----------+------------+-------------+--------------+------+--------+-------+-----+------------+----------+-------------+-------+----------+----+-------+
|dist|dest_id|dofM|dest_index|depdelaymins|tailnum_index|crselapsedtime|or

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df_r.columns
featuresCols.remove('delayed')
# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="indexedFeatures", maxCategories=4)

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="delayed", outputCol="indexedLabel").fit(df_r)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df_r.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, vectorAssembler, vectorIndexer, dt])




In [9]:
# Train model.  This also runs the indexers. This takes time
model = pipeline.fit(trainingData)



In [10]:
# Make predictions.
predictions = model.transform(testData)
predictions.select("prediction", "indexedLabel").show(20)

'''
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
'''
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

#print the stage of pipe 3 is the model
treeModel = model.stages[3]
# summary only
print(treeModel)


+----------+------------+
|prediction|indexedLabel|
+----------+------------+
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       1.0|         1.0|
|       0.0|         0.0|
|       0.0|         0.0|
+----------+------------+
only showing top 20 rows

Test Error = 0.0118171 
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4dbbb647fff10c12c7ae) of depth 5 with 19 nodes


In [11]:
testData.show(2)

+----+-------+----+----------+------------+-------------+--------------+------+--------+-------+-----+------------+----------+-------------+-------+----------+----+-------+
|dist|dest_id|dofM|dest_index|depdelaymins|tailnum_index|crselapsedtime|org_id|arrdelay|deptime|flnum|origin_index|crsarrtime|carrier_index|arrtime|crsdeptime|dofW|delayed|
+----+-------+----+----------+------------+-------------+--------------+------+--------+-------+-----+------------+----------+-------------+-------+----------+----+-------+
|  31|  14256|   2|       269|           0|          871|            26| 15841|       0|   1019|   65|         260|      1106|            9|   1039|      1040|   4|      0|
|  31|  14256|  10|       269|           0|         3824|            26| 15841|       0|   1024|   65|         260|      1106|            9|   1046|      1040|   5|      0|
+----+-------+----+----------+------------+-------------+--------------+------+--------+-------+-----+------------+----------+---------