In [1]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [2]:
% cd '/content/gdrive/My Drive/Datasets/'

/content/gdrive/My Drive/Datasets


In [3]:
!pip install "dask[dataframe]"

Collecting fsspec>=0.6.0; extra == "dataframe"
[?25l  Downloading https://files.pythonhosted.org/packages/62/11/f7689b996f85e45f718745c899f6747ee5edb4878cadac0a41ab146828fa/fsspec-0.9.0-py3-none-any.whl (107kB)
[K     |████████████████████████████████| 112kB 10.3MB/s 
[?25hCollecting partd>=0.3.10; extra == "dataframe"
  Downloading https://files.pythonhosted.org/packages/41/94/360258a68b55f47859d72b2d0b2b3cfe0ca4fbbcb81b78812bd00ae86b7c/partd-1.2.0-py3-none-any.whl
Collecting locket
  Downloading https://files.pythonhosted.org/packages/50/b8/e789e45b9b9c2db75e9d9e6ceb022c8d1d7e49b2c085ce8c05600f90a96b/locket-0.2.1-py2.py3-none-any.whl
Installing collected packages: fsspec, locket, partd
Successfully installed fsspec-0.9.0 locket-0.2.1 partd-1.2.0


In [4]:
! pip install pyaml

Collecting pyaml
  Downloading https://files.pythonhosted.org/packages/15/c4/1310a054d33abc318426a956e7d6df0df76a6ddfa9c66f6310274fb75d42/pyaml-20.4.0-py2.py3-none-any.whl
Installing collected packages: pyaml
Successfully installed pyaml-20.4.0


In [5]:
from pyaml import yaml
import dask.dataframe as dd

In [6]:
with open('config.yaml', 'r') as f:
  config_file = yaml.safe_load(f)

print(config_file)

{'File format': 'tsv', 'Separator': '\t', 'Columns': ['Language', 'Source', 'Date', 'Text'], 'Number of columns': 4, 'Max number of rows admitted': 20000000, 'Max size admitted': 10}


In [7]:
def Pipeline(file_name: str) -> str:
  # 1. read the file using the YAML configuration file
  file_format = config_file['File format']
  data = dd.read_csv(f'{file_name}.{file_format}', sep = config_file['Separator'])

  # 2. validate columns
  if len(config_file['Columns']) == len(data.columns) and list(config_file['Columns']) == list(data.columns):
    control_1 = 1
  else:
    control_1 = 0

  # 3. validate number of rows
  if len(data.iloc[:,0]) < config_file['Max number of rows admitted']:
    control_2 = 1
  else:
    control_2 = 0
  
  # 4. validate size
  import os
  if os.path.getsize(f'{file_name}.{file_format}') < config_file['Max size admitted'] * 1e9:
    control_3 = 1
  else:
    control_3 = 0

  if control_1 + control_2 + control_3 == 3:
    return print('The file has passed the validation and can be compressed')
  else:
    print('The validation failed')

def summary(file_name):

  file_format = config_file['File format']
  data = dd.read_csv(f'{file_name}.{file_format}', sep = config_file['Separator'])
  
  num_of_cols = len(data.columns)
  num_of_rows = len(data.iloc[:,0])
  import os
  file_size = os.path.getsize(f'{file_name}.{file_format}')

  return print(f'The file entered was {file_name}. It has {num_of_cols} columns and {num_of_rows} entries. It weighs {file_size / 1e9} GB.')

def compression(file_name, new_file_name, format):

  import csv

  file_format = config_file['File format']
  with open(f'{file_name}.{file_format}', 'r') as input_f:
    csv_reader = csv.reader(input_f, delimiter = config_file['Separator'])

    with open(f'{new_file_name}.{format}', 'w') as output_f:
      csv_writer = csv.writer(output_f, delimiter = '|')

      for line in csv_reader:
        csv_writer.writerow(line)
      
  import gzip
  import shutil

  with open(f'{new_file_name}.{format}', 'rb') as input_compressed:
    with gzip.open(f'{new_file_name}.gz', 'wb') as output_compressed:
      shutil.copyfileobj(input_compressed, output_compressed)

In [8]:
Pipeline("old-newspaper")

The file has passed the validation and can be compressed


In [12]:
summary('old-newspaper')

The file entered was old-newspaper. It has 4 columns and 16806041 entries. It weighs 6.024697599 GB.


In [None]:
# it works, although it takes longer as the file size increases
# in this case it's a 6 GB file
#compression('old_newspaper', 'old-newspaper-compressed', 'csv')