In [10]:
!pip install -U "sagemaker>2.0"
!pip install --upgrade sagemaker

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com


In [11]:
import logging
import sagemaker
from time import gmtime, strftime

sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

In [None]:
%%writefile preprocess.py

import io
import os
import sys
import boto3
import pickle
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from numpy.lib.stride_tricks import sliding_window_view

HISTORICAL_DATA_WINDOW = 14
FUTURE_PREDICTION_WINDOW = 3

train_frames_x_dir = "./train_frames_x"
test_frames_x_dir = "./test_frames_x"
train_frames_y_dir = "./train_frames_y"
test_frames_y_dir = "./test_frames_y"
train_frames_embed_dir = "./train_frames_embed"
test_frames_embed_dir = "./test_frames_embed"
train_frames_decoder_input_dir = "./train_frames_decoder_input"
test_frames_decoder_input_dir = "./test_frames_decoder_input"

def create_training_frames(partition):

  partition.drop(columns=["Unnamed: 0"], inplace=True)
  partition.sort_values(by=["date"], inplace=True)
  split_index = partition.iloc[0]["keywordId"]
    
  train_frames_X, train_frames_Y = [], []
  test_frames_X, test_frames_Y = [], []
  train_frames_embed, test_frames_embed = [], []
  train_frames_decoder_input, test_frames_decoder_input = [], []

  train_size = (int)(len(partition) * 0.7)
  train_data = partition[:train_size]
  test_data = partition[train_size-HISTORICAL_DATA_WINDOW-FUTURE_PREDICTION_WINDOW+1:]
  data_columns = partition.columns

  train_data_arr, test_data_arr = np.array(train_data), np.array(test_data)
  if train_data_arr.shape[0] < HISTORICAL_DATA_WINDOW or test_data_arr.shape[0] < HISTORICAL_DATA_WINDOW:
    return 0
    
  train_data_frame_X = sliding_window_view(train_data_arr, window_shape = (HISTORICAL_DATA_WINDOW, train_data_arr.shape[1]))
  test_data_frame_X = sliding_window_view(test_data_arr, window_shape = (HISTORICAL_DATA_WINDOW, test_data_arr.shape[1]))
  train_data_frame_X = np.squeeze(train_data_frame_X)[:-FUTURE_PREDICTION_WINDOW]
  test_data_frame_X = np.squeeze(test_data_frame_X)[:-FUTURE_PREDICTION_WINDOW]
  train_data_frames_X, test_data_frames_X = train_data_frame_X.copy(), test_data_frame_X.copy()
  for frame in train_data_frames_X:
    train_frames_X.append(pd.DataFrame(frame, columns=data_columns))
  for frame in test_data_frames_X:
    test_frames_X.append(pd.DataFrame(frame, columns=data_columns))

  train_data_frame_Y = sliding_window_view(train_data_arr, window_shape = (FUTURE_PREDICTION_WINDOW, train_data_arr.shape[1]))
  test_data_frame_Y = sliding_window_view(test_data_arr, window_shape = (FUTURE_PREDICTION_WINDOW, test_data_arr.shape[1]))
  train_data_frame_Y = np.squeeze(train_data_frame_Y)[HISTORICAL_DATA_WINDOW:]
  test_data_frame_Y = np.squeeze(test_data_frame_Y)[HISTORICAL_DATA_WINDOW:]
  train_data_frames_Y, test_data_frames_Y = train_data_frame_Y.copy(), test_data_frame_Y.copy()
  for frame in train_data_frames_Y:
    train_frames_Y.append(pd.DataFrame(frame, columns=data_columns))
  for frame in test_data_frames_Y:
    test_frames_Y.append(pd.DataFrame(frame, columns=data_columns))
    
  embedding_columns = ["keyword_length", "keyword_num_words", "budget", "matchType", "country_code", "campaign_type", "targeting_type", "budget_type", "adFormat", "tactic", "costType"]
  
  for i in range(len(train_frames_X)):
    train_frames_embed.append(train_frames_X[i].loc[0][embedding_columns])
    train_frames_X[i].drop(columns=embedding_columns, inplace=True)

  for i in range(len(test_frames_X)):
    test_frames_embed.append(test_frames_X[i].loc[0][embedding_columns])
    test_frames_X[i].drop(columns=embedding_columns, inplace=True)
    
  for i in range(len(train_frames_X)):
    train_frames_X[i].drop(columns=["keywordId", "date"], inplace=True)
    train_frames_Y[i].drop(columns=embedding_columns, inplace=True)
    train_frames_Y[i].drop(columns=["keywordId", "date", "year", "month", "day", "dayoftheweek", "clicks", "impressions", "orders", "campaign_sales_perc", "campaign_spend_perc", "account_sales_perc", "account_spend_perc"], inplace=True)

  for i in range(len(test_frames_X)):
    test_frames_X[i].drop(columns=["keywordId", "date"], inplace=True)
    test_frames_Y[i].drop(columns=embedding_columns, inplace=True)
    test_frames_Y[i].drop(columns=["keywordId", "date", "year", "month", "day", "dayoftheweek", "clicks", "impressions", "orders", "campaign_sales_perc", "campaign_spend_perc", "account_sales_perc", "account_spend_perc"], inplace=True)
  
  for i in range(len(train_frames_Y)):
      train_frames_decoder_input.append(train_frames_Y[i]["cpc"])
      train_frames_Y[i].drop(columns=["cpc"], inplace=True)

  for i in range(len(test_frames_Y)):
      test_frames_decoder_input.append(test_frames_Y[i]["cpc"])
      test_frames_Y[i].drop(columns=["cpc"], inplace=True)
    
  train_frames_X, train_frames_Y = np.array(train_frames_X), np.array(train_frames_Y)
  test_frames_X, test_frames_Y = np.array(test_frames_X), np.array(test_frames_Y)
  train_frames_embed, test_frames_embed = np.array(train_frames_embed), np.array(test_frames_embed)
  train_frames_decoder_input, test_frames_decoder_input = np.array(train_frames_decoder_input), np.array(test_frames_decoder_input)
  
  if not os.path.exists(train_frames_x_dir):
    os.makedirs(train_frames_x_dir)
  if not os.path.exists(test_frames_x_dir):
    os.makedirs(test_frames_x_dir)
  if not os.path.exists(train_frames_y_dir):
    os.makedirs(train_frames_y_dir)
  if not os.path.exists(test_frames_y_dir):
    os.makedirs(test_frames_y_dir)
  if not os.path.exists(train_frames_embed_dir):
    os.makedirs(train_frames_embed_dir)
  if not os.path.exists(test_frames_embed_dir):
    os.makedirs(test_frames_embed_dir)
  if not os.path.exists(train_frames_decoder_input_dir):
    os.makedirs(train_frames_decoder_input_dir)
  if not os.path.exists(test_frames_decoder_input_dir):
    os.makedirs(test_frames_decoder_input_dir)
    
