In [1]:
# ===============
# Packages import
# ===============

from __future__ import division
from datetime import datetime
import os
import random
import pandas as pd
import numpy as np
import logging
import yaml
from dateutil.relativedelta import relativedelta
from scipy import signal
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql import Row
import pyspark.sql.types as pst
from pyspark.sql.functions import udf
from utils import *
import pydoop.hdfs as pydoop
from hops import hdfs

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "false")

cwd = os.getcwd() + "/"

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
2708,application_1595489208176_0010,pyspark,idle,Link,Link


SparkSession available as 'spark'.


In [2]:
# ===========================
# Reading configuration files
# ===========================

data_conf = Get_Data_From_JSON(cwd + "data.json")
model_conf = Get_Data_From_JSON(cwd + "config.json")

start_date, end_date = data_conf['start_date'], data_conf['end_date']
N_days_X, N_days_y = int(data_conf['number_of_historical_days']), int(data_conf['number_of_predicted_days'])  # 365, 92

end_date_dt = datetime.strptime(end_date, "%Y-%m-%d")
start_date_for_prediction_dt = end_date_dt - relativedelta(days=N_days_X + N_days_y)
start_date_for_prediction = start_date_for_prediction_dt.strftime("%Y-%m-%d")

start_date_dt, end_date_dt, start_date_prediction, end_date_prediction, end_date_plusOneDay, end_date_minus_6month = dates_definitions(
    start_date, end_date, N_days_X, N_days_y)

time_range = pd.date_range(start_date, end_date, freq='D')

# Type of dataset desired
# Case we want a dataset to train a model: use 1e5 and serving_mode=False
# Case we want an unseen dataset to serve the model on: use 2.5e6 and serving_mode=True
N_customers = 1e5#2.5e6
serving_mode = False  # True if creating data for serving

In [5]:
# =========
# Functions
# =========

def time_series_generator(size=500,
                          cycle_period=30.5,
                          signal_type='sine',
                          salary=1,
                          trend=0.1,
                          noise=0.1,
                          offset=False,
                          spike=0):
    '''
    This function generates mock time series with noise
    :param (int) size: length of the time series
    :param (float) cycle_period: period of the signal (usually 30.5, the month period, in days)
    :param (string) signal_type: Type of signal, "sine", "sawtooth", "triangle", "square", "random_choice"
    :param (float) salary: Base scaling variable for the trend, default=1
    :param (float) trend: Scaling variable for the trend
    :param (float) noise: Trend noise, default=0.1
    :param (boolean) offset: Use of random phase offset, makes seasonality
    :param (int) spike: Number of random amplitude spikes
    :return (numpy array): Timeseries with account balance for each day
    '''

    signal_types = ['sine', 'sawtooth', 'triangle', 'square']
    if signal_type == 'random_choice':
        signal_type = random.choice(signal_types)
    elif signal_type not in signal_types:
        raise ValueError('{} is not a valid signal type'.format(signal_type))

    # in size = 635, and cycle_period = 30.5, we have ~ 21 periods (20.8)
    count_periods = size / cycle_period

    # 1. The trend making
    t = np.linspace(-0.5 * cycle_period * count_periods, 0.5 * cycle_period * count_periods, size)
    t_trend = np.linspace(0, 1, size)
    sign = random.choice([-1, 1])
    trend_ts = sign * salary * np.exp(trend*t_trend)

    # 2. The seasonality making
    if offset:
        phase = np.random.uniform(-1, 1) * np.pi
    else:
        phase = 0

    if signal_type == 'sine':     ts = 0.5 * salary * np.sin(2 * np.pi * (1. / cycle_period) * t + phase)
    if signal_type == 'sawtooth': ts = -0.5 * salary * signal.sawtooth(2 * np.pi * (1. / cycle_period) * t + phase)
    if signal_type == 'triangle': ts = 1 * salary * np.abs(signal.sawtooth(2 * np.pi * (1. / cycle_period) * t + phase)) - 1
    if signal_type == 'square':   ts = 0.5 * salary * signal.square(2 * np.pi * (1. / cycle_period) * t + phase)

    # 3. The noise making
    noise_ts = np.random.normal(0, noise * salary, size)

    ts = ts + trend_ts + noise_ts

    # 4. Adding spikes to the time series
    if spike > 0:
        last_spike_time = int(size)-92      # Don't create spikes in the last 3 months, where we want to predict
        first_spike_time = int(size)-92-365 # Let's have the spikes within 1 year up to the prediction time
        for _ in range(spike):
            sign = random.choice([-1, 1])
            t_spike = np.random.randint(first_spike_time, last_spike_time)  # time of the spike
            ts[t_spike:] = ts[t_spike:] + sign * np.random.normal(3 * salary, salary)
            print(t_spike)
            
    print(size, first_spike_time, last_spike_time)
            
    if signal_type == 'sine':     signal_type_int = 1
    if signal_type == 'triangle': signal_type_int = 2
    if signal_type == 'square':   signal_type_int = 3     
    if signal_type == 'sawtooth': signal_type_int = 4      

    return np.around(ts,decimals=2).tolist(), signal_type_int

