# Learning PySpark 
### Video series

### Packt Publishing

**Author**: Tomasz Drabas
**Date**:   2018-02-01





# Section 5: Data Processing with Spark DataFrames

In this section we will look at processing data using Spark DataFrames.

# Read the data

In [1]:
import pyspark.sql.functions as f

sample_df_inferred = spark.read.csv(
    '../data/sample_data.csv'
    , header=True
    , inferSchema = True
)

sample_df_inferred = (
    sample_df_inferred
    .withColumn('OrderDate'
                , f.to_date('OrderDate', 'MM/dd/yy')
               )
)

sample_df_inferred.show(4)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,,pyspark,idle,,,✔


SparkSession available as 'spark'.
+----------+-------+-------+------+-----+--------+------+
| OrderDate| Region|    Rep|  Item|Units|UnitCost| Total|
+----------+-------+-------+------+-----+--------+------+
|2016-01-06|   East|  Jones|Pencil|   95|    1.99|189.05|
|      null|Central| Kivell|Binder|   50|   19.99| 999.5|
|2016-02-09|Central|Jardine|Pencil|   36|    4.99|179.64|
|2016-02-26|Central|   Gill|   Pen|   27|   19.99|539.73|
+----------+-------+-------+------+-----+--------+------+
only showing top 4 rows

# Changing schema
## Dropping columns

In [2]:
(
    sample_df_inferred
    .drop('OrderDate', 'Region', 'Total')
    .show(4)
)

+-------+------+-----+--------+
|    Rep|  Item|Units|UnitCost|
+-------+------+-----+--------+
|  Jones|Pencil|   95|    1.99|
| Kivell|Binder|   50|   19.99|
|Jardine|Pencil|   36|    4.99|
|   Gill|   Pen|   27|   19.99|
+-------+------+-----+--------+
only showing top 4 rows

## Renaming columns

In [3]:
(
    sample_df_inferred
    .select(
        f.col('OrderDate').alias('Date')
        , f.col('Region').alias('Location')
    )
    .show(4)
)

+----------+--------+
|      Date|Location|
+----------+--------+
|2016-01-06|    East|
|      null| Central|
|2016-02-09| Central|
|2016-02-26| Central|
+----------+--------+
only showing top 4 rows

In [4]:
(
    sample_df_inferred
    .withColumnRenamed('OrderDate', 'Date')
    .withColumnRenamed('Region', 'Location')
    .show(4)
)

+----------+--------+-------+------+-----+--------+------+
|      Date|Location|    Rep|  Item|Units|UnitCost| Total|
+----------+--------+-------+------+-----+--------+------+
|2016-01-06|    East|  Jones|Pencil|   95|    1.99|189.05|
|      null| Central| Kivell|Binder|   50|   19.99| 999.5|
|2016-02-09| Central|Jardine|Pencil|   36|    4.99|179.64|
|2016-02-26| Central|   Gill|   Pen|   27|   19.99|539.73|
+----------+--------+-------+------+-----+--------+------+
only showing top 4 rows

## Dropping observations

In [5]:
import numpy as np

sample_df_broken_rdd = (
    sample_df_inferred
    .rdd
    .map(lambda row: 
         row[:5] + 
         ((None, None) if np.random.rand() < 0.2 else tuple(row[5:]))
    )
)

sample_df_broken = (
    spark
    .createDataFrame(
        sample_df_broken_rdd
        , sample_df_inferred.columns
    )
)

sample_df_broken.dropna(subset=['OrderDate']).show(4)

+----------+-------+-------+------+-----+--------+------+
| OrderDate| Region|    Rep|  Item|Units|UnitCost| Total|
+----------+-------+-------+------+-----+--------+------+
|2016-01-06|   East|  Jones|Pencil|   95|    1.99|189.05|
|2016-02-09|Central|Jardine|Pencil|   36|    4.99|179.64|
|2016-02-26|Central|   Gill|   Pen|   27|   19.99|539.73|
|2016-03-15|   West|Sorvino|Pencil|   56|    2.99|167.44|
+----------+-------+-------+------+-----+--------+------+
only showing top 4 rows

## Filling missing values

In [6]:
avg_unitCost = (
    sample_df_broken
    .select('UnitCost')
    .agg(
        f.mean(f.col('UnitCost'))
        .alias('UnitCost')
    ).toPandas()
    .to_dict('records')
)

sample_df_fixed = (
    sample_df_broken
    .fillna(*avg_unitCost)
    .withColumn('Total', f.col('Units') * f.col('UnitCost'))
)

sample_df_fixed.show(4)

+----------+-------+-------+------+-----+--------+------------------+
| OrderDate| Region|    Rep|  Item|Units|UnitCost|             Total|
+----------+-------+-------+------+-----+--------+------------------+
|2016-01-06|   East|  Jones|Pencil|   95|    1.99|            189.05|
|      null|Central| Kivell|Binder|   50|   19.99| 999.4999999999999|
|2016-02-09|Central|Jardine|Pencil|   36|    4.99|179.64000000000001|
|2016-02-26|Central|   Gill|   Pen|   27|   19.99| 539.7299999999999|
+----------+-------+-------+------+-----+--------+------------------+
only showing top 4 rows

