# ETL Framework (Telecom Recommender Crawler)

---

## Description

Extract Transform Load script for excel spreadsheets (the most popular spreadsheet format) to Oracle database with capability to work with compressed files.

### Capabilites:

__Works with:__

- `.xls`, `.xlsx` files
- `.rar` archives
- `.zip` archives
- only `oracle` databases

__Can:__

+ select multiple sources for one table, as long as the sources have identical formatting (number and datatype of columns, start row number)
+ select specific sheet from each url source (best used if url source has only one spreadsheet)
+ encode specific NVAR columns into _unicode literals_ for UNISTR() function enabling safe transport of non-ascii characters to database (eg: kazakh letters)
+ __append__ data to database

__Can't:__

- __purge__ data from database
- select specific file in archive, nor specific sheet in it
- delete last rows

## 1. Initialize Hierarchy

In [2]:
import os
import sys
import requests
from bs4 import BeautifulSoup
import re
import zipfile
import rarfile
import urllib
import logging
import logging.config
from utils import make_log_dir, print_hierarchy, export_hierarchy
from utils import save_sources_config, read_sources_config
import pandas as pd
import numpy as np
from dbfill import DbFill
from time import time
from datetime import timedelta

def init_main():
    global root_path
    global logger
    
    
    # set root_path
    root_path = os.path.abspath(os.curdir)
    
    # set logging config path, you may use the template too
    log_config_file = os.path.join('conf', 'logging.conf')
    
    if os.path.exists(log_config_file):
        # create logger
        make_log_dir(log_config_file)
        logging.config.fileConfig(log_config_file)
        logger = logging.getLogger('crawler')
    else:
        # create a backup logger
        logger = logging.getLogger('crawler')
        logger.setLevel(logging.DEBUG)

        # create console handler and set level to debug
        ch = logging.StreamHandler(stream=sys.stdout)
        ch.setLevel(logging.DEBUG)

        # create formatter
        formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')

        # add formatter to ch
        ch.setFormatter(formatter)

        # add ch to logger
        logger.addHandler(ch)
        
init_main()

In [2]:
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')

2018-06-01 16:32:50,236 crawler      INFO     info message
2018-06-01 16:32:50,237 crawler      ERROR    error message
2018-06-01 16:32:50,238 crawler      CRITICAL critical message


### 1.1.1 STATGOV_COMPANIES

Download list of all taxpaying companies from stat.gov.kz

In [3]:
# 1. Define tables
table_names = ['T_STATGOV_COMPANIES']

structures = [["BIN",
              "Full_Name_Kz",
              "Full_Name_Ru",
              "Registration_Date",
              "OKED_1",
              "Activity_Kz",
              "Activity_Ru",
              "OKED_2",
              "KRP",
              "KRP_Name_Kz",
              "KRP_Name_Ru",
              "KATO",
              "Settlement_Kz",
              "Settlement_Ru",
              "Legal_address",
              "Head_FIO"]
            ]

index_cols = ["Full_Name_Ru"]

# 2. Define urls
root_url = "http://stat.gov.kz"

def list_urls_statgov_companies():
    # get list of urls from statgov businesses registry
    registry_url = "http://stat.gov.kz/faces/publicationsPage/publicationsOper/homeNumbersBusinessRegisters/homeNumbersBusinessRegistersReestr"
    html = requests.get(registry_url).text
    soup = BeautifulSoup(html, 'lxml')
    urls = soup.find("div", attrs={"id": "pt1:r1:0:j_id__ctru0pc3:pgl4"}).find_all("a", href=re.compile("ESTAT"))
    urls = [url.get('href') for url in urls]
        
    return urls

urls = [[root_url+sub_url for sub_url in list_urls_statgov_companies()]]

# 3. Filter
sheets    = [[None]*len(urls[0]),]
skip_rows = [[4]*len(urls[0]),]
last_rows = [[None]*len(urls[0]),]

# 4. Define temporary destination
store = 'data'

# 5. Aggregate
statgov_comp_hierarchy = {tname: {} for tname in table_names}
for i, name in enumerate(table_names):
    statgov_comp_hierarchy[table_names[i]] = {"structure": structures[i],
                                              "index_col": index_cols[i],
                                              "urls": urls[i],
                                              "sheet": sheets[i],
                                              "skip_row": skip_rows[i],
                                              "last_row": last_rows[i],
                                              "path": [],
                                              "store": os.path.join(root_path, store, name)
                                             }

### 1.1.2 KGDGOV BAD TAXPAYERS

List all companies and persons with bad tax records and violations from kgd.gov.kz

