##Spark DataFrames

In [None]:
spark

In [None]:
from pyspark.sql.types import *  # Necessary for creating schemas
from pyspark.sql.functions import * # Importing PySpark functions

###The 'groupBy' Function and Aggregations

The `groupBy()` function groups the DataFrame using the specified columns, then, we can run aggregation on them. The available aggregate functions are:

- `count()`: counts the number of records for each group
- `sum()`: compute the sum for each numeric column for each group
- `min()`: computes the minimum value for each numeric column for each group
- `max()`: computes the maximum value for each numeric column for each group
- `avg()` or `mean()`: computes average values for each numeric columns for each group
- `pivot()`: pivots a column of the current DataFrame and perform the specified aggregation

Before we get into aggregations, let's load in a **CSV** with interesting data and create a new DataFrame.

You do this with the `spark-csv` package. Documentation on that is available at:
- https://github.com/databricks/spark-csv

The dataset that will be loaded in to demonstrate contains data about flights departing New York City airports (`JFK`, `LGA`, `EWR`) in 2013. It has 336,776 rows and 16 columns.

In [29]:
# Create a schema object...
nycflights_schema = StructType([
  StructField('year', IntegerType(), True),
  StructField('month', IntegerType(), True),
  StructField('day', IntegerType(), True),
  StructField('dep_time', StringType(), True),
  StructField('dep_delay', IntegerType(), True),
  StructField('arr_time', StringType(), True),
  StructField('arr_delay', IntegerType(), True),
  StructField('carrier', StringType(), True),
  StructField('tailnum', StringType(), True),
  StructField('flight', StringType(), True),  
  StructField('origin', StringType(), True),
  StructField('dest', StringType(), True),
  StructField('air_time', IntegerType(), True),
  StructField('distance', IntegerType(), True),
  StructField('hour', IntegerType(), True),
  StructField('minute', IntegerType(), True)
  ])

# ...and then read the CSV with the schema
nycflights = \
(sqlContext
 .read
 .format('com.databricks.spark.csv')
 .schema(nycflights_schema)
 .options(header = True)
 .load('s3://bigdatatoolscloud/nycflights13.csv'))


In [30]:
# Have a look at the schema for the imported dataset
nycflights.printSchema()

In [None]:
nycflights.show(5)

In [32]:
# Let's group and aggregate

# `groupBy()` will group one or more DF columns
# and prep them for aggregration functions
(nycflights
 .groupby('month') # creates 'GroupedData'
 .count() # creates a new column with aggregate `count` values
 .show())

# Use the `agg()` function to perform multiple
# aggregations
(nycflights
 .groupby('month')
 .agg({'dep_delay': 'avg', 'arr_delay': 'avg'}) # note the new column names
 .show())

# Caveat: you can't perform multiple aggregrations
# on the same column (only the last is performed)
(nycflights
 .groupby('month')
 .agg({'dep_delay': 'min', 'dep_delay': 'max'})
 .show())

###Column Operations

`Column` instances can be created by:

(1) Selecting a column from a DataFrame
- `df.colName`
- `df["colName"]`
- `df.select(df.colName)`
- `df.withColumn(df.colName)`

(2) Creating one from an expression
- `df.colName + 1`
- `1 / df.colName`

Once you have a `Column` instance, you can apply a wide range of functions. Some of the functions covered here are:
- `format_number()`: apply formatting to a number, rounded to `d` decimal places, and return the result as a string
- `when()` & `otherwise()`: `when()` evaluates a list of conditions and returns one of multiple possible result expressions; if `otherwise()` is not invoked, `None` is returned for unmatched conditions
- `concat_ws()`: concatenates multiple input string columns together into a single string column, using the given separator
- `to_utc_timestamp()`: assumes the given timestamp is in given timezone and converts to UTC
- `year()`: extracts the year of a given date as integer
- `month()`: extracts the month of a given date as integer
- `dayofmonth()`: extracts the day of the month of a given date as integer
- `hour()`: extract the hour of a given date as integer
- `minute()`: extract the minute of a given date as integer