# Filtering Data

In [7]:
sample_df_inferred.where('Item = "Pencil"').show(4)

+----------+-------+-------+------+-----+--------+------+
| OrderDate| Region|    Rep|  Item|Units|UnitCost| Total|
+----------+-------+-------+------+-----+--------+------+
|2016-01-06|   East|  Jones|Pencil|   95|    1.99|189.05|
|2016-02-09|Central|Jardine|Pencil|   36|    4.99|179.64|
|2016-03-15|   West|Sorvino|Pencil|   56|    2.99|167.44|
|2016-04-18|Central|Andrews|Pencil|   75|    1.99|149.25|
+----------+-------+-------+------+-----+--------+------+
only showing top 4 rows

In [8]:
sample_df_inferred.filter('Item = "Pencil"').show(4)

+----------+-------+-------+------+-----+--------+------+
| OrderDate| Region|    Rep|  Item|Units|UnitCost| Total|
+----------+-------+-------+------+-----+--------+------+
|2016-01-06|   East|  Jones|Pencil|   95|    1.99|189.05|
|2016-02-09|Central|Jardine|Pencil|   36|    4.99|179.64|
|2016-03-15|   West|Sorvino|Pencil|   56|    2.99|167.44|
|2016-04-18|Central|Andrews|Pencil|   75|    1.99|149.25|
+----------+-------+-------+------+-----+--------+------+
only showing top 4 rows

# Aggregating data in DataFrames

In [9]:
sample_df_inferred.groupby('Rep', 'Region').count().show()

+--------+-------+-----+
|     Rep| Region|count|
+--------+-------+-----+
|   Jones|   East|    8|
|  Kivell|Central|    4|
| Jardine|Central|    5|
|  Howard|   East|    2|
|  Morgan|Central|    3|
| Sorvino|   West|    4|
|Thompson|   West|    2|
| Andrews|Central|    4|
|   Smith|Central|    3|
|    Gill|Central|    5|
|  Parent|   East|    3|
+--------+-------+-----+

In [10]:
(
    sample_df_inferred
    .groupby('Item')
    .agg(
          f.sum('Units').alias('UnitsTotal')
        , f.sum('Total').alias('GrandTotal')
        , f.avg('Total').alias('AvgPerTransaction')
    )
    .show()
)

+-------+----------+------------------+------------------+
|   Item|UnitsTotal|        GrandTotal| AvgPerTransaction|
+-------+----------+------------------+------------------+
|   Desk|        10|            1700.0| 566.6666666666666|
| Binder|       722|           9577.65|            638.51|
|    Pen|       278|           2045.22|           409.044|
|Pen Set|       395|           4169.87| 595.6957142857143|
| Pencil|       716|2135.1400000000003|164.24153846153848|
+-------+----------+------------------+------------------+

# Selecting data
## .select(...)

In [11]:
(
    sample_df_inferred
    .select('Rep','Total')
    .show()
)

+--------+-------+
|     Rep|  Total|
+--------+-------+
|   Jones| 189.05|
|  Kivell|  999.5|
| Jardine| 179.64|
|    Gill| 539.73|
| Sorvino| 167.44|
|   Jones|  299.4|
| Andrews| 149.25|
| Jardine|  449.1|
|Thompson|  63.68|
|   Jones|  539.4|
|  Morgan|  449.1|
|  Howard|  57.71|
|  Parent|1619.19|
|   Jones| 174.65|
|   Smith|  250.0|
|   Jones| 255.84|
|  Morgan| 251.72|
|   Jones| 575.36|
|  Parent| 299.85|
|  Kivell| 479.04|
+--------+-------+
only showing top 20 rows

## .sql(...)

In [12]:
sample_df_inferred.createOrReplaceTempView('sample_df')

In [13]:
spark.sql('''
    SELECT OrderDate
        , Rep
        , Region
        , Total
    FROM sample_df
    ORDER BY Rep
        , OrderDate
''').show(4)

+----------+-------+-------+------+
| OrderDate|    Rep| Region| Total|
+----------+-------+-------+------+
|2016-04-18|Andrews|Central|149.25|
|2017-04-10|Andrews|Central|131.34|
|2017-10-31|Andrews|Central| 18.06|
|2017-12-21|Andrews|Central|139.72|
+----------+-------+-------+------+
only showing top 4 rows

# Transforming data

In [14]:
commission = spark.createDataFrame(
    sc.parallelize([
          ('Central', 0.033)
        , ('East',    0.032)
        , ('West',    0.034)
    ])
    , ['Region', 'Commission']
)

In [15]:
(
    sample_df_inferred
    .join(commission, on=['Region'], how='left_outer')
    .withColumn('CommissionValue', f.round(f.col('Total') * f.col('Commission')))
    .show(4)
)

