<a id="Top"></a>
# Using Spark to Prepare a Large, Detailed Dataset

## Table of Contents
- [Introduction](#Intro)
- [Import necessary libraries and packages](#Imports)
- [Read in a previously persisted ID list](#IDlist)
- [Define a function to prepare monthly aggregated data for each table](#AggFile)
- [Table schemas and required transformations and aggregations](#Schemas)
- [Run subset and aggregation for all months for all tables](#Process)
- [Merge all tables](#Merge)
- [Aggregate remaining constant/current columns into one](#Constants)
- [Write the results to file](#Output)

<a id="Intro"></a>
## Introduction
In data science, more data is usually better. But because a lot of the activity of a data scientist is exploratory and iterative, a large data volume comes at a cost. When it takes hours just to read the data into memory and perform preliminary transformations, it is hard to make progress. So it can be useful to start with a subset of the data and aggregate too much detail into a coarser-grained representation. You can always go back to the original volume and level of detail, once the feature engineering and modeling approach have been developed and validated.

This notebook illustrates the following operations on a large dataset consisting of multiple files representing detailed records over several data collection periods (for example, daily detail for months or quarters) for several distinct source tables (for example, customers, usage or transactions, billing, etc.).

1. Create a manageable subset of the full dataset by selecting specific unique IDs and aggregating detail data.<br>
   See separate notebook *Generate_Subset_By_Column_Spark* for the generation of the list of IDs and a further discussion of the need for subsampling.

1. Aggregate detail data.<br>
   For example, aggregate daily detail into months.

1. Apply data transformations as needed.<br>
   This includes dropping columns, computing derived columns, changing data types, normalizing and scaling, and more.

1. Merge all files into a single DataFrame.<br>
   Using union or join operations depending on the type of attributes involved, roll all data from all files into a single DataFrame. The common key over which the different tables can be joined or aggregated is the unique ID.

In the example below, the period is a month, and each file contains daily detail, which needs to be aggregated by month. The example is based on a churn use case, which has its own peculiar characteristics—mainly, that the entities being represented (users/customers) join and leave over the course of the data collection period.

Most of the monthly values are kept separate; for a given attribute, the final dataset will have as many columns for that attribute as there are months in the input dataset. Other attributes, those that are either not expected to change or for which only the current value matters, are collapsed into a single column.

Note that the example is based on a dataset that is too small to require any of this code; it is just enough to let the code run and show its intended operation.

[Top **⤒**](#Top)

In [1]:
%%html
<!-- The tables below are easier to keep with the surrounding text if they are left-justified instead of centered -->
<style>
table {float:left}
</style>

### For example, if the two tables for the first two months look like this:
30 (or 31 or 28) rows for each UID. (UID = Unique IDentifier; actual column name may vary, of course.) Not all UIDs may appear in all months, as customers join and leave.

Table1, January|Table1, February|Table2, January|Table2, February
:---:|:---:|:---:|:---:
<table> <thead> <tr> <th>UID</th> <th>Day</th> <th>Col1</th> <th>Col2</th> <th>...</th> </tr> </thead> <tbody> <tr> <td>001</td> <td style="text-align:right;">1</td> <td><strong>ABC</strong></td> <td>XYZ</td> <td>...</td> </tr> <tr> <td>001</td> <td style="text-align:right;">2</td> <td><strong>ABC</strong></td> <td>XYZ</td> <td>...</td> </tr> <tr> <td>001</td> <td>...</td> <td>...</td> <td>...</td> <td>...</td> </tr> <tr> <td>001</td> <td style="text-align:right;">31</td> <td><strong><strong>ABC</strong></strong></td> <td>XYZ</td> <td>...</td> </tr> <tr> <td>002</td> <td style="text-align:right;">1</td> <td>DEF</td> <td>PQR</td> <td>...</td> </tr> <tr> <td>002</td> <td style="text-align:right;">2</td> <td>DEF</td> <td>PQR</td> <td>...</td> </tr> <tr> <td>...</td> <td>...</td> <td>...</td> <td>...</td> <td>...</td> </tr> </tbody> </table>|<table> <thead> <tr> <th>UID</th> <th>Day</th> <th>Col1</th> <th>Col2</th> <th>...</th> </tr> </thead> <tbody> <tr> <td>001</td> <td style="text-align:right;">1</td> <td><strong>ABC</strong></td> <td>XYZ</td> <td>...</td> </tr> <tr> <td>001</td> <td style="text-align:right;">2</td> <td><strong>DCB</strong></td> <td>XYZ</td> <td>...</td> </tr> <tr> <td>001</td> <td>...</td> <td>...</td> <td>...</td> <td>...</td> </tr> <tr> <td>001</td> <td style="text-align:right;">28</td> <td><strong>DCB</strong></td> <td>XYZ</td> <td>...</td> </tr> <tr> <td>002</td> <td style="text-align:right;">1</td> <td>DEF</td> <td>PQR</td> <td>...</td> </tr> <tr> <td>002</td> <td style="text-align:right;">2</td> <td>DEF</td> <td>PQR</td> <td>...</td> </tr> <tr> <td>...</td> <td>...</td> <td>...</td> <td>...</td> <td>...</td> </tr> </tbody> </table>|<table> <thead> <tr> <th>UID</th> <th>Day</th> <th>ColA</th> <th>ColB</th> <th>...</th> </tr> </thead> <tbody> <tr> <td>001</td> <td style="text-align:right;">1</td> <td style="text-align:right;">123</td> <td style="text-align:right;">1.3</td> <td>...</td> </tr> <tr> <td>001</td> <td style="text-align:right;">2</td> <td style="text-align:right;">234</td> <td style="text-align:right;">2.4</td> <td>...</td> </tr> <tr> <td>001</td> <td>...</td> <td>...</td> <td>...</td> <td>...</td> </tr> <tr> <td>001</td> <td style="text-align:right;">31</td> <td style="text-align:right;">213</td> <td style="text-align:right;">1.7</td> <td>...</td> </tr> <tr> <td>002</td> <td style="text-align:right;">1</td> <td style="text-align:right;">567</td> <td style="text-align:right;">5.7</td> <td>...</td> </tr> <tr> <td>002</td> <td style="text-align:right;">2</td> <td style="text-align:right;">678</td> <td style="text-align:right;">6.8</td> <td>...</td> </tr> <tr> <td>...</td> <td>...</td> <td>...</td> <td>...</td> <td>...</td> </tr> </tbody> </table>|<table> <thead> <tr> <th>UID</th> <th>Day</th> <th>ColA</th> <th>ColB</th> <th>...</th> </tr> </thead> <tbody> <tr> <td>001</td> <td style="text-align:right;">1</td> <td style="text-align:right;">345</td> <td style="text-align:right;">1.5</td> <td>...</td> </tr> <tr> <td>001</td> <td style="text-align:right;">2</td> <td style="text-align:right;">456</td> <td style="text-align:right;">2.6</td> <td>...</td> </tr> <tr> <td>001</td> <td>...</td> <td>...</td> <td>...</td> <td>...</td> </tr> <tr> <td>001</td> <td style="text-align:right;">28</td> <td style="text-align:right;">333</td> <td style="text-align:right;">5.1</td> <td>...</td> </tr> <tr> <td>002</td> <td style="text-align:right;">1</td> <td style="text-align:right;">678</td> <td style="text-align:right;">5.9</td> <td>...</td> </tr> <tr> <td>002</td> <td style="text-align:right;">2</td> <td style="text-align:right;">789</td> <td style="text-align:right;">6.2</td> <td>...</td> </tr> <tr> <td>...</td> <td>...</td> <td>...</td> <td>...</td> <td>...</td> </tr> </tbody> </table>

### Then the desired result, after aggregating and combining, looks like this:

The aggregation function (First, Last, Max, or Sum) is indicated for each column.

Notice that in the example, the **Col1** value for UID=001 changes in February, and that is
reflected in the aggregate for **Col1** in the table below, which is based on the Last occurrence.

In other words, some attributes (the ones representing state and aggregated with First and Last
functions) are aggregated over the entire data collection period; others (the ones representing
counts or measures and aggregated with Sum or Max functions) are aggregated by month, with the
monthly aggregates kept in separate columns.

Applying this, the table will look as follows.

UID|Col1<br>(Last)|Col2<br>(First)|...|ColA_Jan<br>(Max)|ColB_JAN<br>(Sum)|...|ColA_FEB<br>(Max)|ColB_FEB<br>(Sum)|...
---|:---:|:---:|---|---:|---:|---|---:|---:|---|---:|---:|---
001|**DCB**|XYZ|...|963|45.3|...|852|57.1|...
002|DEF|PQR|...|974|127.2|...|865|108.9|...
...|...|...|...|...|...|...|...|...|...

[Top **⤒**](#Top)

<a id="Imports"></a>
## Import necessary libraries and packages
And set up the Spark session.

[Top **⤒**](#Top)

In [2]:
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import *

from pyspark.sql           import SparkSession
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.functions import array

spark = SparkSession(sc).builder.getOrCreate()

# Path to project datasets
import os
local_path = os.environ['DSX_PROJECT_DIR'] + '/datasets/'

# Handy package for interacting with iterables
import itertools

# Used for the monthly data in this example: naming files and columns
from calendar import month_abbr

# Handy packages for reporting running time and wall clock time
from time     import time
from datetime import datetime, timedelta
from pytz     import timezone

# Bring in a custom function to write CSV from a Spark DataFrame as a single file
import sys
sys.path.insert(0, '../scripts')
from spark1csv import spark_write_one_csv

# Use logging to see output from the custom function
import logging

<a id="IDlist"></a>
## Read in a previously persisted ID list
In this example, assume that a useful subset (2 columns: ID + label) was persisted in a Parquet file.

[Top **⤒**](#Top)

### First set the specifics: path, filename, and relevant column names

In [3]:
# dir_path      = '/user-home/libraries/UserData/datasets/'    # Set the directory path, if in a library
dir_path      = local_path                                   # Set the directory path, if in this project

# Apply your own naming convention for the ID list input and the result output files
suffix        = '_0k_35'
id_file       = 'customers' + suffix + '.parquet' # Set this to the name of the file containing the subset specification
out_file      = 'subset_merge_agg' + suffix

# 
id_column     = 'customer_id'
label_column  = 'churned'

suffixes      = [month_abbr[i] for i in range(1,13)]     # For month-based file and column names
file_type     = '.csv'                                   # '.csv' or '.csv.gz' or whatever the filename extension is

# For convenience in reporting progress, 
# set this according to local timezone of browser (not the Jupyter client or DSX server or UTC)
tz = timezone('US/Pacific')

### Read the list of unique IDs and their associated labels
The resulting DataFrame is used to subset the whole dataset (using joins).

In [4]:
# Cache the DataFrame to avoid repeated reading (not sure if this matters)
df_subset_ids = spark.read.parquet(dir_path + id_file).cache()

df_subset_ids.printSchema()

# Counting the rows forces Spark to perform the read and instantiate the DataFrame
n_ids = df_subset_ids.count()
print('Number of IDs: {}'.format(n_ids))

root
 |-- customer_id: string (nullable = true)
 |-- churned: integer (nullable = true)
 |-- period_count: long (nullable = true)

Number of IDs: 694


### Make sure there are no obvious problems that would derail everything

In [5]:
# The following statements raise an exception if something is amiss, stopping execution before it can generate useless results

# 1. Make sure the ID list is not empty
assert n_ids > 0, 'List of IDs is empty.'

# 2. Make sure there are no duplicates
assert df_subset_ids.agg(countDistinct(id_column)).first()[0] == n_ids, 'List of IDs contains duplicates.'

<a id="AggFile"></a>
## Define a function to prepare monthly aggregated data for each table
Interpret "month" loosely; your data may be organized by some other time interval or variable.

[Top **⤒**](#Top)

In [6]:
def agg_by_file(df_ids, table, suffix, ftype, fpath, schema=None, header=True, transforms=None, dropcols=[], agg_query=None):
    '''
    Aggregate daily data into monthly aggregates.
    
    This function only applies DataFrame transformations, no actions. Therefore, it will
    seem very fast, but the real work will not happen until an action later on forces the
    execution of the processing graph.
    
    Parameters:
        df_ids     DataFrame with single column, containing unique IDs
        table      Table name: for example, Users or Usage
        suffix     Suffix used for file names and column names (in the example, 3-character months: Jan, Feb, etc.)
        ftype      Filename extension (for example, '.csv')
        fpath      Path to input file directory
        schema     Full pyspark DataFrame schema of input data
        header     Whether the files contain a header line
        transforms Function that applies custom DataFrame transformations, if necessary. May be None.
                   Must take DataFrame as a parameter.
        dropcols   List of column names to drop. May be empty.
        agg_query  SQL query string to perform the table- and column-specific aggregations. May be None
                   Must have numbered string format placeholders ('{0}') for the suffix in each column alias.
    
    Assumptions:
        df_ids is not empty and contains no duplicates.
        File naming convention is fixed.
        Path and file exist
        Dropping columns before applying transformations is fine; any columns that need to exist for
        the transformations and dropped later should be dropped as part of the transforms function
    
    Limitations:
        Limited parameter error checking
    '''
    
    # Construct filename according to convention. Here: Users_MMM.csv.gz or Usage_MMM.csv.gz (MMM is JAN, FEb, ...)
    filename = os.path.join(fpath, table + '_' + suffix + ftype)
    
    df_extract = (spark.read.csv(filename, schema=schema)
                  .drop(*dropcols)         # Immediately drop the non-required columns
                  .join(df_ids,            # and subset the data
                        on=[id_column],    # Special syntax if join column has the same name on both sides
                        how='inner')       # Making sure every CO_ID gets a row happens later, when all tables are joined
                 )
    
    if transforms:
        df_extract = transforms(df_extract)
    
    if agg_query:
        # A SQL query performs the aggregation over the month.
        # First replace the placeholders ('{0}') in the projection part (SELECT clause) of the provided query with the suffix
        query = agg_query.format(suffix)
        df_extract.createOrReplaceTempView('extract')
        df_extract = spark.sql(query)
    
    return df_extract

<a id="Schemas"></a>
## Table schemas and required transformations and aggregations
In this example, assume that we know what the schemas are from prior work. It is better to apply those, rather than letting Spark
infer the schema, even if the input files contain headers. For one, CSV headers only contain names, not data types. For another,
inferring the schema requires an additional pass over the data, dramatically increasing the time it takes to read the files.

Required transformations (columns to be dropped or transformed)
and aggregations are decided by detailed examination, outside this notebook.

A major purpose of this notebook is to aggregate detail (e.g., daily) data into summary (e.g., monthly) data. How to do that
depends on the nature of each columnn. Support for SQL in Spark lets us express these in relatively readable syntax, using
standard aggregate functions:
- For static columns (the value does not change over the entire period, such as _Gender_), pick any occurrence: `First()`
- For columns indicating a status, which may change along the way: `Last()`
- For numeric columns, pick an appropriate roll-up: `Max()` or `Sum()`

**Note** the use of format placeholders (`{0}`) in the SQL query strings. These are used to make the query specific to
each file (representing months, in this example) and name the resultant columns accordingly.

[Top **⤒**](#Top)

### Example of "prior work" to determine file schemas

In [7]:
# Just read one file containing headers and see what Spark infers. Do this for each table type.
spark.read.csv(os.path.join(local_path, 'customers_Jan.csv'), header=True).schema.fields

# spark.read.csv(os.path.join(local_path, 'calls_Jan.csv'), header=True).schema.fields

[StructField(customer_id,StringType,true),
 StructField(first_name,StringType,true),
 StructField(last_name,StringType,true),
 StructField(twitter_handle,StringType,true),
 StructField(number,StringType,true),
 StructField(gender,StringType,true),
 StructField(age,StringType,true),
 StructField(type,StringType,true),
 StructField(location,StringType,true),
 StructField(location_lat,StringType,true),
 StructField(location_lon,StringType,true),
 StructField(callcenter_callcount,StringType,true),
 StructField(text_package_deal,StringType,true),
 StructField(voice_package_deal,StringType,true),
 StructField(4g_handset_deal,StringType,true),
 StructField(all_u_can_eat_data_deal,StringType,true),
 StructField(accidental_damage_cover_deal,StringType,true),
 StructField(europe_roamer_deal,StringType,true),
 StructField(americas_roamer_deal,StringType,true),
 StructField(asia_africa_roamer_deal,StringType,true),
 StructField(extra_handset_deal,StringType,true),
 StructField(musicsubscription_de

### Users
Based on schema shown above. In the sample data, users are called customers.

In [8]:
# This schema only applies to the sample data
users_schema = StructType([
    StructField('customer_id'                 , StringType(), False),    # The ID column should not have nulls
    StructField('first_name'                  , StringType(), True ),
    StructField('last_name'                   , StringType(), True ),
    StructField('twitter_handle'              , StringType(), True ),
    StructField('number'                      , StringType(), True ),
    StructField('gender'                      , StringType(), True ),
    StructField('age'                         , StringType(), True ),
    StructField('type'                        , StringType(), True ),
    StructField('location'                    , StringType(), True ),
    StructField('location_lat'                , StringType(), True ),
    StructField('location_lon'                , StringType(), True ),
    StructField('callcenter_callcount'        , StringType(), True ),
    StructField('text_package_deal'           , StringType(), True ),
    StructField('voice_package_deal'          , StringType(), True ),
    StructField('4g_handset_deal'             , StringType(), True ),
    StructField('all_u_can_eat_data_deal'     , StringType(), True ),
    StructField('accidental_damage_cover_deal', StringType(), True ),
    StructField('europe_roamer_deal'          , StringType(), True ),
    StructField('americas_roamer_deal'        , StringType(), True ),
    StructField('asia_africa_roamer_deal'     , StringType(), True ),
    StructField('extra_handset_deal'          , StringType(), True ),
    StructField('musicsubscription_deal'      , StringType(), True ),
    StructField('churned'                     , StringType(), False)     # The label column should not have nulls
])

# Drop columns considered unimportant or unreliable.
# This just makes explicit which columns will be ignored. Strictly speaking there's
# no need to drop them separately; they'll be dropped anyway by not appearing in the
# SQL query below.

# This set only applies to the sample data.
users_dropcols = [
    'first_name'    ,   # Personal, not relevant
    'last_name'     ,   # Personal, not relevant
    'twitter_handle',   # Personal, not relevant, often missing
    'number'        ,   # One-to-one correlated with customer_id (NOTE: this may not be true in real use cases)
    'location_lat'  ,   # No geospatial analysis in this use case
    'location_lon'  ,   # No geospatial analysis in this use case
    'churned'           # Use the label column from the selected ID list (df_subset_ids) instead
]

# Set up mapping functions and associated UDFs, as needed
# This one only applies to the sample data

# Reduce to broader categories: from location string ('AB12' ,etc.) take only the first letter.
# This brings the number of unique values to 23, down from almost one for each row.
map_location = udf(lambda loc: loc[:1])   

# Transform some of the columns, using UDFs defined above as necessary
def users_transforms(df_in):
    '''
    Apply required transformations that are specific to the Users table.
    '''
    df = df_in.withColumn('location' ,map_location('location'))
    
    # Several binary columns use 'Y' and 'N'. Convert them to using 1 and 0 instead,
    # for ease of aggregation and modeling.
    yes_no_cols = [c for c in df.columns if c.endswith('deal')]
    for c in yes_no_cols:
        df = df.withColumn(c, col(c).cast('boolean').cast('integer'))
    
    return df

# For aggregation, apply a SQL query using the aggregate functions First, Last, and Max.
# This is only needed if the detail files need to be aggregated, for example, if they
# contain daily records and you need them by month. User data tends to change infrequently
# and in most use cases will not be provided as daily detail.

# The sample "customers" data is already by month; no aggregation of daily detail is needed.
# But the files will be merged by unioning, meaning that the columns remain what they are but
# new rows are added for each month, so aggregation is still needed to end up with one row
# per user. In principle it can just be done once, after all months are merged, instead of
# within each month.
# However, if a static table like this one (none of the values vary day to day or even month
# to month) does contain (highly redundant) daily detail, applying the aggregates to each
# month anyway and not waiting till the final aggregation at the end may serve to keep memory
# consumption down. More important, it lets us attach a month indicator ("period") to each
# row, which will be used later to sort the rows before applying the Last() aggregate.
# In this example, we'll apply the aggregate query both to each month (a no-op, since each
# customer_id group has only one row) and after merging.
users_query = '''
SELECT customer_id
     , First('gender'                      ) AS gender
     , Last ('age'                         ) AS age
     , Last ('type'                        ) AS type
     , Last ('location'                    ) AS location
     , Last ('callcenter_callcount'        ) AS callcenter_callcount
     , Last ('text_package_deal'           ) AS text_package_deal
     , Last ('voice_package_deal'          ) AS voice_package_deal
     , Last ('4g_handset_deal'             ) AS 4g_handset_deal
     , Last ('all_u_can_eat_data_deal'     ) AS all_u_can_eat_data_deal
     , Last ('accidental_damage_cover_deal') AS accidental_damage_cover_deal
     , Last ('europe_roamer_deal'          ) AS europe_roamer_deal
     , Last ('americas_roamer_deal'        ) AS americas_roamer_deal
     , Last ('asia_africa_roamer_deal'     ) AS asia_africa_roamer_deal
     , Last ('extra_handset_deal'          ) AS extra_handset_deal
     , Last ('musicsubscription_deal'      ) AS musicsubscription_deal
     , '{0}'                                 AS period
  FROM extract
 GROUP BY customer_id
'''

# Dictionary of function call arguments
users_args  = {'fpath'       : dir_path        ,
               'schema'      : users_schema    ,
               'transforms'  : users_transforms,
               'dropcols'    : users_dropcols  ,
               'agg_query'   : users_query
              }

# Merge type: 'union' if all attributes are essentially constant and to be
# aggregated with First() or Last(); otherwise, 'join'.
users_merge = 'union'

### Usage
Based on schema shown above. In the sample data, usage is represented by calls only. In more realistic datasets for the telco industry, usage would include other things, such as data (MB) and texts.

In [9]:
# This schema only applies to the sample data
# NOTE: only impose non-string data types if you are sure there are no 
# funny values in the files or if you can afford to ignore records
# that cause parse failures
usage_schema = StructType([
    StructField('customer_id'  , StringType (), False),
    StructField('number'       , StringType (), True ),
    StructField('date'         , DateType   (), True ),
    StructField('from_calls'   , IntegerType(), True ),
    StructField('from_duration', IntegerType(), True ),
    StructField('from_dropped' , FloatType  (), True ),
    StructField('to_calls'     , IntegerType(), True ),
    StructField('to_duration'  , IntegerType(), True ),
    StructField('to_dropped'   , FloatType  (), True )
])

# Use all but one column
usage_dropcols = ['number']  # One-to-one correlated with customer_id (NOTE: this may not be true in real use cases)

# Most of the column types are already coerced by providing the schema
# to the CSV reader. Otherwise you'd cast them here. The only oddity to be corrected
# in the sample data is that the dropped flags come as values with decimal fractions
# (which are always zero).
def usage_transforms(df_in):
    return (df_in
            .dropna()      # Placehoder for protecting numeric type casts; sample data has no missing or strange values
#             .withColumn('date'         , col('date'         ).cast('date'   ))
#             .withColumn('from_calls'   , col('from_calls'   ).cast('integer'))
#             .withColumn('from_duration', col('from_duration').cast('integer'))
            .withColumn('from_dropped' , col('from_dropped' ).cast('integer'))
#             .withColumn('to_calls'     , col('to_calls'     ).cast('integer'))
#             .withColumn('to_duration'  , col('to_duration'  ).cast('integer'))
            .withColumn('to_dropped'   , col('to_dropped'   ).cast('integer'))
           )

# Note the aggregate function Sum, applied to all numeric columns
# This query only applies to the sample data
usage_query = '''
SELECT customer_id                            ,
       Sum(from_calls)    AS from_calls_{0}   , 
       Sum(from_duration) AS from_duration_{0},
       Sum(from_dropped)  AS from_dropped_{0} ,
       Sum(to_calls)      AS to_calls_{0}     ,
       Sum(to_duration)   AS to_duration_{0}  ,
       Sum(to_dropped)    AS to_dropped_{0}
  FROM extract
 GROUP BY customer_id
'''

# Dictionary of function call arguments
usage_args  = {'fpath'       : dir_path        ,
               'schema'      : usage_schema    ,
               'transforms'  : usage_transforms,
               'dropcols'    : usage_dropcols  ,
               'agg_query'   : usage_query
              }

# Merge type: 'union' if all attributes are essentially constant and to be
# aggregated with First() or Last(); otherwise, 'join'.
usage_merge = 'join'

<a id="Process"></a>
## Run subset and aggregation for all months for all tables
Initially, keep all tables and months in separate DataFrames. Combine them later.

[Top **⤒**](#Top)

In [10]:
# Set up the call arguments and merge methods for each of the tables as elements of a dictionary
call_args = {
    'customers' : users_args,
    'calls'     : usage_args
}

merge_method = {
    'customers' : users_merge,
    'calls'     : usage_merge
}

In [11]:
# A dictionary, indexed by tablename+month, holds the resulting DataFrames
aggregates = {}

tables     = call_args.keys()
tablewidth = (len(max(tables  , key=len)) + 1 +  # Add 1 for the '_'
              len(max(suffixes, key=len)))       # Positions to reserve for table names (in output)
rowswidth  = len(str(n_ids))                     # Positions to reserve for numbers of rows

print('Started aggregation at {}'.format(datetime.now(tz).strftime('%H:%M:%S %m-%d-%Y')))
for table, suffix in itertools.product(tables, suffixes):
    start   = time()
    key     = '{tab}_{suf}'.format(tab=table, suf=suffix)
    print('Processing table {tab:>{wid}} ...'.format(tab=key, wid=tablewidth), end=' ')
    aggregates[key] = agg_by_file(df_subset_ids.select(id_column), table, suffix, file_type, **call_args[table]).cache()

    rows    = aggregates[key].count()
    seconds = int(time() - start)
    print('{r:>{w}d} rows processed in {t}.'.format(r=rows, w=rowswidth, t=timedelta(seconds=seconds)))

print('Finished aggregation at {}'.format(datetime.now(tz).strftime('%H:%M:%S %m-%d-%Y')))

Started aggregation at 21:29:02 03-21-2019
Processing table     calls_Jan ... 499 rows processed in 0:00:04.
Processing table     calls_Feb ... 510 rows processed in 0:00:02.
Processing table     calls_Mar ... 531 rows processed in 0:00:01.
Processing table     calls_Apr ... 528 rows processed in 0:00:01.
Processing table     calls_May ... 532 rows processed in 0:00:01.
Processing table     calls_Jun ... 539 rows processed in 0:00:01.
Processing table     calls_Jul ... 524 rows processed in 0:00:01.
Processing table     calls_Aug ... 513 rows processed in 0:00:01.
Processing table     calls_Sep ... 508 rows processed in 0:00:01.
Processing table     calls_Oct ... 492 rows processed in 0:00:01.
Processing table     calls_Nov ... 493 rows processed in 0:00:01.
Processing table     calls_Dec ... 468 rows processed in 0:00:01.
Processing table customers_Jan ... 510 rows processed in 0:00:02.
Processing table customers_Feb ... 531 rows processed in 0:00:02.
Processing table customers_Mar ..

<a id="Merge"></a>
## Merge all tables
In the end, every ID gets a single row, with all columns from all variable tables (which are joined) for all months and one set of columns from all static tables (which are unioned).

[Top **⤒**](#Top)

In [12]:
# Unions first, if any: they only work on tables of the same schema, so as long as no columns have been added
df_empty  = spark.createDataFrame([], schema=StructType([StructField(id_column, StringType(), True)]))
merge_msg = 'Adding {rows:>{rwid}} rows from table {tab:>{twid}}.'
for table in [t for t in tables if merge_method[t] == 'union']:
    # Initialize the union with the first month's table. Then iterate over the rest to add them.
    start      = time()
    suffix0    = suffixes[0]
    key0       = '{tab}_{suf}'.format(tab=table, suf=suffix0)
    df_current = aggregates[key0]
    rows       = df_current.count()
    print(merge_msg.format(rows=rows, rwid=rowswidth, tab=key0, twid=tablewidth))
    
    for suffix in suffixes[1:]:
        key             = '{tab}_{suf}'.format(tab=table, suf=suffix)
        rows            = aggregates[key].count()
        print(merge_msg.format(rows=rows, rwid=rowswidth, tab=key, twid=tablewidth))
        df_current      = df_current.union(aggregates[key])
   
    print('Aggregating ...', end=' ')
    
    # The same SQL query as was used for each month performs the aggregation over all months
    df_current.orderBy('period').createOrReplaceTempView('extract')   # Must sort, otherwise First() and Last() don't make sense
    query = call_args[table]['agg_query'].format(suffix0)
    
    # Replace the first-month's aggregate with the new overall aggregate; the rest was already emptied out
    aggregates[key0] = spark.sql(query).cache()
    rows             = aggregates[key0].count()
    elapsed          = timedelta(seconds=int(time() - start))
    print('after aggregation, table {tab} contains {rows} rows. Processed in {td}'.format(tab=key0, rows=rows, td=elapsed))
    
    # Reset the old monthly aggregate so it doesn't get rolled up again in a join
    for suffix in suffixes[1:]:
        aggregates['{tab}_{suf}'.format(tab=table, suf=suffix)] = None
 

Adding 510 rows from table customers_Jan.
Adding 531 rows from table customers_Feb.
Adding 551 rows from table customers_Mar.
Adding 546 rows from table customers_Apr.
Adding 559 rows from table customers_May.
Adding 561 rows from table customers_Jun.
Adding 545 rows from table customers_Jul.
Adding 530 rows from table customers_Aug.
Adding 526 rows from table customers_Sep.
Adding 512 rows from table customers_Oct.
Adding 493 rows from table customers_Nov.
Adding 468 rows from table customers_Dec.
Aggregating ... after aggregation, table customers_Jan contains 694 rows. Processed in 0:00:39


In [13]:
df_all   = df_subset_ids         # This time, include the churn column
lastcols = 2
n_tables = 0
start    = time()
colwidth = len(str(len(max([df.columns for df in aggregates.values() if df is not None], key=len))))

print('Starting with 2 columns: "{id}" and "{lbl}".'.format(id=id_column, lbl=label_column))

for suffix, table in itertools.product(suffixes, tables):
    key        = '{tab}_{suf}'.format(tab=table, suf=suffix)
    
    if aggregates[key] is None: continue
    print('Joining table {tab:>{wid}} ...'.format(tab=key, wid=tablewidth), end=' ')
    
    df_current = aggregates[key]
    df_all     = df_all.join(df_current, on=[id_column], how='leftouter')
#     df_current.unpersist()               # We don't need it anymore (NOTE: It's not clear if this helps)
    
    cols       = len(df_all.columns)
    print('{col:>{cwid}d} columns added.'.format(col=cols-lastcols, cwid=colwidth))
    lastcols   = cols
    n_tables  += 1

print('Processing ...', end=' ')
df_all.cache()
rows    = df_all.count()
elapsed = timedelta(seconds=int(time() - start))
print('Finished with {n} tables in {td}. Final result has {c} columns and {r} rows'.format(n=n_tables, td=elapsed, c=cols, r=rows))

Starting with 2 columns: "customer_id" and "churned".
Joining table     calls_Jan ...  7 columns added.
Joining table customers_Jan ... 16 columns added.
Joining table     calls_Feb ...  6 columns added.
Joining table     calls_Mar ...  6 columns added.
Joining table     calls_Apr ...  6 columns added.
Joining table     calls_May ...  6 columns added.
Joining table     calls_Jun ...  6 columns added.
Joining table     calls_Jul ...  6 columns added.
Joining table     calls_Aug ...  6 columns added.
Joining table     calls_Sep ...  6 columns added.
Joining table     calls_Oct ...  6 columns added.
Joining table     calls_Nov ...  6 columns added.
Joining table     calls_Dec ...  6 columns added.
Processing ... Finished with 13 tables in 0:00:08. Final result has 91 columns and 694 rows


<a id="Constants"></a>
## Aggregate remaining constant/current columns into one
These are columns (attributes) that only need one value for the entire period, not monthly values.
For aggregation over the whole period use the same method as was used for monthly aggregation

**NOTE**: If the tables nicely separate the  constant and variable attributes, with each table
having either all constant or all variable attributes, there is nothing more to do, because all
this is already accomplished by unioning instead of joining the constant tables. This is the
case in the sample dataset. The next cell is only needed if one or more tables have a mix of
constant and variable attributes.

[Top **⤒**](#Top)

In [14]:
# Set up UDFs for use in column expressions, because, unlike sum and max, first and last
# are not built-in aggregate functions.
# PySpark supports ArrayType columns; ArrayType values behave like (or really are) lists;
# therefore, standard Python methods next() (notice the default parameter) and reversed() work.
# Use the pyspark.sql function array() to create the lists passed as arguments into these UDFs.
# There may be other ways to achieve the same effect, but this seems reasonably straightforward.
first_udf = udf(lambda array: next((v for v in          array  if v is not None), None), StringType())
last_udf  = udf(lambda array: next((v for v in reversed(array) if v is not None), None), StringType())

# Associate the right aggregate with each attribute
# ("Attribute" instead of "column", because each attribute is associated with multiple columns,
# one for each month. The purpose of the following operation is to end up with one column for each.)
attribute_aggregate = {}    # {attr1:agg1, attr2:agg2, ...}

### Use a simple for loop
Don't worry about code efficiency: because Spark uses lazy evaluation, no action is
performed until the result needs to be instantiated. That allows Spark to accumulate
transformations and optimize the entire sequence before it executes anything.
Therefore, it doesn't matter if you use loops, list comprehensions, massive SQL queries,
or any other clever code to achieve the result; the execution plan will look the same.

(Earlier cells in this notebook also rely on lazy evaluation.)

In [15]:
for attribute, aggregate in attribute_aggregate.items():                   # March through the columns to be aggregated
    columns = [c for c in df_all.columns if c.startswith(attribute + '_')] # <attr-name>_Jan, <attr-name>_Feb, etc.
    df_all = (df_all
              .withColumn(attribute, aggregate(array(columns)))            # Add the single, whole-period column
              .drop(*columns)                                              # Drop the monthly columns
             )
    
df_all = df_all.cache()

In [16]:
# Sanity test: check if all expected columns are there
[f.name for f in df_all.schema.fields]

['customer_id',
 'churned',
 'period_count',
 'from_calls_Jan',
 'from_duration_Jan',
 'from_dropped_Jan',
 'to_calls_Jan',
 'to_duration_Jan',
 'to_dropped_Jan',
 'gender',
 'age',
 'type',
 'location',
 'callcenter_callcount',
 'text_package_deal',
 'voice_package_deal',
 '4g_handset_deal',
 'all_u_can_eat_data_deal',
 'accidental_damage_cover_deal',
 'europe_roamer_deal',
 'americas_roamer_deal',
 'asia_africa_roamer_deal',
 'extra_handset_deal',
 'musicsubscription_deal',
 'period',
 'from_calls_Feb',
 'from_duration_Feb',
 'from_dropped_Feb',
 'to_calls_Feb',
 'to_duration_Feb',
 'to_dropped_Feb',
 'from_calls_Mar',
 'from_duration_Mar',
 'from_dropped_Mar',
 'to_calls_Mar',
 'to_duration_Mar',
 'to_dropped_Mar',
 'from_calls_Apr',
 'from_duration_Apr',
 'from_dropped_Apr',
 'to_calls_Apr',
 'to_duration_Apr',
 'to_dropped_Apr',
 'from_calls_May',
 'from_duration_May',
 'from_dropped_May',
 'to_calls_May',
 'to_duration_May',
 'to_dropped_May',
 'from_calls_Jun',
 'from_duration_J

<a id="Output"></a>
## Write the results to file
Write a Parquet file for ease of loading into Spark, and a compressed CSV file (with header) for any other purpose.

[Top **⤒**](#Top)

In [17]:
print('Number of partitions: {}'.format(df_all.rdd.getNumPartitions()))

Number of partitions: 7


In [18]:
filename = os.path.join(dir_path, out_file + '.parquet')
df_all.write.parquet(filename, mode='overwrite')
print('{} written.'.format(filename))

/user-home/1053/DSX_Projects/Big-Churn/datasets/subset_merge_agg_0k_35.parquet written.


### Write a single CSV file from a Spark DataFrame
This relies on a custom function. Its module must be accessible through the python path (`sys.path`), for example by having it installed in the project's `scripts` directory and having added that directory to the python path.

In [19]:
# Set up logging to capture output from the function instead of letting it operate silently.
logger    = logging.getLogger('spark1csv')
handler   = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
handler.setLevel(logging.INFO)   # Can set these to logging.DEBUG to get more verbose output
logger .setLevel(logging.INFO)
logger .addHandler(handler)

filename = os.path.join(dir_path, out_file + '.csv.gz')
spark_write_one_csv(df_all, filename) # Use default settings: gzip compression, add header

2019-03-22 04:30:43,057 - INFO - File already exists.
2019-03-22 04:30:43,058 - INFO - Existing file will be overwritten.
2019-03-22 04:30:44,100 - INFO - The file /user-home/1053/DSX_Projects/Big-Churn/datasets/subset_merge_agg_0k_35.csv.gz (0.1 MB) is now available as a single CSV file.


### Developed by IBM Data Science Elite Team, IBM Data Science and AI:
- Robert Uleman, Data Science Engineer

Copyright (c) 2019 IBM Corporation