In [1]:
pip install pandas fastparquet




In [2]:
pip install requests

Note: you may need to restart the kernel to use updated packages.


In [3]:
import os
import requests
from datetime import datetime

# Function to generate URLs for each month
def generate_price_data_urls(start_date, end_date):
    urls = []
    current = start_date.replace(day=1)
    while current <= end_date:
        url = f"https://storage.data.gov.my/pricecatcher/pricecatcher_{current.strftime('%Y-%m')}.parquet"
        urls.append(url)
        # Move to the next month
        if current.month == 12:
            current = current.replace(year=current.year + 1, month=1)
        else:
            current = current.replace(month=current.month + 1)
    return urls

# Define the date range for price data
start_date = datetime(2022, 1, 1)
end_date = datetime(2024, 11, 1)  # Assuming data is up to November 2024

# Generate all URLs
price_data_urls = generate_price_data_urls(start_date, end_date)

# Create 'dataset' directory if it doesn't exist
dataset_dir = 'dataset'
os.makedirs(dataset_dir, exist_ok=True)

# Fetch and save each Parquet file
for url in price_data_urls:
    try:
        print(f"Fetching data from: {url}")
        response = requests.get(url, stream=True)
        response.raise_for_status()  # Raise an error for bad status codes
        
        # Extract filename from URL
        filename = url.split('/')[-1]
        file_path = os.path.join(dataset_dir, filename)
        
        # Write content to file in chunks to handle large files efficiently
        with open(file_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:  # Filter out keep-alive chunks
                    f.write(chunk)
        
        print(f"Successfully saved to: {file_path}")
    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP error occurred for {url}: {http_err}")
    except requests.exceptions.ConnectionError as conn_err:
        print(f"Connection error occurred for {url}: {conn_err}")
    except requests.exceptions.Timeout as timeout_err:
        print(f"Timeout error occurred for {url}: {timeout_err}")
    except Exception as err:
        print(f"An error occurred for {url}: {err}")

print("Data download completed.")

Fetching data from: https://storage.data.gov.my/pricecatcher/pricecatcher_2022-01.parquet
Successfully saved to: dataset\pricecatcher_2022-01.parquet
Fetching data from: https://storage.data.gov.my/pricecatcher/pricecatcher_2022-02.parquet
Successfully saved to: dataset\pricecatcher_2022-02.parquet
Fetching data from: https://storage.data.gov.my/pricecatcher/pricecatcher_2022-03.parquet
Successfully saved to: dataset\pricecatcher_2022-03.parquet
Fetching data from: https://storage.data.gov.my/pricecatcher/pricecatcher_2022-04.parquet
Successfully saved to: dataset\pricecatcher_2022-04.parquet
Fetching data from: https://storage.data.gov.my/pricecatcher/pricecatcher_2022-05.parquet
Successfully saved to: dataset\pricecatcher_2022-05.parquet
Fetching data from: https://storage.data.gov.my/pricecatcher/pricecatcher_2022-06.parquet
Successfully saved to: dataset\pricecatcher_2022-06.parquet
Fetching data from: https://storage.data.gov.my/pricecatcher/pricecatcher_2022-07.parquet
Successful

In [4]:
pip install dask[complete] fastparquet


Collecting dask[complete]
  Downloading dask-2024.11.2-py3-none-any.whl.metadata (3.7 kB)
Collecting cloudpickle>=3.0.0 (from dask[complete])
  Downloading cloudpickle-3.1.0-py3-none-any.whl.metadata (7.0 kB)
Collecting partd>=1.4.0 (from dask[complete])
  Downloading partd-1.4.2-py3-none-any.whl.metadata (4.6 kB)
Collecting toolz>=0.10.0 (from dask[complete])
  Downloading toolz-1.0.0-py3-none-any.whl.metadata (5.1 kB)
Collecting lz4>=4.3.2 (from dask[complete])
  Downloading lz4-4.3.3-cp312-cp312-win_amd64.whl.metadata (3.8 kB)
Collecting locket (from partd>=1.4.0->dask[complete])
  Downloading locket-1.0.0-py2.py3-none-any.whl.metadata (2.8 kB)
Collecting dask-expr<1.2,>=1.1 (from dask[complete])
  Downloading dask_expr-1.1.19-py3-none-any.whl.metadata (2.6 kB)
Collecting bokeh>=3.1.0 (from dask[complete])
  Downloading bokeh-3.6.1-py3-none-any.whl.metadata (12 kB)
Collecting distributed==2024.11.2 (from dask[complete])
  Downloading distributed-2024.11.2-py3-none-any.whl.metadata 

In [6]:
pip install pyarrow

Note: you may need to restart the kernel to use updated packages.


In [8]:
import os
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import logging

# Configure logging
logging.basicConfig(filename='preprocess_logs.log',
                    level=logging.INFO,
                    format='%(asctime)s %(levelname)s:%(message)s')

# Initialize the progress bar for monitoring
ProgressBar().register()

# Define input and output directories
input_dir = 'dataset'
output_dir = 'preprocessed_data'

# Create the output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Define the target item codes
target_item_codes = [1, 120, 1593, 1933, 1581]

# Define the path pattern to read all Parquet files in the input directory
parquet_files = os.path.join(input_dir, '*.parquet')

# Read all Parquet files into a Dask DataFrame, selecting only relevant columns
# Using 'pyarrow' engine
try:
    logging.info("Starting to read Parquet files using pyarrow engine.")
    df = dd.read_parquet(
        parquet_files,
        columns=['date', 'item_code', 'price'],
        engine='pyarrow',
        dtype={'item_code': 'int32', 'price': 'float32'},
        assume_missing=True
    )
    logging.info("Successfully read Parquet files.")
except Exception as e:
    logging.error(f"An error occurred while reading Parquet files: {e}")
    raise e

# Display the first few rows to verify
print("Initial DataFrame:")
print(df.head())

# Filter for the specified item codes
logging.info(f"Filtering for item codes: {target_item_codes}")
df_filtered = df[df['item_code'].isin(target_item_codes)]

# Drop rows with missing values in 'date', 'item_code', or 'price'
logging.info("Dropping rows with missing values in essential columns.")
df_filtered = df_filtered.dropna(subset=['date', 'item_code', 'price'])

# Ensure correct data types
logging.info("Converting 'date' to datetime and ensuring correct data types.")
df_filtered['date'] = dd.to_datetime(df_filtered['date'], errors='coerce')
df_filtered = df_filtered.dropna(subset=['date'])
df_filtered['item_code'] = df_filtered['item_code'].astype('int32')
df_filtered['price'] = df_filtered['price'].astype('float32')

# Group by 'date' and 'item_code' and calculate the average price
logging.info("Grouping by 'date' and 'item_code' to calculate average price.")
df_grouped = df_filtered.groupby(['date', 'item_code']).agg({'price': 'mean'}).reset_index()

# Persist the grouped DataFrame in memory for efficient access
df_grouped = df_grouped.persist()
logging.info("Persisted the grouped DataFrame in memory.")

# Display the first few rows of the grouped DataFrame
print("Grouped DataFrame:")
print(df_grouped.head())

# Iterate over each target item_code and save to separate Parquet files
for item_code in target_item_codes:
    try:
        logging.info(f"Processing item_code {item_code}.")
        # Filter the DataFrame for the current item_code
        df_item = df_grouped[df_grouped['item_code'] == item_code]
        
        # Define the output file path
        output_file = os.path.join(output_dir, f'item_{item_code}.parquet')
        
        # Save to Parquet using 'pyarrow' engine
        df_item.to_parquet(
            output_file,
            engine='pyarrow',
            compression='snappy',
            write_index=False
        )
        
        logging.info(f"Saved processed data for item_code {item_code} to {output_file}")
        print(f"Saved processed data for item_code {item_code} to {output_file}")
    except Exception as e:
        logging.error(f"An error occurred while saving item_code {item_code}: {e}")
        print(f"An error occurred while saving item_code {item_code}: {e}")

logging.info("Data preprocessing completed successfully.")
print("Data preprocessing completed successfully.")


Initial DataFrame:
[########################################] | 100% Completed | 222.99 ms
[########################################] | 100% Completed | 305.15 ms
[########################################] | 100% Completed | 403.77 ms
        date  item_code  price
0 2022-01-01          1    9.1
1 2022-01-01          9   36.0
2 2022-01-01         14   24.0
3 2022-01-01         16    4.3
4 2022-01-01         18    4.5
[########################################] | 100% Completed | 11.68 s
[########################################] | 100% Completed | 11.78 s
[########################################] | 100% Completed | 11.88 s
Grouped DataFrame:
[########################################] | 100% Completed | 108.66 ms
[########################################] | 100% Completed | 206.98 ms
[########################################] | 100% Completed | 304.54 ms
        date  item_code      price
0 2024-03-04          1   8.423158
1 2024-03-04       1581  26.000000
2 2024-03-05       1933   7.0

In [9]:
pip install pandas tabulate

Note: you may need to restart the kernel to use updated packages.


In [10]:
import os
import pandas as pd
import glob
from tabulate import tabulate  # Optional: For better-formatted tables

# Define the directory containing preprocessed Parquet files
preprocessed_dir = 'preprocessed_data'

# Ensure the directory exists
if not os.path.isdir(preprocessed_dir):
    raise FileNotFoundError(f"The directory '{preprocessed_dir}' does not exist.")

# Define the pattern to match all Parquet files (e.g., item_1.parquet, item_120.parquet, etc.)
parquet_pattern = os.path.join(preprocessed_dir, 'item_*.parquet')

# Use glob to find all matching Parquet files
parquet_files = glob.glob(parquet_pattern)

# Check if any Parquet files are found
if not parquet_files:
    raise FileNotFoundError(f"No Parquet files found in '{preprocessed_dir}' with pattern 'item_*.parquet'.")

# Initialize a list to store the counts
record_counts = []

# Iterate through each Parquet file
for file_path in parquet_files:
    try:
        # Extract the filename from the file path
        filename = os.path.basename(file_path)
        
        # Extract item_code from the filename
        # Assumes filenames are in the format 'item_<item_code>.parquet'
        item_code_str = filename.replace('item_', '').replace('.parquet', '')
        
        # Convert item_code to integer if possible
        try:
            item_code = int(item_code_str)
        except ValueError:
            # If conversion fails, keep it as a string
            item_code = item_code_str
        
        # Load the Parquet file into a Pandas DataFrame
        df = pd.read_parquet(file_path, engine='pyarrow')  # Ensure 'pyarrow' is installed
        
        # Count the number of records
        count = len(df)
        
        # Append the result to the list
        record_counts.append({'Item Code': item_code, 'Record Count': count})
        
    except Exception as e:
        print(f"Error processing file '{file_path}': {e}")

# Create a DataFrame from the counts
counts_df = pd.DataFrame(record_counts)

# Optional: Sort the DataFrame by Item Code for better readability
counts_df = counts_df.sort_values(by='Item Code').reset_index(drop=True)

# Display the counts using tabulate for a formatted table (optional)
print(tabulate(counts_df, headers='keys', tablefmt='psql', showindex=False))

# Alternatively, simply print the DataFrame
# print(counts_df)


+-------------+----------------+
|   Item Code |   Record Count |
|-------------+----------------|
|           1 |            992 |
|         120 |            988 |
|        1581 |            262 |
|        1593 |            164 |
|        1933 |            486 |
+-------------+----------------+


In [12]:
import os
import pandas as pd

# Define directories
preprocessed_dir = 'preprocessed_data'
final_datasets_dir = 'final_datasets'

# Create the final_datasets directory if it doesn't exist
os.makedirs(final_datasets_dir, exist_ok=True)

# Define target item codes
target_item_codes = [1, 120]

# Define date ranges
training_validation_start = pd.Timestamp('2022-01-01')
training_validation_end = pd.Timestamp('2023-12-31')
test_start = pd.Timestamp('2024-01-01')

# Optional: Define a validation split ratio (e.g., 80% training, 20% validation)
validation_ratio = 0.2

# Function to split training and validation
def split_train_validation(df, train_end_date):
    train_df = df[df['date'] <= train_end_date].copy()
    validation_df = df[df['date'] > train_end_date].copy()
    return train_df, validation_df

# Iterate over each target item code
for item_code in target_item_codes:
    print(f"\nProcessing Item Code: {item_code}")
    
    # Construct the file path
    input_file = os.path.join(preprocessed_dir, f'item_{item_code}.parquet')
    
    # Check if the file exists
    if not os.path.exists(input_file):
        print(f"File not found: {input_file}. Skipping.")
        continue
    
    # Load the Parquet file into a Pandas DataFrame
    try:
        df = pd.read_parquet(input_file, engine='pyarrow')
        print(f"Loaded data for item_code {item_code}: {len(df)} records.")
    except Exception as e:
        print(f"Error loading {input_file}: {e}")
        continue
    
    # Data Cleaning
    
    # Ensure 'date' is datetime
    if not pd.api.types.is_datetime64_any_dtype(df['date']):
        df['date'] = pd.to_datetime(df['date'], errors='coerce')
    
    # Drop rows with missing essential data
    initial_count = len(df)
    df.dropna(subset=['date', 'item_code', 'price'], inplace=True)
    cleaned_count = len(df)
    print(f"Dropped {initial_count - cleaned_count} records due to missing values.")
    
    # Ensure correct data types
    df['item_code'] = df['item_code'].astype(int)
    df['price'] = df['price'].astype(float)
    
    # Verify that item_code matches the current item
    df = df[df['item_code'] == item_code]
    
    # Sort by date
    df.sort_values(by='date', inplace=True)
    
    # Split into Training+Validation and Test sets
    train_val_df = df[(df['date'] >= training_validation_start) & (df['date'] <= training_validation_end)].copy()
    test_df = df[df['date'] >= test_start].copy()
    
    print(f"Training+Validation records: {len(train_val_df)}")
    print(f"Test records: {len(test_df)}")
    
    # Further split Training+Validation into Training and Validation sets based on date
    # For simplicity, let's split based on an 80-20 ratio within the Training+Validation period
    # Calculate the split date
    split_date = train_val_df['date'].quantile(0.8)
    split_date = pd.Timestamp(split_date.floor('D'))  # Ensure it's a date without time
    
    train_df, validation_df = split_train_validation(train_val_df, split_date)
    
    print(f"Training records: {len(train_df)}")
    print(f"Validation records: {len(validation_df)}")
    
    # Save the splits to separate Parquet files
    try:
        # Define output file paths
        train_file = os.path.join(final_datasets_dir, f'item_{item_code}_train.parquet')
        validation_file = os.path.join(final_datasets_dir, f'item_{item_code}_validation.parquet')
        test_file = os.path.join(final_datasets_dir, f'item_{item_code}_test.parquet')
        
        # Save to Parquet with Snappy compression
        train_df.to_parquet(train_file, engine='pyarrow', compression='snappy', index=False)
        validation_df.to_parquet(validation_file, engine='pyarrow', compression='snappy', index=False)
        test_df.to_parquet(test_file, engine='pyarrow', compression='snappy', index=False)
        
        print(f"Saved Training set to: {train_file}")
        print(f"Saved Validation set to: {validation_file}")
        print(f"Saved Test set to: {test_file}")
    except Exception as e:
        print(f"Error saving splits for item_code {item_code}: {e}")



Processing Item Code: 1
Loaded data for item_code 1: 992 records.
Dropped 0 records due to missing values.
Training+Validation records: 685
Test records: 307
Training records: 548
Validation records: 137
Saved Training set to: final_datasets\item_1_train.parquet
Saved Validation set to: final_datasets\item_1_validation.parquet
Saved Test set to: final_datasets\item_1_test.parquet

Processing Item Code: 120
Loaded data for item_code 120: 988 records.
Dropped 0 records due to missing values.
Training+Validation records: 682
Test records: 306
Training records: 545
Validation records: 137
Saved Training set to: final_datasets\item_120_train.parquet
Saved Validation set to: final_datasets\item_120_validation.parquet
Saved Test set to: final_datasets\item_120_test.parquet