In [6]:
# ============================
# Generation of synthetic data (for both formats)
# ============================

dff = spark.range(N_customers).toDF("primaryaccountholder") #'primaryaccountholder','transactiondate','balance'

#@udf("array<float>") 
def ts_generation():
    bb,nn = time_series_generator(
              size=len(time_range),
              cycle_period=30.5,
              signal_type='random_choice',
              salary=np.maximum(np.random.normal(15000, 5000), 100),
              trend=np.random.uniform(1,2),#np.random.normal(0, 1.1),
              noise=np.abs(np.random.normal(0, 0.01)) + 0.1,
              offset=True,
              spike=3)      
    return Row('signal_type', 'balance')(nn, bb)
    
schema = pst.StructType([
    pst.StructField("signal_type", pst.IntegerType(), False),
    pst.StructField("balance", pst.ArrayType(pst.FloatType()), False)])    
    
ts_generation_udf = F.udf(ts_generation, schema)  
  
dff = dff.withColumn("generation", ts_generation_udf())

dff = dff.select('primaryaccountholder', "generation.*")

dff2 = spark.sql("SELECT sequence(to_date('{0}'), to_date('{1}'), interval 1 day) as transactiondate".format(start_date, end_date))

timeseries_spark = dff2.crossJoin(dff)
timeseries_spark = timeseries_spark.select('primaryaccountholder','transactiondate','balance','signal_type')

timeseries_spark.show(5)
timeseries_spark.count()

+--------------------+--------------------+--------------------+-----------+
|primaryaccountholder|     transactiondate|             balance|signal_type|
+--------------------+--------------------+--------------------+-----------+
|                   0|[2018-12-01, 2018...|[23367.93, 23041....|          3|
|                   1|[2018-12-01, 2018...|[9823.69, 8871.93...|          2|
|                   2|[2018-12-01, 2018...|[9708.74, 8577.31...|          2|
|                   3|[2018-12-01, 2018...|[-4881.65, -5710....|          1|
|                   4|[2018-12-01, 2018...|[24137.48, 26038....|          2|
+--------------------+--------------------+--------------------+-----------+
only showing top 5 rows

100000

In [7]:
# ========================
# Saving the dataset
# ========================

if not serving_mode:
    table_out = data_conf['synthetic_data']['table_to_train_on']
else:    
    table_out = data_conf['synthetic_data']['table_to_score']
    
#timeseries_spark.write.format("parquet").mode("overwrite").save(cwd+"{0}.parquet".format(table_out)) #ideally write like this
timeseries_spark.write.format("parquet").mode("overwrite").save(
        "hdfs:///Projects/CashFlow_Algirdas/CashFlow_Algirdas_Training_Datasets/{0}.parquet".format(table_out)) #so far still like this