In [4]:
# 1. Define tables
table_names = ["T_KGDGOV_PSEUDO_COMPANY",
               "T_KGDGOV_WRONG_ADDRESS",
               "T_KGDGOV_BANKRUPT",
               "T_KGDGOV_INACTIVE",
               "T_KGDGOV_INVALID_REGISTRATION",
               "T_KGDGOV_VIOLATION_TAX_CODE",
               "T_KGDGOV_TAX_ARREARS_150",          # MRP > 150
               "T_KGDGOV_TAX_ARREARS_10"]           # MRP > 10

structures = [
    ["Num", "BIN", "RNN", "taxpayer_organization", "taxpayer_name", "owner_name", "owner_IIN", "owner_RNN",
     "court_decision", "illegal_activity_start_date"],
    ["Num", "BIN", "RNN", "taxpayer_organization", "taxpayer_name", "owner_name", "owner_IIN", "owner_RNN",
     "inspection_act_no", "inspection_date"],
    ["Num", "BIN", "RNN", "taxpayer_organization", "taxpayer_name", "owner_name", "owner_IIN", "owner_RNN",
     "court_decision", "court_decision_date"],
    ["Num", "BIN", "RNN", "taxpayer_organization", "taxpayer_name", "owner_name", "owner_IIN", "owner_RNN",
     "order_no", "order_date"],
    ["Num", "BIN", "RNN", "taxpayer_organization", "taxpayer_name", "owner_name", "owner_IIN", "owner_RNN",
     "court_decision_no", "court_decision_date"],
    ["Num", "BIN", "RNN", "taxpayer_organization", "owner_name", "owner_IIN", "owner_RNN",
     "order_no", "order_date", "violation_type"],
    ["Num", "region", "office_of_tax_enforcement", "OTE_ID", "BIN", "RNN", "taxpayer_organization_ru",
     "taxpayer_organization_kz", "last_name_kz", "first_name_kz", "middle_name_kz", "last_name_ru", "first_name_ru",
     "middle_name_ru", "owner_IIN", "owner_RNN", "owner_name_kz", "owner_name_ru", "economic_sector", "total_due",
     "sub_total_main", "sub_total_late_fee", "sub_total_fine"],
    ["Num", "region", "office_of_tax_enforcement", "OTE_ID", "BIN", "RNN", "taxpayer_organization_ru",
     "taxpayer_organization_kz", "last_name_kz", "first_name_kz", "middle_name_kz", "last_name_ru", "first_name_ru",
     "middle_name_ru", "owner_IIN", "owner_RNN", "owner_name_kz", "owner_name_ru", "economic_sector", "total_due",
     "sub_total_main", "sub_total_late_fee", "sub_total_fine"]
]

index_cols = ['Num']*8

# 2. Define urls
root_url = "http://kgd.gov.kz/mobile_api/services/taxpayers_unreliable_exportexcel"
sub_urls = ["/PSEUDO_COMPANY/KZ_ALL/fileName/list_PSEUDO_COMPANY_KZ_ALL.xlsx",              # Список налогоплательщиков, признанных лжепредприятиями
            "/WRONG_ADDRESS/KZ_ALL/fileName/list_WRONG_ADDRESS_KZ_ALL.xlsx",                # Список налогоплательщиков, отсутствующих по юридическому адресу
            "/BANKRUPT/KZ_ALL/fileName/list_BANKRUPT_KZ_ALL.xlsx",                          # Список налогоплательщиков, признанных банкротами
            "/INACTIVE/KZ_ALL/fileName/list_INACTIVE_KZ_ALL.xlsx",                          # Список налогоплательщиков, признанных бездействующими
            "/INVALID_REGISTRATION/KZ_ALL/fileName/list_INVALID_REGISTRATION_KZ_ALL.xlsx",  # Список налогоплательщиков, регистрация которых признана недействительной
            "/VIOLATION_TAX_CODE/KZ_ALL/fileName/list_VIOLATION_TAX_CODE_KZ_ALL.xlsx",      # Список налогоплательщиков, реорганизованных с нарушением норм Налогового кодекса
            "/TAX_ARREARS_150/KZ_ALL/fileName/list_TAX_ARREARS_150_KZ_ALL.xlsx",            # 1 Список налогоплательщиков, имеющих налоговую задолженность
            "/TAX_ARREARS_150/KZ_ALL/fileName/list_TAX_ARREARS_150_KZ_ALL.xlsx"]            # 2 Список налогоплательщиков, имеющих налоговую задолженность
urls = [[root_url+sub_url] for sub_url in sub_urls]

# 3. Filter
sheets    = [[None]]*6
sheets.extend([[0,],[1,]])
skip_rows = [[3], [3], [3], [3], [3], [3], [6], [6]]
last_rows = [[None]]*8

# 4. Define temporary destination
store = 'data'

