<a href="https://colab.research.google.com/github/syedahmadsohail96/File-ingestion-and-schema-validation/blob/main/DataGlacier_Week6_SyedAhmad.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
import time

In [2]:
from google.colab import drive
drive.mount("/content/driveG")

Mounted at /content/driveG


In [3]:
import dask.dataframe as dd


# Measure the time taken to read the CSV file with Dask
start_time = time.time()
df = dd.read_csv('/content/driveG/MyDrive/2019-Oct.csv')
end_time = time.time()

print(f"Time taken to read file with Dask: {end_time - start_time} seconds")

Time taken to read file with Dask: 0.05585050582885742 seconds


In [14]:
import ray
import pandas as pd
import time

# Initialize Ray
ray.shutdown()
ray.init()

# Define a function to read CSV file with pandas
@ray.remote
def read_csv_with_pandas(file_path):
    df = pd.read_csv(file_path)
    return df

# Measure the time taken to read the CSV file with pandas
start_time = time.time()
csv_file_path = "/content/drive/MyDrive/2019-Oct.csv"
df_id = read_csv_with_pandas.remote(csv_file_path)
df = ray.get(df_id)
end_time = time.time()

print(f"Time taken with pandas: {end_time - start_time:.4f} seconds")
print(f"Number of rows: {len(df)}")



2023-03-13 00:00:56,737	INFO worker.py:1544 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


Time taken with pandas: 114.0412 seconds
Number of rows: 42448764


In [8]:

# Measure the time taken to read the CSV file with pandas
start_time = time.time()
df = pd.read_csv('/content/driveG/MyDrive/2019-Oct.csv')
end_time = time.time()

print(f'Time taken to read CSV with pandas: {end_time - start_time} seconds')

Time taken to read CSV with pandas: 83.03061485290527 seconds


# Considering the three methods computational efficiency analyzed with time, the fastest method was with Dask.

In [4]:
# get the number of columns
num_cols = len(df.columns)
print(f"Number of columns: {num_cols}")

# get the names of columns
col_names = df.columns.tolist()
print(f"Column names: {col_names}")

Number of columns: 9
Column names: ['event_time', 'event_type', 'product_id', 'category_id', 'category_code', 'brand', 'price', 'user_id', 'user_session']


In [5]:
# remove special characters and white spaces from column names
df.columns = df.columns.str.replace('[^a-zA-Z0-9]+', '_', regex=True)
df.columns = df.columns.str.strip()

# print the updated column names
print(df.columns)

Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')


In [6]:
%%writefile file.yaml
file_type: csv
dataset_name: 2019-Oct
file_name: 2019-Oct
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
  - event_time
  - event_type
  - product_id
  - category_id
  - category_code
  - brand
  - price
  - user_id
  - user_session

Overwriting file.yaml


In [7]:
import yaml

# read the schema.yml file
with open('/content/file.yaml', 'r') as file:
    schema = yaml.safe_load(file)

# retrieve the column names
columns = schema['columns']

# print the column names
print(columns)

['event_time', 'event_type', 'product_id', 'category_id', 'category_code', 'brand', 'price', 'user_id', 'user_session']


In [8]:

# Load the YAML file that defines the expected schema
with open('/content/file.yaml', 'r') as file:
    schema = yaml.safe_load(file)

# Load the input file into a DataFrame
df = pd.read_csv('/content/driveG/MyDrive/2019-Oct.csv', sep=schema['inbound_delimiter'])

# Check the number of columns
if len(df.columns) != len(schema['columns']):
    raise ValueError(f"Expected {len(schema['columns'])} columns, but found {len(df.columns)}")

# Check the column names
if set(df.columns) != set(schema['columns']):
    raise ValueError("Column names do not match the expected schema")
else:
  print("All Column names match")

All Column names match


In [10]:
import gzip


# define the output file name
output_file = 'output_DF_GZ.txt.gz'

# write the DataFrame to a gzipped pipe-separated text file
with gzip.open(output_file, 'wt', encoding='utf-8') as f:
    df.to_csv(f, sep='|', index=False)

In [None]:
import gzip

# specify the input file path
input_file = '/content/output_DF_GZ.txt.gz'

# read in the compressed file as a pandas DataFrame
with gzip.open(input_file, 'rt') as f:
    df = pd.read_csv(f, sep='|')

# calculate the mean values of each column
mean_values = df.mean()

# print the mean values of each column
print(mean_values)