# Extract from database

This notebook contains all queries to the source database.

In [1]:
# %%
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from numpy import log10, arange
import matplotlib.pyplot as plt

In [2]:
import dask
from dask.distributed import Client, LocalCluster

# Configure Dask settings
dask.config.set({
    'array.chunk-size': '50MB',  # Set default chunk size for Dask arrays/dataframes
    'distributed.worker.memory.target': 0.6,  # Target fraction of memory to stay below
    'distributed.worker.memory.spill': 0.7,  # Fraction at which we start spilling to disk
    'distributed.worker.memory.pause': 0.8,  # Fraction at which we pause worker threads
    'distributed.worker.memory.terminate': 0.95,  # Fraction at which we terminate the worker
    'distributed.dashboard.link': '{scheme}://{host}:{port}/status',  # Enable Dask dashboard
})

# Create a local Dask cluster
cluster = LocalCluster(
    n_workers=4,  # Number of workers (logical cores)
    threads_per_worker=1,  # Threads per worker (set to 1 for dedicated core per worker)
    memory_limit='3.75GB'  # Memory limit per worker (total RAM / number of workers)
)

# Connect to the cluster
client = Client(cluster)

# Now you can proceed with your Dask tasks


General settings

In [3]:
drive_path = '/media/matias/Elements/export_france/data/type1/DP1610_MAASTRICHT1_1997_2013/'
save_path = './../../data/processed/'

colnames = [u'YEAR', u'MONTH', u'FLUX', u'ID', u'DEPT', u'CN ID 8', u'CPA6',
       u'PYOD', u'PAYP', u'VAT', u'PRIFAC', u'DEVFAC', u'VFTE', u'VART', u'D_MASSE', u'MASSE', u'USUP', u'USUP_MT']
colname_no = dict(zip(colnames, range(18)))

# # Function to read data for specified columns and years
# def get_data(columns, drive_path, start_year=1997, end_year=2014):
#     df_list = []
#     for y in range(start_year, end_year):
#         df = dd.read_table(f'{drive_path}DP1610_MAASTRICHT1_{y}.txt', 
#                            usecols=list(map(colname_no.get, columns)),
#                            delimiter=';', header=None, dtype={4: 'object', 6: 'object', 8: 'object', 9: 'object'}, 
#                            blocksize='50MB')
#         df.columns = columns  # Assigning column names
#         df_list.append(df)
#     return dd.concat(df_list)


def get_data(columns, drive_path, start_year=1997, end_year=2014):
    df_list = []
    for y in range(start_year, end_year):
        df = dd.read_table(
            f'{drive_path}DP1610_MAASTRICHT1_{y}.txt',
            usecols=list(map(colname_no.get, columns)),
            delimiter=';',
            header=None,
            names=columns,  # Set column names directly
            dtype={4: 'object', 6: 'object', 8: 'object', 9: 'object'}, 
            blocksize='50MB'
        )
        df_list.append(df)
    return dd.concat(df_list)

In [4]:
## Eperiment for block size

# import dask.dataframe as dd
# import time

# def perform_computation(blocksize):
#     ddf = dd.read_table(f'{drive_path}DP1610_MAASTRICHT1_{y}.txt', 
#                         usecols=list(map(colname_no.get, columns)),
#                         delimiter=';', header=None, dtype={4: 'object', 6: 'object', 8: 'object', 9: 'object'}, 
#                         blocksize=blocksize)
#     ddf.columns = columns
#     start_time = time.time()
#     result = ddf.groupby('YEAR')['VART'].mean().compute()
#     return time.time() - start_time

# # Experiment with different blocksizes
# blocksizes = ['10MB', '20MB', '30MB']
# results = {}

# for blocksize in blocksizes:
#     print(f"Testing blocksize: {blocksize}")
#     execution_time = perform_computation(blocksize)
#     print(f"Blocksize: {blocksize}, Execution Time: {execution_time:.2f} seconds")
#     results[blocksize] = execution_time

# # Analyzing results
# print("Experiment Results:")
# for blocksize, execution_time in results.items():
#     print(f"Blocksize: {blocksize}, Execution Time: {execution_time:.2f} seconds")


