# CONVERT FILES WITH DUCK_DB

In [1]:
import duckdb
import os
import time
import gc

In [2]:
ZIP_PATH = r"..\data_processed\\guo_subs_europee\\"
ZIP_FOLDER = "subs_eu"

In [3]:
def time_it(func):
    """Decorator to measure the execution time of a function."""
    def wrapper(*args, **kwargs):
        start_time = time.time()  
        result = func(*args, **kwargs) 
        end_time = time.time() 
        elapsed_time = end_time - start_time  
        print(f"Execution time: {elapsed_time:.4f} seconds")  
        return result
    return wrapper

## Ownership history

In [4]:
conn = duckdb.connect()

In [5]:
# path = "E:\dati_moody\ownership_history\links_2022\part-00000-8f9cac6d-cf88-4461-91b4-13c784cdf6a9-c000.snappy.parquet" 

In [6]:
eu27_countries = [
    "AT", "BE", "BG", "HR", "CY", "CZ", 
    "DK", "EE", "FI", "FR", "DE", "GR", 
    "HU", "IE", "IT", "LV", "LT", "LU", 
    "MT", "NL", "PL", "PT", "RO", "SK", 
    "SI", "ES", "SE"
]

eu27_countries = [
   "BE"
]

In [7]:

TEMP_TABLE_FIRMOGRAPHICS = "..\\data_processed\\firmographics_processed\\*.parquet" 
TEMP_TABLE_KEY_FINANCIALS = "..\\data_raw\\key_financials\\key_financials_eur\\*.parquet"

def query_test(year, country, path):
    query = f"""
        SELECT
            main.subsidiary_bvd_id,
            main.guo_25,
            main.type_of_relation,
            main.shareholder_bvd_id,
            {year} AS year,
        FROM
            '{path}' AS main 
        WHERE 
            main."guo_25" LIKE '{country}%'
        AND    
            main."type_of_relation" IN ('GUO 25', 'ISH')
        LIMIT 100

    """
    return query

'''
ora prendo bvd nel guo25
'''



def query(year, country, path):
    query = f"""
        SELECT
            main.subsidiary_bvd_id,
            main.guo_25,
            main.type_of_relation,
           -- main.shareholder_bvd_id,
            firmographics_sub.nuts2 AS subsidiary_nuts2,
            firmographics_sub.nace_rev_2_core_code_4_digits_ AS subsidiary_nace4,
            firmographics_guo.nuts2 AS guo_nuts2,
            firmographics_guo.nace_rev_2_core_code_4_digits_ AS guo_nace4,
            firmographics_guo.type_of_entity AS guo_type_of_entity,
            firmographics_guo.status AS guo_status,
            {year} AS year,
            key_financials.number_of_employees AS sub_number_of_employees,
            key_financials.closing_date AS sub_closing_date,
        FROM 
            '{path}' AS main
        LEFT JOIN 
            '{TEMP_TABLE_FIRMOGRAPHICS}' AS firmographics_sub
        ON 
            main.subsidiary_bvd_id = firmographics_sub.bvd_id_number
        LEFT JOIN
            '{TEMP_TABLE_FIRMOGRAPHICS}' AS firmographics_guo
        ON
            main.guo_25 = firmographics_guo.bvd_id_number
        LEFT JOIN (
            SELECT 
                number_of_employees, 
                closing_date,
                bvd_id_number,
                EXTRACT(YEAR FROM closing_date) AS financial_year  
            FROM 
                '{TEMP_TABLE_KEY_FINANCIALS}' 
        ) AS key_financials
        ON 
            main.subsidiary_bvd_id = key_financials.bvd_id_number
        AND 
            EXTRACT(YEAR FROM key_financials.closing_date) = {year} 
        WHERE 
            main."type_of_relation" IN ('GUO 25')
        AND 
            main."guo_25" LIKE '{country}%'
    """
    return query
 # ISH