#   train_frames_X.write.mode("overwrite").parquet(os.path.join(train_frames_x_dir, f"par_{split_index}.parquet"))
#   train_frames_Y.write.mode("overwrite").parquet(os.path.join(train_frames_y_dir, f"par_{split_index}.parquet"))
#   train_frames_embed.write.mode("overwrite").parquet(os.path.join(train_frames_embed_dir, f"par_{split_index}.parquet"))
#   train_frames_decoder_input.write.mode("overwrite").parquet(os.path.join(train_frames_decoder_input_dir, f"par_{split_index}.parquet"))
#   test_frames_X.write.mode("overwrite").parquet(os.path.join(test_frames_x_dir, f"par_{split_index}.parquet"))
#   test_frames_Y.write.mode("overwrite").parquet(os.path.join(test_frames_y_dir, f"par_{split_index}.parquet"))
#   test_frames_embed.write.mode("overwrite").parquet(os.path.join(test_frames_embed_dir, f"par_{split_index}.parquet"))
#   test_frames_decoder_input.write.mode("overwrite").parquet(os.path.join(test_frames_decoder_input_dir, f"par_{split_index}.parquet"))
  
  np.save(os.path.join(train_frames_x_dir, f"par_{split_index}.npy"), train_frames_X)
  np.save(os.path.join(test_frames_x_dir, f"par_{split_index}.npy"), test_frames_X)
  np.save(os.path.join(train_frames_y_dir, f"par_{split_index}.npy"), train_frames_Y)
  np.save(os.path.join(test_frames_y_dir, f"par_{split_index}.npy"), test_frames_Y)
  np.save(os.path.join(train_frames_embed_dir, f"par_{split_index}.npy"), train_frames_X)
  np.save(os.path.join(test_frames_embed_dir, f"par_{split_index}.npy"), test_frames_X)
  np.save(os.path.join(train_frames_decoder_input_dir, f"par_{split_index}.npy"), train_frames_Y)
  np.save(os.path.join(test_frames_decoder_input_dir, f"par_{split_index}.npy"), test_frames_Y)

  return 0


def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    args = parser.parse_args()
    
    save_bucket_name = args.s3_output_bucket
    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()

#     # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
#     spark.sparkContext._jsc.hadoopConfiguration().set(
#         "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
#     )

#     # Downloading the data from S3 into a Dataframe
#     total_df = spark.read.csv(
#         ("s3://" + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix, "abalone.csv")),
#         header=False,
#         schema=schema,
#     )

    s3 = boto3.client('s3')
    obj = s3.get_object(Bucket=args.s3_input_bucket, Key = 'processed_training_data.csv')
    dataset = pd.read_csv(obj['Body'], nrows=10)
    data_columns = dataset.columns
    spark_df = spark.createDataFrame(dataset)
    
    spark_df = spark_df.pandas_api()
    grouped = spark_df.groupby("keywordId")
    df = grouped.apply(create_training_frames)
    print(df)
    
    train_frames_X, train_frames_Y = [], []
    test_frames_X, test_frames_Y = [], []
    train_frames_embed, test_frames_embed = [], []
    train_frames_decoder_input, test_frames_decoder_input = [], []
    
    files = sorted(glob.glob(train_frames_x_dir + '/*.npy'))
    for f in files:
      train_frames_X.append(np.load(f))
    train_frames_X = np.concatenate(train_frames_X, axis=0)
    my_array_data = io.BytesIO()
    pickle.dump(train_frames_X, my_array_data)
    my_array_data.seek(0)
    s3_client.upload_fileobj(my_array_data, save_bucket_name, "train_frames_x.pkl")
    del train_frames_X
    
    files = sorted(glob.glob(train_frames_y_dir + '/*.npy'))
    for f in files:
      train_frames_Y.append(np.load(f))
    train_frames_Y = np.concatenate(train_frames_Y, axis=0)
    my_array_data = io.BytesIO()
    pickle.dump(train_frames_Y, my_array_data)
    my_array_data.seek(0)
    s3_client.upload_fileobj(my_array_data, save_bucket_name, "train_frames_y.pkl")
    del train_frames_Y
    
    files = sorted(glob.glob(train_frames_embed_dir + '/*.npy'))
    for f in files:
      train_frames_embed.append(np.load(f))
    train_frames_embed = np.concatenate(train_frames_embed, axis=0)
    my_array_data = io.BytesIO()
    pickle.dump(train_frames_embed, my_array_data)
    my_array_data.seek(0)
    s3_client.upload_fileobj(my_array_data, save_bucket_name, "train_frames_embed.pkl")
    del train_frames_embed
    
    files = sorted(glob.glob(train_frames_decoder_input_dir + '/*.npy'))
    for f in files:
      train_frames_decoder_input.append(np.load(f))
    train_frames_decoder_input = np.concatenate(train_frames_decoder_input, axis=0)
    my_array_data = io.BytesIO()
    pickle.dump(train_frames_decoder_input, my_array_data)
    my_array_data.seek(0)
    s3_client.upload_fileobj(my_array_data, save_bucket_name, "train_frames_decoder_input.pkl")
    del train_frames_decoder_input
    
    files = sorted(glob.glob(train_frames_x_dir + '/*.npy'))
    for f in files:
      test_frames_X.append(np.load(f))
    test_frames_X = np.concatenate(test_frames_X, axis=0)
    my_array_data = io.BytesIO()
    pickle.dump(test_frames_X, my_array_data)
    my_array_data.seek(0)
    s3_client.upload_fileobj(my_array_data, save_bucket_name, "test_frames_x.pkl")
    del test_frames_X
    
    files = sorted(glob.glob(test_frames_y_dir + '/*.npy'))
    for f in files:
      test_frames_Y.append(np.load(f))
    test_frames_Y = np.concatenate(test_frames_Y, axis=0)
    my_array_data = io.BytesIO()
    pickle.dump(test_frames_Y, my_array_data)
    my_array_data.seek(0)
    s3_client.upload_fileobj(my_array_data, save_bucket_name, "test_frames_y.pkl")
    del test_frames_Y
    
    files = sorted(glob.glob(test_frames_embed_dir + '/*.npy'))
    for f in files:
      test_frames_embed.append(np.load(f))
    test_frames_embed = np.concatenate(test_frames_embed, axis=0)
    my_array_data = io.BytesIO()
    pickle.dump(test_frames_embed, my_array_data)
    my_array_data.seek(0)
    s3_client.upload_fileobj(my_array_data, save_bucket_name, "test_frames_embed.pkl")
    del test_frames_embed
    
    files = sorted(glob.glob(test_frames_decoder_input_dir + '/*.npy'))
    for f in files:
      test_frames_decoder_input.append(np.load(f))
    test_frames_decoder_input = np.concatenate(test_frames_decoder_input, axis=0)
    my_array_data = io.BytesIO()
    pickle.dump(test_frames_decoder_input, my_array_data)
    my_array_data.seek(0)
    s3_client.upload_fileobj(my_array_data, save_bucket_name, "test_frames_decoder_input.pkl")
    del test_frames_decoder_input
    
    
