In [None]:
import csv
import json
from collections import defaultdict
import sys

if sys.version_info >= (3, 8):
    from typing import TypedDict  # pylint: disable=no-name-in-module
else:
    from typing_extensions import TypedDict

In [None]:
file = "../../data/toflit18_all_flows.csv"
# product_classification = "product_sitc_FR"
product_classification = "product_sitc_simplEN"

port_francs = ["Lorient", "Bayonne", "Marseille", "Dunkerque"]

data_by_port_francs = {p:{} for p in port_francs}

In [None]:
class DataPoint(TypedDict):
    product: str 
    import_dk: int
    import_dkb: int
    export_dk: int
    export_dkb: int

In [None]:
report_port_francs = {p:defaultdict(dict) for p in port_francs}
report_others = {p:defaultdict(dict) for p in port_francs}
import_by_product_by_partner = {p:defaultdict(dict) for p in port_francs}
export_by_product_by_partner  = {p:defaultdict(dict) for p in port_francs}


def import_partner_class(partner):
    if partner in ["Asie", "Afrique", "Amériques"]:
        return "colonies"
    if partner == "France":
        return "France"
    return 'Monde'

# customs region
regions = set()
# trade partners
partners = set()

# file = toflit18 all flows
with open(file, "r") as muerte:
    reader = csv.DictReader(muerte)
    # @todo filter by source type to be clean ? ("best_guess_region_prodxpart" ?)
    for i, row in enumerate(reader):
        if row["year"] == "1789":
            product = row[product_classification]
            region = row["customs_region_grouping"]
            partner = row["partner_simplification"]

            value = float(row["value"]) if row["value"] else 0.0
            # process mirror flows : France imports from Dunkerque or Bayonne
            if row["export_import"] == "Imports" and partner in ["Dunkerque", "Bayonne"]:
                regions.add(region)

                if product:
                    if region not in report_others[partner][product]:
                        report_others[partner][product][region] = {}
                    report_others[partner][product][region]["product"] = product
                    report_others[partner][product][region]["region"] = region
                    report_others[partner][product][region]["import"] = report_others[partner][product][region].get("import", 0) + value
                    export_by_product_by_partner[partner][product]["France"] = export_by_product_by_partner[partner][product].get("France", 0 ) + value
            # process mirrors flows : France exports to Dunkerque
            if row["export_import"] == "Exports" and partner in port_francs:
                if product:
                    import_by_product_by_partner[partner][product]["France"] = import_by_product_by_partner[partner][product].get("France", 0 ) + value

            office = row["customs_office"] if row["customs_office"] != "Port franc De Bayonne" else "Bayonne"
            if row["customs_office"] in port_francs:
                if product:
                    report_port_francs[office][product]["product"] = product
                    partner = import_partner_class(row["partner_grouping"])
                    if row["export_import"] == "Exports":
                        report_port_francs[office][product]["export_dk"] = report_port_francs[office][product].get("export_dk", 0) + value
                        export_by_product_by_partner[office][product][partner] = export_by_product_by_partner[office][product].get(partner,0) + value
                    elif row["export_import"] == "Imports":
                        report_port_francs[office][product]["import_dk"] = report_port_francs[office][product].get("import_dk", 0) + value       
                        import_by_product_by_partner[office][product][partner] = import_by_product_by_partner[office][product].get(partner,0) + value
                   

In [None]:
import_by_product_by_partner
# partners

In [None]:
all_data = {p:[] for p in port_francs}
all_exp_to_fr = 0
get_products = lambda d: set([p for (port,product_dict) in d.items() for p in product_dict.keys()])
products = get_products(report_port_francs) | get_products(report_others)

for product in products:
    for port in port_francs:
        # flows for a specific product within a specific port
        flows = report_port_francs[port][product] if product in report_port_francs[port] else {}
        is_colonial = False
        total_declared_import = sum(v for (_,v) in import_by_product_by_partner[port][product].items())
        total_declared_export = sum(v for (_,v) in export_by_product_by_partner[port][product].items())

        # IMPORTS
        if product in import_by_product_by_partner[port]:
            for (partner, value) in import_by_product_by_partner[port][product].items():
                if partner == "colonies":
                    # considered colonial if partner === 'colonie'
                    # and the value of import is higher than half of the total of imports of this product
                    # @todo understand that better
                    is_colonial = value/total_declared_import >= 0.5
                if value != 0:
                    all_data[port].append({
                        "value":value, 
                        "product": product, 
                        "partner" : partner, 
                        "group": "import"
                    })

        if product in export_by_product_by_partner[port]:
            for (partner, value) in export_by_product_by_partner[port][product].items():
                if value != 0:
                    all_data[port].append({
                        "value":value, 
                        "product": product, 
                        "partner" : partner, 
                        "group": "export"
                    })
        
        # export_to_fr = sum([dp['import'] for (region,dp) in report_others[port].get(product,{}).items()])
        # all_data[port].append({"value":export_to_fr, "product": product, "partner" : "France", "group": "export"})
        
        # total_declared_export = flows.get("export_dk", 0)
        # total_declared_import = flows.get("import_dk",0) + import_by_product_by_partner[port][product].get("France",0)
        unknown_export = total_declared_import - total_declared_export
        if unknown_export != 0:
            if port != "Dunkerque":
                # partner == 'fraude ?' if :
                # port is not dunkerque 
                # and differential in import/export
                all_data[port].append({
                    "value":abs(unknown_export), 
                    "product": product, 
                    "partner" : "Fraude ?", 
                    "group": "export" if unknown_export> 0 else "import"
                })
            else:
                if is_colonial:
                    # partner == 'fraude ?' for the remainder of import-export if :
                    # port is dunkerque
                    # and product is colonial
                    all_data[port].append({
                        "value":abs(unknown_export), 
                        "product": product, 
                        "partner" : "Fraude ?", 
                        "group": "export" if unknown_export> 0 else "import"
                    })
                else:
                    # partner === 'Re-exp ou fraude ?' for the remainder of import - export if :
                    # port is dunkerque
                    # and product is not colonial
                    all_data[port].append({
                        "value":abs(unknown_export) , 
                        "product": product, 
                        "partner" : "Re-exp ou fraude ?", 
                        "group": "export" if unknown_export> 0 else "import"
                    })

with open("data/import_export_ports_francs.json", "w") as f:
    json.dump(all_data, f)

In [None]:
all_data

In [None]:
with open("data/report_dunkerque.json", "w") as f:
    json.dump(report_dk, f)

In [None]:
with open("data/report_other_regions.json", "w") as f:
    json.dump(report_others, f)