# Spark DataFrames

#### Create a Spark Session

In [2]:
import findspark
findspark.init('c:\spark')

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

#### Load orders data as a DataFrame

In [3]:
import pyspark.sql.types as typ

ordersSchema = [('order_id', typ.IntegerType())
                , ('order_date', typ.StringType())
                , ('order_customer_id', typ.IntegerType())
                , ('order_status', typ.StringType())
    
]

ordersSchema = typ.StructType([typ.StructField(e[0], e[1], True) for e in ordersSchema])

ordersDF = spark.read.csv('../datasets/orders', schema=ordersSchema)

ordersDF.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



#### Filter data in DataFrame - where

In [5]:
ordersDF.where("order_status = 'CLOSED'").show()

+--------+--------------------+-----------------+------------+
|order_id|          order_date|order_customer_id|order_status|
+--------+--------------------+-----------------+------------+
|       1|2013-07-25 00:00:...|            11599|      CLOSED|
|       4|2013-07-25 00:00:...|             8827|      CLOSED|
|      12|2013-07-25 00:00:...|             1837|      CLOSED|
|      18|2013-07-25 00:00:...|             1205|      CLOSED|
|      24|2013-07-25 00:00:...|            11441|      CLOSED|
|      25|2013-07-25 00:00:...|             9503|      CLOSED|
|      37|2013-07-25 00:00:...|             5863|      CLOSED|
|      51|2013-07-25 00:00:...|            12271|      CLOSED|
|      57|2013-07-25 00:00:...|             7073|      CLOSED|
|      61|2013-07-25 00:00:...|             4791|      CLOSED|
|      62|2013-07-25 00:00:...|             9111|      CLOSED|
|      87|2013-07-25 00:00:...|             3065|      CLOSED|
|      90|2013-07-25 00:00:...|             9131|      

#### Filter Data using Filter

In [6]:
ordersDF.filter("order_status = 'CLOSED'").show()

+--------+--------------------+-----------------+------------+
|order_id|          order_date|order_customer_id|order_status|
+--------+--------------------+-----------------+------------+
|       1|2013-07-25 00:00:...|            11599|      CLOSED|
|       4|2013-07-25 00:00:...|             8827|      CLOSED|
|      12|2013-07-25 00:00:...|             1837|      CLOSED|
|      18|2013-07-25 00:00:...|             1205|      CLOSED|
|      24|2013-07-25 00:00:...|            11441|      CLOSED|
|      25|2013-07-25 00:00:...|             9503|      CLOSED|
|      37|2013-07-25 00:00:...|             5863|      CLOSED|
|      51|2013-07-25 00:00:...|            12271|      CLOSED|
|      57|2013-07-25 00:00:...|             7073|      CLOSED|
|      61|2013-07-25 00:00:...|             4791|      CLOSED|
|      62|2013-07-25 00:00:...|             9111|      CLOSED|
|      87|2013-07-25 00:00:...|             3065|      CLOSED|
|      90|2013-07-25 00:00:...|             9131|      

#### Aggregating data in DataFrames

In [7]:
ordersDF.groupBy('order_status').count().show()

+---------------+-----+
|   order_status|count|
+---------------+-----+
|PENDING_PAYMENT|15030|
|       COMPLETE|22899|
|        ON_HOLD| 3798|
| PAYMENT_REVIEW|  729|
|     PROCESSING| 8275|
|         CLOSED| 7556|
|SUSPECTED_FRAUD| 1558|
|        PENDING| 7610|
|       CANCELED| 1428|
+---------------+-----+



#### Distinct order status

In [8]:
ordersDF.select('order_status').distinct().show()

+---------------+
|   order_status|
+---------------+
|PENDING_PAYMENT|
|       COMPLETE|
|        ON_HOLD|
| PAYMENT_REVIEW|
|     PROCESSING|
|         CLOSED|
|SUSPECTED_FRAUD|
|        PENDING|
|       CANCELED|
+---------------+



#### Count by order status

In [9]:
ordersDF.groupBy('order_status').count().show()

+---------------+-----+
|   order_status|count|
+---------------+-----+
|PENDING_PAYMENT|15030|
|       COMPLETE|22899|
|        ON_HOLD| 3798|
| PAYMENT_REVIEW|  729|
|     PROCESSING| 8275|
|         CLOSED| 7556|
|SUSPECTED_FRAUD| 1558|
|        PENDING| 7610|
|       CANCELED| 1428|
+---------------+-----+



