In [3]:
# Mount google drive where dataset is stored
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [5]:

# !pip install modin[ray]
# !pip install dask

import pandas as pd
import dask.dataframe as dd
# import modin.pandas as mpd

# file path for dataset
file_path = '/content/drive/MyDrive/brewery_project/brewery_data_complete_extended.csv'

# Pandas: Time to read CSV using pandas
print("Pandas:")
%time df_pandas = pd.read_csv(file_path)

# Dask: Time to read CSV using dask
print("\nDask:")
%time df_dask = dd.read_csv(file_path)

# Explicitly compute the dataframe for dask
print("Dask Compute:")
%time df_dask.compute()

# Modin: Time to read CSV using modin (Modin did not work in this use case)
# print("\nModin:")
# %time df_modin = mpd.read_csv(file_path)


Pandas:
CPU times: user 57.7 s, sys: 13.1 s, total: 1min 10s
Wall time: 1min 33s

Dask:
CPU times: user 26.9 ms, sys: 348 ms, total: 375 ms
Wall time: 494 ms
Dask Compute:
CPU times: user 1min 13s, sys: 9.19 s, total: 1min 22s
Wall time: 1min 3s


Unnamed: 0,Batch_ID,Brew_Date,Beer_Style,SKU,Location,Fermentation_Time,Temperature,pH_Level,Gravity,Alcohol_Content,Bitterness,Color,Ingredient_Ratio,Volume_Produced,Total_Sales,Quality_Score,Brewhouse_Efficiency,Loss_During_Brewing,Loss_During_Fermentation,Loss_During_Bottling_Kegging
0,7870796,2020-01-01 00:00:19,Wheat Beer,Kegs,Whitefield,16,24.204251,5.289845,1.039504,5.370842,20,5,1:0.32:0.16,4666,2664.759345,8.577016,89.195882,4.104988,3.235485,4.663204
1,9810411,2020-01-01 00:00:31,Sour,Kegs,Whitefield,13,18.086763,5.275643,1.059819,5.096053,36,14,1:0.39:0.24,832,9758.801062,7.420541,72.480915,2.676528,4.246129,2.044358
2,2623342,2020-01-01 00:00:40,Wheat Beer,Kegs,Malleswaram,12,15.539333,4.778016,1.037476,4.824737,30,10,1:0.35:0.16,2115,11721.087016,8.451365,86.322144,3.299894,3.109440,3.033880
3,8114651,2020-01-01 00:01:37,Ale,Kegs,Rajajinagar,17,16.418489,5.345261,1.052431,5.509243,48,18,1:0.35:0.15,3173,12050.177463,9.671859,83.094940,2.136055,4.634254,1.489889
4,4579587,2020-01-01 00:01:43,Stout,Cans,Marathahalli,18,19.144908,4.861854,1.054296,5.133625,57,13,1:0.46:0.11,4449,5515.077465,7.895334,88.625833,4.491724,2.183389,2.990630
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
249995,4575645,2023-12-31 23:58:59,Stout,Bottles,Yelahanka,12,20.719151,4.944343,1.049876,5.437139,28,15,1:0.31:0.17,3054,6655.673836,6.264104,79.654815,4.612468,1.484389,1.118612
249996,2074362,2023-12-31 23:59:18,Wheat Beer,Bottles,Electronic City,19,16.970908,5.121963,1.037019,5.102428,47,6,1:0.30:0.19,1985,10440.357817,9.651172,89.119201,2.112756,1.150946,4.972988
249997,177617,2023-12-31 23:59:41,IPA,Bottles,Malleswaram,10,24.386184,4.747588,1.047490,5.025251,24,6,1:0.43:0.14,1982,17069.572855,9.060614,72.555455,3.393856,3.011241,2.115054
249998,7306891,2023-12-31 23:59:45,Ale,Kegs,Electronic City,18,18.056172,5.007909,1.046138,5.843181,58,17,1:0.41:0.25,4016,5316.412808,6.297562,87.650899,4.095874,4.711502,1.535620


In [6]:
# Validation on data column
# Clean column names to remove all special characters and whitespaces
df_dask.columns = df_dask.columns.str.replace('[^A-Za-z0-9]+', '_', regex=True).str.strip()
df_dask.columns

Index(['Batch_ID', 'Brew_Date', 'Beer_Style', 'SKU', 'Location',
       'Fermentation_Time', 'Temperature', 'pH_Level', 'Gravity',
       'Alcohol_Content', 'Bitterness', 'Color', 'Ingredient_Ratio',
       'Volume_Produced', 'Total_Sales', 'Quality_Score',
       'Brewhouse_Efficiency', 'Loss_During_Brewing',
       'Loss_During_Fermentation', 'Loss_During_Bottling_Kegging'],
      dtype='object')

In [7]:
df_dask.head()

Unnamed: 0,Batch_ID,Brew_Date,Beer_Style,SKU,Location,Fermentation_Time,Temperature,pH_Level,Gravity,Alcohol_Content,Bitterness,Color,Ingredient_Ratio,Volume_Produced,Total_Sales,Quality_Score,Brewhouse_Efficiency,Loss_During_Brewing,Loss_During_Fermentation,Loss_During_Bottling_Kegging
0,7870796,2020-01-01 00:00:19,Wheat Beer,Kegs,Whitefield,16,24.204251,5.289845,1.039504,5.370842,20,5,1:0.32:0.16,4666,2664.759345,8.577016,89.195882,4.104988,3.235485,4.663204
1,9810411,2020-01-01 00:00:31,Sour,Kegs,Whitefield,13,18.086763,5.275643,1.059819,5.096053,36,14,1:0.39:0.24,832,9758.801062,7.420541,72.480915,2.676528,4.246129,2.044358
2,2623342,2020-01-01 00:00:40,Wheat Beer,Kegs,Malleswaram,12,15.539333,4.778016,1.037476,4.824737,30,10,1:0.35:0.16,2115,11721.087016,8.451365,86.322144,3.299894,3.10944,3.03388
3,8114651,2020-01-01 00:01:37,Ale,Kegs,Rajajinagar,17,16.418489,5.345261,1.052431,5.509243,48,18,1:0.35:0.15,3173,12050.177463,9.671859,83.09494,2.136055,4.634254,1.489889
4,4579587,2020-01-01 00:01:43,Stout,Cans,Marathahalli,18,19.144908,4.861854,1.054296,5.133625,57,13,1:0.46:0.11,4449,5515.077465,7.895334,88.625833,4.491724,2.183389,2.99063