#     train_frames_X = spark.read.parquet(os.path.join(train_frames_x_dir, "*.parquet"))
#     train_frames_Y = spark.read.parquet(os.path.join(train_frames_y_dir, "*.parquet"))
#     train_frames_embed = spark.read.parquet(os.path.join(train_frames_embed_dir, "*.parquet"))
#     train_frames_decoder_input = spark.read.parquet(os.path.join(train_frames_decoder_input_dir, "*.parquet"))
#     test_frames_X = spark.read.parquet(os.path.join(test_frames_x_dir, "*.parquet"))
#     test_frames_Y = spark.read.parquet(os.path.join(test_frames_y_dir, "*.parquet"))
#     test_frames_embed = spark.read.parquet(os.path.join(test_frames_embed_dir, "*.parquet"))
#     test_frames_decoder_input = spark.read.parquet(os.path.join(test_frames_decoder_input_dir, "*.parquet"))
    
    return

if __name__ == "__main__":
    main()

In [None]:
from sagemaker.spark.processing import PySparkProcessor

spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.t3.medium",
    max_runtime_in_seconds=1200,
)


In [None]:
spark_processor.run(
    submit_app="preprocess.py",
    arguments=[
        "--s3_input_bucket",
        "random",
        "--s3_output_bucket",
        "training-data-lstm",
    ],
)

In [None]:
a = sagemaker_session.logs_for_processing_job("sm-spark-2023-07-14-06-12-26-648")

In [None]:
import boto3

sagemaker_client = boto3.client("sagemaker")
response = sagemaker_client.list_processing_jobs()

if "ProcessingJobSummaries" in response:
    processing_jobs = response["ProcessingJobSummaries"]
    for job in processing_jobs:
        job_name = job["ProcessingJobName"]
        print("Processing job name:", job_name)
else:
    print("No processing jobs found.")

In [None]:
def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    args = parser.parse_args()
    save_bucket_name = args.s3_output_bucket

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    s3 = boto3.client('s3')

    # Load the CSV file from S3 into a Pandas DataFrame
    obj = s3.get_object(Bucket=args.s3_input_bucket, Key='processed_training_data.csv')
    dataset = pd.read_csv(obj['Body'], nrows=10)

    # Create a Spark DataFrame from the Pandas DataFrame
    spark_df = spark.createDataFrame(dataset)

    # Perform groupby and apply a function using pandas_udf
    grouped_udf = pandas_udf(create_training_frames, returnType=spark_df.schema, functionType=PandasUDFType.GROUPED_MAP)
    df = spark_df.groupby("keywordId").apply(grouped_udf)

    # Convert train_frames_X, train_frames_Y, etc. to pickle files and upload to S3
    train_frames_x_dir = "/path/to/train_frames_x"
    train_frames_y_dir = "/path/to/train_frames_y"
    train_frames_embed_dir = "/path/to/train_frames_embed"
    train_frames_decoder_input_dir = "/path/to/train_frames_decoder_input"
    test_frames_x_dir = "/path/to/test_frames_x"
    test_frames_y_dir = "/path/to/test_frames_y"
    test_frames_embed_dir = "/path/to/test_frames_embed"
    test_frames_decoder_input_dir = "/path/to/test_frames_decoder_input"

    s3_client = boto3.client('s3')

    def upload_pickle(data, bucket, key):
        my_array_data = io.BytesIO()
        pickle.dump(data, my_array_data)
        my_array_data.seek(0)
        s3_client.upload_fileobj(my_array_data, bucket, key)

    train_frames_X = np.concatenate([np.load(f) for f in glob.glob(train_frames_x_dir + '/*.npy')], axis=0)
    upload_pickle(train_frames_X, save_bucket_name, "train_frames_x.pkl")

    train_frames_Y = np.concatenate([np.load(f) for f in glob.glob(train_frames_y_dir + '/*.npy')], axis=0)
    upload_pickle(train_frames_Y, save_bucket_name, "train_frames_y.pkl")

    train_frames_embed = np.concatenate([np.load(f) for f in glob.glob(train_frames_embed_dir + '/*.npy')], axis=0)
    upload_pickle(train_frames_embed, save_bucket_name, "train_frames_embed.pkl")

    train_frames_decoder_input = np.concatenate([np.load(f) for f in glob.glob(train_frames_decoder_input_dir + '/*.npy')], axis=0)
    upload_pickle(train_frames_decoder_input, save_bucket_name, "train_frames_decoder_input.pkl")

    test_frames_X = np.concatenate([np.load(f) for f in glob.glob(test_frames_x_dir + '/*.npy')], axis=0)
    upload_pickle(test_frames_X, save_bucket_name, "test_frames_x.pkl")

    test_frames_Y = np.concatenate([np.load(f) for f in glob.glob(test_frames_y_dir + '/*.npy')], axis=0)
    upload_pickle(test_frames_Y, save_bucket_name, "test_frames_y.pkl")

    test_frames_embed = np.concatenate([np.load(f) for f in glob.glob(test_frames_embed_dir + '/*.npy')], axis=0)
    upload_pickle(test_frames_embed, save_bucket_name, "test_frames_embed.pkl")

    test_frames_decoder_input = np.concatenate([np.load(f) for f in glob.glob(test_frames_decoder_input_dir + '/*.npy')], axis=0)
    upload_pickle(test_frames_decoder_input, save_bucket_name, "test_frames_decoder_input.pkl")

    return

if __name__ == "__main__":
    main()s

In [40]:
!pip install pyarrow

IOStream.flush timed out
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com


In [41]:
import pyarrow.hdfs as hdfs

# Create a connection to HDFS
fs = hdfs.connect(host='algo-1', port=8020)

# Check if the connection is successful
if fs.is_open:
    print("Connected to HDFS successfully!")
else:
    print("Failed to connect to HDFS.")

# Perform operations on HDFS
# For example, you can list files in a directory
#file_list = fs.ls('/')
#for file_path in file_list:
#    print(file_path)

