# Fusion - Working w/ Fusion File System (FFS)

In [2]:
from fusion import Fusion
import asyncio
import ast

# Package only for running async in notebook
import nest_asyncio
nest_asyncio.apply()

In [3]:
test_path = "common/datasets/ISS_ESG_CNTRY_RTNG_SSF/datasetseries/20250101/distributions"

## Creating FFS Instances

Simplest way to create a synchronous FFS instance is to call the below Fusion method:

In [4]:
# Fusion instance setup (synchronous)
f_inst1 = Fusion()
sync_ffs = f_inst1.get_fusion_filesystem()


In [5]:
# To create an async fusion file system, will require instantiating the FusionHTTPFileSystem class itself
from fusion.fusion_filesystem import FusionHTTPFileSystem

f_inst2 = Fusion()
as_async = True

async_ffs = FusionHTTPFileSystem(
    client_kwargs={
        "root_url": f_inst2.root_url,
        "credentials": f_inst2.credentials,
    },
    asynchronous=as_async
)

## Executing Basic File System Operations

File system operations like "cat", "ls", "info", and "find" can easily be executed by calling their associated methods on your file system instance. 

Output for many operations will be a JSON. Byte-type outputs can be converted to string via methods like byte.decode(), and a string can be parsed to a dictionary using ast.literal_eval.

### Sync

In [9]:
cat_output_bytes = sync_ffs.cat(test_path)
cat_output_str = cat_output_bytes.decode("UTF-8")
cat_output_parsed = ast.literal_eval(cat_output_str)

print(f"Raw .cat output {type(cat_output_bytes)}: {cat_output_bytes}")
print(f"Cast .cat output {type(cat_output_str)}: {cat_output_str}")
print(f"Parsed .cat output {type(cat_output_parsed)}: {cat_output_parsed}")