# 5. Aggregate
kgdgov_hierarchy = {tname: {} for tname in table_names}
for i, name in enumerate(table_names):
    kgdgov_hierarchy[table_names[i]] = {"structure": structures[i],
                                        "index_col": index_cols[i],
                                        "urls": urls[i],
                                        "sheet": sheets[i],
                                        "skip_row": skip_rows[i],
                                        "last_row": last_rows[i],
                                        "path": [],
                                        "store": os.path.join(root_path, store, name)
                                       }

### 1.1.3 STATGOV CLASSIFICATORS

List of classificators from stat.gov.kz

In [5]:
# 1. Define tables
table_names = ["T_STATGOV_OKED",
               "T_STATGOV_KPVED",
               "T_STATGOV_KATO",
               "T_STATGOV_NVED",
               "T_STATGOV_KURK",
               "T_STATGOV_MKEIS"]

structures = [["Code", "Name_Kaz", "Name_Rus"],
              ["Code", "Name_Kaz", "Name_Rus"],
              ["te", "ab", "cd", "ef", "hij", "k", "name_kaz", "name_rus", "nn"],
              ["Code", "Name_Kaz", "Name_Rus"],
              ["Code", "Name_Rus", "Name_Kaz"],
              ["Code", "Name_Kaz", "Name_Rus"]]

index_cols = ["Code",
             "Code",
             "te",
             "Code",
             "Code",
             "Code"]

# 2. Define urls
root_url = "http://stat.gov.kz"
sub_urls = ["/getImg?id=ESTAT116572",  # XLSX:  Общий классификатор видов экономической деятельности
            "/getImg?id=ESTAT116569",  # XLS:   Классификатор продукции по видам экономической деятельности
            "/getImg?id=ESTAT245918",  # RAR:   Классификатор административно-территориальных объектов
            "/getImg?id=ESTAT181313",  # XLSX:  Номенклатура видов экономической деятельности
            "/getImg?id=WC16200004875",  # XLS: Кодификатор улиц Республики Казахстан
            "/getImg?id=ESTAT093569"]  # XLSX:  Межгосударственный классификатор единиц измерения и счета
urls = [[root_url + url] for url in sub_urls]

# 3. Filter
sheets    = [[None]]*7
skip_rows = [[3], [3], [1], [3], [1], [4]]
last_rows = [[None]]*7

# 4. Define temporary destination
store = 'data'

# 4. Aggregate
statgov_cl_hierarchy = {tname: {} for tname in table_names}
for i, name in enumerate(table_names):
    statgov_cl_hierarchy[table_names[i]] = {"structure": structures[i],
                                            "index_col": index_cols[i],
                                            "urls": urls[i],
                                            "sheet": sheets[i],
                                            "skip_row": skip_rows[i],
                                            "last_row": last_rows[i],
                                            "path": [],
                                            "store": os.path.join(root_path, store, name)
                                           }

## 2. Join Hierarchies

Let's join all our dataset sources into one tree (hierarchy)

In [6]:
# Join all hierarchies into one

table_hierarchy = {}
table_hierarchy.update(kgdgov_hierarchy)
table_hierarchy.update(statgov_comp_hierarchy)
table_hierarchy.update(statgov_cl_hierarchy)

for i, table in enumerate(table_hierarchy.keys()):
    print(str(i+1).ljust(3), str(table).ljust(30))

1   T_KGDGOV_PSEUDO_COMPANY       
2   T_KGDGOV_INACTIVE             
3   T_KGDGOV_VIOLATION_TAX_CODE   
4   T_KGDGOV_TAX_ARREARS_150      
5   T_STATGOV_KPVED               
6   T_STATGOV_KATO                
7   T_STATGOV_COMPANIES           
8   T_KGDGOV_WRONG_ADDRESS        
9   T_KGDGOV_INVALID_REGISTRATION 
10  T_STATGOV_NVED                
11  T_STATGOV_MKEIS               
12  T_STATGOV_OKED                
13  T_STATGOV_KURK                
14  T_KGDGOV_TAX_ARREARS_10       
15  T_KGDGOV_BANKRUPT             


In [7]:
save_sources_config(table_hierarchy, 'conf/sources.ini', file_format='ini')

new_dict_from_config = read_sources_config('conf/sources.ini', 'ini')

new_dict_from_config == table_hierarchy

True

## 3. Download files and update hierarchy

Let's launch the download script

__TODO:__ _Later, this should be combined with data filtering and loading scripts for parallel execution_

In [8]:
# Purge old data if you need to:
!rm -r data

In [9]:
def retrieve_file_object(url):
    '''
    Retrieves attached file names from URL and returns a GET request result

    :param url: URL source of presumed downloadable content
    :return:    tuple (result, filename)
        WHERE
        requests.models.Response result
        str filename is the name of the attachment with extension
    '''
    result = requests.get(url, verify=False, stream=True)
    try:
        cont_disp = urllib.parse.unquote(result.headers["content-disposition"])
        if re.search("UTF-8''(.*);", cont_disp) != None:
            file_name = re.findall("UTF-8''(.*);", cont_disp)[0]
        else:
            file_name = re.findall('filename="(.+)"', cont_disp)[0]
    
    except NameError as e:
        raise e
    except:
        file_name = None

    return result, file_name


