In [7]:
!pip install modin
!pip install ray
!pip install dask

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [10]:
#Download dataset from Kaggle
!pip install -q kaggle
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d hm-land-registry/uk-housing-prices-paid
!unzip uk-housing-prices-paid.zip

mkdir: cannot create directory ‘/root/.kaggle’: File exists
uk-housing-prices-paid.zip: Skipping, found more recently modified local copy (use --force to force download)
Archive:  uk-housing-prices-paid.zip
  inflating: price_paid_records.csv  


# **Read CSV with Pandas, Dask, Modin/Ray**

In [2]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import ray
import time

In [3]:
csvfile='/content/price_paid_records.csv'

In [5]:
#Pandas to read the CSV

start = time.time()
df=pd.read_csv(csvfile)
end = time.time()
print('Time to read the CSV (pandas): ',end - start, 'seconds')

Time to read the CSV (pandas):  55.602303981781006 seconds


In [4]:
#Modin/Ray to read the CSV

start = time.time()
ray.shutdown()
ray.init()
mpd.read_csv(csvfile)
end = time.time()
print('Time to read the CSV (modin/ray): ',end - start, 'seconds')

[2m[36m(deploy_ray_func pid=3340)[0m tcmalloc: large alloc 1202847744 bytes == 0x3724000 @  0x7f08490ae1e7 0x4a3940 0x5b438c 0x5d0ccd 0x5939af 0x516337 0x549576 0x593fce 0x548ae9 0x51566f 0x549576 0x4bca8a 0x5134a6 0x549576 0x4bca8a 0x5134a6 0x4bc98a 0x7f0845d87e02 0x7f0845e1bdb6 0x7f0845d8e306 0x7f0845f026ab 0x7f0845e6388f 0x7f0845f2a7d3 0x7f0845f2b72a 0x7f0845f3d18e 0x7f0845f16530 0x7f084613af06 0x7f08460e7a3e 0x7f08460e7c96 0x7f084657ecab 0x7f084657fee1
[2m[36m(deploy_ray_func pid=3341)[0m tcmalloc: large alloc 1202847744 bytes == 0x42a0000 @  0x7f9e18a1e1e7 0x4a3940 0x5b438c 0x5d0ccd 0x5939af 0x516337 0x549576 0x593fce 0x548ae9 0x51566f 0x549576 0x4bca8a 0x5134a6 0x549576 0x4bca8a 0x5134a6 0x4bc98a 0x7f9e156f7e02 0x7f9e1578bdb6 0x7f9e156fe306 0x7f9e158726ab 0x7f9e157d388f 0x7f9e1589a7d3 0x7f9e1589b72a 0x7f9e158ad18e 0x7f9e15886530 0x7f9e15aaaf06 0x7f9e15a57a3e 0x7f9e15a57c96 0x7f9e15eeecab 0x7f9e15eefee1


Time to read the CSV (modin/ray):  97.11903548240662 seconds


In [5]:
#Dask to read the CSV

start = time.time()
dd.read_csv(csvfile)
end = time.time()
print('Time to read the CSV (dask): ',end - start, 'seconds')

Time to read the CSV (dask):  0.06782817840576172 seconds


**Dask took the least time reading the CSV file!**


# **Clean the Column Names**

In [6]:
# Remove spaces and special chars from the cols

data=dd.read_csv(csvfile)

data.columns=data.columns.str.lower()
data.columns = data.columns.str.replace(' ', '')
data.columns=data.columns.str.replace('[^\w]','_',regex=True)
print(data.columns)

Index(['transactionuniqueidentifier', 'price', 'dateoftransfer',
       'propertytype', 'old_new', 'duration', 'town_city', 'district',
       'county', 'ppdcategorytype', 'recordstatus_monthlyfileonly'],
      dtype='object')


# **Validation**



In [7]:
%%writefile utility.py
import yaml
import logging
import os
import subprocess
import pandas as pd
import re
import gc
import datetime

def read_config_file(filepath):
  with open(filepath, 'r') as stream:
    try:
      return yaml.safe_load(stream)
    except yaml.YAMLerror as exc:
      logging.error(exc)

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df, table_config):
  df.columns=df.columns.str.lower()
  df.columns=df.columns.str.replace('[^\w]','_',regex=True)
  df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
  df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
  expected_col = list(map(lambda x: x.lower(), table_config['columns']))
  expected_col.sort()
  df.columns = list(map(lambda x: x.lower(), list(df.columns)))
  df=df.reindex(sorted(df.columns), axis=1)
  if len(df.columns) == len(expected_col) and list(expected_col) == list(df.columns):
    print("Column name and column length validation passed")
    return 1
  else:
    print("Column name and column length validation failed")
    mismatched_columns_file = list(set(df.columns).difference(expected_col))
    print("Following file columns are not in the YAML file", mismatched_columns_file)
    missing_YAML_file = list(set(expected_col).difference(df.columns))
    print("Following YAML columns are not in the file uploaded", missing_YAML_file)
    logging.info(f'df columns: {df.columns}')
    logging.info(f'expected columns: {expected_col}')
    return 0

Overwriting utility.py


In [8]:
%%writefile store.yaml
file_type: csv
dataset_name: testfile
file_name: price_paid_records
table_name: endsurv
inbound_delimiter: ','
outbound_delimiter: '|'
skip_leading_rows: 1
columns:
  - transaction_unique_identifier
  - price
  - date_of_transfer
  - property_type
  - old_new
  - duration
  - town_city
  - district
  - county
  - ppdcategory_type
  - record_status_monthly_file_only

Overwriting store.yaml


In [9]:
# Read config file

import utility as util
config_data = util.read_config_file("store.yaml")

In [10]:
config_data['inbound_delimiter']

','

In [11]:
#data of the config file
config_data

{'columns': ['transaction_unique_identifier',
  'price',
  'date_of_transfer',
  'property_type',
  'old_new',
  'duration',
  'town_city',
  'district',
  'county',
  'ppdcategory_type',
  'record_status_monthly_file_only'],
 'dataset_name': 'testfile',
 'file_name': 'price_paid_records',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'endsurv'}

In [12]:
#Read the file using config file
file_type = config_data['file_type']
source_file = "/content/" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()



Unnamed: 0,Transaction unique identifier,Price,Date of Transfer,Property Type,Old/New,Duration,Town/City,District,County,PPDCategory Type,Record Status - monthly file only
0,{81B82214-7FBC-4129-9F6B-4956B4A663AD},25000,1995-08-18 00:00,T,N,F,OLDHAM,OLDHAM,GREATER MANCHESTER,A,A
1,{8046EC72-1466-42D6-A753-4956BF7CD8A2},42500,1995-08-09 00:00,S,N,F,GRAYS,THURROCK,THURROCK,A,A
2,{278D581A-5BF3-4FCE-AF62-4956D87691E6},45000,1995-06-30 00:00,T,N,F,HIGHBRIDGE,SEDGEMOOR,SOMERSET,A,A
3,{1D861C06-A416-4865-973C-4956DB12CD12},43150,1995-11-24 00:00,T,N,F,BEDFORD,NORTH BEDFORDSHIRE,BEDFORDSHIRE,A,A
4,{DD8645FD-A815-43A6-A7BA-4956E58F1874},18899,1995-06-23 00:00,S,N,F,WAKEFIELD,LEEDS,WEST YORKSHIRE,A,A


In [13]:
#Validate the Header of the file
util.col_header_val(df,config_data)

Column name and column length validation passed


1

In [14]:
print("Columns of files are:" , df.columns)
print("Columns of YAML are:" , config_data['columns'])

Columns of files are: Index(['transaction_unique_identifier', 'price', 'date_of_transfer',
       'property_type', 'old_new', 'duration', 'town_city', 'district',
       'county', 'ppdcategory_type', 'record_status_monthly_file_only'],
      dtype='object')
Columns of YAML are: ['transaction_unique_identifier', 'price', 'date_of_transfer', 'property_type', 'old_new', 'duration', 'town_city', 'district', 'county', 'ppdcategory_type', 'record_status_monthly_file_only']


In [15]:
if util.col_header_val(df,config_data)==0:
    print("Validation failed")
    print("Columns of the file does not match the YAML")
else:
    print("Column validation passed")
    print('Preview of the data\n', df.head())

Column name and column length validation passed
Column validation passed
Preview of the data
             transaction_unique_identifier  price  date_of_transfer  \
0  {81B82214-7FBC-4129-9F6B-4956B4A663AD}  25000  1995-08-18 00:00   
1  {8046EC72-1466-42D6-A753-4956BF7CD8A2}  42500  1995-08-09 00:00   
2  {278D581A-5BF3-4FCE-AF62-4956D87691E6}  45000  1995-06-30 00:00   
3  {1D861C06-A416-4865-973C-4956DB12CD12}  43150  1995-11-24 00:00   
4  {DD8645FD-A815-43A6-A7BA-4956E58F1874}  18899  1995-06-23 00:00   

  property_type old_new duration   town_city            district  \
0             T       N        F      OLDHAM              OLDHAM   
1             S       N        F       GRAYS            THURROCK   
2             T       N        F  HIGHBRIDGE           SEDGEMOOR   
3             T       N        F     BEDFORD  NORTH BEDFORDSHIRE   
4             S       N        F   WAKEFIELD               LEEDS   

               county ppdcategory_type record_status_monthly_file_only  
0  

# **Save pipe separated file as .gz**

In [17]:
#csv to gz

import gzip
import csv

df.to_csv('dfgz.gz',
      sep='|',
      header=True,
      index=False,
      quoting=csv.QUOTE_ALL,
      compression='gzip',
      quotechar='"',
      doublequote=True,
      line_terminator='\n')

# **Summary of the File**

In [17]:
#Size of the CSV file

import os
print(str(os.path.getsize(csvfile)))

2405685902


In [18]:
#Number of rows/cols
print('Number of Rows: ',len(df.index))
print('Number of columns:', len(df.columns))

Number of Rows:  22489348
Number of columns: 11