# Close the connection to HDFS
#fs.close()


  fs = hdfs.connect(host='algo-1', port=8020)


FileNotFoundError: [Errno 2] No such file or directory: 'hadoop'

In [37]:
%%writefile preprocess.py

import io
import os
import sys
import boto3
import pickle
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from numpy.lib.stride_tricks import sliding_window_view

HISTORICAL_DATA_WINDOW = 14
FUTURE_PREDICTION_WINDOW = 3

train_frames_x_dir = "./train_frames_x"
test_frames_x_dir = "./test_frames_x"
train_frames_y_dir = "./train_frames_y"
test_frames_y_dir = "./test_frames_y"
train_frames_embed_dir = "./train_frames_embed"
test_frames_embed_dir = "./test_frames_embed"
train_frames_decoder_input_dir = "./train_frames_decoder_input"
test_frames_decoder_input_dir = "./test_frames_decoder_input"


def create_training_frames(key, iterator):
    partition = pd.concat(iterator)
    partition.drop(columns=["Unnamed: 0"], inplace=True)
    partition.sort_values(by=["date"], inplace=True)
    split_index = partition.iloc[0]["keywordId"]
    
    
    train_frames_X, train_frames_Y = [], []
    test_frames_X, test_frames_Y = [], []
    train_frames_embed, test_frames_embed = [], []
    train_frames_decoder_input, test_frames_decoder_input = [], []

    train_size = (int)(len(partition) * 0.7)
    train_data = partition[:train_size]
    test_data = partition[train_size - HISTORICAL_DATA_WINDOW - FUTURE_PREDICTION_WINDOW + 1:]
    data_columns = partition.columns
    
    if train_data_arr.shape[0] < HISTORICAL_DATA_WINDOW or test_data_arr.shape[0] < HISTORICAL_DATA_WINDOW:
        return []
    
    train_data_frame_X = sliding_window_view(train_data_arr, window_shape = (HISTORICAL_DATA_WINDOW, train_data_arr.shape[1]))
    test_data_frame_X = sliding_window_view(test_data_arr, window_shape = (HISTORICAL_DATA_WINDOW, test_data_arr.shape[1]))
    train_data_frame_X = np.squeeze(train_data_frame_X)[:-FUTURE_PREDICTION_WINDOW]
    test_data_frame_X = np.squeeze(test_data_frame_X)[:-FUTURE_PREDICTION_WINDOW]
    train_data_frames_X, test_data_frames_X = train_data_frame_X.copy(), test_data_frame_X.copy()
    for frame in train_data_frames_X:
        train_frames_X.append(pd.DataFrame(frame, columns=data_columns))
    for frame in test_data_frames_X:
        test_frames_X.append(pd.DataFrame(frame, columns=data_columns))

    train_data_frame_Y = sliding_window_view(train_data_arr, window_shape = (FUTURE_PREDICTION_WINDOW, train_data_arr.shape[1]))
    test_data_frame_Y = sliding_window_view(test_data_arr, window_shape = (FUTURE_PREDICTION_WINDOW, test_data_arr.shape[1]))
    train_data_frame_Y = np.squeeze(train_data_frame_Y)[HISTORICAL_DATA_WINDOW:]
    test_data_frame_Y = np.squeeze(test_data_frame_Y)[HISTORICAL_DATA_WINDOW:]
    train_data_frames_Y, test_data_frames_Y = train_data_frame_Y.copy(), test_data_frame_Y.copy()
    for frame in train_data_frames_Y:
        train_frames_Y.append(pd.DataFrame(frame, columns=data_columns))
    for frame in test_data_frames_Y:
        test_frames_Y.append(pd.DataFrame(frame, columns=data_columns))   
    
    
    embedding_columns = ["keyword_length", "keyword_num_words", "budget", "matchType", "country_code", "campaign_type",
                         "targeting_type", "budget_type", "adFormat", "tactic", "costType"]

    for i in range(len(train_frames_X)):
        train_frames_embed.append(train_frames_X[i].loc[0][embedding_columns])
        train_frames_X[i].drop(columns=embedding_columns, inplace=True)

    for i in range(len(test_frames_X)):
        test_frames_embed.append(test_frames_X[i].loc[0][embedding_columns])
        test_frames_X[i].drop(columns=embedding_columns, inplace=True)

    for i in range(len(train_frames_X)):
        train_frames_X[i].drop(columns=["keywordId", "date"], inplace=True)
        train_frames_Y[i].drop(columns=embedding_columns, inplace=True)
        train_frames_Y[i].drop(
            columns=["keywordId", "date", "year", "month", "day", "dayoftheweek", "clicks", "impressions", "orders",
                     "campaign_sales_perc", "campaign_spend_perc", "account_sales_perc", "account_spend_perc"],
            inplace=True)

    for i in range(len(test_frames_X)):
        test_frames_X[i].drop(columns=["keywordId", "date"], inplace=True)
        test_frames_Y[i].drop(columns=embedding_columns, inplace=True)
        test_frames_Y[i].drop(
            columns=["keywordId", "date", "year", "month", "day", "dayoftheweek", "clicks", "impressions", "orders",
                     "campaign_sales_perc", "campaign_spend_perc", "account_sales_perc", "account_spend_perc"],
            inplace=True)

    for i in range(len(train_frames_Y)):
        train_frames_decoder_input.append(train_frames_Y[i]["cpc"])
        train_frames_Y[i].drop(columns=["cpc"], inplace=True)

    for i in range(len(test_frames_Y)):
        test_frames_decoder_input.append(test_frames_Y[i]["cpc"])
        test_frames_Y[i].drop(columns=["cpc"], inplace=True)

    train_frames_X, train_frames_Y = np.array(train_frames_X), np.array(train_frames_Y)
    test_frames_X, test_frames_Y = np.array(test_frames_X), np.array(test_frames_Y)
    train_frames_embed, test_frames_embed = np.array(train_frames_embed), np.array(test_frames_embed)
    train_frames_decoder_input, test_frames_decoder_input = np.array(train_frames_decoder_input), np.array(
        test_frames_decoder_input)

    if not os.path.exists(train_frames_x_dir):
        os.makedirs(train_frames_x_dir)
    if not os.path.exists(test_frames_x_dir):
        os.makedirs(test_frames_x_dir)
    if not os.path.exists(train_frames_y_dir):
        os.makedirs(train_frames_y_dir)
    if not os.path.exists(test_frames_y_dir):
        os.makedirs(test_frames_y_dir)
    if not os.path.exists(train_frames_embed_dir):
        os.makedirs(train_frames_embed_dir)
    if not os.path.exists(test_frames_embed_dir):
        os.makedirs(test_frames_embed_dir)
    if not os.path.exists(train_frames_decoder_input_dir):
        os.makedirs(train_frames_decoder_input_dir)
    if not os.path.exists(test_frames_decoder_input_dir):
        os.makedirs(test_frames_decoder_input_dir)

    np.save(os.path.join(train_frames_x_dir, f"par_{split_index}.npy"), train_frames_X)
    np.save(os.path.join(test_frames_x_dir, f"par_{split_index}.npy"), test_frames_X)
    np.save(os.path.join(train_frames_y_dir, f"par_{split_index}.npy"), train_frames_Y)
    np.save(os.path.join(test_frames_y_dir, f"par_{split_index}.npy"), test_frames_Y)
    np.save(os.path.join(train_frames_embed_dir, f"par_{split_index}.npy"), train_frames_X)
    np.save(os.path.join(test_frames_embed_dir, f"par_{split_index}.npy"), test_frames_X)
    np.save(os.path.join(train_frames_decoder_input_dir, f"par_{split_index}.npy"), train_frames_Y)
    np.save(os.path.join(test_frames_decoder_input_dir, f"par_{split_index}.npy"), test_frames_Y)

    return []


