In [2]:
pwd

'/Users/ponte/Data Wrangling'

In [2]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(ddf, table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    ddf.columns = ddf.columns.str.lower()
    ddf.columns = ddf.columns.str.replace('[^\w]', '_', regex=True)
    ddf.columns = list(map(lambda x: x.strip('_'), list(ddf.columns)))
    ddf.columns = list(map(lambda x: replacer(x, '_'), list(ddf.columns)))
    expected_col = list(map(lambda x: x.lower(), table_config['columns']))
    expected_col.sort()
    ddf.columns = list(map(lambda x: x.lower(), list(ddf.columns)))
    ddf = ddf[expected_col]
    if len(ddf.columns) == len(expected_col) and list(expected_col) == list(ddf.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(ddf.columns).difference(expected_col))
        print("Following File columns are not in the YAML file", mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(ddf.columns))
        print("Following YAML columns are not in the file uploaded", missing_YAML_file)
        logging.info(f'ddf columns: {ddf.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0


Overwriting testutility.py


## **Write YAML file**

In [52]:

%%writefile file.yaml
file_type: csv
dataset_name: en-fr
file_name: en-fr
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - changing_lives_changing_society_how_it_works_technology_drives_change_home_concepts_teachers_search_overview_credits_hhcc_web_reference_feedback_virtual_museum_of_canada_home_page

Overwriting file.yaml


In [53]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")
config_data['inbound_delimiter']

','

In [54]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'en-fr',
 'file_name': 'en-fr',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['changing_lives_changing_society_how_it_works_technology_drives_change_home_concepts_teachers_search_overview_credits_hhcc_web_reference_feedback_virtual_museum_of_canada_home_page']}

In [43]:
# Normal reading process of the file
import pandas as pd
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.max_colwidth', None)  # Show full width of each column
df_sample = pd.read_csv("en-fr.csv",delimiter=',')
df_sample.head()

Unnamed: 0,en,fr
0,Changing Lives | Changing Society | How It Works | Technology Drives Change Home | Concepts | Teachers | Search | Overview | Credits | HHCC Web | Reference | Feedback Virtual Museum of Canada Home Page,"Il a transformé notre vie | Il a transformé la société | Son fonctionnement | La technologie, moteur du changement Accueil | Concepts | Enseignants | Recherche | Aperçu | Collaborateurs | Web HHCC | Ressources | Commentaires Musée virtuel du Canada"
1,Site map,Plan du site
2,Feedback,Rétroaction
3,Credits,Crédits
4,Français,English


In [7]:
import time
import pandas as pd
# Start the timer
start_time = time.time()

# Import the CSV file using pandas
df_sample = pd.read_csv("en-fr.csv")

# End the timer
end_time = time.time()

# Calculate the time taken
time_taken = end_time - start_time

# Print the time taken
print("Time taken to import CSV file:", time_taken, "seconds")

Time taken to import CSV file: 1298.7667400836945 seconds


In [45]:
import dask.dataframe as dd

# Read the CSV file using Dask
ddf_sample = dd.read_csv("en-fr.csv", skiprows=1,delimiter=',')

# Display the first few rows of the Dask DataFrame
print(ddf_sample.head(20))


   Changing Lives | Changing Society | How It Works | Technology Drives Change Home | Concepts | Teachers | Search | Overview | Credits | HHCC Web | Reference | Feedback Virtual Museum of Canada Home Page  \
0                                                                                                                                                                                                   Site map   
1                                                                                                                                                                                                   Feedback   
2                                                                                                                                                                                                    Credits   
3                                                                                                                                                                       

In [28]:
import dask.dataframe as dd

# Read the CSV file using Dask
ddf_sample = dd.read_csv("en-fr.csv", skiprows=1)

# Display the first few rows of the Dask DataFrame
print(ddf_sample.tail(20))

       Changing Lives | Changing Society | How It Works | Technology Drives Change Home | Concepts | Teachers | Search | Overview | Credits | HHCC Web | Reference | Feedback Virtual Museum of Canada Home Page  \
150294  "India participated in the extensive discussio...                                                                                                                                                          
150295                 The following is the view of India                                                                                                                                                          
150296  Rule 1: Objective and Competence of the Confer...                                                                                                                                                          
150297  Albania, Algeria, Andorra, Angola, Antigua and...                                                                                               

In [5]:
import dask.dataframe as dd
import time

# Record the start time
start_time = time.time()

# Read the CSV file using Dask
ddf_sample = dd.read_csv("en-fr.csv")

# Trigger computation by computing the number of rows
num_rows = len(ddf_sample)

# Record the end time
end_time = time.time()

# Calculate the time taken
time_taken = end_time - start_time

# Display the number of rows and the time taken
print("Number of rows:", num_rows)
print("Time taken to read the file:", time_taken, "seconds")


Number of rows: 22520376
Time taken to read the file: 91.2523078918457 seconds


In [7]:
import ray
import pandas as pd
import time

# Start Ray
ray.init()

# Define a function to read the CSV file
@ray.remote
def read_csv(filename):
    return pd.read_csv(filename)

# Record the start time
start_time = time.time()

# Read the CSV file using Ray
# Note: Replace 'en-fr.csv' with your actual file path
result_id = read_csv.remote('en-fr.csv')
df_sample = ray.get(result_id)

# Record the end time
end_time = time.time()

# Calculate the time taken
time_taken = end_time - start_time

# Display the time taken
print("Time taken to read the file:", time_taken, "seconds")





2024-04-09 18:38:48,063	INFO worker.py:1752 -- Started a local Ray instance.


KeyboardInterrupt: 

In [None]:
import modin.pandas as pd
import time

# Record the start time
start_time = time.time()

# Read the CSV file using Modin
df_sample = pd.read_csv("en-fr.csv")

# Record the end time
end_time = time.time()

# Calculate the time taken
time_taken = end_time - start_time

# Display the time taken
print("Time taken to read the file:", time_taken, "seconds")


2024-04-09 20:39:30,990	INFO worker.py:1752 -- Started a local Ray instance.


In [20]:
print(type(ddf_sample))


<class 'dask.dataframe.core.DataFrame'>


In [55]:
util.col_header_val(ddf_sample, config_data)


column name and column length validation passed


1

In [56]:
print("columns of files are:" ,ddf_sample.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['changing_lives_changing_society_how_it_works_technology_drives_change_home_concepts_teachers_search_overview_credits_hhcc_web_reference_feedback_virtual_museum_of_canada_home_page', 'il_a_transformé_notre_vie_il_a_transformé_la_société_son_fonctionnement_la_technologie_moteur_du_changement_accueil_concepts_enseignants_recherche_aperçu_collaborateurs_web_hhcc_ressources_commentaires_musée_virtuel_du_canada'], dtype='object')
columns of YAML are: ['changing_lives_changing_society_how_it_works_technology_drives_change_home_concepts_teachers_search_overview_credits_hhcc_web_reference_feedback_virtual_museum_of_canada_home_page']


In [27]:
# Write the Dask DataFrame to a pipe-separated text file in gzip format
ddf_sample.to_csv("output_file.csv.gz", sep='|', compression='gzip', index=False)


['/Users/ponte/Data Wrangling/output_file.csv.gz/000.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/001.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/002.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/003.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/004.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/005.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/006.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/007.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/008.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/009.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/010.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/011.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/012.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/013.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/014.part',
 '/Users/ponte/Data Wrangling/output_file.csv.gz/015.part',
 '/Users/ponte/Data Wrangling/output_fil

In [44]:
import shutil
import glob

# Specify the path to the directory containing the part files
part_files_directory = 'output_file.csv.gz'  

# Specify the output path for the combined gzipped file
output_gz_file = 'combined_file.gz'  

# Use glob to get a list of all part files in the directory
part_files = glob.glob(part_files_directory + '/*.part*')

# Open the output gzipped file in binary write mode
with open(output_gz_file, 'wb') as output_file:
    # Iterate over each part file
    for part_file in part_files:
        # Open each part file in binary read mode
        with open(part_file, 'rb') as input_file:
            # Copy the contents of the part file to the output gzipped file
            shutil.copyfileobj(input_file, output_file)

print(f"Combined gzipped file saved to: {output_gz_file}")



Combined gzipped file saved to: combined_file.gz


In [49]:
import os

gz_file_path = "combined_file.gz"  

if os.path.isfile(gz_file_path):
    # Get the size of the gzipped file
    file_size = os.path.getsize(gz_file_path)

    # Convert the file size to gigabytes (GB)
    file_size_gb = file_size / (1024 * 1024 * 1024)

    # Report the file size
    print(f"File size of the gzipped file: {file_size_gb:.2f} GB")
else:
    print(f"The specified path '{gz_file_path}' is not a file.")


File size of the gzipped file: 2.48 GB


In [1]:
import pandas as pd

# Specify the path to the gzipped file
gz_file_path = 'combined_file.gz'  

# Read the CSV file with pipe (|) delimiter
df = pd.read_csv("combined_file.gz", sep='|', compression='gzip')

# Determine the number of rows and columns in the DataFrame
num_rows, num_columns = df.shape

# Print the number of rows and columns
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")


Number of rows: 22520505
Number of columns: 2