### Dropping columns

In [10]:
ordersDF.drop('order_date').show()

+--------+-----------------+---------------+
|order_id|order_customer_id|   order_status|
+--------+-----------------+---------------+
|       1|            11599|         CLOSED|
|       2|              256|PENDING_PAYMENT|
|       3|            12111|       COMPLETE|
|       4|             8827|         CLOSED|
|       5|            11318|       COMPLETE|
|       6|             7130|       COMPLETE|
|       7|             4530|       COMPLETE|
|       8|             2911|     PROCESSING|
|       9|             5657|PENDING_PAYMENT|
|      10|             5648|PENDING_PAYMENT|
|      11|              918| PAYMENT_REVIEW|
|      12|             1837|         CLOSED|
|      13|             9149|PENDING_PAYMENT|
|      14|             9842|     PROCESSING|
|      15|             2568|       COMPLETE|
|      16|             7276|PENDING_PAYMENT|
|      17|             2667|       COMPLETE|
|      18|             1205|         CLOSED|
|      19|             9488|PENDING_PAYMENT|
|      20|

### Renaming columns

##### Rename columns using select

In [11]:
import pyspark.sql.functions as f

ordersDF.select(f.col('order_customer_id').alias('cid')).show(10)

+-----+
|  cid|
+-----+
|11599|
|  256|
|12111|
| 8827|
|11318|
| 7130|
| 4530|
| 2911|
| 5657|
| 5648|
+-----+
only showing top 10 rows



##### Rename columns using withColumnRenamed

In [13]:
ordersDF.withColumnRenamed('order_customer_id', 'cid').show()

+--------+--------------------+-----+---------------+
|order_id|          order_date|  cid|   order_status|
+--------+--------------------+-----+---------------+
|       1|2013-07-25 00:00:...|11599|         CLOSED|
|       2|2013-07-25 00:00:...|  256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|12111|       COMPLETE|
|       4|2013-07-25 00:00:...| 8827|         CLOSED|
|       5|2013-07-25 00:00:...|11318|       COMPLETE|
|       6|2013-07-25 00:00:...| 7130|       COMPLETE|
|       7|2013-07-25 00:00:...| 4530|       COMPLETE|
|       8|2013-07-25 00:00:...| 2911|     PROCESSING|
|       9|2013-07-25 00:00:...| 5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...| 5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|  918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...| 1837|         CLOSED|
|      13|2013-07-25 00:00:...| 9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...| 9842|     PROCESSING|
|      15|2013-07-25 00:00:...| 2568|       COMPLETE|
|      16|2013-07-25 00:00:.

### Load order items data into DataFrammes

In [14]:
orderItemsSchema = [('order_item_id', typ.IntegerType())
                    , ('order_item_order_id', typ.IntegerType())
                    , ('order_item_product_id', typ.IntegerType())
                    , ('order_item_quantity', typ.IntegerType())
                    , ('order_item_subtotal', typ.FloatType())
                    , ('order_item_product_price', typ.FloatType())
    
]

orderItemsSchema = typ.StructType([typ.StructField(e[0], e[1], True) for e in orderItemsSchema])

orderItemsDF = spark.read.csv('../datasets/order_items', schema=orderItemsSchema)

orderItemsDF.printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: float (nullable = true)
 |-- order_item_product_price: float (nullable = true)



#### aggregations

In [15]:
orderItemsDF.agg(
    {
        'order_item_subtotal': 'avg'
    }
).show()

+------------------------+
|avg(order_item_subtotal)|
+------------------------+
|      199.32066922046081|
+------------------------+



In [16]:
orderItemsDF.agg(
    {
        'order_item_subtotal': 'avg',
        'order_item_subtotal': 'sum'
    }
).show()

+------------------------+
|sum(order_item_subtotal)|
+------------------------+
|     3.432262059842491E7|
+------------------------+



#### Multiple aggregations

In [17]:
aggregations = [
    ('order_item_subtotal', f.min, 'subtotal_min')
    , ('order_item_subtotal', f.max, 'subtotal_max')
    , ('order_item_subtotal', f.sum, 'subtotal_sum')
    , ('order_item_subtotal', f.avg, 'subtotal_avg')
    , ('order_item_subtotal', f.mean, 'subtotal_mean')
    , ('order_item_subtotal', f.stddev, 'subtotal_stddev')
    , ('order_item_subtotal', f.count, 'subtotal_count')
]

