#### Description

In [1]:
print('Author: Leo Pauly (cnlp@leeds.ac.uk) & Nick Wilson (n.wilson@lubs.leeds.ac.uk)')
print('Description: Autmatic database update')

Author: Leo Pauly (cnlp@leeds.ac.uk) & Nick Wilson (n.wilson@lubs.leeds.ac.uk)
Description: Autmatic database update


#### Usage intructions & Other info

1. Change/Add User (user variable options: 'leo','nick')
2. Assumed that new entry directory names always start with UKLTD_R_ or UKLTD_W_
3. File processed_dir_list_CI04.txt contains the processed directory list
4. Prefefably install python using anacodna (all in one installation): https://www.anaconda.com/products/individual#windows
5. Run this if 'pyarrow' module is missing':  !pip install pyarrow 
6. Check base_dir variable
7. Databases are stored in the directory (Create one if missing) : UKLTD_Database 
8. Run this if 'patool' module is missing':  !pip install patool pyunpack
9. Delete UKLTD_Database/CI04* directory and processed_dir_list_CI04.txt file (in UKLTD_Scripts dir) when running for the very first time

In [2]:
#!pip install pyarrow 
#!pip install patool pyunpack
#print('Extra modules installed')

#### Imports

In [3]:
import os
import sys
import pandas as pd
import numpy as np
import pyreadstat
from zipfile import ZipFile
import pyunpack
import multiprocessing
import dask.dataframe as dd
import time
import dask
from dask.diagnostics import ProgressBar
from dask.distributed import Client
dask.config.set(scheduler='threads')

print('Python version:',sys.version)
num_processes = multiprocessing.cpu_count()
print('No: of logical CPU cores available:',num_processes)

CI04_header_dtypes={'CI04': 'str', 'REGNUM': 'str', 'SIC03': 'str', 'SIC07': 'str', 'UPLOAD': 'str'}
CI04_header_names=list(CI04_header_dtypes.keys())

Python version: 3.8.5 (default, Sep  3 2020, 21:29:08) [MSC v.1916 64 bit (AMD64)]
No: of logical CPU cores available: 8


In [4]:
## Selectting user and adding filepaths
user='leo' #(user variable options: 'leo','nick')
if(user=='leo'):
    base_dir='C:/Users/cnlp/Research Fellowship/'
elif(user=='nick'):
    base_dir='/Volumes/Pegasus32 R6/CreditSafe 2019 Zipped/'

os.makedirs(base_dir+'UKLTD_Database', exist_ok=True)
dir_list_file=base_dir+'/UKLTD_Scripts/processed_dir_list_CI04.txt'
database_file_folder=base_dir+'UKLTD_Database/CI04/'

#### Checking new entries

In [5]:
## Checking for new download
with open(dir_list_file, 'a+') as fd:
    fd.seek(0)
    dir_list_old=fd.read().split('\n')

print('List of processed directores:',*dir_list_old,sep='\n')
dir_list_new=[dir_name for dir_name in os.listdir(base_dir) if (dir_name.startswith("UKLTD_W") or dir_name.startswith("UKLTD_R"))]
entry_dir_list=[entry_dir for entry_dir in dir_list_new if entry_dir not in dir_list_old ]
print('\nNew entries detected:',*entry_dir_list,sep='\n')

List of processed directores:


New entries detected:
UKLTD_W_20190602
UKLTD_W_20190609
UKLTD_W_20190616
UKLTD_W_7


#### Creating/Loading database 

In [6]:
## If databse is in .csv format 
if (os.path.exists(database_file_folder)):
    print('Database found.','Reading database! \n') 
    start = time.process_time()
    with ProgressBar():
        df_database = dd.read_parquet(database_file_folder) 
    print('Time taken to read database {}:'.format(database_file_folder),time.process_time() - start,'s')
    database_missing=False
else:
    print('Database not found.','Creating database! \n')
    database_missing=True
    


Database not found. Creating database! 



#### Updating database

Rules:

1. Concatenate new entries to the existig database
2. Run de_duplicating fucntion

