In [89]:
# Importing required libraries
import os
import pandas as pd
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import random


# Generate random dates in string format
def generate_random_date_string(start_date, end_date):
    delta = end_date - start_date
    random_days = random.randint(0, delta.days)
    random_date = start_date + timedelta(days=random_days)
    return random_date.strftime('%Y-%m-%d')

start_date = datetime(2020, 1, 1)
end_date = datetime(2023, 12, 31)


# Creating dummy data with a million records
num_records = 1000000
sale_ids = range(1, num_records + 1)
product_ids = [random.randint(1, 50) for _ in range(num_records)]
quantities_sold = [random.randint(1, 20) for _ in range(num_records)]
sale_dates = [generate_random_date_string(start_date, end_date) for _ in range(num_records)]
sale_amounts = [round(random.uniform(5.0, 1000.0), 2) for _ in range(num_records)]

# Create a DataFrame with the dummy data
sales_data = pd.DataFrame({
    'sale_id': sale_ids,
    'product_id': product_ids,
    'quantity_sold': quantities_sold,
    'sale_date': sale_dates,
    'sale_amount': sale_amounts
})

# Saving to CSV
file_name = 'sales_data.csv' #sales_data.csv has 1 million records
sales_data.to_csv(file_name, index=False)




In [90]:
# Print the datatype of each column
print("Datatype of the different columns in the Original Dataframe\n",sales_data.dtypes)
print("\nMemory usage of the different columns in the Original Dataframe\n",sales_data.memory_usage(deep=True))  # memory usage in bytes

Datatype of the different columns in the Original Dataframe
 sale_id            int64
product_id         int64
quantity_sold      int64
sale_date         object
sale_amount      float64
dtype: object

Memory usage of the different columns in the Original Dataframe
 Index                 128
sale_id           8000000
product_id        8000000
quantity_sold     8000000
sale_date        67000000
sale_amount       8000000
dtype: int64


In [91]:
# Reading the CSV file into a DataFrame
sales_data_df = pd.read_csv(file_name)


        sale_id  product_id  quantity_sold   sale_date  sale_amount
0             1          32             19  2023-09-16       540.07
1             2          36              1  2022-09-25       626.29
2             3          28              4  2022-10-29       745.31
3             4          46             10  2021-04-21       867.50
4             5          18             15  2020-01-06       486.18
...         ...         ...            ...         ...          ...
999995   999996           6             14  2021-03-25        87.02
999996   999997          46             10  2021-03-31       633.23
999997   999998          29             10  2022-02-09       775.78
999998   999999          32              1  2020-12-14       252.05
999999  1000000           8             11  2022-03-21       499.64

[1000000 rows x 5 columns]


#Improving efficiency by using better data types

Default Pandas data types are not very efficient. So to reduce the data usage we convert to more efficient data types.

In [92]:
# Print the datatype of each column
print("Datatype of the different columns in the Original Dataframe\n",sales_data_df.dtypes)
print("\nMemory usage of the different columns in the Original Dataframe\n",sales_data_df.memory_usage(deep=True))

Datatype of the different columns in the Original Dataframe
 sale_id            int64
product_id         int64
quantity_sold      int64
sale_date         object
sale_amount      float64
dtype: object

Memory usage of the different columns in the Original Dataframe
 Index                 128
sale_id           8000000
product_id        8000000
quantity_sold     8000000
sale_date        67000000
sale_amount       8000000
dtype: int64


In [93]:
#Copying the dataframe to a new one for new efficient data types
sales_data_df2=sales_data_df.copy()


In [94]:
# Checking range for integer columns to decide optimum data type
print("sale_id range:", sales_data_df2['sale_id'].min(), "-", sales_data_df2['sale_id'].max())
print("product_id range:", sales_data_df2['product_id'].min(), "-", sales_data_df2['product_id'].max())
print("quantity_sold range:", sales_data_df2['quantity_sold'].min(), "-", sales_data_df2['quantity_sold'].max())

# Checking precision for float columns
print("sale_amount precision check:")
print(sales_data_df2['sale_amount'].describe())

sale_id range: 1 - 1000000
product_id range: 1 - 50
quantity_sold range: 1 - 20
sale_amount precision check:
count    1000000.000000
mean         502.376477
std          287.448846
min            5.000000
25%          253.050000
50%          502.110000
75%          751.550000
max         1000.000000
Name: sale_amount, dtype: float64


Based on the identified data ranges of the various columns, we can make the following datatype conversions to optimize memory usage:



