Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling Spark based HDFS I/O for running ml4ir @rev balikasg@ #44

Merged
merged 24 commits into from Jul 17, 2020

Conversation

lastmansleeping
Copy link
Collaborator

The PR includes changes required to read the following:

  • data (any directory -> copied over to local FS)
  • feature config + model config (any YAML/JSON file)
  • dataframe(eg: vocabulary files)

and writing the following back to HDFS:

  • models
  • logs, metrics, etc

@lastmansleeping lastmansleeping self-assigned this Jul 13, 2020
@lastmansleeping lastmansleeping changed the title Enabling Spark based HDFS I/O for running ml4ir Enabling Spark based HDFS I/O for running ml4ir @rev balikasg@ Jul 13, 2020
@lastmansleeping lastmansleeping requested review from balikasg and jakemannix and removed request for balikasg July 13, 2020 14:25
return SparkConfigHolder.HADOOP_CONFIG


def get_hdfs():
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@balikasg This is the main module to review. I had to jump some hoops to get access to the hadoop File system since it is only available in spark's java based APIs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the pointer!
From what I understand it is not a Flowsnake v2 thing, we are expecting HDFS paths and we are handling things as this.

self.data_dir: str = data_dir
self.logger = logger

# If data directory is a HDFS path, first copy to local file system
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we can reorganize this a bit. This has references to sparkIO directly in this class, which we don't want. We should encapsulate the IO into a class (ahh! Jake is suggesting a class?!?) or a function which is passed in.

We can chat about that later on if it's not clear.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Haha. I went overboard with NOT making spark_io a class this time.)
I'm not sure how we can eliminate a reference to spark_io here though. I can wrap it under file_io.copy_dir(src, dest) and then internally call spark_io like I do for the other I/O codepaths, if you like.

Does that work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I mean is you could pass into RelevanceDataset's constructor, an object or function which provides IO. If it's a object which you pass in, the (abstract) class of the object would be DataIO (or something like that), and some particular implementations would be LocalIO and SparkIO. Then RelevanceDataset has no references to spark, but whoever instantiates a RelevanceDataset may choose to create a spark-based DataIO and pass it into the constructor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what you are suggesting. But I don't get how you would do this specific task - "copying files from HDFS to Local filesystem", which is specific for the spark_io case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind that for tfrecord, we don't explicitly(using file_io or spark_io) do any read at all. It's handled by tensorflow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, I can do the copying in the main pipeline and force RelevanceDataset to work with only file_io(or local file I/O). This might be cleaner. My reasoning for adding the copying to RelevanceDataset was that if a user wanted to create a RelevanceDataset using data stored on HDFS, then they already can do it. They don't have to write their own version of spark_IO. May be passing in the file handler object into RelevanceDataset is a good middle ground. Not entirely convinced.

@lastmansleeping
Copy link
Collaborator Author

lastmansleeping commented Jul 14, 2020

@balikasg Synced with @jakemannix offline, and he requested a few structural changes to the code. Will update you when done, but not changing any functionality. So you can still review it when you are free and try training models.

MODELS = "models"
LOGS = "logs"
DATA = "data"
TEMP_DATA = "data/temp"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since they are temp, should we move this to /tmp/data? do we have cleaning in place?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although I don't know if flowsnake has /tmp

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explicitly delete the temp directory once done.

@@ -465,13 +473,11 @@ def parse_config(
tfrecord_type: str, feature_config, logger: Optional[Logger] = None
) -> FeatureConfig:
if feature_config.endswith(".yaml"):
feature_config = file_io.read_yaml(feature_config)
if logger:
logger.info("Reading feature config from YAML file : {}".format(feature_config))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just wondering here. Not for this PR.
But if logger repeats a lot in ml4ir. Why not agree we always need a logger, and instantiate one form the beginning? Or decorate key functions/classes to avoid such checks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Let me file an issue for this. It looks to me like tech debt building.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree - we should just always have a Logger - if none is configured, use a dummy / noop logger or something.