def download_extract_files(table_hierarchy, test=False):
    '''
    Download files and extract any xls file in archives. Return file paths list. Delete RAR/ZIPs.
    '''
    for table, table_info in table_hierarchy.items():
        # 1. Iterate through tables
        table_hierarchy[table]["path"] = [] # reset paths
        temp_sheet                     = [] # temporary sheet number selector
        temp_skip_row                  = [] # temporary skip row selector
        for i, url in enumerate(table_info["urls"]):
            # 2. Iterate through urls for each table and download
            result, file_name = retrieve_file_object(url)
            file_path = os.path.join(table_info["store"], file_name)
            
            logger.debug('{}: downloading {}'.format(table, file_name))
            if not test:
                os.makedirs(os.path.dirname(file_path), exist_ok=True)
                with open(file_path, 'wb') as f: 
                    f.write(result.content)
            
            _, file_ext = os.path.splitext(file_path)
            
            # 3. (Extract and) Append path to spreadsheet
            if file_ext in (".xls", ".xlsx"):
                
                logger.info('{}: saving {}'.format(table, file_name))
                table_hierarchy[table]["path"].append(file_path)
                temp_sheet.append(table_hierarchy[table]["sheet"][i])
                temp_skip_row.append(table_hierarchy[table]["skip_row"][i])
            
            else:

                if file_ext == '.rar':
                    logger.debug("{}: checking contents of {}".format(table, file_name))
                    if not test:
                        archive = rarfile.RarFile(file_path)
                elif file_ext == '.zip':
                    logger.debug("{}: checking contents of {}".format(table, file_name))
                    if not test:
                        archive = zipfile.ZipFile(file_path)
                if not test:    
                    for idx, f in enumerate(archive.namelist()):
                        # check for excel and extract
                        _, f_ext = os.path.splitext(f)
                        if f_ext in (".xls", ".xlsx"):
                            archive.extract(f, table_info["store"])
                            logger.info("{}: saving {} (from {})".format(table, f, file_name))
                            table_hierarchy[table]["path"].append(os.path.join(table_info["store"], f))
                            temp_sheet.append(table_hierarchy[table]["sheet"][i])
                            temp_skip_row.append(table_hierarchy[table]["skip_row"][i])

                logger.debug("{}: removing {}".format(table, file_name))
                if not test:
                    os.remove(file_path)
                    
        table_hierarchy[table]["sheet"] = temp_sheet
        table_hierarchy[table]["skip_row"] = temp_skip_row
        
    return table_hierarchy

def download_extract_files_w_timing(table_hierarchy):
    from time import time
    from datetime import timedelta
    
    
    t0 = time()
    table_hierarchy = download_extract_files(table_hierarchy, test=False)
    print('time elapsed: {}'.format(timedelta(seconds=time()-t0)))
    
    return table_hierarchy

table_hierarchy = download_extract_files_w_timing(table_hierarchy)

2018-06-01 16:33:02,574 crawler      INFO     saving T_KGDGOV_PSEUDO_COMPANY/list_PSEUDO_COMPANY_KZ_ALL.xlsx
2018-06-01 16:33:03,203 crawler      INFO     saving T_KGDGOV_INACTIVE/list_INACTIVE_KZ_ALL.xlsx
2018-06-01 16:33:03,330 crawler      INFO     saving T_KGDGOV_VIOLATION_TAX_CODE/list_VIOLATION_TAX_CODE_KZ_ALL.xlsx
2018-06-01 16:33:03,686 crawler      INFO     saving T_KGDGOV_TAX_ARREARS_150/list_TAX_ARREARS_150_KZ_ALL.xlsx
2018-06-01 16:33:04,922 crawler      INFO     saving T_STATGOV_KPVED/КПВЭД_каз_рус_Изм№2.xls
2018-06-01 16:33:05,219 crawler      INFO     saving T_STATGOV_KATO/КАТО_12_17/katonew1.xls (from КАТО_12_17.rar)
2018-06-01 16:33:05,233 crawler      INFO     saving T_STATGOV_KATO/КАТО_12_17/katonew2.xls (from КАТО_12_17.rar)
2018-06-01 16:33:05,235 crawler      INFO     removing T_STATGOV_KATO/КАТО_12_17.rar
2018-06-01 16:33:06,367 crawler      INFO     saving T_STATGOV_COMPANIES/11.xls (from 11.zip)
2018-06-01 16:33:06,369 crawler      INFO     removing T_STATGOV_C