1.   sale_id: Since the range is from 1 to 1,000,000, we can use uint32 (unsigned 32-bit integer). This type can hold values from 0 to 4,294,967,295, which is more than enough for our range (1 to 1,000,000).
2.   product_id: Since the range is from 1 to 50, we can use uint8 (unsigned 8-bit integer). This type can hold values from 0 to 255, which is sufficient for our range (1 to 50).
3.   quantity_sold: Since the range is from 1 to 20, we can use uint8 (unsigned 8-bit integer). This type can hold values from 0 to 255, which is sufficient for our range (1 to 20).
4.   sale_date: Convert to datetime64[ns].This type efficiently handles datetime data.


sale_amount: Given the precision and range, we can use float32. But when converting from float64 to float32, minor variations occur due to the difference in precision between these two data types. float32 has fewer bits for precision, which can lead to small inaccuracies in the representation of the data. This can increase bigger errors when aggregating the sale_amount value so it was kept at float64.








In [95]:
# Convert integer columns to more efficient types
sales_data_df2['sale_id'] = pd.to_numeric(sales_data_df2['sale_id'], downcast='unsigned')
sales_data_df2['product_id'] = sales_data_df2['product_id'].astype('uint8')
sales_data_df2['quantity_sold'] = sales_data_df2['quantity_sold'].astype('uint8')

# Convert sale_date to datetime from string
sales_data_df2['sale_date'] = pd.to_datetime(sales_data_df2['sale_date'])

# Print the optimized dtypes and memory usage
print("\nOptimized dtypes:")
print(sales_data_df2.dtypes)
print("\nOptimized memory usage:")
print(sales_data_df2.memory_usage(deep=True))


Optimized dtypes:
sale_id                  uint32
product_id                uint8
quantity_sold             uint8
sale_date        datetime64[ns]
sale_amount             float64
dtype: object

Optimized memory usage:
Index                128
sale_id          4000000
product_id       1000000
quantity_sold    1000000
sale_date        8000000
sale_amount      8000000
dtype: int64


In [96]:
reduction = sales_data_df2.memory_usage(deep=True).sum() / sales_data_df.memory_usage(deep=True).sum()

print(f"Reduction in memory usage(%)-{(reduction*100):0.2f} %")

Reduction in memory usage(%)-22.22 %


# Splitting the 1 million row dataset to 50 parquet files for each product id

Since we have to do aggregation based on product id, we split the original dataset into separate parquet files. Then each of these files are processed separately using parallel processing.

In [97]:

# Ensure the 'product_data' directory exists
output_dir = 'product_data'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Iterate over each product_id and save the corresponding dataframe to a Parquet file
for product_id in range(1, 51):  # Assuming product IDs are from 1 to 50
    product_df = sales_data_df2[sales_data_df2['product_id'] == product_id]
    if not product_df.empty:
        file_name = f'{output_dir}/product_{product_id}.parquet'
        product_df.to_parquet(file_name, index=False)
        print(f'Saved {file_name}')

Saved product_data/product_1.parquet
Saved product_data/product_2.parquet
Saved product_data/product_3.parquet
Saved product_data/product_4.parquet
Saved product_data/product_5.parquet
Saved product_data/product_6.parquet
Saved product_data/product_7.parquet
Saved product_data/product_8.parquet
Saved product_data/product_9.parquet
Saved product_data/product_10.parquet
Saved product_data/product_11.parquet
Saved product_data/product_12.parquet
Saved product_data/product_13.parquet
Saved product_data/product_14.parquet
Saved product_data/product_15.parquet
Saved product_data/product_16.parquet
Saved product_data/product_17.parquet
Saved product_data/product_18.parquet
Saved product_data/product_19.parquet
Saved product_data/product_20.parquet
Saved product_data/product_21.parquet
Saved product_data/product_22.parquet
Saved product_data/product_23.parquet
Saved product_data/product_24.parquet
Saved product_data/product_25.parquet
Saved product_data/product_26.parquet
Saved product_data/pr

In [98]:
# Ensure the 'processed_data' directory exists
output_dir = 'processed_data'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Function to process each Parquet file
def process_parquet_file(product_id):
    input_file = f'product_data/product_{product_id}.parquet'
    output_file = f'{output_dir}/product_{product_id}_processed.parquet'

    # Read the Parquet file
    df = pd.read_parquet(input_file)

    # Add year_month column
    df['year_month'] = df['sale_date'].dt.to_period('M')

    # Calculate total sales amount per product per month
    result_df = df.groupby(['product_id', 'year_month']).agg(total_sales_amount=('sale_amount', 'sum')).reset_index()

    # Write the result to a new Parquet file
    result_df.to_parquet(output_file, index=False)
    print(f'Processed and saved {output_file}')
    return result_df,df