# # Experiment Results:
# # Blocksize: 10MB, Execution Time: 86.68 seconds
# # Blocksize: 20MB, Execution Time: 60.58 seconds
# # Blocksize: 30MB, Execution Time: 62.73 seconds

In [5]:


def chunk(s):
    '''
    The function applied to the
    individual partition (map)
    '''    
    return s.apply(lambda x: list(set(x)))


def agg(s):
    '''
    The function whic will aggrgate 
    the result from all the partitions(reduce)
    '''
    s = s._selected_obj    
    return s.groupby(level=list(range(s.index.nlevels))).sum()


def finalize(s):
    '''
    The optional functional that will be 
    applied to the result of the agg_tu functions
    '''
    return s.apply(lambda x: len(set(x)))

tunique = dd.Aggregation('tunique', chunk, agg,finalize)



## Build datasets
### - Price and quantities

In [6]:
# columns = [u'YEAR', u'MONTH', u'FLUX', u'ID', u'CN ID 8', u'PYOD', u'VART', u'MASSE', u'USUP', u'USUP_MT']


# # Function to process price and quantities dataset
# def process_price_quantities(columns, drive_path, end_year):
#     data = get_data(columns, drive_path, end_year=end_year)
#     # display(data.head())
#     grouped = data[data.FLUX == 2].groupby(['ID', 'CN ID 8', 'MONTH', 'YEAR'])
#     yearly_qv = grouped[['VART', 'MASSE']].sum().compute()
#     yearly_qv.to_csv(f'{save_path}units_qv.csv')
#     return yearly_qv

# yearly_qv = process_price_quantities(columns, drive_path, end_year=2014)
# print(yearly_qv.head())

# # with ProgressBar():
# #     yearly_details = data_.loc[data_.FLUX == 2].head(1000).groupby(['ID', 'CN ID 8', 'YEAR']).agg(
# #         {'VART': sum, 'MASSE': sum, 'USUP': tunique, 'USUP': first, 'USUP_MT': sum}).compute()
# # yearly_details.to_csv(save_path + 'units_detail.csv')

### - Firm sizes

In [7]:
columns = [u'YEAR', u'MONTH', u'FLUX', u'ID', u'VAT', u'VART']
data = get_data(columns, drive_path, end_year = 1999)

data['IMPORT'] = data['FLUX'] % 2

firm_sizes = data.groupby(['ID', 'IMPORT','YEAR'])[['VART']].sum().reset_index()
buyr_sizes = data.groupby(['VAT', 'IMPORT','YEAR'])[['VART']].sum().reset_index()

with ProgressBar():
    firm_sizes = firm_sizes.compute()
    buyr_sizes = buyr_sizes.compute()
    

firm_sizes.to_csv(save_path + 'firm_sizes_99.csv', index = False)
buyr_sizes.to_csv(save_path + 'buyr_sizes_99.csv', index = False)

### - Value of buyer-seller links

In [None]:
columns = [u'YEAR', u'FLUX', u'ID', u'VAT', u'VART']

data = get_data(columns, drive_path, end_year = 1999)

data['IMPORT'] = data['FLUX'] % 2

links = data.groupby(['IMPORT','YEAR','ID','VAT'])['VART'].sum().reset_index()

with ProgressBar():
    out = links.compute()
out.to_csv(save_path + 'buyer_seller_link_value.csv', index = False)