In [10]:
print_hierarchy(table_hierarchy)
export_hierarchy(table_hierarchy)
save_sources_config(table_hierarchy, 'conf/sources.ini', file_format='ini')


table: T_KGDGOV_PSEUDO_COMPANY 
|
|--urls
|    |-http://kgd.gov.kz/mobile_api/services/taxpayers_unreliable_exportexcel/PSEUDO_COMPANY/KZ_ALL/fileName/list_PSEUDO_COMPANY_KZ_ALL.xlsx
|
|--index_col
|    |-Num
|
|--skip_row
|    |-3
|
|--path
|    |-/mnt/hdd/projects/crawler/data/T_KGDGOV_PSEUDO_COMPANY/list_PSEUDO_COMPANY_KZ_ALL.xlsx
|
|--sheet
|    |-None
|
|--last_row
|    |-None
|
|--structure
|    |-Num
|    |-BIN
|    |-RNN
|    |-taxpayer_organization
|    |-taxpayer_name
|    |-owner_name
|    |-owner_IIN
|    |-owner_RNN
|    |-court_decision
|    |-illegal_activity_start_date
|
|--store
|    |-/mnt/hdd/projects/crawler/data/T_KGDGOV_PSEUDO_COMPANY
|

table: T_KGDGOV_INACTIVE 
|
|--urls
|    |-http://kgd.gov.kz/mobile_api/services/taxpayers_unreliable_exportexcel/INACTIVE/KZ_ALL/fileName/list_INACTIVE_KZ_ALL.xlsx
|
|--index_col
|    |-Num
|
|--skip_row
|    |-3
|
|--path
|    |-/mnt/hdd/projects/crawler/data/T_KGDGOV_INACTIVE/list_INACTIVE_KZ_ALL.xlsx
|
|--sheet
|    |-None
|


## 4. Process files

In [16]:
def prepare_data(table_data:dict, table_name):
    """
    Iterate through all files and load into single dataframe

    NOTE: If one url source was an archive with multiple spreadsheets
          then the options sheet and skip_row are valid for ALL those
          spreadsheets
    
    :param dict table_data: Dictionary containing single table's
                            structure, source paths
    """
    logger.info("{}: Pre-processing...".format(table_name))
    # Blank data frame
    data = pd.DataFrame()
    # Open Excel
    for i, file in enumerate(table_data["path"]):
        xls = pd.ExcelFile(file)
        xls_sheets = xls.sheet_names
        
        if table_data["sheet"][i] == None:
            # Iterate through all sheets if no specific sheet selected
            # PANDAS BUG (pandas = 0.23.0):
            #    For some reason pandas fails to read all sheets if sheet_name=None
            for sheet in xls_sheets:
                df = pd.read_excel(file,
                                   sheet_name=sheet,
                                   index_col=None,
                                   skiprows=table_data["skip_row"][i],
                                   dtype=str,
                                   header=None)
                
                data = data.append(df, ignore_index=True)
                # Remove unnecessary columns just in case
                data = data.iloc[:, 0:len(table_data["structure"])]
                
        else:
            # Get only from selected sheet
            df = pd.read_excel(file,
                               sheet_name=table_data['sheet'][i],
                               index_col=None,
                               skiprows=table_data["skip_row"][i],
                               dtype=str,
                               header=None)

            data = data.append(df, ignore_index=True)
        
    data.columns = table_data["structure"]
    # Remove NaN rows and everything below
    data = data.replace(['nan', 'None'], '', regex=True) # replaces 'nan' strings with blanks
    rows_b_trunc = len(data)
    try:
        # locate first instance of blank cell in index_col and truncate data up to that cell
        data = data.loc[:data[(data[table_data["index_col"]] == '')].index[0]-1, :]
        if len(data) == 0:
            raise IndexError
        logger.warning('{}: truncated from {} to {}'.format(table_name, rows_b_trunc, len(data)))
    except IndexError:
        pass
    logger.info('{}: Pre-processing complete, {} rows'.format(table_name, len(data)))
    return data

In [17]:
# ONLY FOR TESTING THE ABOVE SCRIPT
def test_df_read(table_hierarchy):
    from dbfill import DbFill
    from time import time
    from datetime import timedelta
    
    
    t0 = time()
    
    for table_name, table_data in table_hierarchy.items():
        try:
            df = prepare_data(table_data, table_name)
        except Exception as e:
            logger.error("{}: {}".format(table_name, e))
    
    print('time elapsed: {}'.format(timedelta(seconds=time()-t0)))

test_df_read(table_hierarchy)