def main():
    
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    args = parser.parse_args()
    
    save_bucket_name = args.s3_output_bucket
    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    
    s3 = boto3.client('s3')
    obj = s3.get_object(Bucket=args.s3_input_bucket, Key='processed_training_data.csv')
    dataset = pd.read_csv(obj['Body'], nrows=10)
    data_columns = dataset.columns
    spark_df = spark.createDataFrame(dataset)
    
    # Convert Spark DataFrame to Pandas DataFrame
    pandas_df = spark_df.toPandas()

    # Convert Pandas DataFrame back to Spark DataFrame and repartition
    spark_df = spark.createDataFrame(pandas_df)
    spark_df = spark_df.repartition(spark_df.rdd.getNumPartitions())

    # Apply the function to each partition using Spark
    spark_df.rdd.mapPartitionsWithIndex(create_training_frames).collect()
    
    # Apply the function to each partition using Spark
    spark.sparkContext.parallelize(partitions).mapPartitionsWithIndex(create_training_frames).collect()
    
    # Upload the resulting files to S3
    s3_client = boto3.client('s3')
    
    def upload_pickle(data, bucket, key):
        my_array_data = io.BytesIO()
        pickle.dump(data, my_array_data)
        my_array_data.seek(0)
        s3_client.upload_fileobj(my_array_data, bucket, key)
        
    files = sorted(glob.glob(train_frames_x_dir + '/*.npy'))
    train_frames_X = np.concatenate([np.load(f) for f in files], axis=0)
    upload_pickle(train_frames_X, save_bucket_name, "train_frames_x.pkl")

    files = sorted(glob.glob(train_frames_y_dir + '/*.npy'))
    train_frames_Y = np.concatenate([np.load(f) for f in files], axis=0)
    upload_pickle(train_frames_Y, save_bucket_name, "train_frames_y.pkl")

    files = sorted(glob.glob(train_frames_embed_dir + '/*.npy'))
    train_frames_embed = np.concatenate([np.load(f) for f in files], axis=0)
    upload_pickle(train_frames_embed, save_bucket_name, "train_frames_embed.pkl")

    files = sorted(glob.glob(train_frames_decoder_input_dir + '/*.npy'))
    train_frames_decoder_input = np.concatenate([np.load(f) for f in files], axis=0)
    upload_pickle(train_frames_decoder_input, save_bucket_name, "train_frames_decoder_input.pkl")

    files = sorted(glob.glob(test_frames_x_dir + '/*.npy'))
    test_frames_X = np.concatenate([np.load(f) for f in files], axis=0)
    upload_pickle(test_frames_X, save_bucket_name, "test_frames_x.pkl")

    files = sorted(glob.glob(test_frames_y_dir + '/*.npy'))
    test_frames_Y = np.concatenate([np.load(f) for f in files], axis=0)
    upload_pickle(test_frames_Y, save_bucket_name, "test_frames_y.pkl")

    files = sorted(glob.glob(test_frames_embed_dir + '/*.npy'))
    test_frames_embed = np.concatenate([np.load(f) for f in files], axis=0)
    upload_pickle(test_frames_embed, save_bucket_name, "test_frames_embed.pkl")

    files = sorted(glob.glob(test_frames_decoder_input_dir + '/*.npy'))
    test_frames_decoder_input = np.concatenate([np.load(f) for f in files], axis=0)
    upload_pickle(test_frames_decoder_input, save_bucket_name, "test_frames_decoder_input.pkl")

    return 

if __name__ == "__main__":
    main()

Overwriting preprocess.py


In [38]:
from sagemaker.spark.processing import PySparkProcessor

spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.t3.medium",
    max_runtime_in_seconds=1200,
)

In [39]:
spark_processor.run(
    submit_app="preprocess.py",
    arguments=[
        "--s3_input_bucket",
        "random",
        "--s3_output_bucket",
        "training-data-lstm",
    ],
)

Creating processing-job with name sm-spark-2023-07-18-06-44-57-714
INFO:sagemaker:Creating processing-job with name sm-spark-2023-07-18-06-44-57-714