In [7]:
def update_database(entry_dir): 
    global df_database
    global database_missing
    
    
    ## Checking and unzipping new entry file
    entry_file="CI04_"+(entry_dir.split('_')[-2])+"_"+(entry_dir.split('_')[-1])+".txt"
    entry_file_zip="CI04_"+(entry_dir.split('_')[-2])+"_"+(entry_dir.split('_')[-1])+".rar"
    print('Unzipping:',entry_file_zip)
    try:
        pyunpack.Archive(base_dir+'{}/{}'.format(entry_dir,entry_file_zip)).extractall(base_dir+'{}/'.format(entry_dir))
        print('Entry file unzipped as:',entry_file)
    except:
        print("Rar file not found: {}".format(entry_file_zip))
        print('Skipping this entry:%s'%entry_dir)
        return None
       
    ## Reading new entry file
    start = time.process_time()
    with ProgressBar():
        df_entry_file=dd.read_csv(base_dir+entry_dir+'/'+entry_file,sep='|',names=CI04_header_names,dtype=CI04_header_dtypes,encoding='iso-8859-1')
    print('Time taken to read:',time.process_time() - start,'s')
    df_entry_file['UPLOAD']=entry_file.split('.')[0]
    
    
    ## Adding new entry_rows to main database after checking
    if(database_missing):
        df_database=df_entry_file
        database_missing=False
    else:
        print('Appending...!')
        start = time.process_time()
        with ProgressBar():
            df_database=df_database.append(df_entry_file)
        print('Time taken to append:',time.process_time() - start,'s')  

In [8]:
## Reading from new directory and adding to database
for entry_dir in entry_dir_list:
    print('\nReading from entry dir:',entry_dir)
    update_database(entry_dir)


Reading from entry dir: UKLTD_W_20190602
Unzipping: CI04_W_20190602.rar
Entry file unzipped as: CI04_W_20190602.txt
Time taken to read: 0.015625 s

Reading from entry dir: UKLTD_W_20190609
Unzipping: CI04_W_20190609.rar
Entry file unzipped as: CI04_W_20190609.txt
Time taken to read: 0.015625 s
Appending...!
Time taken to append: 0.0 s

Reading from entry dir: UKLTD_W_20190616
Unzipping: CI04_W_20190616.rar
Entry file unzipped as: CI04_W_20190616.txt
Time taken to read: 0.0 s
Appending...!
Time taken to append: 0.0 s

Reading from entry dir: UKLTD_W_7
Unzipping: CI04_W_7.rar
Rar file not found: CI04_W_7.rar
Skipping this entry:UKLTD_W_7


In [9]:
## Deduplication list 
CI04_header_names_dedup_list=list(np.array(df_database.columns))
CI04_header_names_dedup_list.remove('UPLOAD')
print('Original colomn list:',df_database.columns,'\n',)
print('Colomns to check while de-duplicating:',CI04_header_names_dedup_list)

Original colomn list: Index(['CI04', 'REGNUM', 'SIC03', 'SIC07', 'UPLOAD'], dtype='object') 

Colomns to check while de-duplicating: ['CI04', 'REGNUM', 'SIC03', 'SIC07']


In [10]:
## De duplicating
start = time.process_time()
df_database=df_database.drop_duplicates(subset=CI04_header_names_dedup_list).repartition(npartitions=df_database.npartitions) 
print('Time taken to de duplicate:',time.process_time() - start,'s')

Time taken to de duplicate: 0.0 s


In [11]:
## Reseting index
df_database=df_database.reset_index(drop=True)
print('No: of partitions to be written:',df_database.npartitions)

No: of partitions to be written: 3


#### Writing into disk

In [12]:
## Writing updated databse to file
start = time.process_time()
with ProgressBar():
    df_database.to_parquet(database_file_folder)#,schema="infer")
print('Time taken to write:',time.process_time() - start,'s')

[########################################] | 100% Completed |  0.2s
Time taken to write: 0.203125 s


In [13]:
## Update processed directory list
print('Processed list updated')
with open(dir_list_file, 'w') as fd:
    fd.write('\n'.join(dir_list_old+entry_dir_list))

Processed list updated