[########################################] | 100% Completed | 122.56 s


In [None]:
pd.read_csv(save_path + 'buyer_seller_link_value.csv').head()

Unnamed: 0,IMPORT,YEAR,ID,VAT,VART
0,0,1997,215,IT0018705,569748
1,0,1997,330,IT0224285,14459
2,0,1997,413,AT0026337,69955
3,0,1997,413,BE1043987,41595
4,0,1997,413,DE0161168,294


### - Sourcing info

In [None]:
# columns = [u'YEAR', u'MONTH', u'FLUX', u'ID', u'CN ID 8', 'PYOD', u'VART']

# data = get_data(columns, drive_path, end_year = 1999)

# data['IMPORT'] = data['FLUX'] % 2
# data['QUARTER'] = ((data['MONTH'] -1)// 3) + 1

# CN_full = pd.read_csv('./../../data/processed/CN_full.csv', encoding = 'utf-8')
# data = data.merge(CN_full[['CN ID 8', 'CN ID 4', 'CN label 4']])#.persist()

# # Compute and save
# sourcing_strategies = data.loc[data.IMPORT == 1].groupby(['YEAR', 'ID', 'CN ID 4', 'PYOD'])[['VART']].sum() #rm QUARTER for yearly dataset
# with ProgressBar():
#     out = sourcing_strategies.compute()
# out.to_csv(save_path + 'sourcing_strategies_99.csv')

# export_bundles = data.loc[data.IMPORT == 0].groupby(['YEAR', 'ID', 'CN ID 4', 'PYOD'])[['VART']].sum()
# with ProgressBar():
#     out2 = export_bundles.compute()
# out2.to_csv(save_path + 'export_bundles_99.csv')

# # Compute and save
# sourcing_strategies_qr = data.loc[data.IMPORT == 1].groupby(['YEAR', 'QUARTER','ID', 'CN ID 4', 'PYOD'])[['VART']].sum() #rm QUARTER for yearly dataset
# with ProgressBar():
#     out = sourcing_strategies_qr.compute()
# out.to_csv(save_path + 'sourcing_strategies_99_qr.csv')

# export_bundles_qr = data.loc[data.IMPORT == 0].groupby(['YEAR', 'QUARTER', 'ID', 'CN ID 4', 'PYOD'])[['VART']].sum()
# with ProgressBar():
#     out2 = export_bundles_qr.compute()
# out2.to_csv(save_path + 'export_bundles_99_qr.csv')

[########################################] | 100% Completed | 187.61 s
[########################################] | 100% Completed | 340.92 s


In [None]:
import pandas as pd
from dask.diagnostics import ProgressBar

def compute_and_save_grouped_data(data, group_cols, agg_col, save_path, file_name, extra_group_cols=None):
    if extra_group_cols is not None:
        group_cols.extend(extra_group_cols)

    grouped_data = data.groupby(group_cols)[[agg_col]].sum()
    with ProgressBar():
        output = grouped_data.compute()
    output.to_csv(f'{save_path}{file_name}.csv')
    return output

def add_import_quarter_columns(data):
    data['IMPORT'] = data['FLUX'] % 2
    data['QUARTER'] = ((data['MONTH'] - 1) // 3) + 1
    return data

# Main processing function
def process_sourcing_export(data, drive_path, end_year, save_path):
    data = get_data(columns, drive_path, end_year=end_year)
    data = add_import_quarter_columns(data)
    
    # Merging additional data
    CN_full = pd.read_csv('./../../data/processed/CN_full.csv', encoding='utf-8')
    data = data.merge(CN_full[['CN ID 8', 'CN ID 4', 'CN label 4']])
    
    # Compute and save
    compute_and_save_grouped_data(data[data.IMPORT == 1], 
                                  ['YEAR', 'ID', 'CN ID 4', 'PYOD'], 
                                  'VART', save_path, 'sourcing_strategies_99')

    compute_and_save_grouped_data(data[data.IMPORT == 0], 
                                  ['YEAR', 'ID', 'CN ID 4', 'PYOD'], 
                                  'VART', save_path, 'export_bundles_99')

    # For quarterly data
    compute_and_save_grouped_data(data[data.IMPORT == 1], 
                                  ['YEAR', 'ID', 'CN ID 4', 'PYOD'], 
                                  'VART', save_path, 'sourcing_strategies_99_qr', 
                                  extra_group_cols=['QUARTER'])

    compute_and_save_grouped_data(data[data.IMPORT == 0], 
                                  ['YEAR', 'ID', 'CN ID 4', 'PYOD'], 
                                  'VART', save_path, 'export_bundles_99_qr', 
                                  extra_group_cols=['QUARTER'])

# Usage
columns = [u'YEAR', u'MONTH', u'FLUX', u'ID', u'CN ID 8', 'PYOD', u'VART']
drive_path = # Define drive path
end_year = 1999
save_path = # Define save path

process_sourcing_export(data, drive_path, end_year, save_path)


In [None]:
pd.read_csv(save_path + 'export_bundles_99.csv').head()

Unnamed: 0,YEAR,ID,CN ID 4,PYOD,VART
0,1997,0,705,CH,71651
1,1997,0,709,CH,45022
2,1997,0,1806,JP,25972
3,1997,0,2204,BS,2389
4,1997,0,2204,CA,1512


### - Bernard's margins

In [None]:
import pandas as pd
from dask.diagnostics import ProgressBar

# Define the columns needed
columns = [u'YEAR', u'FLUX', u'ID', u'CN ID 8', 'PYOD', u'VART', u'VAT']

# Load the data
data = get_data(columns, drive_path, end_year=1999)

# Apply transformation
data['IMPORT'] = data['FLUX'] % 2

# Group and aggregate the data
grouped = data.groupby(['IMPORT', 'YEAR', 'ID']).agg({'VAT': tunique, 'PYOD': tunique, 'CN ID 8': tunique, 'VART': 'sum'})

# Compute the results
with ProgressBar():
    result = grouped.compute()

# Save the results to CSV
result.to_csv(save_path + 'bernards_margins.csv')


[                                        ] | 0% Completed | 120.90 ms

[##                                      ] | 5% Completed | 21.22 sms


KeyboardInterrupt: 

### - Krammar's determinants of diversification

In [None]:
columns = [u'YEAR', u'FLUX', u'ID', u'CN ID 8', u'VAT', u'PYOD', u'VART']

data = get_data(columns, drive_path, end_year = 1999)
data['IMPORT'] = data['FLUX'] % 2

grouped = data.groupby(['ID', 'YEAR', 'IMPORT'])

with ProgressBar():
    df = grouped.agg({'VART': 'sum', u'VAT': tunique, 'CN ID 8': tunique, u'PYOD': tunique}).compute()

df.to_csv(save_path + 'dets_of_diversification.csv')

[                                        ] | 0% Completed | 120.35 ms

[########################################] | 100% Completed | 15m 26s


In [None]:
# pd.read_csv(save_path + 'dets_of_diversification.csv').head()

### - Degree distribution

In [None]:
import pandas as pd
from numpy import arange, log10
from dask.diagnostics import ProgressBar

# Settings
center_years = arange(1997, 2000, 2)
window = 1
gap = (window - 1) / 2
save_path = './../../data/processed/'   # Define where to save the output file


# Load data (you need to define how 'data' is loaded here)
# data = ...

ID_degree_res = []
VAT_degree_res = []

# Process ID degrees
for Yc in center_years:
    print(f"Processing ID degree for year {Yc} and window {window}")
    data_sec = data.loc[data.YEAR - Yc <= gap]
    
    grouped_ID = data_sec.groupby(['ID']).agg({'VAT': tunique, 'VART': sum})
    ID_degree = grouped_ID[['VAT']].reset_index()
    ID_degree.columns = ['ID', 'ID_degree']
    ID_degree['center_year'] = Yc
    ID_degree['window'] = window
    ID_degree['bin'] = pd.cut(log10(ID_degree['ID_degree']), bins=arange(-.49, 5.99, .25))
    
    filename_ID = f'{save_path}ID_deg_{Yc}_{window}.csv'
    with ProgressBar():
        ID_degree.compute().to_csv(filename_ID, index=False)
    print(f"Saved to {filename_ID}")
    ID_degree_res.append(ID_degree)

# Process VAT degrees
for Yc in center_years:
    print(f"Processing VAT degree for year {Yc} and window {window}")
    ID_deg = ID_degree_res[center_years.index(Yc)]
    
    sampling = ID_deg.groupby(['bin'], observed=True).apply(lambda x: x.sample(200, replace=True))
    data_sec_sample = data.loc[data.ID.isin(sampling['ID'].values)]
    
    grouped_VAT = data_sec_sample.groupby(['VAT']).agg({'ID': tunique, 'VART': sum})
    VAT_degree = grouped_VAT[['ID']].reset_index()
    VAT_degree.columns = ['VAT', 'VAT_degree']
    VAT_degree['center_year'] = Yc
    VAT_degree['window'] = window
    
    filename_VAT = f'{save_path}VAT_deg_{Yc}_{window}.csv'
    with ProgressBar():
        VAT_degree.compute().to_csv(filename_VAT, index=False)
    print(f"Saved to {filename_VAT}")
    VAT_degree_res.append(VAT_degree)


Processing ID degree for year 1997 and window 1


KeyboardInterrupt: 

In [None]:
# # window = 3
# # assortativity_res = []
# ID_degree_res = []
# VAT_degree_res = []

# for window in [1]:
#     gap = (window-1)/2
#     center_years = arange(1997, 2000, 2)
#     print window

#     for Yc in center_years:
#         print Yc
#         data_sec = data.loc[data.YEAR - Yc <= gap]
# #         data_sec.groupby(['ID', 'VAT']).agg({'VART': sum })

#         data_sec_by_ID = data_sec.groupby(['ID']).agg({'VAT': tunique, 'VART': sum})

#         ID_degree = data_sec_by_ID[['VAT']].reset_index()
#         ID_degree.columns = [u'ID', u'ID_degree']
#         ID_degree['center_year'] = Yc
#         ID_degree['window'] = window
        
#         with ProgressBar():
#             ID_deg = ID_degree.compute()
#             ID_deg['bin'] = pd.cut(log10(ID_deg['ID_degree']), bins = arange(-.49, 5.99, .25))
#             ID_deg.to_csv(save_path + 'ID_deg_'+str(Yc)+'_'+str(window)+'.csv', index = False)
# #         ID_degree_res += [ID_degree]     

# #         ID_deg = pd.read_csv()
#         sampling = ID_deg.groupby(['bin'], observed = True).apply(lambda x: x.sample(200, replace = True))

#         data_sec_sample = data_sec.loc[data_sec.ID.isin(sampling['ID'].values)]
#         data_sec_by_VAT = data_sec_sample.groupby(['VAT']).agg({'ID': tunique, 'VART': sum})

#         VAT_degree = data_sec_by_VAT[['ID']].reset_index()
#         VAT_degree.columns = [u'VAT', u'VAT_degree']
#         VAT_degree['center_year'] = Yc
#         VAT_degree['window'] = window
#         VAT_degree_res += [VAT_degree]
#         with ProgressBar():
#             VAT_deg = VAT_degree.compute()
#             VAT_deg.to_csv(save_path + 'VAT_deg_'+str(Yc)+'_'+str(window)+'.csv', index = False)

1
1997
[########################################] | 100% Completed |  7min  3.7s
[########################################] | 100% Completed |  4min 30.6s
1999
[########################################] | 100% Completed | 14min  4.1s
[########################################] | 100% Completed |  8min  6.8s


In [None]:
# pd.read_csv(save_path + 'ID_deg_'+str(Yc)+'_'+str(window)).head()

In [None]:
# fig, ax = plt.subplots(1)
# df_degrees.groupby('VAT_degree_bin')['ID_degree','VAT_degree'].quantile(.25).plot(x = 'VAT_degree', y = 'ID_degree', marker = '', ax = ax)
# df_degrees.groupby('VAT_degree_bin')['ID_degree','VAT_degree'].quantile(.5).plot(x = 'VAT_degree', y = 'ID_degree', marker = '', ax = ax)
# df_degrees.groupby('VAT_degree_bin')['ID_degree','VAT_degree'].quantile(.75).plot(x = 'VAT_degree', y = 'ID_degree', marker = '', ax = ax)

# # df_degrees.groupby('ID_nunique_bin')['VAT_nunique','ID_nunique'].mean().plot(x = 'ID_nunique', y = 'VAT_nunique', marker = 'o', ax = ax)
# df_degrees.groupby('ID_nunique')['VAT_nunique'].median().plot(x = 'index', y = 'VAT_nunique', marker = '.', linewidth = 0, ax = ax)
# ax.set_xscale('log')
# ax.set_yscale('log')

NameError: name 'df_degrees' is not defined

### - Buyers and sellers by foreign country

In [None]:
save_path = './../../data/processed/'   # Define where to save the output file
columns = [u'YEAR', u'FLUX', u'ID', u'PYOD', u'VART']
data = get_data(columns, drive_path, end_year = 1999)
data['IMPORT'] = data['FLUX'] % 2

data_by_dest = data.groupby(['IMPORT','YEAR','ID','PYOD'])['VART'].sum().reset_index()

result = data_by_dest.groupby(['PYOD', 'YEAR']).agg({'ID': tunique, 'VART': 'sum'})

with ProgressBar():
    out = result.compute()
    
out.to_csv(save_path + 'destination.csv')

[                                        ] | 0% Completed | 3.16 s ms


KeyboardInterrupt: 

### - Size distribution of firms

In [None]:
import pandas as pd
import numpy as np
from dask.diagnostics import ProgressBar

# Settings
drive_path = '/media/matias/Elements/export_france/data/type1/DP1610_MAASTRICHT1_1997_2013/'  # Define the path to your data
end_year = 1999    # Define the end year for your data
save_path = './../../data/processed/'   # Define where to save the output file
n_bins = 20        # Number of bins for categorizing data

# Load and process the data
columns = ['YEAR', 'FLUX', 'ID', 'VART']
data = get_data(columns, drive_path, end_year=end_year)
data['IMPORT'] = data['FLUX'] % 2

# Group by IMPORT, YEAR, ID and sum the VART column
grouped_data = data.groupby(['IMPORT', 'YEAR', 'ID'])['VART'].sum().reset_index()

# Compute and convert to Pandas dataframe
with ProgressBar():
    filtered_data = grouped_data[grouped_data['VART'] > 0].compute()


# Calculate the median of VART for each ID and IMPORT
median_sizes = filtered_data.groupby(['ID', 'IMPORT'])['VART'].median().reset_index()

# Rename columns for clarity
median_sizes = median_sizes.rename(columns={'VART': 'exp_mma'})

# Log transform and binning
median_sizes['log_exp_mma'] = np.log10(median_sizes['exp_mma']).round(3)
cuts = pd.cut(median_sizes['log_exp_mma'], n_bins, labels=range(n_bins))
median_sizes['exp_mma_cat'] = cuts

# Save the processed data
median_sizes.to_csv(f'{save_path}sizes_index.csv', index=False)

# Now 'median_sizes' can be used similar to 'sizes_index' in your previous script


[########################################] | 100% Completed | 103.46 s


In [None]:
median_sizes.head()

Unnamed: 0,ID,IMPORT,exp_mma,log_exp_mma,exp_mma_cat
0,0,0,32949420.0,7.518,14
1,0,1,38070333.0,7.581,14
2,18,1,2047.0,3.311,6
3,157,1,77846.0,4.891,9
4,215,0,732059.5,5.865,11


# Older stuff

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from numpy import log10, arange

# Load the data and create time periods
links = pd.read_csv(save_path + 'buyer_seller_link_value.csv')
links['PERIOD'] = (links['YEAR'] - 1996) // 2

# Calculate degrees
degrees = links.groupby(['PERIOD', 'ID'])[['VAT']].nunique().rename(columns = {'VAT': 'ID_degree'})

# Log transform and binning
degrees['log_ID_degree'] = log10(degrees['ID_degree'])
degrees['bin_ID_degree'] = pd.cut(degrees['log_ID_degree'], arange(-.25, 4.5, 0.25))

# Calculate degree distribution
degree_dist = degrees.reset_index().groupby(['PERIOD', 'bin_ID_degree'])[['ID']].count()

# Visualization
fig, axs = plt.subplots(1, 2, figsize =(15, 6))
ax = axs[0]
for t in links['PERIOD'].unique():
    log10(degree_dist.loc[t]).reset_index().plot(marker='o', linewidth=0, ax=ax, mec='None')
