# Model Training Data Preparation 

In this notebook, we perform the following tasks:

    1. Down features data files in data14group1-ml S3 bucket(from ETL job).
    2. Create a Spark dataframe and audit the data set
    3. Split (randomly) the data set into (train:80%, validation:20%)
    4. Delete the hidden files (sagemaker estimator does not like these files)

In [13]:
import boto3
import sagemaker
import os

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


In [14]:
# Store the current SageMaker session
session = sagemaker.Session()
# Define the global bucket name
bucket = "data14group1-ml"

### Download trainval data parquet files from S3 bucket 

In [3]:
# Create a Boto3 client
s3_client = boto3.client('s3')

# Define S3 bucket and folder path
bucket_name = "data14group1-ml"
s3_folder_path = 'data/trainval/'

# Define the local directory where files will be downloaded
local_dir = "data/"

# Ensure the local directory exists
if not os.path.exists(local_dir):
    os.makedirs(local_dir)

# List all objects in the S3 folder
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=s3_folder_path)

# Iterate through the objects and download each one
for obj in response.get('Contents', []):
    # Get the file path
    s3_file_path = obj['Key']
    file_name = os.path.basename(s3_file_path)

    # Define the local file path
    local_file_path = os.path.join(local_dir, file_name)

    # Download the file from S3
    s3_client.download_file(bucket_name, s3_file_path, local_file_path)
    print(f'Downloaded {s3_file_path} to {local_file_path}')

print("\ndata download complete")

Downloaded data/trainval/part-00000-45907027-bb5a-46b1-bc94-25f1cd988849-c000.snappy.parquet to data/part-00000-45907027-bb5a-46b1-bc94-25f1cd988849-c000.snappy.parquet
Downloaded data/trainval/part-00001-45907027-bb5a-46b1-bc94-25f1cd988849-c000.snappy.parquet to data/part-00001-45907027-bb5a-46b1-bc94-25f1cd988849-c000.snappy.parquet
Downloaded data/trainval/part-00002-45907027-bb5a-46b1-bc94-25f1cd988849-c000.snappy.parquet to data/part-00002-45907027-bb5a-46b1-bc94-25f1cd988849-c000.snappy.parquet
Downloaded data/trainval/part-00003-45907027-bb5a-46b1-bc94-25f1cd988849-c000.snappy.parquet to data/part-00003-45907027-bb5a-46b1-bc94-25f1cd988849-c000.snappy.parquet

data download complete


### Split data into train, valuation

In [7]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("ReadParquetFromData").getOrCreate()

# Define file path
file_path = "data/"

# Read the Parquet file into a DataFrame
data = spark.read.parquet(file_path)

print("data loading complete")

data loading complete


### Check data integrity

In [8]:
from pyspark.sql.functions import isnan, col

# Check data count (8474661)
print(f"data number of rows: {data.count()}")

data number of rows: 8474661


In [4]:
# Show the columns that contain NaN value
nan_count = 0
for c in data.columns:
    count = 0
    count = data.filter(isnan(col(c))).count()
    if count > 0:
        nan_count += 1
        print(f"There are {count} records has NaN value for column: {c}")
if nan_count == 0:
    print("No NaN values detected")
else:
    print("Check records with NaN values")



No NaN values detected


                                                                                

### Split data into train set and validation set

In [9]:
# Split the data into training (80%) and validation (20%) sets
seed = 42
train, validation = data.randomSplit([0.8,0.2], seed=seed)
print("train and validation sets are randomly selected")

print(train.count())
print(validation.count())

train and validation sets are randomly selected


                                                                                

6778638




1696023


                                                                                

### Save train and validation sets to local folders

In [10]:
file_path = "train/"
train.write.mode("overwrite").parquet(file_path)
print("train file saving complete")



train file saving complete


                                                                                

In [None]:
file_path = "validation/"
validation.write.mode("overwrite").parquet(file_path)
print("validation file saving complete")



### Save train and validation sets to local folders

In [None]:
file_path = "train/"
train.write.mode("overwrite").parquet(file_path)
print("train file saving complete")

file_path = "validation/"
validation.write.mode("overwrite").parquet(file_path)
print("validation file saving complete")

### Upload to S3 bucket

In [15]:
prefix = "data"
session.upload_data(
    "train/", bucket=bucket, key_prefix=f"{prefix}/train"
)

session.upload_data(
    "validation/", bucket=bucket, key_prefix=f"{prefix}/validation"
)
print("upload complete")

upload complete


### Make sure delete hidden files in the train and validation folder in S3 before training

Model training won't work if we don't delete them!!!

In [17]:
s3_client = boto3.client('s3')
# Specify your bucket name and prefix
bucket = "data14group1-ml"
prefix_train = "data/train"
prefix_validation = "data/validation"

def delete_crc(bucket_name, prefix):
    # List and delete .crc files
    paginator = s3_client.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        for obj in page.get('Contents', []):
            key = obj['Key']
            if key.endswith('.crc'):
                print(f'Deleting {key}')
                s3_client.delete_object(Bucket=bucket_name, Key=key)

delete_crc(bucket, prefix_train)
delete_crc(bucket, prefix_validation)

Deleting data/train/._SUCCESS.crc
Deleting data/train/.part-00000-1c50e827-e50f-4982-aa06-c4eae4d74e91-c000.snappy.parquet.crc
Deleting data/train/.part-00001-1c50e827-e50f-4982-aa06-c4eae4d74e91-c000.snappy.parquet.crc
Deleting data/train/.part-00002-1c50e827-e50f-4982-aa06-c4eae4d74e91-c000.snappy.parquet.crc
Deleting data/train/.part-00003-1c50e827-e50f-4982-aa06-c4eae4d74e91-c000.snappy.parquet.crc
Deleting data/validation/._SUCCESS.crc
Deleting data/validation/.part-00000-c3087eba-79fd-4c30-b3fc-65f1a2efc3fd-c000.snappy.parquet.crc
Deleting data/validation/.part-00001-c3087eba-79fd-4c30-b3fc-65f1a2efc3fd-c000.snappy.parquet.crc
Deleting data/validation/.part-00002-c3087eba-79fd-4c30-b3fc-65f1a2efc3fd-c000.snappy.parquet.crc
Deleting data/validation/.part-00003-c3087eba-79fd-4c30-b3fc-65f1a2efc3fd-c000.snappy.parquet.crc


### Delete local data folders

In [18]:
!rm -r data train validation