In [0]:
from glob import glob
import shutil
import concurrent.futures
import requests
import os

In [0]:
dbutils.widgets.dropdown("scale_factor", "10", ["10", "100", "1000", "5000", "10000", "20000"])
dbutils.widgets.text("existing_files_directory", "/Volumes/tpcdi/tpcdi_raw_data/tpcdi_volume/")
dbutils.widgets.text("parquet_files_directory", "/Volumes/tpcdi/tpcdi_raw_data/tpcdi_volume/splitparquet")
dbutils.widgets.text("catalog", "tpcdi")

catalog = dbutils.widgets.get("catalog")
scale_factor = dbutils.widgets.get("scale_factor")
tpcdi_directory = dbutils.widgets.get("existing_files_directory")
staging_dir = dbutils.widgets.get("parquet_files_directory") + f"/sf={scale_factor}"

tables_ls = [
    ["HR", 16, "csv", "employeeid BIGINT, managerid BIGINT, employeefirstname STRING, employeelastname STRING, employeemi STRING, employeejobcode STRING , employeebranch STRING, employeeoffice STRING, employeephone STRING"],
    ["CashTransaction", 400, "txt", "accountid BIGINT, ct_dts TIMESTAMP, ct_amt DOUBLE, ct_name STRING"],
    ["DailyMarket", 600, "txt", "dm_date DATE, dm_s_symb STRING, dm_close DOUBLE, dm_high DOUBLE, dm_low DOUBLE, dm_vol INT"],
    ["HoldingHistory", 60, "txt", "hh_h_t_id BIGINT, hh_t_id BIGINT, hh_before_qty INT, hh_after_qty INT"],
    ["Prospect", 40, "csv", "agencyid STRING, lastname STRING, firstname STRING, middleinitial STRING, gender STRING, addressline1 STRING, addressline2 STRING, postalcode STRING, city STRING, state STRING, country STRING, phone STRING, income STRING, numbercars STRING, numberchildren STRING, maritalstatus STRING, age STRING, creditrating STRING, ownorrentflag STRING, employer STRING, numbercreditcards STRING, networth STRING"],
    ["TradeHistory", 200, "txt", "tradeid BIGINT, th_dts TIMESTAMP, status STRING"],
    ["Trade", 300, "txt", "t_id BIGINT, t_dts TIMESTAMP, t_st_id STRING, t_tt_id STRING, t_is_cash TINYINT, t_s_symb STRING, quantity INT, bidprice DOUBLE, t_ca_id BIGINT, executedby STRING, tradeprice DOUBLE, fee DOUBLE, commission DOUBLE, tax DOUBLE"],
    ["WatchHistory", 300, "txt", "w_c_id BIGINT, w_s_symb STRING, w_dts TIMESTAMP, w_action STRING"]
]

In [0]:
def rewrite_files(file_cnt, delimiter, tbl, filetype, raw_schema, batchnum):
  df = spark.sql(f"""
  select * FROM read_files(
    "{tpcdi_directory}sf={scale_factor}/Batch{batchnum}/",
    format => "csv",
    inferSchema => False,
    header => False,
    sep => "{delimiter}",
    fileNamePattern => "{tbl}.{filetype}",
    schema => "{raw_schema}"
  )
  """)
  df.repartition(file_cnt).write.mode("overwrite").parquet(f"{staging_dir}/_stage/Batch{batchnum}/{tbl}")

In [0]:
threads = len(tables_ls)
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
  futures = []
  for tbl in tables_ls:
    delimiter = ',' if tbl[2] == "csv" else '|'
    futures.append(executor.submit(rewrite_files, file_cnt=tbl[1], delimiter=delimiter, tbl=tbl[0], filetype=tbl[2], raw_schema=tbl[3], batchnum=1))
  for future in concurrent.futures.as_completed(futures):
    try: print(future.result())
    except requests.ConnectTimeout: print("ConnectTimeout.")

In [0]:
def move_file(src_loc, tgt_loc):
  dbutils.fs.cp(src_loc, tgt_loc)
  # dbutils.fs.rm(src_loc)

In [0]:
files_map = []
for tbl in tables_ls:
  file_num = 1
  for file in glob(f"{staging_dir}/_stage/Batch1/{tbl[0]}/part-00*.parquet"):
    files_map.append([file, f"{staging_dir}/Batch1/{tbl[0]}_{file_num}.parquet"])
    file_num += 1

In [0]:
threads = 64
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
  futures = []
  for file in files_map:
    futures.append(executor.submit(move_file, src_loc=file[0], tgt_loc=file[1]))
  for future in concurrent.futures.as_completed(futures):
    try: print(future.result())
    except requests.ConnectTimeout: print("ConnectTimeout.")

