In [2]:
from pyspark.sql.types import *  # Necessary for creating schemas
from pyspark.sql.functions import * # Importing PySpark functions

In [3]:
# Make a tuple list
a_list = [('a', 1), ('b', 2), ('c', 3)]

# Create a Spark DataFrame, without supplying a schema value
df_from_list_no_schema = \
sqlContext.createDataFrame(a_list)

# Print the DF object
print df_from_list_no_schema

# Print a collected list of Row objects
print df_from_list_no_schema.collect()

# Show the DataFrame
df_from_list_no_schema.show()

In [4]:
# Create a Spark DataFrame, this time with schema
df_from_list_with_schema = \
sqlContext.createDataFrame(a_list, ['letters', 'numbers'])

# Show the DataFrame
df_from_list_with_schema.show()

In [5]:
# Make a dictionary
a_dict = [{'letters': 'a', 'numbers': 1},
          {'letters': 'b', 'numbers': 2},
          {'letters': 'c', 'numbers': 3}]

# Create a Spark DataFrame from the dictionary
df_from_dict = \
sqlContext.createDataFrame(a_dict)

# Show the DataFrame
df_from_dict.show()

In [6]:
from pyspark.sql.types import *

# Define the schema
schema = StructType([
    StructField("letters", StringType(), True),
    StructField("numbers", IntegerType(), True)])

# Create an RDD from a list
rdd = sc.parallelize(a_list)

# Create the DataFrame from these raw components
nice_df = \
(sqlContext
 .createDataFrame(rdd, schema))

# Show the DataFrame
nice_df.show()

In [8]:
# We now have a `nice_df`, here are some nice functions for
# inspecting the DF

# `columns`: return all column names as a list
nice_df.columns

In [9]:
# `dtypes`: get the datatypes for all columns
nice_df.dtypes

In [10]:
# `printSchema()`: prints the schema of the supplied DF
nice_df.printSchema()

In [11]:
# `schema`: returns the schema of the provided DF as `StructType` schema
nice_df.schema

In [12]:
# `first()` returns the first row as a Row while
# `head()` and `take()` return `n` number of Row objects
print nice_df.first() # can't supply a value; never a list
print nice_df.head(2) # can optionally supply a value (default: 1);
                      # with n > 1, a list
print nice_df.take(2) # expects a value; always a list

In [13]:
# `count()`: returns a count of all rows in DF
nice_df.count()

In [14]:
# `describe()`: print out stats for numerical columns
nice_df.describe().show() # can optionally supply a list of column names

In [15]:
# the `explain()` function explains the under-the-hood evaluation process
nice_df.explain()

In [17]:
# `unionAll()`: combine two DataFrames together

# Take the DataFrame and add it to itself
double_df = \
(nice_df
 .unionAll(nice_df))

# Add it to itself twice
triple_df = \
(nice_df
 .unionAll(nice_df)
 .unionAll(nice_df))

# Coercion will occur if schemas don't align
coerced_to_strings_df = \
(nice_df
 .select(['numbers', 'letters'])
 .unionAll(nice_df))

#coerced_to_strings_df.printSchema()

In [18]:
display(double_df)
#display(triple_df)

In [19]:
# `orderBy` will perform sorting of DataFrame columns
sorted_asc_triple_df = \
(triple_df
 .orderBy('numbers'))

sorted_desc_triple_df = \
(triple_df
 .orderBy('numbers', ascending = False))

In [20]:
display(sorted_asc_triple_df)
#display(sorted_desc_triple_df)

In [21]:
# `select()` and `drop()` both take a list of column names
# and these functions do exactly what you might expect

# Select only the first column of the DF
nice_df_1_col = \
(nice_df
 .select('letters'))

# Re-order columns in the DF using `select()`
nice_df_2_col_reordered = \
(nice_df
 .select(['numbers', 'letters']))

# Drop the second column of the DF
nice_df_1_col_drop = \
(nice_df
 .drop('letters'))

In [22]:
# `filter`: performs filtering of DF rows

# Here is some numeric filtering with comparison operators
# (>, <, >=, <=, ==, != all work)
#nice_df.filter(nice_df.numbers > 1).show()
#nice_df.filter(nice_df.numbers > 1).filter(nice_df.numbers < 3).show()

# Not just numbers! Use the `filter()` + `isin()` combo to
# filter on string columns with a set of values
#nice_df.filter(nice_df.letters.isin(['a', 'b'])).show()

In [23]:
# Before we get into aggregations, let's
# load in a CSV with interesting data and
# create a new DataFrame

# You do this with the `spark-csv` package;
# documentation: https://github.com/databricks/spark-csv

# This dataset contains data about flights departing
# New York City airports (JFK, LGA, EWR) in 2013; it
# has 336,776 rows and 16 columns

# Create a schema object then read the CSV with schema
nycflights_schema = StructType([
  StructField("year", IntegerType(), True),
  StructField("month", IntegerType(), True),
  StructField("day", IntegerType(), True),
  StructField("dep_time", StringType(), True),
  StructField("dep_delay", IntegerType(), True),
  StructField("arr_time", StringType(), True),
  StructField("arr_delay", IntegerType(), True),
  StructField("carrier", StringType(), True),
  StructField("tailnum", StringType(), True),
  StructField("flight", StringType(), True),  
  StructField("origin", StringType(), True),
  StructField("dest", StringType(), True),
  StructField("air_time", IntegerType(), True),
  StructField("distance", IntegerType(), True),
  StructField("hour", IntegerType(), True),
  StructField("minute", IntegerType(), True),
  ])