2018-06-01 19:08:48,612 crawler      INFO     T_KGDGOV_INACTIVE: Pre-processing...
2018-06-01 19:10:37,887 crawler      INFO     T_KGDGOV_INACTIVE: Pre-processing complete, 114711 rows
2018-06-01 19:10:37,889 crawler      INFO     T_STATGOV_NVED: Pre-processing...
2018-06-01 19:10:39,340 crawler      INFO     T_STATGOV_NVED: Pre-processing complete, 2068 rows
2018-06-01 19:10:39,392 crawler      INFO     T_KGDGOV_PSEUDO_COMPANY: Pre-processing...
2018-06-01 19:10:41,163 crawler      INFO     T_KGDGOV_PSEUDO_COMPANY: Pre-processing complete, 1774 rows
2018-06-01 19:10:41,165 crawler      INFO     T_KGDGOV_VIOLATION_TAX_CODE: Pre-processing...
2018-06-01 19:10:41,448 crawler      INFO     T_KGDGOV_VIOLATION_TAX_CODE: Pre-processing complete, 193 rows
2018-06-01 19:10:41,451 crawler      INFO     T_KGDGOV_TAX_ARREARS_10: Pre-processing...
2018-06-01 19:11:00,160 crawler      INFO     T_KGDGOV_TAX_ARREARS_10: Pre-processing complete, 9254 rows
2018-06-01 19:11:00,162 crawler      INFO     

## 5. Filling in Database

In [9]:
# Purge data from database:
def purge_database():
    from dbfill import DbFill
    from time import time
    from datetime import timedelta
    
    t0 = time()
    db = DbFill("conf/database.ini")
    
    with open(os.path.join('sql','truncate_all.sql'), 'r') as sql_file:
        full_sql = sql_file.read()
    
    sql_commands = full_sql.split(';')
    for sql_command in sql_commands[:-1]: # last member is a blank that will throw ORA-24373: invalid length specified for statement
        db.send_command(sql_command)
    
    print('time elapsed: {}'.format(timedelta(seconds=time()-t0)))
    
purge_database()

time elapsed: 0:00:01.706956


In [15]:
def fill_the_database(table_hierarchy):
    from dbfill import DbFill
    from time import time
    from datetime import timedelta
    
    
    t0 = time()
    db = DbFill("conf/database.ini")

    for table_name, table_data in table_hierarchy.items():
        try:
            data = prepare_data(table_data, table_name)
            data_rows = len(data)
            structure = table_data["structure"]

            # conversion to list of dictionaries (see https://stackoverflow.com/a/29815523/8510370)
            data = list(data.T.to_dict().values())
            logger.debug("{}: Storing to database...".format(table_name))
            db.fill_main_storage(table_name, structure, data, "utf-8")
            
            # check for successful write to database
            db_rows = db.get_num_rows(table_name)
            if int(db_rows) == data_rows:
                logger.info("{}: Successfully stored in database!".format(table_name))
            else:
                logger.error("{}: Row count mismatch, something went wrong.".format(table_name))
        except Exception as e:
            logger.error("{}: {}".format(table_name, e))
    
    print('time elapsed: {}'.format(timedelta(seconds=time()-t0)))

In [16]:
fill_the_database(table_hierarchy)

2018-06-01 16:40:00,781 crawler      INFO     T_KGDGOV_PSEUDO_COMPANY: Pre-processing...
2018-06-01 16:40:01,830 crawler      INFO     T_KGDGOV_PSEUDO_COMPANY: Pre-processing complete, 1774 rows
2018-06-01 16:40:02,813 crawler      INFO     T_KGDGOV_PSEUDO_COMPANY: Successfully stored in database!
2018-06-01 16:40:02,814 crawler      INFO     T_KGDGOV_INACTIVE: Pre-processing...
2018-06-01 16:41:09,599 crawler      INFO     T_KGDGOV_INACTIVE: Pre-processing complete, 114711 rows
2018-06-01 16:41:44,767 crawler      INFO     T_KGDGOV_INACTIVE: Successfully stored in database!
2018-06-01 16:41:44,770 crawler      INFO     T_KGDGOV_VIOLATION_TAX_CODE: Pre-processing...
2018-06-01 16:41:44,906 crawler      INFO     T_KGDGOV_VIOLATION_TAX_CODE: Pre-processing complete, 193 rows
2018-06-01 16:41:45,112 crawler      INFO     T_KGDGOV_VIOLATION_TAX_CODE: Successfully stored in database!
2018-06-01 16:41:45,113 crawler      INFO     T_KGDGOV_TAX_ARREARS_150: Pre-processing...
2018-06-01 16:41:5

# Non-critical code

---

## Helpers


In [8]:
archive = rarfile.RarFile('statgov_class_data/КАТО_12_17.rar')
for idx, f in enumerate(archive.namelist()):
    # check for excel and extract
    _, f_ext = os.path.splitext(f)
    if f_ext in (".xls", ".xlsx"):
        archive.extract(f, store)

In [104]:
df = prepare_data(files_dict['TEST_CR_STATGOV_OKED'])
print(len(df))
df.tail()


