New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HLS-387] GFF3 Data Source #182
Conversation
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Codecov Report
@@ Coverage Diff @@
## master #182 +/- ##
==========================================
+ Coverage 93.64% 93.79% +0.14%
==========================================
Files 86 87 +1
Lines 4077 4222 +145
Branches 365 389 +24
==========================================
+ Hits 3818 3960 +142
- Misses 259 262 +3
Continue to review full report at Codecov.
|
core/src/main/scala/io/projectglow/gff/GffRowToInternalRowConverter.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some preliminary comments but it looks like a great start to me!
One question: are bgzip files with a .gz
extensions handled correctly now? I'm not sure if we'll know they're splittable.
*/ | ||
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { | ||
|
||
createRelation(sqlContext, parameters, null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think it would be better to infer schema here and then call the method below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
val sqlContext: SQLContext, | ||
path: String, | ||
requiredSchema: StructType | ||
) extends BaseRelation with TableScan { // TODO: investigate use of PrunedFilteredScan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO the column pruning would be helpful and probably very simple to implement. I doubt the filters are worth spending time on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorporated ColumnPruning. Skipped Filters
gffOfficialAttributeFields.foldLeft(Seq[StructField]()) { (s, f) => | ||
if (attributeTags | ||
.map(t => deUnderscore(t.toLowerCase)) | ||
.contains(f.name.toLowerCase)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, I think that spark sql is already case insensitive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. The case sensitivity against tag names needs to be dealt with though. Revise this section based on your other comments.
} | ||
|
||
def filterCommentsAndFasta(df: DataFrame): DataFrame = { | ||
df.where( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an option in the CSV reader to drop rows that don't have the right number of fields. Might save you some code here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As well as an option to filter out comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used filtering the comments.
Used DROPMALFORMED. This has issues in spark 2.4.4 and before which I dealt with and documented in the code.
) | ||
.collect()(0).getAs[Seq[String]](0) | ||
|
||
// generate the schema. The field names will be case and underscore insensitive |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code style thing: I think this would be more concise and clear if you normalized all the names together and then used sortBy to put them in the order you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revised.
} | ||
) | ||
.drop(attributesField.name, attributesMapColumnName) // TODO: Complete the option of keeping attributes field | ||
.rdd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For an easy performance gain, we can override needConversion and return queryExecution.toRdd
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great. Done
gffBaseSchema | ||
.fields | ||
.toSeq | ||
.map(f => StructField(f.name, StringType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you have to parse everything as a string at first? You can't parse them as the proper data types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used nullValue option of csv reader and was able to do it.
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
As we discussed, it is handled but not in splittable fashion. Will address in a separate PR by extending CSV datasource as we discussed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a first pass. I think the high level is really solid! Most of my comments are about code style and readability.
I'm a little surprised there aren't more edge cases to look at. We should be sure we've thought hard about the kinds of "weird" GFFs we'll encounter.
override def createRelation( | ||
sqlContext: SQLContext, | ||
parameters: Map[String, String]): BaseRelation = { | ||
val path = parameters.get("path") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you handle this validation logic in a common place? Place checkAndGetPath(options: Map[String, String]): String
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
val attributeFields = attributeTags | ||
.foldLeft(Seq[StructField]()) { (s, t) => | ||
val officialIdx = officialFieldNames.indexOf(normalizeString(t)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be a little cleaner to gffOfficialAttributeFields.find(f => f.name == normalizeString(t)).map(_.dataType).getOrElse(StringType)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.sortBy(_.name)(FieldNameOrdering) | ||
|
||
StructType( | ||
gffBaseSchema.fields.dropRight(1) ++ attributeFields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comment that attributes
is the last field in gffBaseSchema.fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
def filterFastaLines(df: DataFrame): DataFrame = { | ||
df.where( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of a confusing way to write this filter. fold
is specifically called out in our style guide as a hard-to-understand pattern: https://github.com/databricks/scala-style-guide. Coalesce might be a good alternative here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used Coalesce
s :+ StructField(t, StringType) | ||
} | ||
} | ||
.sortBy(_.name)(FieldNameOrdering) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the custom ordering is not necessary. It might be simpler to use the natural ordering on whether the field is in the official attributes and then the name.
attributes.sortBy(field => (!officialAttributes.contains(field), field.name))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used your suggestion but with indexOf because I want the official fields to be in a set order as well.
spark.conf.set(columnPruningConf, originalColumnPruning) | ||
|
||
val attributeFields = attributeTags | ||
.foldLeft(Seq[StructField]()) { (s, t) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a fold
instead of a simple map
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remnant of the older version! didnt pay attention that does nothing. Changed.
:+ StructField("Is_circular", BooleanType) | ||
) | ||
|
||
test("Schema inference") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any particular edge cases we should look for here? E.g., no attributes fields, attributes fields containing weird characters (can there be escaped =
or ;
inside attribute values)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only unescaped character allowed in gff3 is space. I added tags in the test schema that include space. Values that contain space are also covered in my test files.
=
and ;
and other characters must be escaped using percent-encoding in gff3 and I read them in their percent-encoding as well.
Gff3 must have nine columns. Files with no attribute column are not valid. I also checked that with an online gff validator.
Please see my comment in the tests. GFF3 format is pretty restricted. I could not think of cases other than those mentioned that are both weird and valid. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just had two comments on testing. The main code looks great!
assert(dfRows.sameElements(expectedRows)) | ||
} | ||
|
||
test("Read gff with user-specified schema containing attributesField") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the expected behavior if the user provides a schema with fields that are not actually in the attributes map? We should have a test for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The column will be added. All entries will be null. Added a test for that.
} | ||
assert(e.getMessage.contains("GFF data source does not support writing!")) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a test for column pruning where we select only a few columns from the inferred schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the "column pruning" test
Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
* prep Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * schema inferrer Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * field order Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * cleanup Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * reader Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * more reader Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * projection Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * works Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * sbt Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * test schema Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * test schema Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * test read Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * updateToken Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * more tests Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * filess Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * filtered schema Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * revert Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * datasource api Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * comments and more Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * working version Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * working version Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * final and tests Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * case test Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * comments Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * empty Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> * tests added Signed-off-by: kianfar77 <kiavash.kianfar@databricks.com> Signed-off-by: Henry Davidge <hhd@databricks.com>
What changes are proposed in this pull request?
This PR adds the "gff" datasource to glow for reading GFF3 (https://github.com/The-Sequence-Ontology/Specifications/blob/master/gff3.md) files. The usage will be like:
val df = spark.read.format("gff").load
The data source is able to infer the schema or accept a user-specified schema. It flattens the attributes field by creating a column for each tag that appears in the attributes column of the gff file.
The inferred schema will have base fields corresponding to the first 8 columns of gff3 called seqId, source, type, start, end, score, strand, and phase, followed by any official attribute field among id, name, alias, parent, target, gap, derivesfrom, note, dbxref, ontologyterm, and iscircular that appears in the gff tags followed by any unofficial attribute field that appears in the tags. In the inferred schema, the base and official fields will be in the same order as listed above. The unofficial fields will be in alphabetical order.
Any user-specified schema can have any subset of fields corresponding to the 9 columns of gff3 (named seqId, source, type, start, end, score, strand, phase, and attributes), the official attribute fields, and the unofficial attribute fields. The name of the official and unofficial fields should match the tag name in a case-and-underscore-insensitive fashion.
How is this patch tested?
(Details)