# def get_ownership_data(year, country, path=None):
#     print(f"{country} - {year}...")
#     query_ = query(year, country, path)
#     conn = duckdb.connect()
#     try:
#         df = conn.execute(query_).fetchdf()
#     finally:
#         conn.close()
#     return df



def get_ownership_data(year, country, path=None):
    print(f"{country} - {year}...")
    query_ = query(year, country, path)
    with duckdb.connect() as conn:
        conn.execute("PRAGMA memory_limit='1GB';")  # Example: Limit memory to 1GB
        df = conn.execute(query_).fetchdf()
    return df



def convert_to_stata(df, output_path, country, year):
    os.makedirs(output_path, exist_ok=True)
    df.to_csv(f"{output_path}\\{country}_{year}.csv", write_index=False)
    del df
    gc.collect()


@time_it
def fetch_and_convert_to_stata(year, country, path, output_path):
    df = get_ownership_data(year, country, path)
    convert_to_stata(df, output_path, country, year)
    del df
    gc.collect()




### Test

In [8]:
# path = "..\\data_raw\\ownership_history\\links_2007\\*.parquet"
# country = "IT"
# year = "2018"
# path = "..\\data_raw\\ownership_history\\links_2018\\*.parquet"
# output_path = r"..\data_processed\\guo_subs_europee\\subs_eu"


# df = get_ownership_data(
#     path=path,
#     year=year,
#     country=country,
# )
# df

# fetch_and_convert_to_stata(country=country, year=year, path=path, output_path=output_path)

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json

total_start = time.time()
start_time = time.time()

errors = {}
OUTPUT_PATH = r"..\data_processed\\guo_subs_europee\\subs_eu"
total_start = time.time()

def process_country_year(country, year):
    try:
        path = f"..\\data_raw\\ownership_history\\links_{year}\\*.parquet"
        fetch_and_convert_to_stata(year, country, path, OUTPUT_PATH)
        duration = time.time() - start_time
        print(f"Data for {country} - {year} has been converted to Stata in {duration:.2f} seconds.")
    except Exception as e:
        errors[f"{country} - {year}"] = str(e)
        print(f"Error {country} - {year}")

with ThreadPoolExecutor() as executor:
    futures = [
        executor.submit(process_country_year, country, year)
        for country in eu27_countries
        for year in range(2007, 2022)
    ]
    for future in as_completed(futures):
        try:
            future.result()
        except Exception as e:
            print(f"Error: {e}")

total_end = time.time()
total_duration = total_end - total_start
print(f"Total execution time: {total_duration:.2f} seconds.")

with open("errors_subs.json", "w") as f:
    json.dump(errors, f)

conn.close()    

BE - 2007...
BE - 2008...
BE - 2009...
BE - 2010...
BE - 2011...
BE - 2012...
BE - 2013...
BE - 2014...
BE - 2015...
BE - 2016...
BE - 2017...
BE - 2018...


In [None]:
# import time
# import json

# errors = {}
# OUTPUT_PATH = r"..\data_processed\\guo_subs_europee\\subs_eu"

# total_start = time.time()
# start_time = time.time()

# for country in eu27_countries:
#     for year in range(2007, 2022):
#         year = str(year)
#         try:
#             start_time = time.time()
#             path = f"..\\data_raw\\ownership_history\\links_{year}\\*.parquet"
#             fetch_and_convert_to_stata(year, country, path, OUTPUT_PATH)
#             end_time = time.time()
#             duration = end_time - start_time
#             print(f"Data for {country} - {year} has been converted to Stata in {duration:.2f} seconds.")
#         except Exception as e:
#             errors[f"{country} - {year}"] = str(e)
#             print(f"Error {country} - {year}")
#             continue

# total_end = time.time()
# total_duration = total_end - total_start
# print(f"Total execution time: {total_duration:.2f} seconds.")

# with open("errors_subs.json", "w") as f:
#     json.dump(errors, f)

# conn.close()

In [None]:
import shutil

shutil.make_archive(ZIP_PATH, 'zip', ZIP_FOLDER)

In [None]:
import json
with open("errors.json", "r") as f:
    errors = json.load(f)

errors