997


Unnamed: 0,Code,Name_Kaz,Name_Rus
992,9820,Жеке тұтыну үшін қызметтер өндіру жөніндегі үй...,Деятельность домашних хозяйств по производству...
993,U,АУМАҚТАН ТЫС ҰЙЫМДАРДЫҢ ЖӘНЕ ОРГАНДАРДЫҢ ҚЫЗМЕТІ,ДЕЯТЕЛЬНОСТЬ ЭКСТЕРРИТОРИАЛЬНЫХ ОРГАНИЗАЦИЙ И ...
994,99,Аумақтан тыс ұйымдардың қызметі,Деятельность экстерриториальных организаций
995,990,Аумақтан тыс ұйымдардың қызметі,Деятельность экстерриториальных организаций
996,9900,Аумақтан тыс ұйымдардың қызметі,Деятельность экстерриториальных организаций


In [106]:
#slice to first occurrence of blank
try:
    df = df.loc[: df[(df['Code'] == '')].index[0]-1, :]
except IndexError:
    pass
print(len(df))
df.tail()

997


Unnamed: 0,Code,Name_Kaz,Name_Rus
992,9820,Жеке тұтыну үшін қызметтер өндіру жөніндегі үй...,Деятельность домашних хозяйств по производству...
993,U,АУМАҚТАН ТЫС ҰЙЫМДАРДЫҢ ЖӘНЕ ОРГАНДАРДЫҢ ҚЫЗМЕТІ,ДЕЯТЕЛЬНОСТЬ ЭКСТЕРРИТОРИАЛЬНЫХ ОРГАНИЗАЦИЙ И ...
994,99,Аумақтан тыс ұйымдардың қызметі,Деятельность экстерриториальных организаций
995,990,Аумақтан тыс ұйымдардың қызметі,Деятельность экстерриториальных организаций
996,9900,Аумақтан тыс ұйымдардың қызметі,Деятельность экстерриториальных организаций


## Features [WIP]

In [11]:
table_hierarchy = read_sources_config('conf/sources.ini', 'ini')

list_of_th = [(table_info, table_name) for table_name, table_info in table_hierarchy.items()]

In [12]:
def prep_and_upload(table_info, table_name):
    
    db = DbFill("conf/database.ini")

    try:
        data = prepare_data(table_info, table_name)
        data_rows = len(data)
        structure = table_info["structure"]

        # conversion to list of dictionaries (see https://stackoverflow.com/a/29815523/8510370)
        data = list(data.T.to_dict().values())
        logger.debug("{}: Storing to database...".format(table_name))
        db.fill_main_storage(table_name, structure, data, "utf-8")

        # check for successful write to database
        db_rows = db.get_num_rows(table_name)
        if int(db_rows) == data_rows:
            logger.info("{}: Successfully stored in database!".format(table_name))
        else:
            logger.error("{}: Row count mismatch, something went wrong.".format(table_name))
    except Exception as e:
        logger.error("{}: {}".format(table_name, e))

In [None]:
def do_multiprocessing(table_hierarchy, purge=True):
    from multiprocessing import Pool, cpu_count
    
    
    t0 = time()
    db = DbFill("conf/database.ini")
    
    # Purge old data
    if purge:
        with open(os.path.join('sql','truncate_all.sql'), 'r') as sql_file:
            full_sql = sql_file.read()
        sql_commands = full_sql.split(';')
        for sql_command in sql_commands[:-1]: # last member is a blank that will throw ORA-24373: invalid length specified for statement
            db.send_command(sql_command)
    
    # Upload new data
    list_of_th = [(table_info, table_name) for table_name, table_info in table_hierarchy.items()]
    
    pool = Pool(cpu_count())
    list_of_df = pool.starmap(func=prep_and_upload, iterable=list_of_th)
    print("time elapsed: {}".format(timedelta(seconds=time()-t0)))
    
do_multiprocessing(table_hierarchy)

2018-06-01 18:24:09,448 crawler      INFO     T_STATGOV_KATO: Pre-processing...
2018-06-01 18:24:09,450 crawler      INFO     T_KGDGOV_INVALID_REGISTRATION: Pre-processing...
2018-06-01 18:24:09,448 crawler      INFO     T_KGDGOV_WRONG_ADDRESS: Pre-processing...
2018-06-01 18:24:09,457 crawler      INFO     T_KGDGOV_VIOLATION_TAX_CODE: Pre-processing...
2018-06-01 18:24:09,613 crawler      INFO     T_KGDGOV_VIOLATION_TAX_CODE: Pre-processing complete, 193 rows
2018-06-01 18:24:09,777 crawler      INFO     T_KGDGOV_VIOLATION_TAX_CODE: Successfully stored in database!
2018-06-01 18:24:09,988 crawler      INFO     T_STATGOV_KPVED: Pre-processing...
2018-06-01 18:24:10,248 crawler      INFO     T_STATGOV_KPVED: Pre-processing complete, 5429 rows
2018-06-01 18:24:10,585 crawler      INFO     T_KGDGOV_INVALID_REGISTRATION: Pre-processing complete, 1904 rows
2018-06-01 18:24:10,712 crawler      INFO     T_STATGOV_KATO: Pre-processing complete, 16799 rows
2018-06-01 18:24:11,358 crawler      I