(
    orderItemsDF
    .agg(*[e[1](e[0]).alias(e[2]) for e in aggregations])
    .show()
)

+------------+------------+-------------------+------------------+------------------+------------------+--------------+
|subtotal_min|subtotal_max|       subtotal_sum|      subtotal_avg|     subtotal_mean|   subtotal_stddev|subtotal_count|
+------------+------------+-------------------+------------------+------------------+------------------+--------------+
|        9.99|     1999.99|3.432262059842491E7|199.32066922046081|199.32066922046081|112.74303987146804|        172198|
+------------+------------+-------------------+------------------+------------------+------------------+--------------+



#### Multiple aggregations using SQL

In [18]:
orderItemsDF.createOrReplaceTempView("order_items")

(
    spark
    .sql('''
        SELECT 
            MIN(order_item_subtotal) AS subtotal_min
            , MAX(order_item_subtotal) AS subtotal_max
            , SUM(order_item_subtotal) AS subtotal_sum
            , AVG(order_item_subtotal) AS subtotal_avg
            , MEAN(order_item_subtotal) AS subtotal_mean
            , STDDEV(order_item_subtotal) AS subtotal_stddev
            , COUNT(order_item_subtotal) AS subtotal_count
        FROM order_items
    ''')
).show()

+------------+------------+-------------------+------------------+------------------+------------------+--------------+
|subtotal_min|subtotal_max|       subtotal_sum|      subtotal_avg|     subtotal_mean|   subtotal_stddev|subtotal_count|
+------------+------------+-------------------+------------------+------------------+------------------+--------------+
|        9.99|     1999.99|3.432262059842491E7|199.32066922046081|199.32066922046081|112.74303987146804|        172198|
+------------+------------+-------------------+------------------+------------------+------------------+--------------+



#### Revenue for order_id 2

In [22]:
from pyspark.sql.functions import round

orderItemsDF.filter('order_item_order_id = 2').show()

orderItemsDF.filter('order_item_order_id = 2').agg(round(f.sum('order_item_subtotal'), 2).alias('Revenue')).show()

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|              250.0|                    50.0|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+

+-------+
|Revenue|
+-------+
| 579.98|
+-------+



#### Revenue per orderID

In [23]:
orderItemsDF.groupBy('order_item_order_id').agg(round(f.sum('order_item_subtotal'), 2).alias('Revenue')).show()

+-------------------+-------+
|order_item_order_id|Revenue|
+-------------------+-------+
|                148| 479.99|
|                463| 829.92|
|                471| 169.98|
|                496| 441.95|
|               1088| 249.97|
|               1580| 299.95|
|               1591| 439.86|
|               1645|1509.79|
|               2366| 299.97|
|               2659| 724.91|
|               2866| 569.96|
|               3175| 209.97|
|               3749| 143.97|
|               3794| 299.95|
|               3918| 829.93|
|               3997| 579.95|
|               4101| 129.99|
|               4519|  79.98|
|               4818| 399.98|
|               4900| 179.97|
+-------------------+-------+
only showing top 20 rows



#### Daily product revenue

In [26]:
ordersJoin = ordersDF.join(orderItemsDF, ordersDF.order_id == orderItemsDF.order_item_order_id)

ordersJoin.printSchema()

ordersJoin.groupBy('order_date', 'order_item_product_id').agg(round(f.sum('order_item_subtotal'), 2).alias('Revenue')).show()


root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: float (nullable = true)
 |-- order_item_product_price: float (nullable = true)

+--------------------+---------------------+--------+
|          order_date|order_item_product_id| Revenue|
+--------------------+---------------------+--------+
|2013-07-27 00:00:...|                  703|   99.95|
|2013-07-29 00:00:...|                  793|   44.97|
|2013-08-04 00:00:...|                  825|   95.97|
|2013-08-06 00:00:...|                   93|   24.99|
|2013-08-12 00:00:...|                  627| 5358.66|
|2013-08-15 00:00:...|                  926|   15.99|
|2013-08-17 00:00:...|

#### Sort Data - Descending by Subtotal

In [28]:
(
    ordersJoin
        .groupBy('order_date', 'order_item_product_id')
        .agg(round(f.sum('order_item_subtotal'), 2).alias('SubTotal'))
        .sort('SubTotal', ascending=0).show()
)