In [8]:
import yaml
# Load yaml file from google drive
yaml_file = '/content/drive/MyDrive/brewery_project/schema.yaml'

# Load schema from yaml file
try:
    with open(yaml_file, 'r') as f:
        schema = yaml.safe_load(f)
except FileNotFoundError:
    print(f"Error: File '{yaml_file}' not found.")
except Exception as e:
    print(f"An error occurred while loading the YAML file: {e}")
print(schema)

# Validating column names with schema
ingested_columns = df_dask.columns.str.strip().str.replace(' ', '_').tolist()
# Making sure is loaded correctly
schema_columns = [col.strip().replace(' ', '_') for col in schema['schema']['columns']]

# Checking if columns match
if sorted(ingested_columns) == sorted(schema_columns):
    print("Columns match the YAML schema.")
else:
    print("Columns do not match the YAML schema.")

{'schema': {'columns': ['Batch_ID', 'Brew_Date', 'Beer_Style', 'SKU', 'Location', 'Fermentation_Time', 'Temperature', 'pH_Level', 'Gravity', 'Alcohol_Content', 'Bitterness', 'Color', 'Ingredient_Ratio', 'Volume_Produced', 'Total_Sales', 'Quality_Score', 'Brewhouse_Efficiency', 'Loss_During_Brewing', 'Loss_During_Fermentation', 'Loss_During_Bottling_Kegging']}, 'file_name': 'brewery_data_complete_extended', 'inbound_delimiter': ',', 'outbound_delimiter': '|', 'skip_leading_rows': 1, 'table_name': 'brewery_data_complete_extended', 'file_format': 'csv', 'compression': 'gzip'}
Columns match the YAML schema.


In [21]:
# Validate missing or extra columns
missing_columns = set(schema_columns) - set(ingested_columns)
extra_columns = set(ingested_columns) - set(schema_columns)

if missing_columns:
    print(f"Missing columns from dataset: {missing_columns}")
else:
    print("No missing columns in the dataset.")

if extra_columns:
    print(f"Extra columns in dataset: {extra_columns}")
else:
    print("No extra columns in the dataset.")


No missing columns in the dataset.
No extra columns in the dataset.


In [10]:
# Validate for null values

# Calculate null values for each column
null_counts = df_dask.isnull().sum().compute()

# Print the null value counts
print("Null value counts per column:\n", null_counts)

# Check if any column has null values
if null_counts.any():
    print("\nThere are null values in the DataFrame.")
else:
    print("\nNo null values found in the DataFrame.")


Null value counts per column:
 Batch_ID                        0
Brew_Date                       0
Beer_Style                      0
SKU                             0
Location                        0
Fermentation_Time               0
Temperature                     0
pH_Level                        0
Gravity                         0
Alcohol_Content                 0
Bitterness                      0
Color                           0
Ingredient_Ratio                0
Volume_Produced                 0
Total_Sales                     0
Quality_Score                   0
Brewhouse_Efficiency            0
Loss_During_Brewing             0
Loss_During_Fermentation        0
Loss_During_Bottling_Kegging    0
dtype: int64

No null values found in the DataFrame.


In [18]:
# Check duplicates by using map_partitions with pandas' duplicated function
duplicate_rows = df_dask.map_partitions(lambda df: df[df.duplicated()])

# Compute the duplicate rows to bring them into memory
duplicate_rows = duplicate_rows.compute()

# Check if there are any duplicate rows and display them
if not duplicate_rows.empty:
    print("Duplicate rows found:")
    print(duplicate_rows)
else:
    print("No duplicate rows found.")


No duplicate rows found.


In [17]:
# Validating if there is any duplicate valyes in primary key column
primary_key = schema['schema'].get('primary_key')
if primary_key and df_dask[primary_key].nunique().compute() != df_dask.shape[0].compute():
    print(f"Schema validation failed: Duplicate values found in the primary key column: {primary_key}")
else:
    print(f"Schema validation passed: No duplicate values found in the primary key column: {primary_key}")


Schema validation passed: No duplicate values found in the primary key column: None


In [19]:
# Check for missing values (NaNs) in any column
missing_values = df_dask.isnull().sum().compute()

# Display columns with missing values
print("Columns with missing values:")
print(missing_values[missing_values > 0])


Columns with missing values:
Series([], dtype: int64)


In [23]:
# Output file path
output_file = '/content/drive/MyDrive/brewery_project/brewery_data_complete_extended.txt.gz'

# Write file in pipe separated text file gz format
df_dask.to_csv(output_file, sep='|', compression='gzip', single_file=True)

# Total number of rows
total_rows = df_dask.shape[0].compute()
print(f"Total number of rows in the DataFrame: {total_rows}")

# Total number of columns
total_columns = df_dask.shape[1]
print(f"Total number of columns in the DataFrame: {total_columns}")

Total number of rows in the DataFrame: 10000000
Total number of columns in the DataFrame: 20