In [0]:
tables_ls = [
  [1, 'BatchDate', 'batchdate DATE'],
  [1, 'Date', 'sk_dateid BIGINT, datevalue DATE, datedesc STRING, calendaryearid INT, calendaryeardesc STRING, calendarqtrid INT, calendarqtrdesc STRING, calendarmonthid INT, calendarmonthdesc STRING, calendarweekid INT, calendarweekdesc STRING, dayofweeknum INT, dayofweekdesc STRING, fiscalyearid INT, fiscalyeardesc STRING, fiscalqtrid INT, fiscalqtrdesc STRING, holidayflag BOOLEAN'],
  [1, 'Industry', 'in_id STRING, in_name STRING, in_sc_id STRING'],
  [1, 'StatusType', 'st_id STRING, st_name STRING'],
  [1, 'TaxRate', 'tx_id STRING, tx_name STRING, tx_rate FLOAT'],
  [1, 'Time', 'sk_timeid BIGINT, timevalue STRING, hourid INT, hourdesc STRING, minuteid INT, minutedesc STRING, secondid INT, seconddesc STRING, markethoursflag BOOLEAN, officehoursflag BOOLEAN'],
  [1, 'TradeType', 'tt_id STRING, tt_name STRING, tt_is_sell INT, tt_is_mrkt INT'],
  [2, 'Account', 'cdc_flag STRING, cdc_dsn BIGINT, accountid BIGINT, brokerid BIGINT, customerid BIGINT, accountdesc STRING, taxstatus TINYINT, status STRING'],
  [2, 'BatchDate', 'batchdate DATE'],
  [2, 'CashTransaction', 'cdc_flag STRING, cdc_dsn BIGINT, accountid BIGINT, ct_dts TIMESTAMP, ct_amt DOUBLE, ct_name STRING'],
  [2, 'Customer', 'cdc_flag STRING, cdc_dsn BIGINT, customerid BIGINT, taxid STRING, status STRING, lastname STRING, firstname STRING, middleinitial STRING, gender STRING, tier TINYINT, dob DATE, addressline1 STRING, addressline2 STRING, postalcode STRING, city STRING, stateprov STRING, country STRING, c_ctry_1 STRING, c_area_1 STRING, c_local_1 STRING, c_ext_1 STRING, c_ctry_2 STRING, c_area_2 STRING, c_local_2 STRING, c_ext_2 STRING, c_ctry_3 STRING, c_area_3 STRING, c_local_3 STRING, c_ext_3 STRING, email1 STRING, email2 STRING, lcl_tx_id STRING, nat_tx_id STRING'],
  [2, 'DailyMarket', 'cdc_flag STRING, cdc_dsn BIGINT, dm_date DATE, dm_s_symb STRING, dm_close DOUBLE, dm_high DOUBLE, dm_low DOUBLE, dm_vol INT'],
  [2, 'HoldingHistory', 'cdc_flag STRING, cdc_dsn BIGINT, hh_h_t_id BIGINT, hh_t_id BIGINT, hh_before_qty INT, hh_after_qty INT'],
  [2, 'Trade', 'cdc_flag STRING, cdc_dsn BIGINT, tradeid BIGINT, t_dts TIMESTAMP, status STRING, t_tt_id STRING, cashflag TINYINT, t_s_symb STRING, quantity INT, bidprice DOUBLE, t_ca_id BIGINT, executedby STRING, tradeprice DOUBLE, fee DOUBLE, commission DOUBLE, tax DOUBLE'],
  [2, 'WatchHistory', 'cdc_flag STRING, cdc_dsn BIGINT, w_c_id BIGINT, w_s_symb STRING, w_dts TIMESTAMP, w_action STRING'],
  [3, 'Account','cdc_flag STRING, cdc_dsn BIGINT, accountid BIGINT, brokerid BIGINT, customerid BIGINT, accountdesc STRING, taxstatus TINYINT, status STRING'],
  [3, 'BatchDate', 'batchdate DATE'],
  [3, 'CashTransaction', 'cdc_flag STRING, cdc_dsn BIGINT, accountid BIGINT, ct_dts TIMESTAMP, ct_amt DOUBLE, ct_name STRING'],
  [3, 'Customer', 'cdc_flag STRING, cdc_dsn BIGINT, customerid BIGINT, taxid STRING, status STRING, lastname STRING, firstname STRING, middleinitial STRING, gender STRING, tier TINYINT, dob DATE, addressline1 STRING, addressline2 STRING, postalcode STRING, city STRING, stateprov STRING, country STRING, c_ctry_1 STRING, c_area_1 STRING, c_local_1 STRING, c_ext_1 STRING, c_ctry_2 STRING, c_area_2 STRING, c_local_2 STRING, c_ext_2 STRING, c_ctry_3 STRING, c_area_3 STRING, c_local_3 STRING, c_ext_3 STRING, email1 STRING, email2 STRING, lcl_tx_id STRING, nat_tx_id STRING'],
  [3, 'DailyMarket', 'cdc_flag STRING, cdc_dsn BIGINT, dm_date DATE, dm_s_symb STRING, dm_close DOUBLE, dm_high DOUBLE, dm_low DOUBLE, dm_vol INT'],
  [3, 'HoldingHistory', 'cdc_flag STRING, cdc_dsn BIGINT, hh_h_t_id BIGINT, hh_t_id BIGINT, hh_before_qty INT, hh_after_qty INT'],
  [3, 'Trade', 'cdc_flag STRING, cdc_dsn BIGINT, tradeid BIGINT, t_dts TIMESTAMP, status STRING, t_tt_id STRING, cashflag TINYINT, t_s_symb STRING, quantity INT, bidprice DOUBLE, t_ca_id BIGINT, executedby STRING, tradeprice DOUBLE, fee DOUBLE, commission DOUBLE, tax DOUBLE'],
  [3, 'WatchHistory', 'cdc_flag STRING, cdc_dsn BIGINT, w_c_id BIGINT, w_s_symb STRING, w_dts TIMESTAMP, w_action STRING']
]