nycflights = \
(sqlContext
 .read
 .format('com.databricks.spark.csv')
 .schema(nycflights_schema)
 .options(header = True)
 .load('mnt/spark-atp/nycflights13/nycflights13.csv'))


In [24]:
nycflights.printSchema()

In [25]:
display(nycflights)

In [27]:
# Let's group and aggregate

# `groupBy()` will group one or more DF columns
# and prep them for aggregration functions
(nycflights
.groupby('month') # creates 'GroupedData'
.count() # creates a new column with aggregate `count` values
.show())

# Use the `agg()` function to perform multiple
# aggregations
(nycflights
.groupby('month')
.agg({'dep_delay': 'avg', 'arr_delay': 'avg'}) # note the new column names
.show())

# Caveat: you can't perform multiple aggregrations
# on the same column (only the last is performed)
(nycflights
.groupby('month')
.agg({'dep_delay': 'min', 'dep_delay': 'max'})
.show())


In [28]:
# Use `groupBy()` with a few columns, then aggregate

(nycflights
.groupby(['month', 'origin', 'dest']) # group by these unique combinations
.count() # perform a 'count' aggregation on the groups
.orderBy(['month', 'count'], ascending = [1, 0]) # order by `month` ascending, `count` descending
.show())

In [29]:
# Use `groupBy()` + `pivot()` + an aggregation function to
# make a pivot table!

# Get a table of flights by month for each carrier
display(nycflights.groupBy('month').pivot('carrier').count())

In [30]:
# Another pivot table idea: get the average departure
# delay for each carrier at the different NYC airports
display(nycflights.groupBy('carrier').pivot('origin').avg('dep_delay'))

In [32]:
(nycflights
.groupby('month')
.agg({'dep_delay': 'avg', 'arr_delay': 'avg'})
.withColumnRenamed("avg(arr_delay)", "mean_arr_delay")
.withColumnRenamed("avg(dep_delay)", "mean_dep_delay")
.withColumn("mean_arr_delay", format_number("mean_arr_delay", 1))
.withColumn("mean_dep_delay", format_number("mean_dep_delay", 1))
.show())

In [33]:
# Add a new column (`far_or_near`) with a string based on a comparison
# on a numeric column; uses: `withColumn()`, `when()`, and `otherwise()`
(nycflights
 .withColumn("far_or_near", when(nycflights.distance > 1000, "far")
             .otherwise("near"))
 .show())

In [34]:
# Perform a few numerical computations across columns
display(
 nycflights
 .withColumn('dist_per_minute',
             nycflights.distance / nycflights.air_time)
 .withColumn("dist_per_minute", format_number("dist_per_minute", 2))
 .drop('distance')
 .drop('air_time'))

In [35]:
# Create a proper timestamp for once in your life!
# We have all the components: `year`, `month`, `day`,
# `hour`, and `minute`

# Use `concat_ws()` (concatentate with separator) to
# combine column data into StringType columns such
# that dates (`-` separator, YYYY-MM-DD) and times
# (`:` separator, 24-hour time) are formed
nycflights = \
 (nycflights
 .withColumn('date',
             concat_ws('-', nycflights.year, nycflights.month, nycflights.day))
 .withColumn('time',
             concat_ws(':', nycflights.hour, nycflights.minute)))

# In a second step, concatenate with `concat_ws()`
# the `date` and `time` strings (separator is a space);
# then drop several columns
nycflights = \
 (nycflights
  .withColumn('timestamp',
              concat_ws(' ', nycflights.date, nycflights.time))
  .drop('year')
  .drop('month')
  .drop('day')
  .drop('hour')
  .drop('minute')
  .drop('date')
  .drop('time'))

# In the final step, convert the `timestamp` from
# a StringType into a TimestampType
nycflights = \
 (nycflights
  .withColumn('timestamp', to_utc_timestamp(nycflights.timestamp, "EST")))

In [36]:
display(nycflights)

In [37]:
# It probably doesn't matter in the end, but,
# I'd prefer the `timestamp` column to be
# the first column; let's make use of the
# `columns` method and get slicing!
nycflights = \
 (nycflights
  .select(nycflights.columns[-1:] + nycflights.columns[0:-1]))

In [38]:
display(nycflights)

In [39]:
nycflights.printSchema()

In [40]:
# If you miss the time component columns,
# you can get them back! Use the `year()`,
# `month()`, `dayofmonth()`, `hour()`, and
# `minute()` functions with `withColumn()`
display(
  nycflights
   .withColumn('year', year(nycflights.timestamp))
   .withColumn('month', month(nycflights.timestamp))
   .withColumn('day', dayofmonth(nycflights.timestamp))
   .withColumn('hour', hour(nycflights.timestamp))
   .withColumn('minute', minute(nycflights.timestamp)))