if logger:
logger.info("Reading feature config from YAML string")
if logger:
logger.info("Feature Config \n{}".format(json.dumps(feature_config, indent=4)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw these are very nice, but not related to the functionality this PR adds!
thanks for the effort though! We could also consider this as logger debug, as it is often long

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Can change.


if infile.startswith(HDFS_PREFIX):
# NOTE: Move to fully spark dataframe based operations in the future
return spark_io.read_df(infile).toPandas()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this lazy? Does it scale or OOM?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not lazy. The goal is to only use this for small files like vocabulary, etc which you really need in memory. For training data, we should use the lazy loading provided by TFRecordDataset.


if infile.startswith(HDFS_PREFIX):
# NOTE: Move to fully spark dataframe based operations in the future
return spark_io.read_df(infile).toPandas()
elif infile.endswith(".gz"):
fp = gzip.open(os.path.expanduser(infile), "rb")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the impression that pandas handles gz files out of the box.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used this method from our other projects. So my memory might not be completely correct - I don't think pandas can read gz files with the C engine. Can check though.

fp.close()
return output
if outfile:
fp = open(outfile, "w")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here using with open()... is a more pythonic way. Why not directly write with pd.to_csv? to replace?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After spending a lot of time with our ranking data, this combination of read_df and write_df were the only ones that were compatible with each other and the upstream spark jobs. If you notice, I need to replace \\ with \\\\ on the next line for compatibility with re-reading. I can add a FIXME here for the future though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Again, this method was written a year ago for some of our other projects. Might not be necessary to do all this hacky read write for ml4ir)

if self.data_format == DataFormatKey.CSV:
file_io.rm_dir(os.path.join(self.data_dir, "tfrecord"))
file_io.rm_dir(DefaultDirectoryKey.TEMP_DATA)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw this, there is cleaning!

Copy link
Collaborator

@balikasg balikasg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR looks good, I like the new design!
I have left a few non-blocking comments, so I am approving

"""
if outfile and outfile.startswith(HDFS_PREFIX):

class FileIO(object):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this! Defining FileIO and then having the LocalIO and SparkIO, is clean, slick etc. nice!

class LocalIO(FileIO):
"""Class defining the file I/O handler methods for the local file system"""

def make_directory(self, dir_path: str, clear_dir: bool = False) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear_dir is an overloaded name in this implementation. There are both functions and variables defined with the same name. Can we consider changing one of the two? For example, this bool arg can be renamed to remove_dir_content or something else equally descriptive. clear_dir_content would also work I guess.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

)
self.local_fs = self.hdfs.getLocal(self.hadoop_config)

def make_directory(self, dir_path: str, clear_dir: bool = False) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here with clear_dir..

Returns:
python dictionary
"""
self.log("Reading JSON file : {}".format(infile))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice usage of self.log here! +1

Returns:
pandas dataframe
"""
raise NotImplementedError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkIO inherits from FileIO, isn't it redundant to overwrite notImplemented methods that were defined there? Any particular reason for it? it occurs a few times in SparkIO.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question (if you know): just passing the list of files, wouldn't do the trick? Like:

self.spark_session.read.csv
            .option("header", "true")
            .option("inferschema", "true")
            .option("mode", "DROPMALFORMED")
            .option("mergeSchema", "true")
            .load(infiles)
            .toPandas()

I am looking at this question.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@lastmansleeping
Copy link
Collaborator Author

This PR looks good, I like the new design!
I have left a few non-blocking comments, so I am approving

Thanks @balikasg . Will address the comments and fix the test failures and then merge the PR.

@lastmansleeping
Copy link
Collaborator Author

@jakemannix I have made the requested changes. Had to change a lot of files though.

Copy link
Contributor

@jakemannix jakemannix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok this about covers my concerns. We're still coupled, but the coupling is loose enough that it's mostly just package restructuring left.

if self.args.file_handler == FileHandlerKey.LOCAL:
self.file_io = self.local_io
elif self.args.file_handler == FileHandlerKey.SPARK:
self.file_io = SparkIO(self.logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so this is basically what I was looking for, yes. Technically, we need to make one last step of separation: pull spark_io.py out of ml4ir/base and move it into a new python package (ml4ir-spark), and perhaps put the pipeline in its own package as well (ml4ir-apps), and show how you could allow people to run pipelines without having to pip install pyspark at all, if not needed.

But folding that into the next ticket should be fine.

@lastmansleeping lastmansleeping merged commit d47950a into master Jul 17, 2020
@lastmansleeping lastmansleeping deleted the ashish/spark_io branch July 17, 2020 07:40
parser.add_argument('-max_num_records', default=MAX_NUM_RECORDS)
parser.add_argument('-num_samples', default=NUM_SAMPLES)
parser.add_argument('-random_state', default=RANDOM_STATE)
parser.add_argument("-data_dir", default=DATA_DIR)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

help messages?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants