# ETL 00: Reading and Pre-processing NFT data (raw --> stag)
Reading the extended transaction file(s), and create the base files for the transaction, token and collection tables. After saving them to daily parquet files. </br>
</br>
There are 2 files which could be loaded as "**ers**":
 * ETL_00_rawToStage_new: the pandas version of the loaders
 * ETL_00_rawToStage_pyspark: the pyspark version of the loaders
As the function names are the same, the notebook will work without any change. 

In [1]:
_version = "PANDAS" # could be "PYSPARK" for GCP and "PANDAS" for local environment

In [3]:
# loading ETL related libraries
import pandas as pd
import numpy as np

# core libraries
from datetime import datetime, timedelta
import imp

# OS related
from os import listdir, makedirs, remove, path

# parallel programming related
from multiprocessing import Pool
import subprocess

# visualization related
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns

# settings
pd.set_option('display.max_rows', 200)
pd.set_option('display.max_columns', 50)

# src loader functions, pathes, config values
import src.config as cf

if _version == "PYSPARK":
    import src.ETL_00_rawToStage_pyspark as ers
else:
    import src.ETL_00_rawToStage_new as ers

In [101]:
ers = imp.reload(ers)
cf = imp.reload(cf)

## Testing: preprocessing one file
With running this loader on a single file, the loader can be tested out, and the given tables can be analyzed.

In [4]:
# _testfilepath = cf.PATH_00RAW_API_dump_main + 'events_20170101_000000_20180101_000000/eventresponse_1498953600_1499558400.pickle'
# ers._processOneFile(path_in_file_pkl=_testfilepath, path_out_trx_folder_pq='./data/01_stage/trx/', 
#                     path_out_token_folder_pq='./data/01_stage/token/', path_out_collection_folder_pq='./data/01_stage/collection/',
#                     _verbose=False)

# pdf_tokens = pd.read_parquet('./data/01_stage/token/')
# pdf_collections = pd.read_parquet('./data/01_stage/collection/')
# pdf_trxs = pd.read_parquet('./data/01_stage/trx/year=2021/')

In [5]:
# pdf_tokens.head(2)

In [6]:
# pdf_collections.head(2)

In [7]:
# pdf_trxs.head(2)

## Running the Preprocessing
Running the preprocessing ETLs on the given list of folders (only checking the content of the folder, will not go to the subfolders). If a file is preprocessed and not the overwriting mode is used, the source file will be ignored. Be careful, the "overwrite" mode deletes all files in the given output folders!!!

In [110]:
ers = imp.reload(ers)

ers.StageLoader(path_in_folder_list=['./data/00_dump/events_20170101_000000_20180101_000000/'], 
                path_out_trx_folder_pq='./data/01_stage/trx/', path_out_token_folder_pq='./data/01_stage/token/', 
                path_out_collection_folder_pq='./data/01_stage/collection/', path_io_meta_folder='./data/_meta/', 
                _mode='append', _njobs=4, _verbose=False)

In [111]:
ers.StageLoader(path_in_folder_list=['./data/00_dump/events_20180101_000000_20190101_000000/', 
                                     './data/00_dump/events_20190101_000000_20200101_000000/', 
                                     './data/00_dump/events_20200101_000000_20210101_000000/', 
                                     './data/00_dump/events_20210101_000000_20210401_000000/', 
                                     './data/00_dump/events_20210401_000000_20210701_000000/'], 
                path_out_trx_folder_pq='./data/01_stage/trx/', path_out_token_folder_pq='./data/01_stage/token/', 
                path_out_collection_folder_pq='./data/01_stage/collection/', path_io_meta_folder='./data/_meta/', 
                _mode='append', _njobs=4, _verbose=False)

## Exploring the gotten tables
With a few examples, checking the possible data issues.