Process ForkPoolWorker-20:
Process ForkPoolWorker-18:
Process ForkPoolWorker-17:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Process ForkPoolWorker-19:
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/p

KeyboardInterrupt: 

  File "/usr/lib/python3.5/multiprocessing/pool.py", line 47, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
KeyboardInterrupt
  File "<ipython-input-12-6471cd094912>", line 13, in prep_and_upload
    db.fill_main_storage(table_name, structure, data, "utf-8")
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/mnt/hdd/projects/crawler/dbfill.py", line 111, in fill_main_storage
    data = self._kaz_encode(data)
  File "/mnt/hdd/projects/crawler/dbfill.py", line 89, in _kaz_encode
    try:
KeyboardInterrupt
KeyboardInterrupt
Process ForkPoolWorker-21:
Process ForkPoolWorker-22:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.p

In [None]:
# Multiprocessing of EXTRACT process [WIP]

from multiprocessing import Pool, cpu_count

def download_and_extract_one(url):

    # FROM WEBSITE TO ZIP ARCHIVE
    archive_name = re.search('ESTAT.+', url).group()
    ufr = requests.get(url, verify=False, stream=True)
    archive_path = os.path.join(root_path, 'zip', archive_name + '.zip')
    with open(archive_path, 'wb') as f:
        print('downloading {}.zip'.format(archive_name))    
        for data in ufr.iter_content():
            f.write(data)

    # FROM ZIP ARCHIVE TO XLS
    files = zipfile.ZipFile(os.path.join('zip',archive_name)).namelist()
    print('Exctracting {}.zip'.format(archive_name))

    with zipfile.ZipFile(archive_path, 'r') as f:
        f.extractall(os.path.join(root_path, 'excel'))
    
    return os.path.join(root_path, 'excel', )

def process_urls(urls):
    pool = Pool(cpu_count())
    files_list = pool.map(download_and_extract_one, urls)
    df = pd.concat(df_list, ignore_index=True)
    df.columns = structure
    return df

urls = [root_url + url for url in list_urls(registry)]

t0 = time.time()
df2 = process_excels2(urls)
print("time elapsed:", time.time()-t0, "s")

download_and_extract([root_url + url for url in list_urls(registry)])

# for hierarchies
# iterate through this with additional level of parallelization using urls (e.g. STATGOV_COMPANIES has 16 sources!)
list_of_tables = [{k: v} for k,v in table_hierarchy.items()]

In [None]:
# Multiprocessing of TRANSFORM (data read+filter+load preparation) [WIP]

def reader(file):
    data = pd.DataFrame(dtype=str)
    xls_sheets = pd.ExcelFile(file, dtype=str).sheet_names
    for sheet in xls_sheets:
        df = pd.read_excel(file,sheet_name=sheet,index_col=None,skiprows=4,dtype=str,header=None)
        data = data.append(df, ignore_index=True)
    data = data.replace(['nan', 'None'], '', regex=True)
    return data

def process_excels(file_list):
    pool = Pool(cpu_count())
    df_list = pool.map(reader, file_list)
    df = pd.concat(df_list, ignore_index=True)
    df.columns = structure
    return df

In [10]:
def check_on_tables():
    table_hierarchy = read_sources_config('conf/sources.ini', 'ini')
    
    db = DbFill("conf/database.ini")
    
    for table_name in table_hierarchy.keys():
        print("{} rows in {}.".format(db.get_num_rows(table_name), table_name))

check_on_tables()

0 rows in T_KGDGOV_INACTIVE.
0 rows in T_STATGOV_NVED.
0 rows in T_KGDGOV_PSEUDO_COMPANY.
0 rows in T_KGDGOV_VIOLATION_TAX_CODE.
0 rows in T_KGDGOV_TAX_ARREARS_10.
0 rows in T_STATGOV_OKED.
0 rows in T_STATGOV_MKEIS.
0 rows in T_KGDGOV_BANKRUPT.
0 rows in T_STATGOV_KURK.
0 rows in T_STATGOV_KATO.
0 rows in T_STATGOV_KPVED.
0 rows in T_KGDGOV_TAX_ARREARS_150.
0 rows in T_STATGOV_COMPANIES.
0 rows in T_KGDGOV_WRONG_ADDRESS.
0 rows in T_KGDGOV_INVALID_REGISTRATION.