In [40]:
# Create a proper timestamp for once in your life...
# We have all the components: `year`, `month`, `day`,
# `hour`, and `minute`

# Use `concat_ws()` (concatentate with separator) to
# combine column data into StringType columns such
# that dates (`-` separator, YYYY-MM-DD) and times
# (`:` separator, 24-hour time) are formed
nycflights = \
(nycflights
 .withColumn('date',
             concat_ws('-',
                       nycflights.year,
                       nycflights.month,
                       nycflights.day))
 .withColumn('time',
             concat_ws(':',
                       nycflights.hour,
                       nycflights.minute)))

# In a second step, concatenate with `concat_ws()`
# the `date` and `time` strings (separator is a space);
# then drop several columns
nycflights = \
(nycflights
 .withColumn('timestamp',
             concat_ws(' ',
                       nycflights.date,
                       nycflights.time))
 .drop('year')     # `drop()` doesn't accept
 .drop('month')    # a list of column names,
 .drop('day')      # therefore, for every column
 .drop('hour')     # we would like to remove
 .drop('minute')   # from the DataFrame, we 
 .drop('date')     # must create a new `drop()`
 .drop('time'))    # statement

# In the final step, convert the `timestamp` from
# a StringType into a TimestampType
nycflights = \
(nycflights
 .withColumn('timestamp',
             to_utc_timestamp(nycflights.timestamp, 'GMT')))

In [None]:
nycflights.printSchema()

In [None]:
nycflights.show(5)

In [42]:
# It probably doesn't matter in the end, but,
# I'd prefer that the `timestamp` column be
# the first column; let's make use of the
# `columns` method and get slicing!
nycflights = \
 (nycflights
  .select(nycflights.columns[-1:] + nycflights.columns[0:-1])) # recall that `columns` returns a list of column names

In [44]:
# Inspect the DataFrame's schema, note that `timestamp` is indeed classed as a timestamp
nycflights.printSchema()

In [None]:
nycflights.show(5)

In [None]:
# WEATHER

In [None]:
# Create a schema object...
weather_schema = StructType([  
  StructField('year', IntegerType(), True),
  StructField('month', IntegerType(), True),
  StructField('day', IntegerType(), True),
  StructField('hour', IntegerType(), True),
  StructField('temp', FloatType(), True),
  StructField('dewp', FloatType(), True),
  StructField('humid', FloatType(), True),
  StructField('wind_dir', IntegerType(), True),
  StructField('wind_speed', FloatType(), True),
  StructField('wind_gust', FloatType(), True),
  StructField('precip', FloatType(), True),
  StructField('pressure', FloatType(), True),
  StructField('visib', FloatType(), True)
  ])

#...and then read the CSV with the schema
weather = \
(sqlContext
 .read
 .format('com.databricks.spark.csv')
 .schema(weather_schema)
 .options(header = True)
 .load('s3://bigdatatoolscloud/weather.csv'))

In [None]:
weather.printSchema()

In [None]:
weather.show(5)

In [None]:
# We need those `month`, `day`, and `hour` values back
nycflights = \
(nycflights
 .withColumn('month', month(nycflights.timestamp))
 .withColumn('day', dayofmonth(nycflights.timestamp))
 .withColumn('hour', hour(nycflights.timestamp)))

# Join the `nycflights` DF with the `weather` DF 
nycflights_all_columns = \
(nycflights
 .join(weather,
       [nycflights.month == weather.month, # three join conditions: month,
        nycflights.day == weather.day,     #                        day,
        nycflights.hour == weather.hour],  #                        hour
       'left_outer')) # left outer join: keep all rows from the left DF (flights), with the matching rows in the right DF (weather)
                      # NULLs created if there is no match to the right DF

In [None]:
nycflights_all_columns.printSchema()

In [None]:
nycflights_all_columns.show(5)

In [55]:
# One way to reduce the number of extraneous
# columns is to use a `select()` statement
nycflights_wind_visib = \
(nycflights_all_columns
 .select(['timestamp', 'carrier', 'flight',
          'origin', 'dest', 'wind_dir',
          'wind_speed', 'wind_gust', 'visib']))