...............................................................................[34m07-18 06:58 smspark.cli  INFO     Parsing arguments. argv: ['/usr/local/bin/smspark-submit', '/opt/ml/processing/input/code/preprocess.py', '--s3_input_bucket', 'random', '--s3_output_bucket', 'training-data-lstm'][0m
[34m07-18 06:58 smspark.cli  INFO     Raw spark options before processing: {'class_': None, 'jars': None, 'py_files': None, 'files': None, 'verbose': False}[0m
[34m07-18 06:58 smspark.cli  INFO     App and app arguments: ['/opt/ml/processing/input/code/preprocess.py', '--s3_input_bucket', 'random', '--s3_output_bucket', 'training-data-lstm'][0m
[34m07-18 06:58 smspark.cli  INFO     Rendered spark options: {'class_': None, 'jars': None, 'py_files': None, 'files': None, 'verbose': False}[0m
[34m07-18 06:58 smspark.cli  INFO     Initializing processing job.[0m
[34m07-18 06:58 smspark-submit INFO     {'current_host': 'algo-2', 'hosts': ['algo-1', 'algo-2']}[0m
[34m07-18 06:58 smspa

UnexpectedStatusException: Error for Processing job sm-spark-2023-07-18-06-44-57-714: Failed. Reason: AlgorithmError: See job logs for more information

In [15]:
!pip install currencyconverter
!pip install -U pandas==1.5.3

IOStream.flush timed out
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
IOStream.flush timed out
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com


In [27]:
import io
import os
import sys
import boto3
import pickle
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from numpy.lib.stride_tricks import sliding_window_view

In [28]:
s3 = boto3.client('s3')
obj = s3.get_object(Bucket='training-data-lstm', Key = 'processed_training_data.csv')

In [29]:
dataset = pd.read_csv(obj['Body'],nrows = 50)
dataset.drop_duplicates(subset=["keywordId", "date"], keep='first', inplace=True)
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()
spark_dataset = spark.createDataFrame(dataset)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [30]:
pandas_df = spark_dataset.toPandas()
pandas_df

Unnamed: 0.1,Unnamed: 0,keywordId,date,clicks,impressions,orders,budget,campaign_sales_perc,account_sales_perc,campaign_spend_perc,...,targeting_type,budget_type,adFormat,tactic,costType,cpc,year,month,day,dayoftheweek
0,1124466,45328926266934,2023-01-13,0,37,0,500.0,0.0,0.0,0.0,...,2,0,2,0,0,0.0,2023,1,13,4
1,1124467,72423678058542,2023-01-13,4,131,0,500.0,0.0,0.0,0.023576,...,2,0,2,0,0,0.036826,2023,1,13,4
2,1124468,167712247204160,2023-01-13,0,12,0,500.0,0.0,0.0,0.0,...,2,0,2,0,0,0.0,2023,1,13,4
3,1124469,264836381289639,2023-01-13,1,73,0,500.0,0.0,0.0,0.005894,...,2,0,2,0,0,0.036826,2023,1,13,4
4,1124470,246060444937210,2023-01-13,1,11,0,500.0,0.0,0.0,0.005894,...,2,0,2,0,0,0.036826,2023,1,13,4
5,1124471,69020599926806,2023-01-13,5,105,0,500.0,0.0,0.0,0.02947,...,2,0,2,0,0,0.036826,2023,1,13,4
6,1124472,172870446609549,2023-01-13,1,27,0,500.0,0.0,0.0,0.005894,...,2,0,2,0,0,0.036826,2023,1,13,4
7,1124473,275509676939985,2023-01-13,1,77,0,500.0,0.0,0.0,0.005894,...,2,0,2,0,0,0.036826,2023,1,13,4
8,1124474,62686449994060,2023-01-13,2,147,0,500.0,0.0,0.0,0.011788,...,2,0,2,0,0,0.036826,2023,1,13,4
9,1124475,185241824852577,2023-01-13,1,101,0,500.0,0.0,0.0,0.005894,...,2,0,2,0,0,0.036826,2023,1,13,4


In [31]:
spark_df = spark.createDataFrame(pandas_df)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [32]:
spark_df = spark_df.repartition(spark_df.rdd.getNumPartitions())

In [33]:
spark_df

DataFrame[Unnamed: 0: bigint, keywordId: bigint, date: string, clicks: bigint, impressions: bigint, orders: bigint, budget: double, campaign_sales_perc: double, account_sales_perc: double, campaign_spend_perc: double, account_spend_perc: double, spend_usd: double, sales_usd: double, keyword_length: bigint, keyword_num_words: bigint, matchType: bigint, country_code: bigint, campaign_type: bigint, targeting_type: bigint, budget_type: bigint, adFormat: bigint, tactic: bigint, costType: bigint, cpc: double, year: bigint, month: bigint, day: bigint, dayoftheweek: bigint]

In [8]:
import io
import os
import sys
import boto3
import pickle
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from numpy.lib.stride_tricks import sliding_window_view

HISTORICAL_DATA_WINDOW = 14
FUTURE_PREDICTION_WINDOW = 3

train_frames_x_dir = "./train_frames_x"
test_frames_x_dir = "./test_frames_x"
train_frames_y_dir = "./train_frames_y"
test_frames_y_dir = "./test_frames_y"
train_frames_embed_dir = "./train_frames_embed"
test_frames_embed_dir = "./test_frames_embed"
train_frames_decoder_input_dir = "./train_frames_decoder_input"
test_frames_decoder_input_dir = "./test_frames_decoder_input"


def create_training_frames(key, iterator):
    partition = pd.concat(iterator)
    partition.drop(columns=["Unnamed: 0"], inplace=True)
    partition.sort_values(by=["date"], inplace=True)
    split_index = partition.iloc[0]["keywordId"]
    
    
    train_frames_X, train_frames_Y = [], []
    test_frames_X, test_frames_Y = [], []
    train_frames_embed, test_frames_embed = [], []
    train_frames_decoder_input, test_frames_decoder_input = [], []

    train_size = (int)(len(partition) * 0.7)
    train_data = partition[:train_size]
    test_data = partition[train_size - HISTORICAL_DATA_WINDOW - FUTURE_PREDICTION_WINDOW + 1:]
    data_columns = partition.columns
    
    if train_data_arr.shape[0] < HISTORICAL_DATA_WINDOW or test_data_arr.shape[0] < HISTORICAL_DATA_WINDOW:
        return []
    
    train_data_frame_X = sliding_window_view(train_data_arr, window_shape = (HISTORICAL_DATA_WINDOW, train_data_arr.shape[1]))
    test_data_frame_X = sliding_window_view(test_data_arr, window_shape = (HISTORICAL_DATA_WINDOW, test_data_arr.shape[1]))
    train_data_frame_X = np.squeeze(train_data_frame_X)[:-FUTURE_PREDICTION_WINDOW]
    test_data_frame_X = np.squeeze(test_data_frame_X)[:-FUTURE_PREDICTION_WINDOW]
    train_data_frames_X, test_data_frames_X = train_data_frame_X.copy(), test_data_frame_X.copy()
    for frame in train_data_frames_X:
        train_frames_X.append(pd.DataFrame(frame, columns=data_columns))
    for frame in test_data_frames_X:
        test_frames_X.append(pd.DataFrame(frame, columns=data_columns))

    train_data_frame_Y = sliding_window_view(train_data_arr, window_shape = (FUTURE_PREDICTION_WINDOW, train_data_arr.shape[1]))
    test_data_frame_Y = sliding_window_view(test_data_arr, window_shape = (FUTURE_PREDICTION_WINDOW, test_data_arr.shape[1]))
    train_data_frame_Y = np.squeeze(train_data_frame_Y)[HISTORICAL_DATA_WINDOW:]
    test_data_frame_Y = np.squeeze(test_data_frame_Y)[HISTORICAL_DATA_WINDOW:]
    train_data_frames_Y, test_data_frames_Y = train_data_frame_Y.copy(), test_data_frame_Y.copy()
    for frame in train_data_frames_Y:
        train_frames_Y.append(pd.DataFrame(frame, columns=data_columns))
    for frame in test_data_frames_Y:
        test_frames_Y.append(pd.DataFrame(frame, columns=data_columns))   
    
    
    embedding_columns = ["keyword_length", "keyword_num_words", "budget", "matchType", "country_code", "campaign_type",
                         "targeting_type", "budget_type", "adFormat", "tactic", "costType"]

    for i in range(len(train_frames_X)):
        train_frames_embed.append(train_frames_X[i].loc[0][embedding_columns])
        train_frames_X[i].drop(columns=embedding_columns, inplace=True)

    for i in range(len(test_frames_X)):
        test_frames_embed.append(test_frames_X[i].loc[0][embedding_columns])
        test_frames_X[i].drop(columns=embedding_columns, inplace=True)

    for i in range(len(train_frames_X)):
        train_frames_X[i].drop(columns=["keywordId", "date"], inplace=True)
        train_frames_Y[i].drop(columns=embedding_columns, inplace=True)
        train_frames_Y[i].drop(
            columns=["keywordId", "date", "year", "month", "day", "dayoftheweek", "clicks", "impressions", "orders",
                     "campaign_sales_perc", "campaign_spend_perc", "account_sales_perc", "account_spend_perc"],
            inplace=True)

    for i in range(len(test_frames_X)):
        test_frames_X[i].drop(columns=["keywordId", "date"], inplace=True)
        test_frames_Y[i].drop(columns=embedding_columns, inplace=True)
        test_frames_Y[i].drop(
            columns=["keywordId", "date", "year", "month", "day", "dayoftheweek", "clicks", "impressions", "orders",
                     "campaign_sales_perc", "campaign_spend_perc", "account_sales_perc", "account_spend_perc"],
            inplace=True)

    for i in range(len(train_frames_Y)):
        train_frames_decoder_input.append(train_frames_Y[i]["cpc"])
        train_frames_Y[i].drop(columns=["cpc"], inplace=True)

    for i in range(len(test_frames_Y)):
        test_frames_decoder_input.append(test_frames_Y[i]["cpc"])
        test_frames_Y[i].drop(columns=["cpc"], inplace=True)

    train_frames_X, train_frames_Y = np.array(train_frames_X), np.array(train_frames_Y)
    test_frames_X, test_frames_Y = np.array(test_frames_X), np.array(test_frames_Y)
    train_frames_embed, test_frames_embed = np.array(train_frames_embed), np.array(test_frames_embed)
    train_frames_decoder_input, test_frames_decoder_input = np.array(train_frames_decoder_input), np.array(
        test_frames_decoder_input)

    if not os.path.exists(train_frames_x_dir):
        os.makedirs(train_frames_x_dir)
    if not os.path.exists(test_frames_x_dir):
        os.makedirs(test_frames_x_dir)
    if not os.path.exists(train_frames_y_dir):
        os.makedirs(train_frames_y_dir)
    if not os.path.exists(test_frames_y_dir):
        os.makedirs(test_frames_y_dir)
    if not os.path.exists(train_frames_embed_dir):
        os.makedirs(train_frames_embed_dir)
    if not os.path.exists(test_frames_embed_dir):
        os.makedirs(test_frames_embed_dir)
    if not os.path.exists(train_frames_decoder_input_dir):
        os.makedirs(train_frames_decoder_input_dir)
    if not os.path.exists(test_frames_decoder_input_dir):
        os.makedirs(test_frames_decoder_input_dir)

    np.save(os.path.join(train_frames_x_dir, f"par_{split_index}.npy"), train_frames_X)
    np.save(os.path.join(test_frames_x_dir, f"par_{split_index}.npy"), test_frames_X)
    np.save(os.path.join(train_frames_y_dir, f"par_{split_index}.npy"), train_frames_Y)
    np.save(os.path.join(test_frames_y_dir, f"par_{split_index}.npy"), test_frames_Y)
    np.save(os.path.join(train_frames_embed_dir, f"par_{split_index}.npy"), train_frames_X)
    np.save(os.path.join(test_frames_embed_dir, f"par_{split_index}.npy"), test_frames_X)
    np.save(os.path.join(train_frames_decoder_input_dir, f"par_{split_index}.npy"), train_frames_Y)
    np.save(os.path.join(test_frames_decoder_input_dir, f"par_{split_index}.npy"), test_frames_Y)

    return []


In [35]:
spark.sparkContext.parallelize(partitions).mapPartitionsWithIndex(create_training_frames).collect()

NameError: name 'partitions' is not defined

In [36]:
import io
import os
import sys
import boto3
import pickle
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from numpy.lib.stride_tricks import sliding_window_view

s3 = boto3.client('s3')
obj = s3.get_object(Bucket='training-data-lstm', Key='processed_training_data.csv')
dataset = pd.read_csv(obj['Body'], nrows=50)
dataset.drop_duplicates(subset=["keywordId", "date"], keep='first', inplace=True)

spark = SparkSession.builder.appName("DataProcessing").getOrCreate()
spark_dataset = spark.createDataFrame(dataset)
spark_df = spark_dataset.repartition(spark_df.rdd.getNumPartitions())

HISTORICAL_DATA_WINDOW = 14
FUTURE_PREDICTION_WINDOW = 3

train_frames_x_dir = "./train_frames_x"
test_frames_x_dir = "./test_frames_x"
train_frames_y_dir = "./train_frames_y"
test_frames_y_dir = "./test_frames_y"
train_frames_embed_dir = "./train_frames_embed"
test_frames_embed_dir = "./test_frames_embed"
train_frames_decoder_input_dir = "./train_frames_decoder_input"
test_frames_decoder_input_dir = "./test_frames_decoder_input"

def create_training_frames(partition):
    partition.drop(columns=["Unnamed: 0"], inplace=True)
    partition.sort_values(by=["date"], inplace=True)
    split_index = partition.iloc[0]["keywordId"]
    
    train_frames_X, train_frames_Y = [], []
    test_frames_X, test_frames_Y = [], []
    train_frames_embed, test_frames_embed = [], []
    train_frames_decoder_input, test_frames_decoder_input = [], []

    train_size = int(len(partition) * 0.7)
    train_data = partition[:train_size]
    test_data = partition[train_size - HISTORICAL_DATA_WINDOW - FUTURE_PREDICTION_WINDOW + 1:]
    data_columns = partition.columns
    
    if train_data.shape[0] < HISTORICAL_DATA_WINDOW or test_data.shape[0] < HISTORICAL_DATA_WINDOW:
        return []
    
    train_data_arr = train_data.to_numpy()
    test_data_arr = test_data.to_numpy()
    
    train_data_frame_X = sliding_window_view(train_data_arr, window_shape=(HISTORICAL_DATA_WINDOW, train_data_arr.shape[1]))
    test_data_frame_X = sliding_window_view(test_data_arr, window_shape=(HISTORICAL_DATA_WINDOW, test_data_arr.shape[1]))
    train_data_frame_X = np.squeeze(train_data_frame_X)[:-FUTURE_PREDICTION_WINDOW]
    test_data_frame_X = np.squeeze(test_data_frame_X)[:-FUTURE_PREDICTION_WINDOW]
    train_data_frames_X, test_data_frames_X = train_data_frame_X.copy(), test_data_frame_X.copy()
    
    for frame in train_data_frames_X:
        train_frames_X.append(pd.DataFrame(frame, columns=data_columns))
    for frame in test_data_frames_X:
        test_frames_X.append(pd.DataFrame(frame, columns=data_columns))

    train_data_frame_Y = sliding_window_view(train_data_arr, window_shape=(FUTURE_PREDICTION_WINDOW, train_data_arr.shape[1]))
    test_data_frame_Y = sliding_window_view(test_data_arr, window_shape=(FUTURE_PREDICTION_WINDOW, test_data_arr.shape[1]))
    train_data_frame_Y = np.squeeze(train_data_frame_Y)[HISTORICAL_DATA_WINDOW:]
    test_data_frame_Y = np.squeeze(test_data_frame_Y)[HISTORICAL_DATA_WINDOW:]
    train_data_frames_Y, test_data_frames_Y = train_data_frame_Y.copy(), test_data_frame_Y.copy()
    
    for frame in train_data_frames_Y:
        train_frames_Y.append(pd.DataFrame(frame, columns=data_columns))
    for frame in test_data_frames_Y:
        test_frames_Y.append(pd.DataFrame(frame, columns=data_columns))   
    
    embedding_columns = ["keyword_length", "keyword_num_words", "budget", "matchType", "country_code", "campaign_type",
                         "targeting_type", "budget_type", "adFormat", "tactic", "costType"]

    for i in range(len(train_frames_X)):
        train_frames_embed.append(train_frames_X[i].loc[0][embedding_columns])
        train_frames_X[i].drop(columns=embedding_columns, inplace=True)

    for i in range(len(test_frames_X)):
        test_frames_embed.append(test_frames_X[i].loc[0][embedding_columns])
        test_frames_X[i].drop(columns=embedding_columns, inplace=True)

    for i in range(len(train_frames_X)):
        train_frames_X[i].drop(columns=["keywordId", "date"], inplace=True)
        train_frames_Y[i].drop(columns=embedding_columns, inplace=True)
        train_frames_Y[i].drop(
            columns=["keywordId", "date", "year", "month", "day", "dayoftheweek", "clicks", "impressions", "orders",
                     "campaign_sales_perc", "campaign_spend_perc", "account_sales_perc", "account_spend_perc"],
            inplace=True)

    for i in range(len(test_frames_X)):
        test_frames_X[i].drop(columns=["keywordId", "date"], inplace=True)
        test_frames_Y[i].drop(columns=embedding_columns, inplace=True)
        test_frames_Y[i].drop(
            columns=["keywordId", "date", "year", "month", "day", "dayoftheweek", "clicks", "impressions", "orders",
                     "campaign_sales_perc", "campaign_spend_perc", "account_sales_perc", "account_spend_perc"],
            inplace=True)

    for i in range(len(train_frames_Y)):
        train_frames_decoder_input.append(train_frames_Y[i]["cpc"])
        train_frames_Y[i].drop(columns=["cpc"], inplace=True)

    for i in range(len(test_frames_Y)):
        test_frames_decoder_input.append(test_frames_Y[i]["cpc"])
        test_frames_Y[i].drop(columns=["cpc"], inplace=True)

    train_frames_X, train_frames_Y = np.array(train_frames_X), np.array(train_frames_Y)
    test_frames_X, test_frames_Y = np.array(test_frames_X), np.array(test_frames_Y)
    train_frames_embed, test_frames_embed = np.array(train_frames_embed), np.array(test_frames_embed)
    train_frames_decoder_input, test_frames_decoder_input = np.array(train_frames_decoder_input), np.array(
        test_frames_decoder_input)

    if not os.path.exists(train_frames_x_dir):
        os.makedirs(train_frames_x_dir)
    if not os.path.exists(test_frames_x_dir):
        os.makedirs(test_frames_x_dir)
    if not os.path.exists(train_frames_y_dir):
        os.makedirs(train_frames_y_dir)
    if not os.path.exists(test_frames_y_dir):
        os.makedirs(test_frames_y_dir)
    if not os.path.exists(train_frames_embed_dir):
        os.makedirs(train_frames_embed_dir)
    if not os.path.exists(test_frames_embed_dir):
        os.makedirs(test_frames_embed_dir)
    if not os.path.exists(train_frames_decoder_input_dir):
        os.makedirs(train_frames_decoder_input_dir)
    if not os.path.exists(test_frames_decoder_input_dir):
        os.makedirs(test_frames_decoder_input_dir)

    np.save(os.path.join(train_frames_x_dir, f"par_{split_index}.npy"), train_frames_X)
    np.save(os.path.join(test_frames_x_dir, f"par_{split_index}.npy"), test_frames_X)
    np.save(os.path.join(train_frames_y_dir, f"par_{split_index}.npy"), train_frames_Y)
    np.save(os.path.join(test_frames_y_dir, f"par_{split_index}.npy"), test_frames_Y)
    np.save(os.path.join(train_frames_embed_dir, f"par_{split_index}.npy"), train_frames_X)
    np.save(os.path.join(test_frames_embed_dir, f"par_{split_index}.npy"), test_frames_X)
    np.save(os.path.join(train_frames_decoder_input_dir, f"par_{split_index}.npy"), train_frames_Y)
    np.save(os.path.join(test_frames_decoder_input_dir, f"par_{split_index}.npy"), test_frames_Y)

    return []

partitions = spark_df.rdd.getNumPartitions()
spark_df.rdd.mapPartitionsWithIndex(create_training_frames).collect()


  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
[Stage 9:>                                                          (0 + 4) / 4]

23/07/18 06:15:00 ERROR Executor: Exception in task 2.0 in stage 9.0 (TID 30)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
TypeError: create_training_frames() takes 1 positional argument but 2 were given

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 1 times, most recent failure: Lost task 1.0 in stage 9.0 (TID 29) (ip-172-16-92-53.eu-west-1.compute.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
TypeError: create_training_frames() takes 1 positional argument but 2 were given

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
TypeError: create_training_frames() takes 1 positional argument but 2 were given

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
