From 91addbb47911301cf70e8da380dd07fde40cee5c Mon Sep 17 00:00:00 2001 From: Ian Wang <22849821+wangyinz@users.noreply.github.com> Date: Sat, 18 May 2024 03:53:40 -0500 Subject: [PATCH] Handles import with exception (#539) * handles import with exception * Remove debris from previous change * reformat with black --- python/mspasspy/db/database.py | 17 +++----------- python/mspasspy/io/distributed.py | 38 ++++++++++++++++++++++++++----- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/python/mspasspy/db/database.py b/python/mspasspy/db/database.py index fdfe1e922..c4173cffe 100644 --- a/python/mspasspy/db/database.py +++ b/python/mspasspy/db/database.py @@ -14,23 +14,12 @@ import fcntl try: - import dask.bag as daskbag + import dask.dataframe as daskdf _mspasspy_has_dask = True except ImportError: _mspasspy_has_dask = False -try: - import dask.dataframe as daskdf -except ImportError: - _mspasspy_has_dask = False - -try: - import pyspark - - _mspasspy_has_pyspark = True -except ImportError: - _mspasspy_has_pyspark = False import gridfs import pymongo @@ -5864,7 +5853,7 @@ def save_dataframe( """ dbcol = self[collection] - if parallel: + if parallel and _mspasspy_has_dask: df = daskdf.from_pandas(df, chunksize=1, sort=False) if not one_to_one: @@ -5880,7 +5869,7 @@ def save_dataframe( df[key] = df[key].mask(df[key] == val, None) """ - if parallel: + if parallel and _mspasspy_has_dask: df = daskdf.from_pandas(df, chunksize=1, sort=False) df = df.apply(lambda x: x.dropna(), axis=1).compute() else: diff --git a/python/mspasspy/io/distributed.py b/python/mspasspy/io/distributed.py index 0d7586151..6861ca50c 100644 --- a/python/mspasspy/io/distributed.py +++ b/python/mspasspy/io/distributed.py @@ -22,8 +22,24 @@ ErrorLogger, ) -import dask -import pyspark +try: + import dask + + _mspasspy_has_dask = True +except ImportError: + _mspasspy_has_dask = False +try: + import pyspark + + _mspasspy_has_pyspark = True +except ImportError: + _mspasspy_has_pyspark = False + +if not _mspasspy_has_dask and not _mspasspy_has_pyspark: + message = "{} requires either dask or pyspark module. Please install dask or pyspark".format( + __name__ + ) + raise ModuleNotFoundError(message) def read_ensemble_parallel( @@ -460,6 +476,11 @@ def read_distributed_data( message += "Unsupported value for scheduler={}\n".format(scheduler) message += "Must be either 'dask' or 'spark'" raise ValueError(message) + if scheduler == "spark" and not _mspasspy_has_pyspark: + print( + "WARNING(read_distributed_data): pyspark not found, will use dask instead. The scheduler argument is ignored." + ) + scheduler = "dask" if isinstance(data, list): ensemble_mode = True @@ -491,9 +512,10 @@ def read_distributed_data( ensemble_mode = False dataframe_input = False db = data - elif isinstance( - data, - (pd.DataFrame, pyspark.sql.dataframe.DataFrame, dask.dataframe.core.DataFrame), + elif ( + isinstance(data, pd.DataFrame) + or (_mspasspy_has_dask and isinstance(data, dask.dataframe.core.DataFrame)) + or (_mspasspy_has_pyspark and isinstance(data, pyspark.sql.dataframe.DataFrame)) ): ensemble_mode = False dataframe_input = True @@ -539,7 +561,6 @@ def read_distributed_data( ) raise TypeError(message) container_partitions = container_to_merge.getNumPartitions() - else: if not isinstance(container_to_merge, dask.bag.core.Bag): message += ( @@ -1512,6 +1533,11 @@ class method `save_data`. See the docstring for details but the ) message += "Must be either dask or spark" raise ValueError(message) + if scheduler == "spark" and not _mspasspy_has_pyspark: + print( + "WARNING(write_distributed_data): pyspark not found, will use dask instead. The scheduler argument is ignored." + ) + scheduler = "dask" else: scheduler = "dask" # This use of the collection name to establish the schema is