<a href="https://colab.research.google.com/github/mussb00/data-ingestion-pipeline/blob/main/data_ingestion_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -U -q Kaggle
!mkdir -p ~/.kaggle
!echo '{"username":"mussieberhane","key":"f3f6c8621a8237750399d4f577288380"}' > ~/.kaggle/kaggle.json
!chmod 600 ~/.kaggle/kaggle.json
!kaggle competitions download -c wikichallenge

In [None]:
!unzip /content/wikichallenge.zip -d /content

In [None]:
!pip install py7zr

In [None]:
import py7zr
with py7zr.SevenZipFile('/content/wikichallenge_data_all.7z', mode='r') as z:
    z.extractall('/content/large_file')

In [None]:
from pathlib import Path
sz=Path('/content/large_file/comments.tsv').stat().st_size
print(sz) # size in bytes



> __Pandas will struggle to read large datasets__



In [None]:
import pandas as pd
import time

# reading without using chunks

s_time=time.time()
df= pd.read_csv('/content/hiiii/comments.tsv', sep='\t')
e_time=time.time()

print("Read without chunks: ", (e_time-s_time), "seconds")

Chunking is the process of reading a large dataset in smaller, more manageablechunks and processing these chunks. This means the memory requirements to read a dataset are lower. Thus, it improves the speed of reading data since by not exceeding the physical memory limits, the computer then does not have to access data using virtual memory (disk storage) which is slower.


In [None]:
# reading with chunks
s_time_chunk=time.time()
chunk= pd.read_csv('/content/large_file/comments.tsv', sep='\t', chunksize=1000)
e_time_chunk=time.time()

print("Read with chunks: ", (e_time_chunk-s_time_chunk), "seconds")
df=pd.concat(chunk)
df.sample(10)

Dask is a parallel-computing library. This means it distributes processing tasks across mutliple CPU cores on a single host machine and then combines the results. This leads to massive improvements in performance on computational tasks for large datasets.

In [None]:
!pip install dask

In [None]:
from dask import dataframe as df1

# read with dask
s_time_dask=time.time()
dask_df=df1.read_csv('/content/large_file/comments.tsv', sep='\t')
e_time_dask=time.time()

print("Read with dask: ", (e_time_dask-s_time_dask), "seconds")

In [None]:
dask_df.head(10)

In [None]:
dask_df['comment'].dtype

In [None]:
## https://www.geeksforgeeks.org/working-with-large-csv-files-in-python/ 
## https://pythonspeed.com/articles/faster-pandas-dask/#:~:text=When%20data%20doesn't%20fit,can%20also%20become%20a%20bottleneck.
## this article explains how exactly dask works

In [None]:
import os

In [None]:
%%writefile testutility.py 
import logging
import os
import yaml
import pandas as pd
import datetime
import re

def read_config_file(filepath):
  with open(filepath, 'r') as stream:
    try:
      return yaml.safe_load(stream)
    except yaml.YAMLError as exc:
      logging.error(exc)

def replacer(string, char):
  pattern=char+'{2,}'
  string=re.sub(pattern, char, string)
  return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

def col_number_val(df, table_config):
    '''
    Check the number of columns coincides coincides with YAML file
    '''
    if len(df.columns)==len(table_config['columns']):
        print('column length validation passed')
        return 1

def column_data_type_validator(df, table_config):
    '''
    Check data is in the right format
    '''
    if df['revision_id'].dtype == table_config['datatype']['revision_id'] and df['comments'].dtype==table_config['datatype']['comments']:
        print('datatype matches configuration')
        return 0
    else:
      print('at least one of the column data types are incorrect')


In [None]:
!pip install pyyaml

In [None]:
## Write YAML file

%%writefile file.yaml
file_type: tsv
file_name: comments
inbound_delimeter: "\t"
outbound_delimeter: "|"
columns:
  - revision_id
  - comments
datatype:
  revision_id: int64 # integer
  comments: O # string/pandas object

In [None]:
import testutility as util

config_data=util.read_config_file("file.yaml")


In [None]:
util.col_header_val(dask_df, config_data)

In [None]:
# write data to pipe separated text file in .gz format using pandas

df.to_csv('textfile.txt', header=None, index=None, sep='|', mode='w', compression={'method': 'gzip', 'compresslevel': 1, 'mtime': 1})