Raw .cat output <class 'bytes'>: b'{"resources":[{"title":"CSV","fileExtension":".csv","description":"Snapshot data will be in a tabular, comma separated format.","mediaType":"text/csv; header=present; charset=utf-8","identifier":"csv","@id":"csv/"},{"title":"Parquet","fileExtension":".parquet","description":"Snapshot data will be in a parquet format.","mediaType":"application/parquet; header=present","identifier":"parquet","@id":"parquet/"}],"@context":{"@base":"https://fusion.jpmorgan.com/api/v1/","@vocab":"https://www.w3.org/ns/dcat3.jsonld"},"description":"A list of available distributions","@id":"distributions/","identifier":"distributions","title":"Distributions"}'
Cast .cat output <class 'str'>: {"resources":[{"title":"CSV","fileExtension":".csv","description":"Snapshot data will be in a tabular, comma separated format.","mediaType":"text/csv; header=present; charset=utf-8","identifier":"csv","@id":"csv/"},{"title":"Parquet","fileExtension":".parquet","description":"Snapshot dat

### Async

Using the async versions of our ffs functions will require more active management to properly execute. An aiohttp.ClientSession must be opened prior to method execution and all execution must occur in an async context.

In [4]:
target = "catalogs/common/datasets/FXO_SP/datasetseries/20230726/distributions/csv"

In [None]:
# Function to run async functions in a synchronous context
def execute_coroutine(coroute):
    """Execute coroutine from an un-awaited async function.
    
    Args:
        coroute (coroutine): An async function's returned coroutine.
    
    Returns:
        Result of coroutine execution.
    """
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(coroute)
    return result

# Example async function wrapping a basic fs operations we might use, here file downloading.
async def async_read(ffs: FusionHTTPFileSystem, client: Fusion, path: str):
    """Read a file asynchronously using provided ffs file system.
    
    Args:
        ffs (FusionHTTPFileSystem): FFS file system configured to read from path.
        client (Fusion): Fusion instance to extract root_url from.
        path (str): Path without root_url to access desired file.
    
    Returns:
        File returned as bytes.
    """
    f = await ffs.open_async(client.root_url + path, "rb")
    async with f:
        result = await f.read()
    return result

In [12]:
# Before deploying any ffs async methods, must first call the async method .set_session()
sess = execute_coroutine(async_ffs.set_session())

# Execute cat asynchronously
# _cat, like other _methods, is asynchronous. Running it by itself returns a coroutine.
# To actually execute the code, you either can "await" the coroutine in an asynchronous context
# or if you're in a synchronous context (like this notebook), use a function like execute_coroutine.
cat_coroutine = async_ffs._cat(test_path) # Returns a coroutine but does not yet execute code
cat_async_bytes = execute_coroutine(cat_coroutine)
cat_async_str = cat_async_bytes.decode("UTF-8")
cat_async_parsed = ast.literal_eval(cat_async_str)

print(f"Raw .cat output {type(cat_async_bytes)}: {cat_async_bytes}")
print(f"Cast .cat output {type(cat_async_str)}: {cat_async_str}")
print(f"Parsed .cat output {type(cat_async_parsed)}: {cat_async_parsed}")

Raw .cat output <class 'bytes'>: b'{"@id":"distributions/","title":"Distributions","identifier":"distributions","resources":[{"fileExtension":".csv","mediaType":"text/csv; header=present; charset=utf-8","identifier":"csv","@id":"csv/","description":"Snapshot data will be in a tabular, comma separated format.","title":"CSV"},{"fileExtension":".parquet","mediaType":"application/parquet; header=present","identifier":"parquet","@id":"parquet/","description":"Snapshot data will be in a parquet format.","title":"Parquet"}],"description":"A list of available distributions","@context":{"@base":"https://fusion.jpmorgan.com/api/v1/","@vocab":"https://www.w3.org/ns/dcat3.jsonld"}}'
Cast .cat output <class 'str'>: {"@id":"distributions/","title":"Distributions","identifier":"distributions","resources":[{"fileExtension":".csv","mediaType":"text/csv; header=present; charset=utf-8","identifier":"csv","@id":"csv/","description":"Snapshot data will be in a tabular, comma separated format.","title":"CSV

In [None]:
# Creating a coroutine for reading a file asynchronously
task_perform_async_read = async_read(async_ffs, f_inst2, target)
result = execute_coroutine(task_perform_async_read)
result

b'instrument_name,currency_pair,term,product,date,fx_rate\nUSDAED | Spot,USDAED,Spot,FXSpot,20230726,3.673025\nUSDARS | Spot,USDARS,Spot,FXSpot,20230726,272.5\nUSDCHF | Spot,USDCHF,Spot,FXSpot,20230726,0.8632\nUSDCNY | Spot,USDCNY,Spot,FXSpot,20230726,7.1504\nUSDDKK | Spot,USDDKK,Spot,FXSpot,20230726,6.7345\nUSDHUF | Spot,USDHUF,Spot,FXSpot,20230726,347.17\nUSDIDO | Spot,USDIDO,Spot,FXSpot,20230726,15032.0\nUSDIDR | Spot,USDIDR,Spot,FXSpot,20230726,15032.0\nUSDILS | Spot,USDILS,Spot,FXSpot,20230726,3.6945\nUSDMXN | Spot,USDMXN,Spot,FXSpot,20230726,16.835\nUSDPAB | Spot,USDPAB,Spot,FXSpot,20230726,1.0\nUSDPHF | Spot,USDPHF,Spot,FXSpot,20230726,54.62\nUSDPHP | Spot,USDPHP,Spot,FXSpot,20230726,54.62\nUSDSAR | Spot,USDSAR,Spot,FXSpot,20230726,3.7511\nUSDTHB | Spot,USDTHB,Spot,FXSpot,20230726,34.309\nUSDTRY | Spot,USDTRY,Spot,FXSpot,20230726,26.9495\nUSDUAH | Spot,USDUAH,Spot,FXSpot,20230726,36.75145\nXAUUSD | Spot,XAUUSD,Spot,FXSpot,20230726,1969.0\nAUDUSD | Spot,AUDUSD,Spot,FXSpot,2023072

In [37]:
print(result.decode("UTF-8"))

instrument_name,currency_pair,term,product,date,fx_rate
USDAED | Spot,USDAED,Spot,FXSpot,20230726,3.673025
USDARS | Spot,USDARS,Spot,FXSpot,20230726,272.5
USDCHF | Spot,USDCHF,Spot,FXSpot,20230726,0.8632
USDCNY | Spot,USDCNY,Spot,FXSpot,20230726,7.1504
USDDKK | Spot,USDDKK,Spot,FXSpot,20230726,6.7345
USDHUF | Spot,USDHUF,Spot,FXSpot,20230726,347.17
USDIDO | Spot,USDIDO,Spot,FXSpot,20230726,15032.0
USDIDR | Spot,USDIDR,Spot,FXSpot,20230726,15032.0
USDILS | Spot,USDILS,Spot,FXSpot,20230726,3.6945
USDMXN | Spot,USDMXN,Spot,FXSpot,20230726,16.835
USDPAB | Spot,USDPAB,Spot,FXSpot,20230726,1.0
USDPHF | Spot,USDPHF,Spot,FXSpot,20230726,54.62
USDPHP | Spot,USDPHP,Spot,FXSpot,20230726,54.62
USDSAR | Spot,USDSAR,Spot,FXSpot,20230726,3.7511
USDTHB | Spot,USDTHB,Spot,FXSpot,20230726,34.309
USDTRY | Spot,USDTRY,Spot,FXSpot,20230726,26.9495
USDUAH | Spot,USDUAH,Spot,FXSpot,20230726,36.75145
XAUUSD | Spot,XAUUSD,Spot,FXSpot,20230726,1969.0
AUDUSD | Spot,AUDUSD,Spot,FXSpot,20230726,0.67525
EURUSD | Sp

### More Async Reading with Fusion Module

In [6]:
### To stream a file using an async generator:

# In an async context:
target = "common/datasets/ISS_ESG_CNTRY_RTNG_SSF/datasetseries/20250101/distributions/csv"
async_generator = f_inst1._async_stream_file(target, chunk_size=100) # returns AsyncGenerator[bytes, None]

# Working with output as async generator
# Printing the first 5 chunks
counter = 0
async for chunk in async_generator:
    print(chunk)
    counter += 1
    if counter >= 5:
        break

b'as_at_date,as_of_date,entity_id,entity_name,parent_entity_id,parent_entity_name,final_parent_entity_'
b'id,final_parent_entity_name,iss_issr_id,iss_issr_name,lei,gics_sctr,gics_sctr_code,gics_industry_grp'
b',gics_industry_grp_code,gics_industry,gics_industry_code,gics_sub_industry,gics_sub_industry_code,cm'
b'pny_ticker,cmpny_isin,cmpny_cusip,cmpny_sedol,cmpny_cins,inst_class,cmpny_exchg_lvl_figi,metrics_by_'
b'proxy_ind,proxy_entity_id,proxy_entity_name,agrcltre_prdctn_num,agrcltre_prdctn_wt,agrcltrl_prdctn,b'


In [7]:
### To asynchronously get entire file instead of streaming
file_as_bytes = await f_inst1._async_get_file(target, chunk_size=1000)
char_limit = 700 # Set a display limit just so notebook doesn't have to render entire file
print(f"First {char_limit} chars of file: {file_as_bytes[:char_limit]}")

First 700 chars of file: b'as_at_date,as_of_date,entity_id,entity_name,parent_entity_id,parent_entity_name,final_parent_entity_id,final_parent_entity_name,iss_issr_id,iss_issr_name,lei,gics_sctr,gics_sctr_code,gics_industry_grp,gics_industry_grp_code,gics_industry,gics_industry_code,gics_sub_industry,gics_sub_industry_code,cmpny_ticker,cmpny_isin,cmpny_cusip,cmpny_sedol,cmpny_cins,inst_class,cmpny_exchg_lvl_figi,metrics_by_proxy_ind,proxy_entity_id,proxy_entity_name,agrcltre_prdctn_num,agrcltre_prdctn_wt,agrcltrl_prdctn,biodvrsty,biodvrsty_num,biodvrsty_wt,clmte_chg,clmte_chg_and_energy,clmte_chg_and_energy_num,clmte_chg_and_energy_wt,clmte_chg_num,clmte_chg_wt,cntry_env_rtng,cntry_env_rtng_num,cntry_env_rtng_wt,cntry'


The above async examples are modified to run in a notebook.

See below for a pattern more applicable to execution in a .py file.

In [None]:
from fusion import Fusion
from fusion.fusion_filesystem import FusionHTTPFileSystem

# Basic Fusion/ffs setup
f_inst3 = Fusion()
as_async = True
async_ffs = FusionHTTPFileSystem(
    client_kwargs={
        "root_url": f_inst3.root_url,
        "credentials": f_inst3.credentials,
    },
    asynchronous=as_async
)

test_path = "common/datasets/ISS_ESG_CNTRY_RTNG_SSF/datasetseries/20250101/distributions"

# An HTTP Client session must be opened before executing any async methods on FFS
sess = await async_ffs.set_session()

# To avoid resource leaks, session should be opened in "async with" block.
# This ensures resources are cleaned up including when errors arise. Manually opening and then closing with .close() is available at user's own peril.
async with sess:
    cat_output = await async_ffs._cat(test_path)
    exists_output = await async_ffs._exists(test_path)
    target = "catalogs/common/datasets/FXO_SP/datasetseries/20230726/distributions/csv"
    async_download = await async_read(async_ffs, f_inst3, target)