+-------+----------+-------+------+-----+--------+------+----------+---------------+
| Region| OrderDate|    Rep|  Item|Units|UnitCost| Total|Commission|CommissionValue|
+-------+----------+-------+------+-----+--------+------+----------+---------------+
|Central|      null| Kivell|Binder|   50|   19.99| 999.5|     0.033|           33.0|
|Central|2016-02-09|Jardine|Pencil|   36|    4.99|179.64|     0.033|            6.0|
|Central|2016-02-26|   Gill|   Pen|   27|   19.99|539.73|     0.033|           18.0|
|Central|2016-04-18|Andrews|Pencil|   75|    1.99|149.25|     0.033|            5.0|
+-------+----------+-------+------+-----+--------+------+----------+---------------+
only showing top 4 rows

# Printing

In [16]:
(
    sample_df_inferred
    .select('Region', 'Rep')
    .show(4)
)

+-------+-------+
| Region|    Rep|
+-------+-------+
|   East|  Jones|
|Central| Kivell|
|Central|Jardine|
|Central|   Gill|
+-------+-------+
only showing top 4 rows

In [17]:
(
    sample_df_inferred
    .select('Region', 'Rep')
    .take(4)
)

[Row(Region='East', Rep='Jones'), Row(Region='Central', Rep='Kivell'), Row(Region='Central', Rep='Jardine'), Row(Region='Central', Rep='Gill')]

# Sorting data

In [18]:
(
    sample_df_inferred
    .orderBy('Rep', 'Region')
    .show(4)
)

+----------+-------+-------+------+-----+--------+------+
| OrderDate| Region|    Rep|  Item|Units|UnitCost| Total|
+----------+-------+-------+------+-----+--------+------+
|2016-04-18|Central|Andrews|Pencil|   75|    1.99|149.25|
|2017-12-21|Central|Andrews|Binder|   28|    4.99|139.72|
|2017-10-31|Central|Andrews|Pencil|   14|    1.29| 18.06|
|2017-04-10|Central|Andrews|Pencil|   66|    1.99|131.34|
+----------+-------+-------+------+-----+--------+------+
only showing top 4 rows

In [19]:
(
    sample_df_inferred
    .sort('Rep', 'Region')
    .show(4)
)

+----------+-------+-------+------+-----+--------+------+
| OrderDate| Region|    Rep|  Item|Units|UnitCost| Total|
+----------+-------+-------+------+-----+--------+------+
|2016-04-18|Central|Andrews|Pencil|   75|    1.99|149.25|
|2017-12-21|Central|Andrews|Binder|   28|    4.99|139.72|
|2017-10-31|Central|Andrews|Pencil|   14|    1.29| 18.06|
|2017-04-10|Central|Andrews|Pencil|   66|    1.99|131.34|
+----------+-------+-------+------+-----+--------+------+
only showing top 4 rows

# Saving data

## CSV

In [20]:
(
    sample_df_inferred
    .write
    .mode('overwrite')
    .csv('../data/sample_data_inferred.csv')
)

## Parquet

In [21]:
(
    sample_df_inferred
    .write
    .parquet(
        '../data/sample_data_inferred.parquet'
        , mode='overwrite'
        , partitionBy='Rep'
        , compression='gzip'
    )
)

## JSON

In [22]:
(
    sample_df_inferred
    .write
    .json(
        '../data/sample_data_inferred.json'
        , mode='overwrite'
        , dateFormat='yyyy-mm-dd'
        , compression='gzip'
    )
)

# Pitfalls of using pure Python UDFs

![alt text][logo]

[logo]: https://raw.githubusercontent.com/drabastomek/learningPySpark_video/master/common/images/udf.png


In [25]:
def calculateCommission(value, commissionPercent):
    return value * commissionPercent

In [26]:
(
    sample_df_inferred
    .join(commission, on=['Region'], how='left_outer')
    .withColumn('CommissionValue', calculateCommission(f.col('Total'), f.col('Commission')))
    .show(4)
)

+-------+----------+-------+------+-----+--------+------+----------+---------------+
| Region| OrderDate|    Rep|  Item|Units|UnitCost| Total|Commission|CommissionValue|
+-------+----------+-------+------+-----+--------+------+----------+---------------+
|Central|      null| Kivell|Binder|   50|   19.99| 999.5|     0.033|        32.9835|
|Central|2016-02-09|Jardine|Pencil|   36|    4.99|179.64|     0.033|        5.92812|
|Central|2016-02-26|   Gill|   Pen|   27|   19.99|539.73|     0.033|       17.81109|
|Central|2016-04-18|Andrews|Pencil|   75|    1.99|149.25|     0.033|        4.92525|
+-------+----------+-------+------+-----+--------+------+----------+---------------+
only showing top 4 rows

# Repartitioning data

In [28]:
sample_df_inferred.rdd.getNumPartitions()

1

In [31]:
sample_df_repartitioned = sample_df_inferred.repartition(4, 'Rep')
sample_df_repartitioned.rdd.getNumPartitions()

4