# Importing required libraries

In [1]:
import pandas as pd
import time
import yaml
import os
import modin.pandas as mpd
import ray
import ray.data
import dask.dataframe as dd
import warnings
warnings.filterwarnings("ignore")

# Reading the File with Different Approaches
In this section, we will explore various methods for reading the file. Each approach has its own advantages and use cases depending on the file format, size, and the requirements of the analysis.



### 1.Reading the file with pandas 

In [2]:
# Measure the time to read the file using pandas
start_time = time.time()
df_pandas = pd.read_csv('Trips_merged.csv')
end_time = time.time()
print(f"Pandas read time: {end_time - start_time} seconds")

Pandas read time: 352.2310583591461 seconds


### 2.Reading the file with Dask 

In [3]:
# Measure the time to read the file using dask
start_time = time.time()
df_dask = dd.read_csv('Trips_merged.csv')
#df_dask = df_dask.compute() # un comment it If your dataset is small enough to fit in memory.
end_time = time.time()
print(f"Dask read time: {end_time - start_time} seconds")

Dask read time: 0.38047361373901367 seconds


### 3.Reading the file with Ray 

In [4]:
# Initialize Ray
ray.init(ignore_reinit_error=True)

# Measure the time to read the file using ray
start_time = time.time()
ds_ray = ray.data.read_csv('Trips_merged.csv')
# df_ray = ds_ray.to_pandas()
end_time = time.time()
print(f"Ray read time: {end_time - start_time} seconds")

2024-08-12 22:24:02,967	INFO worker.py:1781 -- Started a local Ray instance.


Ray read time: 3.579085111618042 seconds


### 4.Reading the file with Modin 

In [5]:
# Measure the time to read the file using modin
ray.shutdown()
ray.init()
start_time = time.time()
df_modin = mpd.read_csv('Trips_merged.csv')
end_time = time.time()
print(f"Modin read time: {end_time - start_time} seconds")

2024-08-12 22:24:13,199	INFO worker.py:1781 -- Started a local Ray instance.


Modin read time: 363.9993145465851 seconds


**Conclusion** : 

- **Dask** is the **fastest**, achieving a read time of approximately 0.38 seconds, indicating high efficiency for this operation.
- Ray is also efficient but slightly slower than Dask, with a read time of about 3.58 seconds. It remains significantly faster than Pandas and Modin.
- Pandas has a read time of approximately 352.23 seconds, making it considerably slower than both Dask and Ray.
- **Modin** has the **slowest** read time at around 364.00 seconds.<br>

**Note:** Performance can vary based on the nature of the data and specific configurations, so it's important to consider these factors when choosing a tool for data processing.

# Creating Functions
In this section, we will define custom functions to streamline our data processing and analysis tasks. Creating a Python file **(functions.py)** for user-defined functions so that we can use these functions whenever and wherever needed.

In [6]:
%%writefile functions.py

import logging
import os
import yaml

#for basic validation
def clean_column_names(df):
    
    # Convert column names to lowercase
    df.columns = df.columns.str.lower()
    
    # Replace any non-alphanumeric characters with underscores
    df.columns= df.columns.str.replace('[^\w]','_',regex=True)
    
    # remove any spaces
    df.columns = df.columns.str.strip()
    
    return df

#for validating columns 
def validate_columns(df, Yaml_file_name):
    # checking if the length and file names are same
    if list(df.columns) == Yaml_file_name['columns'] and len(df.columns) == len(Yaml_file_name['columns']):
        print("column name and column length validation passed")
        return 1
    else:
        if list(df.columns) != Yaml_file_name['columns']:
            print("column names are different,please check column names in the uploaded file",set((df.columns))-set(Yaml_file_name['columns']))
        if len(df.columns) != len(Yaml_file_name['columns']):
            print("columns length are differnt, please add or remove extra columns.")                    
        return 0
    
# for opening/reading Yaml File
def read_config_file(filepath):
    with open(filepath, 'r') as file:
        try:
            return yaml.safe_load(file)
        except yaml.YAMLError as exc:
            logging.error(exc)


Writing functions.py


# Basic Validation
Instead of hard-coding the validation, we use custom **clean_column_names** function from the **functions.py** file. This function:

- Convert all column names to lowercase.
- Remove spaces.
- Replace non-alphanumeric characters with underscores.<br>

**Note**: You can add more validations if needed.

In [7]:
#importing the file which we created.
import functions as fn

# Performing basic validation using the function "clean_column_names" 
df_pandas=fn.clean_column_names(df_pandas)

In [8]:
df_pandas.head()