+--------------------+---------------------+--------+
|          order_date|order_item_product_id|SubTotal|
+--------------------+---------------------+--------+
|2013-11-03 00:00:...|                 1004|43197.84|
|2014-06-19 00:00:...|                 1004| 33998.3|
|2013-11-07 00:00:...|                 1004|33198.34|
|2013-10-13 00:00:...|                 1004|32398.38|
|2013-11-29 00:00:...|                 1004|31598.42|
|2014-07-20 00:00:...|                 1004|31598.42|
|2014-05-06 00:00:...|                 1004|31198.44|
|2014-02-01 00:00:...|                 1004|31198.44|
|2014-05-16 00:00:...|                 1004|31198.44|
|2014-02-19 00:00:...|                 1004|30798.46|
|2014-01-11 00:00:...|                 1004|30798.46|
|2014-07-15 00:00:...|                 1004|30398.48|
|2014-01-30 00:00:...|                 1004|30398.48|
|2013-09-06 00:00:...|                 1004| 29998.5|
|2014-05-12 00:00:...|                 1004| 29998.5|
|2014-04-03 00:00:...|      

#### Sort - Daily Product Revenue ascending by order date and descending by subtotal

In [30]:
(
    ordersJoin
        .groupBy('order_date', 'order_item_product_id')
        .agg(round(f.sum('order_item_subtotal'), 2).alias('SubTotal'))
        .sort(['order_date', 'SubTotal'], ascending=[1,0])
        .show(100)
)

+--------------------+---------------------+--------+
|          order_date|order_item_product_id|SubTotal|
+--------------------+---------------------+--------+
|2013-07-25 00:00:...|                 1004|10799.46|
|2013-07-25 00:00:...|                  957| 9599.36|
|2013-07-25 00:00:...|                  191| 8499.15|
|2013-07-25 00:00:...|                  365| 7558.74|
|2013-07-25 00:00:...|                 1073| 6999.65|
|2013-07-25 00:00:...|                 1014| 6397.44|
|2013-07-25 00:00:...|                  403| 5589.57|
|2013-07-25 00:00:...|                  502|  5100.0|
|2013-07-25 00:00:...|                  627| 2879.28|
|2013-07-25 00:00:...|                  226|  599.99|
|2013-07-25 00:00:...|                   24|  319.96|
|2013-07-25 00:00:...|                  982|  299.98|
|2013-07-25 00:00:...|                  981|   297.0|
|2013-07-25 00:00:...|                  276|  255.92|
|2013-07-25 00:00:...|                  705|  239.98|
|2013-07-25 00:00:...|      

# Schema changes

#### Load SampleData

In [31]:
sampleDF = spark.read.csv('../datasets/sample_data.csv', header=True, inferSchema=True)
sampleDF.printSchema()
sampleDF.show()

root
 |-- OrderDate: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Rep: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Units: integer (nullable = true)
 |-- UnitCost: double (nullable = true)
 |-- Total: double (nullable = true)

+----------+-------+--------+-------+-----+--------+-------+
| OrderDate| Region|     Rep|   Item|Units|UnitCost|  Total|
+----------+-------+--------+-------+-----+--------+-------+
|    1/6/16|   East|   Jones| Pencil|   95|    1.99| 189.05|
|2017-03-02|Central|  Kivell| Binder|   50|   19.99|  999.5|
|    2/9/16|Central| Jardine| Pencil|   36|    4.99| 179.64|
|   2/26/16|Central|    Gill|    Pen|   27|   19.99| 539.73|
|   3/15/16|   West| Sorvino| Pencil|   56|    2.99| 167.44|
|    4/1/16|   East|   Jones| Binder|   60|    4.99|  299.4|
|   4/18/16|Central| Andrews| Pencil|   75|    1.99| 149.25|
|    5/5/16|Central| Jardine| Pencil|   90|    4.99|  449.1|
|   5/22/16|   West|Thompson| Pencil|   32|    1.99|  63.6

#### Transform orderDate from String to Date

In [32]:
import pyspark.sql.functions as f
sampleDF = sampleDF.withColumn('OrderDate', f.to_date('OrderDate', 'MM/dd/yy'))

sampleDF.printSchema()

root
 |-- OrderDate: date (nullable = true)
 |-- Region: string (nullable = true)
 |-- Rep: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Units: integer (nullable = true)
 |-- UnitCost: double (nullable = true)
 |-- Total: double (nullable = true)