In [0]:
threads = len(tables_ls)
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
  futures = []
  for tbl in tables_ls:
    futures.append(executor.submit(rewrite_files, file_cnt=1, delimiter='|', tbl=tbl[1], filetype='txt', raw_schema=tbl[2], batchnum=tbl[0]))
  for future in concurrent.futures.as_completed(futures):
    try: print(future.result())
    except requests.ConnectTimeout: print("ConnectTimeout.")

In [0]:
files_map = []
for dir in tables_ls:
  for file in glob(f"{staging_dir}/_stage/Batch{dir[0]}/{dir[1]}/part-00*.parquet"):
    files_map.append([file, f"{staging_dir}/Batch{dir[0]}/{dir[1]}.parquet"])

In [0]:
threads = 64
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
  futures = []
  for file in files_map:
    futures.append(executor.submit(move_file, src_loc=file[0], tgt_loc=file[1]))
  for future in concurrent.futures.as_completed(futures):
    try: print(future.result())
    except requests.ConnectTimeout: print("ConnectTimeout.")

In [0]:
df = spark.sql(f"select * FROM text.`{tpcdi_directory}sf={scale_factor}/Batch1/FINWIRE[0-9][0-9][0-9][0-9]Q[1-4]`")
df.repartition(400).write.mode("overwrite").parquet(f"{staging_dir}/_stage/Batch1/FINWIRE")

In [0]:
files_map = []
file_num = 1
for file in glob(f"{staging_dir}/_stage/Batch1/FINWIRE/part-00*.parquet"):
  files_map.append([file, f"{staging_dir}/Batch1/FINWIRE_{file_num}.parquet"])
  file_num += 1

In [0]:
threads = 64
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
  futures = []
  for file in files_map:
    futures.append(executor.submit(move_file, src_loc=file[0], tgt_loc=file[1]))
  for future in concurrent.futures.as_completed(futures):
    try: print(future.result())
    except requests.ConnectTimeout: print("ConnectTimeout.")

In [0]:
threads = 2
tbl = [
    "Prospect", 40, "csv", "agencyid STRING, lastname STRING, firstname STRING, middleinitial STRING, gender STRING, addressline1 STRING, addressline2 STRING, postalcode STRING, city STRING, state STRING, country STRING, phone STRING, income STRING, numbercars STRING, numberchildren STRING, maritalstatus STRING, age STRING, creditrating STRING, ownorrentflag STRING, employer STRING, numbercreditcards STRING, networth STRING"    
]

with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
  futures = []
  for batchnum in range(2, 4):
    futures.append(executor.submit(rewrite_files, file_cnt=tbl[1], delimiter=',', tbl=tbl[0], filetype=tbl[2], raw_schema=tbl[3], batchnum=batchnum))
  for future in concurrent.futures.as_completed(futures):
    try: print(future.result())
    except requests.ConnectTimeout: print("ConnectTimeout.")

In [0]:
files_map = []
for batchnum in range(2, 4):
  file_num = 1
  for file in glob(f"{staging_dir}/_stage/Batch{batchnum}/{tbl[0]}/part-00*.parquet"):
    files_map.append([file, f"{staging_dir}/Batch{batchnum}/{tbl[0]}_{file_num}.parquet"])
    file_num += 1

In [0]:
threads = len(files_map)
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
  futures = []
  for file in files_map:
    futures.append(executor.submit(move_file, src_loc=file[0], tgt_loc=file[1]))
  for future in concurrent.futures.as_completed(futures):
    try: print(future.result())
    except requests.ConnectTimeout: print("ConnectTimeout.")

In [0]:
file_cnt = 0
expected_file_cnt = 2323

for file in glob(f"{staging_dir}/Batch1/*.parquet"):
  file_cnt += 1

print('Total Converted Parquet files: ' + str(file_cnt))

if file_cnt != expected_file_cnt:
  print('Number of Parquet files does not match the expected amount of ' + str(expected_file_cnt))
else:
  print('Number of Parquet files matches the expected amount of ' + str(expected_file_cnt))
  dbutils.fs.rm(f"{staging_dir}/_stage", recurse=True)