Unnamed: 0,trip_id,start_time,stop_time,bike_id,trip_duration,from_station_id,from_station_name,to_station_id,to_station_name,user_type,gender,birth_year
0,4118,2013-06-27T17:11:00Z,2013-06-27T17:16:00Z,480.0,300,85,Michigan Ave & Oak St,28,Larrabee St & Menomonee St,Casual,,
1,4275,2013-06-27T19:44:00Z,2013-06-27T19:45:00Z,77.0,60,32,Racine Ave & Congress Pkwy,32,Racine Ave & Congress Pkwy,Casual,,
2,4291,2013-06-27T19:58:00Z,2013-06-27T20:05:00Z,77.0,420,32,Racine Ave & Congress Pkwy,19,Loomis St & Taylor St,Casual,,
3,4316,2013-06-27T20:06:00Z,2013-06-27T20:09:00Z,77.0,180,19,Loomis St & Taylor St,19,Loomis St & Taylor St,Casual,,
4,4342,2013-06-27T20:13:00Z,2013-06-27T20:27:00Z,77.0,840,19,Loomis St & Taylor St,55,Halsted St & James M Rochford St,Casual,,


# Creating a YAML File
In this section, we will create a YAML file to define the structure and requirements of our dataset. This file will specify the expected columns, their names, and any additional metadata.

In [9]:
%%writefile Trips.yaml
file_type: csv
dataset_name: Cyclistic_trips
inbound_delimiter: ","
outbound_delimiter: "|"
columns: 
    - trip_id
    - start_time
    - stop_time
    - bike_id
    - trip_duration
    - from_station_id
    - from_station_name
    - to_station_id
    - to_station_name
    - user_type
    - gender
    - birth_year   

Writing Trips.yaml


# Validation (YAML File)
#### Column Validation Using Custom Function
In this section, we will use the custom **validate_columns function** to ensure that the columns in our dataset match the specifications defined in a YAML file. The function will check both the number of columns and their names.

- If the validation is successful, a success message will be printed.
- If the validation fails, an error message will be shown.

In [10]:
# loading the YAML file 
Trips_YAML_file = fn.read_config_file("Trips.yaml")
Trips_YAML_file

{'file_type': 'csv',
 'dataset_name': 'Cyclistic_trips',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'columns': ['trip_id',
  'start_time',
  'stop_time',
  'bike_id',
  'trip_duration',
  'from_station_id',
  'from_station_name',
  'to_station_id',
  'to_station_name',
  'user_type',
  'gender',
  'birth_year']}

In [11]:
# validating the columns using custom function
fn.validate_columns(df_pandas,Trips_YAML_file)

column name and column length validation passed


1

#### Example: Testing Failed Validation
To verify that our validation function correctly identifies errors, we’ll create a dataset with intentional mismatches. This example will help us ensure that the validation function properly detects and reports failures.

In [12]:
data = {
    'start_time': ['2024-01-01 08:00', '2024-01-01 09:00'],   # Correct column
    'stop_time': ['2024-01-01 08:30', '2024-01-01 09:30'],    # Correct column
    'bike_id': ['B123', 'B124'],                             # Correct column
    'duration': [30, 30],                                    # Incorrect column name (should be 'trip_duration')
    'from_station_name': ['Station A', 'Station B'],         # Correct column
    'to_station_id': [100, 101],                             # Correct column
    'user_type': ['Subscriber', 'Customer'],                 # Correct column
    'birth_year': [1985, 1990],                              # Correct column
    'extra_column': ['extra', 'data']                        # Extra column not in expected list
}

# Creating DataFrame
df_Example = pd.DataFrame(data)

In [13]:
# Example function call
fn.validate_columns(df_Example, Trips_YAML_file)

column names are different,please check column names in the uploaded file {'duration', 'extra_column'}
columns length are differnt, please add or remove extra columns.


0

# Converting CSV File to Pipe-Separated Text File (|) and Compressing to GZ Format

In [14]:
df_pandas.to_csv("output_file_gz_format.csv.gz",
          index=False,
          sep = ("|"),
          compression="gzip",
          )

# Summary
This section provides a summary of the dataset, including the total number of rows and columns, as well as the file sizes before and after compression.

In [15]:
print('Summary:')
print(f"Total rows: {'==>':<5} {len(df_pandas):,}")
print(f"Total columns: {'==>':<5} {len(df_pandas.columns):,}")
print(f"Original File size: {'==>':<5} {os.path.getsize('Trips_merged.csv'):,} bytes ({os.path.getsize('Trips_merged.csv')/(1024**3):.2f} GB)")
print(f"File size of pipe-separated text file (|) in gz format: {'==>':<5} {os.path.getsize('output_file_gz_format.csv.gz'):,} bytes ({os.path.getsize('output_file_gz_format.csv.gz')/(1024**3):.2f} GB)")

Summary:
Total rows: ==>   33,356,927
Total columns: ==>   12
Original File size: ==>   4,430,497,816 bytes (4.13 GB)
File size of pipe-separated text file (|) in gz format: ==>   856,806,404 bytes (0.80 GB)


# Dataset

I found the dataset(4GB) on Kaggle's site [Cyclistic dataset (clean & merge)](https://www.kaggle.com/datasets/algorismus/cyclistic-dataset-clean-merge/data)