In [33]:
sampleDF.show()

+----------+-------+--------+-------+-----+--------+-------+
| OrderDate| Region|     Rep|   Item|Units|UnitCost|  Total|
+----------+-------+--------+-------+-----+--------+-------+
|2016-01-06|   East|   Jones| Pencil|   95|    1.99| 189.05|
|      null|Central|  Kivell| Binder|   50|   19.99|  999.5|
|2016-02-09|Central| Jardine| Pencil|   36|    4.99| 179.64|
|2016-02-26|Central|    Gill|    Pen|   27|   19.99| 539.73|
|2016-03-15|   West| Sorvino| Pencil|   56|    2.99| 167.44|
|2016-04-01|   East|   Jones| Binder|   60|    4.99|  299.4|
|2016-04-18|Central| Andrews| Pencil|   75|    1.99| 149.25|
|2016-05-05|Central| Jardine| Pencil|   90|    4.99|  449.1|
|2016-05-22|   West|Thompson| Pencil|   32|    1.99|  63.68|
|2016-06-08|   East|   Jones| Binder|   60|    8.99|  539.4|
|2016-06-25|Central|  Morgan| Pencil|   90|    4.99|  449.1|
|2016-07-12|   East|  Howard| Binder|   29|    1.99|  57.71|
|2016-07-29|   East|  Parent| Binder|   81|   19.99|1619.19|
|2016-08-15|   East|   J

#### Drop null

In [34]:
sampleDF.dropna(subset=['OrderDate']).show()

+----------+-------+--------+-------+-----+--------+-------+
| OrderDate| Region|     Rep|   Item|Units|UnitCost|  Total|
+----------+-------+--------+-------+-----+--------+-------+
|2016-01-06|   East|   Jones| Pencil|   95|    1.99| 189.05|
|2016-02-09|Central| Jardine| Pencil|   36|    4.99| 179.64|
|2016-02-26|Central|    Gill|    Pen|   27|   19.99| 539.73|
|2016-03-15|   West| Sorvino| Pencil|   56|    2.99| 167.44|
|2016-04-01|   East|   Jones| Binder|   60|    4.99|  299.4|
|2016-04-18|Central| Andrews| Pencil|   75|    1.99| 149.25|
|2016-05-05|Central| Jardine| Pencil|   90|    4.99|  449.1|
|2016-05-22|   West|Thompson| Pencil|   32|    1.99|  63.68|
|2016-06-08|   East|   Jones| Binder|   60|    8.99|  539.4|
|2016-06-25|Central|  Morgan| Pencil|   90|    4.99|  449.1|
|2016-07-12|   East|  Howard| Binder|   29|    1.99|  57.71|
|2016-07-29|   East|  Parent| Binder|   81|   19.99|1619.19|
|2016-08-15|   East|   Jones| Pencil|   35|    4.99| 174.65|
|2016-09-01|Central|   S

#### Changing data within the Region field - East as E

In [36]:
sampleDF.select(f.col('Region')).replace("East", "E").show()

sampleDF.where("Region = 'East'").replace("East", "E").show()

+-------+
| Region|
+-------+
|      E|
|Central|
|Central|
|Central|
|   West|
|      E|
|Central|
|Central|
|   West|
|      E|
|Central|
|      E|
|      E|
|      E|
|Central|
|      E|
|Central|
|      E|
|      E|
|Central|
+-------+
only showing top 20 rows

+----------+------+------+-------+-----+--------+-------+
| OrderDate|Region|   Rep|   Item|Units|UnitCost|  Total|
+----------+------+------+-------+-----+--------+-------+
|2016-01-06|     E| Jones| Pencil|   95|    1.99| 189.05|
|2016-04-01|     E| Jones| Binder|   60|    4.99|  299.4|
|2016-06-08|     E| Jones| Binder|   60|    8.99|  539.4|
|2016-07-12|     E|Howard| Binder|   29|    1.99|  57.71|
|2016-07-29|     E|Parent| Binder|   81|   19.99|1619.19|
|2016-08-15|     E| Jones| Pencil|   35|    4.99| 174.65|
|2016-09-18|     E| Jones|Pen Set|   16|   15.99| 255.84|
|2016-10-22|     E| Jones|    Pen|   64|    8.99| 575.36|
|2016-11-08|     E|Parent|    Pen|   15|   19.99| 299.85|
|2016-12-29|     E|Parent|Pen Set|   7