In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [7]:
spark=SparkSession.builder.master('local[*]').appName('sparkapp').getOrCreate()

In [4]:
spark.stop()

In [12]:
# Read data from CSV file
flights = spark.read.csv('flights.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Get number of records
print("The data contain %d records." % flights.count())

# View the first five records
flights.show(5)

# Check column data types
flights.dtypes

The data contain 50000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows



[('mon', 'int'),
 ('dom', 'int'),
 ('dow', 'int'),
 ('carrier', 'string'),
 ('flight', 'int'),
 ('org', 'string'),
 ('mile', 'int'),
 ('depart', 'double'),
 ('duration', 'int'),
 ('delay', 'int')]

In [11]:
data.show()

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
|  5|  2|  1|     UA|   704|SFO| 550|  7.98|     102|    2|
|  7|  2|  6|     AA|   380|ORD| 733| 10.83|     135|   54|
|  1| 16|  6|     UA|  1477|ORD|1440|  8.00|     232|   -7|
|  1| 22|  5|     UA|   620|SJC|1829|  7.98|     250|  -13|
| 11|  8|  1|     OO|  5590|SFO| 158|  7.77|      60|   88|
|  4| 26|  1|     AA|  1144|SFO|1464| 13.25|     210|  -10|
|  4| 25|  0|     AA|   321|ORD| 978| 13.75|     160|   31|
|  8| 30|  2|     UA|   646|ORD| 719| 13.28|     151|   16|
|  3| 16|  3|     UA|   107|ORD|1745|  9

In [13]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
sms = spark.read.csv('sms.csv', sep=';', header=False,schema=schema)

# Print schema of DataFrame
sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [20]:
cars = spark.read.csv('cars.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Get number of records
print("The data contain %d records." % cars.count())

The data contain 32 records.


In [21]:
cars.show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|                _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
|          M

In [16]:
# Either drop the columns you don't want...
cars = cars.drop('Make', 'Model')

In [22]:
cars.show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|              _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [23]:
# How many missing values?
cars.filter('cyl IS NULL').count()

0

In [24]:
cars = cars.filter('cyl IS NOT NULL')

In [25]:
cars = cars.dropna()

In [26]:
from pyspark.sql.functions import round
# Create a new 'mass' column
cars = cars.withColumn('mass', round(cars.wt / 2.205, 0))
# Convert length to metres
cars = cars.withColumn('length', round(cars.qsec * 0.0254, 3))

In [27]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='disp',
outputCol='type_idx')
# Assign index values to strings
indexer = indexer.fit(cars)
# Create column with index values
cars = indexer.transform(cars)

In [28]:
cars.show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+------+--------+
|                _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|mass|length|type_idx|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+------+--------+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4| 1.0| 0.418|     2.0|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4| 1.0| 0.432|     2.0|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1| 1.0| 0.473|     8.0|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1| 1.0| 0.494|    17.0|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2| 2.0| 0.432|     3.0|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1| 2.0| 0.514|    24.0|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4| 2.0| 0.402|     3.0|
|          Merc 240D|24.4|  4|

In [29]:
cars = StringIndexer(
inputCol="gear",
outputCol="label"
).fit(cars).transform(cars)

In [30]:
cars.show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+------+--------+-----+
|                _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|mass|length|type_idx|label|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+------+--------+-----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4| 1.0| 0.418|     2.0|  1.0|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4| 1.0| 0.432|     2.0|  1.0|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1| 1.0| 0.473|     8.0|  1.0|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1| 1.0| 0.494|    17.0|  0.0|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2| 2.0| 0.432|     3.0|  0.0|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1| 2.0| 0.514|    24.0|  0.0|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   

In [31]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['cyl', 'hp'], outputCol='features')
assembler.transform(cars)

DataFrame[_c0: string, mpg: double, cyl: int, disp: double, hp: int, drat: double, wt: double, qsec: double, vs: int, am: int, gear: int, carb: int, mass: double, length: double, type_idx: double, label: double, features: vector]

In [32]:
cars_train, cars_test = cars.randomSplit([0.8, 0.2], seed=23)

In [33]:
[cars_train.count(), cars_test.count()]

[27, 5]

In [34]:
from pyspark.ml.classification import DecisionTreeClassifier

In [35]:
tree = DecisionTreeClassifier()

In [37]:
#tree_model = tree.fit(cars_train)

In [38]:
#prediction = tree_model.transform(cars_test)

In [39]:
#prediction.groupBy("label", "prediction").count().show()