# List of product IDs (1 to 50)
product_ids = list(range(1, 51))

# Process files in parallel using ThreadPoolExecutor
#Each product file is processed separately in a parallel process using ThreadPoolExecutor
results = []
df_chunks = []
with ThreadPoolExecutor(max_workers=10) as executor:
     for res, df_chunk in executor.map(process_parquet_file, product_ids):
        results.append(res)
        df_chunks.append(df_chunk)

# Combining all results into a single DataFrames
aggregate_result = pd.concat(results)
final_df = pd.concat(df_chunks)

Processed and saved processed_data/product_2_processed.parquet
Processed and saved processed_data/product_1_processed.parquet
Processed and saved processed_data/product_5_processed.parquet
Processed and saved processed_data/product_3_processed.parquet
Processed and saved processed_data/product_6_processed.parquet
Processed and saved processed_data/product_4_processed.parquet
Processed and saved processed_data/product_7_processed.parquet
Processed and saved processed_data/product_9_processed.parquet
Processed and saved processed_data/product_8_processed.parquet
Processed and saved processed_data/product_10_processed.parquet
Processed and saved processed_data/product_11_processed.parquet
Processed and saved processed_data/product_14_processed.parquet
Processed and saved processed_data/product_12_processed.parquet
Processed and saved processed_data/product_15_processed.parquet
Processed and saved processed_data/product_13_processed.parquet
Processed and saved processed_data/product_16_pro

In [104]:
# Displaying the final result
print(aggregate_result.head(10))
# Print the datatype of each column
print("Datatype of the different columns in the Aggregate Result Dataframe\n",aggregate_result.dtypes)
print(f'Shape of Aggregate Result Dataframe-{aggregate_result.shape}')

print(f'\n Since there are 50 products sold in 4 years with 12 months each,\n we need to have maximum of 50*4*12=2400 rows in the aggregate result table')

   product_id year_month  total_sales_amount
0           1    2020-01           200965.95
1           1    2020-02           195312.01
2           1    2020-03           205651.19
3           1    2020-04           197658.97
4           1    2020-05           207338.64
5           1    2020-06           217745.13
6           1    2020-07           213160.15
7           1    2020-08           210664.96
8           1    2020-09           241199.34
9           1    2020-10           198068.03
Datatype of the different columns in the Aggregate Result Dataframe
 product_id                uint8
year_month            period[M]
total_sales_amount      float64
dtype: object
Shape of Aggregate Result Dataframe-(2400, 3)

 Since there are 50 products sold in 4 years with 12 months each,
 we need to have maximum of 50*4*12=2400 rows in the aggregate result table


In [100]:
# Displaying the final transformed dataframe
print(final_df.head(10))

# Print the datatype of each column
print("Datatype of the different columns in the Transformed Dataframe\n",final_df.dtypes)

print(f'Shape of Transformed Dataframe-{final_df.shape}')


   sale_id  product_id  quantity_sold  sale_date  sale_amount year_month
0       51           1             19 2023-02-08       577.39    2023-02
1      116           1              5 2022-08-19       335.59    2022-08
2      154           1             14 2021-05-20        98.07    2021-05
3      185           1             17 2023-08-09       917.19    2023-08
4      198           1             19 2020-12-25        35.09    2020-12
5      275           1             19 2021-08-12       914.90    2021-08
6      309           1              1 2021-04-11       525.18    2021-04
7      348           1             18 2023-01-08       213.05    2023-01
8      362           1              6 2023-07-13        65.70    2023-07
9      423           1              7 2023-06-19       882.21    2023-06
Datatype of the different columns in the Transformed Dataframe
 sale_id                  uint32
product_id                uint8
quantity_sold             uint8
sale_date        datetime64[ns]
sale_

   sale_id  product_id  quantity_sold  sale_date  sale_amount year_month
0        5          18             15 2020-01-06       486.18    2020-01
0        8          24             18 2021-04-13       956.60    2021-04
0        3          28              4 2022-10-29       745.31    2022-10
0        1          32             19 2023-09-16       540.07    2023-09
0        2          36              1 2022-09-25       626.29    2022-09
0        6          43              5 2022-07-16       926.20    2022-07
0        9          45             20 2023-06-25       949.86    2023-06
0        4          46             10 2021-04-21       867.50    2021-04
0        7          50              6 2022-01-22       691.37    2022-01
   sale_id  product_id  quantity_sold  sale_date  sale_amount
0        1          32             19 2023-09-16       540.07
1        2          36              1 2022-09-25       626.29
2        3          28              4 2022-10-29       745.31
3        4          46