In [None]:
nycflights_wind_visib.printSchema()

In [None]:
nycflights_wind_visib.show(5)

Let's load in even more data so we can determine if any takeoffs occurred in very windy weather.

The **CSV** `beaufort_land.csv` contains Beaufort scale values (the `force` column), wind speed ranges in *mph*, and the name for each wind force.

In [58]:
# Create a schema object... 
beaufort_land_schema = StructType([  
  StructField('force', IntegerType(), True),
  StructField('speed_mi_h_lb', IntegerType(), True),
  StructField('speed_mi_h_ub', IntegerType(), True),
  StructField('name', StringType(), True)
  ])

# ...and then read the CSV with the schema
beaufort_land = \
(sqlContext
 .read
 .format('com.databricks.spark.csv')
 .schema(beaufort_land_schema)
 .options(header = True)
 .load('s3://bigdatatoolscloud/beaufort_land.csv'))

In [None]:
beaufort_land.printSchema()

In [None]:
beaufort_land.show(5)

In [60]:
# Join the current working DF with the `beaufort_land` DF
# and use join expressions that use the WS ranges
nycflights_wind_visib_beaufort = \
(nycflights_wind_visib
 .join(beaufort_land,
      [nycflights_wind_visib.wind_speed >= beaufort_land.speed_mi_h_lb,
       nycflights_wind_visib.wind_speed < beaufort_land.speed_mi_h_ub],
       'left_outer')
 .withColumn('month', month(nycflights_wind_visib.timestamp)) # Create a month column from `timestamp` values
 .drop('speed_mi_h_lb')
 .drop('speed_mi_h_ub')
)

In [None]:
nycflights_wind_visib_beaufort.printSchema()

In [None]:
nycflights_wind_visib_beaufort.show(5)

In [62]:
# We can inspect the number of potentially dangerous
# takeoffs (i.e., where the Beaufort force is high)
# month-by-month through the use of the `crosstab()` function
crosstab_month_force = \
(nycflights_wind_visib_beaufort
 .crosstab('month', 'force'))

# After creating the crosstab DataFrame, use a few
# functions to clean up the resultant DataFrame
crosstab_month_force = \
(crosstab_month_force
 .withColumn('month_force',
             crosstab_month_force.month_force.cast('int')) # the column is initially a string but recasting as
                                                           # an `int` will aid ordering in the next expression
 .orderBy('month_force')
 .drop('null'))

In [None]:
crosstab_month_force.printSchema()

In [None]:
crosstab_month_force.show(5)

###User Defined Functions (UDFs)

**UDF**s allow for computations of values while looking at every input row in the DataFrame. They allow you to make your own function and import functionality from other **Python** libraries.

In [65]:
# Define a function to convert velocity from
# miles per hour (mph) to meters per second (mps)
def mph_to_mps(mph):
  mps = mph * 0.44704
  return mps

# Register this function as a UDF using `udf()`
mph_to_mps = udf(mph_to_mps, FloatType()) # An output type was specified

###Writing DataFrames to Files
We can easily write DataFrame data to a variety of different file formats.

In [68]:
# Saving to CSV is quite similar to reading from a CSV file
(crosstab_month_force
 .write
 .mode('overwrite')
 .format('com.databricks.spark.csv')
 .save('s3://bigdatatoolscloud/crosstab_month_force.csv'))

In [69]:
# Saving to Parquet is generally recommended for later retrieval
(crosstab_month_force
 .write
 .mode('overwrite')
 .parquet('s3://bigdatatoolscloud/crosstab_month_force.parquet'))

###Useful Links

There are many more functions... although I tried to cover a lot of ground, there are dozens more functions for DataFrames that I haven't touched upon.

The main project page for Spark:

- http://spark.apache.org

The main reference for PySpark is:

- http://spark.apache.org/docs/latest/api/python/index.html

These examples are available at:

- https://github.com/rich-iannone/so-many-pyspark-examples

Information on the Parquet file format can be found at its project page:

- http://parquet.apache.org

The GitHub project page for `spark-csv` package; contains usage documentation:

- https://github.com/databricks/spark-csv