diff --git a/README.md b/README.md index 04973fb..98f4e6a 100644 --- a/README.md +++ b/README.md @@ -11,13 +11,17 @@ A functional parser of tables, implemented in Scala. Typically, the input is in the form of a "CSV" (comma-separated-values) file. However, it is perfectly possible to parse other formats. +Indeed, as of 1.3.0, it is possible to parse Parquet files. +Please see the **[TableParser Parquet Design](docs/design/TableParserParquetDesign.md)** +design document for more information. _TableParser_ aims to make it as simple as possible to ingest a fully typed tabular dataset. The principal mechanism for this is the use of case classes to specify the types of fields in the dataset. All conversions from strings to standard types are performed automatically. For non-standard types, it suffices simply to provide an implicit converter of the form _String=>T_. -It is possible to parse sequences of _String_ (one per row)--the typical situation for a CSV file--or sequences of +It is possible to parse sequences of _String_ (one per row)-- +the typical situation for a CSV file--or sequences of _String_ (where the table corresponds to a matrix of cells). This library makes extensive use of type classes and other implicit mechanisms. @@ -26,33 +30,37 @@ There is a row-parser configuration mechanism that allows the programmer to vary the regular expressions for recognizing strings and delimiters and also to vary the quote character. -In addition to parsing, _TableParser_ provides a mechanism for rendering a table in _hierarchical_ form (for example for +In addition to parsing, +_TableParser_ provides a mechanism for rendering a table in _hierarchical_ form (for example for XML or HTML). -An output structure which is itself tabular or sequence-oriented can be generated quite easily using the rows of the table, -together with something like, for instance, a JSON writer. +An output structure which is itself tabular or sequence-oriented can be generated +quite easily using the rows of the table, together with something like, for instance, a JSON writer. Package Structure =========== -As of version 1.2.1, the code is made up of four packages: _core_, _cats_, _zio_, and _spark_. +As of version 1.3.0, the code is made up of five packages: _core_, _cats_, _parquet_, _zio_, and _spark_. Most of the remainder of this README file refers to the _core_ package. The _cats_ package is for use with Cats Effect, particularly _IO_. All usage of _cats-effect IO_ and encryption have been moved into the _cats_ package. The _spark_ package is for use with Apache Spark (beginning with 1.2.0). The _zio_ package is for use with ZIO. +The _parquet_ package is for use with Apache Parquet (beginning with 1.3.0). Quick Intro =========== -The simplest way to get an introduction to _TableParser_ is to consult the _movie.sc_ +The simplest way to get an introduction to _TableParser_ is to consult the _movie.sc_. and _airbnb.sc_ worksheets (the latter is in the _cats_ package). These give detailed descriptions of each stage of the process. -Another way to see how it works is to look at this application Pairings which takes a CSV file, parses it, transforms the data, +Another way to see how it works is to look at this application Pairings which takes a CSV file, +parses it, transforms the data, and outputs a JSON file. This way of parsing is a little different from what is shown in the worksheets. But both are effective. -The minimum code necessary to parse the CSV file as a table of "Player"s, using as many defaults as possible is: +The minimum code necessary to parse the CSV file as a table of "Player"s, +using as many defaults as possible is: case class Player(first: String, last: String) @@ -62,14 +70,18 @@ The minimum code necessary to parse the CSV file as a table of "Player"s, using val pty: Try[Table[Player]] = Table.parseFile("players.csv") -The _TableParserHelper_ used here is an abstract subclass of _CellParsers_ and is customized for the row type (in this -case, _Player_). -In particular, it defines an implicit _TableParser\[Table\[X]]_ where _X_ is the row type (_Player_ in this example). -This assumes that the source input file ("players.csv") contains a header row which includes column names corresponding to the parameters +The _TableParserHelper_ used here is an abstract subclass of _CellParsers_, +and is customized for the row type (in this case, _Player_). +In particular, it defines an implicit _TableParser\[Table\[X]]_, +where _X_ is the row type (_Player_ in this example). +This assumes that the source input file ("players.csv") +contains a header row which includes column names corresponding to the parameters of the case class _Player_ (in this case "first" and "last"). -If, for example, your CSV file does not have a header row, then you make a minor change to the line _object Player..._ +If, for example, your CSV file does not have a header row, +then you make a minor change to the line _object Player..._ -The input file looks something like this (the first and last columns are required, others are ignored): +The input file looks something like this (the first and last columns are required, +others are ignored): Id,First,Last, 1,Adam,Sullivan, @@ -85,16 +97,19 @@ For another simple use case _TableParser_, please see my blog at: https://scalap # User Guide -This version of the README.md file refers to version: 1.2.6. +This version of the README.md file refers to version: 1.3.0. See release notes below for history. -Parsing -======= +## Parsing + +Parsing Row-oriented Tables +=========================== The _Table_ trait expresses the result of parsing from a serialized representation of a table. Each row is represented by a parametric type _Row_. -Typically, this _Row_ type is a case class with one parameter corresponding to one column in the table file. +Typically, +this _Row_ type is a case class with one parameter corresponding to one column in the table file. However, some table files will have too many columns to be practical for this correspondence. In such a situation, you have two choices: (1) parsing each row as a list of String (also known as a "raw" row); @@ -113,24 +128,30 @@ This analysis will give you a list of columns, each showing its name, whether it is optional (i.e., contains nulls), and (if it's a numerical column), its range, mean, and standard deviation. -Incidentally, this raw parser has three signatures, one for resources, one for files, and one for a sequence of Strings. +Incidentally, this raw parser has three signatures, one for resources, one for files, +and one for a sequence of Strings. And the default for raw row parsing is to allow quoted strings to span multiple lines. -But, if not parsing as raw rows, you will need to design a class hierarchy to model the columns of the table. +But, if not parsing as raw rows, +you will need to design a class hierarchy to model the columns of the table. _TableParser_ will take care of any depth of case classes/tuples. -Currently, there is a limit of 13 parameters per case class/tuple so with a depth of _h_ classes/tuples you could +Currently, there is a limit of 13 parameters per case class/tuple, +so with a depth of _h_ classes/tuples you could theoretically handle _13^h_ attributes altogether. -The names of the parameters of a case class do not necessarily have to be the same as the column from which the value derives. +The names of the parameters of a case class do not necessarily +have to be the same as the column from which the value derives. The _ColumnHelper_ class is available to manage the mapping between parameters and columns. The result of parsing a table file (CSV, etc.) will be a _Table\[Row]_, wrapped in _Try_. -There are object methods to parse most forms of text: _File, Resource, InputStream, URL, Seq\[String]_, etc. (see _Table_ below). +There are object methods to parse most forms of text: +_File, Resource, InputStream, URL, Seq\[String]_, etc. (see _Table_ below). -The parser responsible for parsing the contents of a cell is called _CellParser\[T]_ where _T_ is the type of the value +The parser responsible for parsing the contents of a cell is called _CellParser\[T]_, +where _T_ is the type of the value in the cell in question. -_T_ is covariant so that if you have alternative parsers which generate different subclasses of trait, for instance, -this can be done. +_T_ is covariant so that if you have alternative parsers which generate different subclasses of trait, +for instance, this can be done. In order for _TableParser_ to know how to construct a case class (or tuple) from a set of values, an implicit instance of _CellParser\[T]_ must be in scope. @@ -142,9 +163,10 @@ and where _T_ is the type to be constructed: Typically, the function _f_ is the _apply_ method of the case class _T_, although you may have to explicitly refer to a particular function/method with a specific signature. -When you have created a companion object to the case class, you will simply use the method name (typically _apply_) as in -_Name.apply_ (see example below). -If you have created additional apply methods, you will need to define a function of a specific type and pass that in. +When you have created a companion object to the case class, +you will simply use the method name (typically _apply_) as in _Name.apply_ (see example below). +If you have created additional apply methods, +you will need to define a function of a specific type and pass that in. Or, more simply, do as for _ratingParser_ in the example below. Note that _P1_, _P2_, ... _Pn_ each have a context bound on _CellParser_ (that's to say, there is implicit @@ -154,6 +176,25 @@ _T_ is bound to be a subtype of _Product_ and has two context bounds: _ClassTag_ See the section on _CellParsers_ below. +Parsing Column-oriented Tables +============================== + +### Parquet parsing + +The mechanisms described above are primarily designed for parsing row-oriented tables such as CSV. +However, as of version 1.3.0, it is possible to parse Parquet files. +Please see the **[TableParser Parquet Design](docs/design/TableParserParquetDesign.md)** +design document for more information. + +Much of the core functionality is applicable to column-oriented tables as well. +The most significant differences are +- that the header is always known and is defined by the Parquet file itself. +- the concept of the RowParser does not translate to column-oriented tables. + +For an example of how to parse a Parquet file, +see the _YellowTaxiTrip_ example in **[YellowTaxiTrip](parquet/src/test/scala/com/phasmidsoftware/tableparser/parquet/YellowTaxiTrip.scala). +This is from the NYC Taxi dataset. + ## Table The _Table_ class, which implements _Iterable\[Row]_, also has several methods for manipulation: @@ -177,7 +218,8 @@ The _Table_ class, which implements _Iterable\[Row]_, also has several methods f * lazy val shuffle: Table\[Row] -It is to be expected that _join_ methods will be added later (based upon the second signature of processRows). +It is to be expected that _join_ methods will be added later +(based upon the second signature of processRows). The following **object** methods are available for parsing text: * def parse\[T: TableParser](ws: Seq\[String]): Try\[T] @@ -187,20 +229,24 @@ The following **object** methods are available for parsing text: * def parse\[T: TableParser](u: URI, enc: String): Try\[T] * def parseInputStream\[T: TableParser](i: InputStream)(implicit codec: Codec): Try\[T] * def parseInputStream\[T: TableParser](i: InputStream, enc: String): Try\[T] -* def parseFile\[T: TableParser](f: File)(implicit codec: Codec): Try\[T] -* def parseFile\[T: TableParser](f: File, enc: String): Try\[T] -* def parseFile\[T: TableParser](pathname: String)(implicit codec: Codec): Try\[T] -* def parseFile\[T: TableParser](pathname: String, enc: String): Try\[T] +* def parseFile\[T: TableParser](path: Path)(implicit codec: Codec): Try\[T] +* def parseFile\[T: TableParser](path: Path, enc: String): Try\[T] * def parseResource\[T: TableParser](s: String, clazz: Class\[_] = getClass)(implicit codec: Codec): Try\[T] * def parseResource\[T: TableParser](u: URL, enc: String): Try\[T] * def parseResource\[T: TableParser](u: URL)(implicit codec: Codec): Try\[T] * def parseSequence\[T: TableParser](wss: Seq\[Seq\[String]]): Try\[T] +* @deprecated def parseFile\[T: TableParser](f: File)(implicit codec: Codec): Try\[T] +* @deprecated def parseFile\[T: TableParser](f: File, enc: String): Try\[T] +* @deprecated def parseFile\[T: TableParser](pathname: String)(implicit codec: Codec): Try\[T] +* @deprecated def parseFile\[T: TableParser](pathname: String, enc: String): Try\[T] Please note that, in the case of a parameter being an Auto-closeable object such as _InputStream_ or Source, it is the caller's responsibility to close it after parsing. -However, if the parameter is a File, or filename, or URL/URI, then any Source object that is instantiated within +However, if the parameter is a _Path_, _File_, or filename, or _URL_/_URI_, +then any Source object that is instantiated within the parse method will be closed. -This applies also to the _parseInputStream_ methods: the internally defined _Source_ will be closed (but not the stream). +This applies also to the _parseInputStream_ methods: +the internally defined _Source_ will be closed (but not the stream). Additionally, there is an implicit class called _ImplicitParser_ (defined in the _TableParser_ companion object) which allows for expressions such as: @@ -231,30 +277,41 @@ _TableParser_ is defined thus: def parse(xs: Iterator[Input], n: Int = headerRowsToRead): Try[Table] } -The type _Row_ defines the specific row type of the resulting _Table_ (for example, _Movie_, in the example below). -The type _Input_ defines the input type, typically _String_, but there are also alternatives such as _Seq\[String]_. -_hasHeader_ is used to define if there is a header row in the first line of the file (or sequence of strings) to be parsed. -_forgiving_, which defaults to _false_, can be set to _true_ if you expect that some rows will not parse, but where this +The type _Row_ defines the specific row type of the resulting _Table_ +(for example, _Movie_, in the example below). +The type _Input_ defines the input type, typically _String_, +but there are also alternatives such as _Seq\[String]_. +_maybeHeader_ is an optional parameter that, if defined, +specifies the (fixed) header to be used for parsing. +In such a case, the system will not look for a header row(s) in the input. +_headerRowsToRead_ is the number of header rows to be read, +as sometimes it extends beyond one line. +_forgiving_, which defaults to _false_, +can be set to _true_ if you expect that some rows will not parse, but where this will not invalidate your dataset as a whole. _multiline_ is used to allow (or disallow when false) quoted strings to span multiple lines. In forgiving mode, any exceptions thrown in the parsing of a row are collected and then logged. _rowParser_ is the specific parser for the _Row_ type (see below). _builder_ is used by the _parse_ method. -_parse_ is the main method of _TableParser_ and takes a _Seq\[String]_ and yields a _Try\[Table]_. +_parse_ is the main method of _TableParser_ and takes an _Iterator\[Input]_ and yields a _Try\[Table]_. +You can also specify the number of header rows to be read, but this defaults to _headerRowsToRead_. -The predicate is used to filter rows (which are the results of parsing). +The _predicate_ is used to filter rows (which are the results of parsing). By default, all rows are included. _TableParser_ also provides a method (_sampler_) to create a random sampling function. -Note, however, that a significant part of the time for building a table from a large file is just reading and parsing the file. +Note, however, +that a significant part of the time for building a table from a large file is just reading and parsing the file. Sampling will not reduce this portion of the time. -Associated with _TableParser_ is an abstract class called _TableParserHelper_, whose purpose is to make your coding job -easier. -_TableParserHelper_ is designed to be extended (i.e., subclassed) by the companion object of the case class that you +Associated with _TableParser_ is an abstract class called _TableParserHelper_, +whose purpose is to make your coding job easier. +_TableParserHelper_ is designed to be extended i.e., subclassed, +by the companion object of the case class that you wish to parse from a row of your input. Doing it this way makes it easier for the implicit TableParser instance to be found. -You can also set up your application along the lines of the examples below, such as the Movie example. +You can also set up your application along the lines of the examples below, +such as the Movie example. The constructor for _TableParserHelper_ takes two parameters, both of which can be defaulted: * sourceHasHeaderRow: Boolean = true @@ -264,8 +321,10 @@ The constructor for _TableParserHelper_ takes two parameters, both of which can _RowParser_ is a trait that defines how a line of text is to be parsed as a _Row_. _Row_ is a parametric type that, in subtypes of _RowParser_, is context-bound to _CellParser_. -A second parametric type _Input_ is defined: this will take on values of _String_ or _Seq\[String]_, according to the form of input. -Typically, the _StandardRowParser_ is used, which takes as its constructor parameter a _LineParser_. +A second parametric type _Input_ is defined: +this will take on values of _String_ or _Seq\[String]_, according to the form of input. +Typically, the _StandardRowParser_ is used, +which takes as its constructor parameter a _LineParser_. The methods of _RowParser_ are: @@ -276,14 +335,19 @@ The methods of _RowParser_ are: def parseHeader(w: String): Try[Header] The parseIndexed method is useful when we care about the sequential aspect of the input. -This is particularly important if strings are allowed to spread over newlines (as in the Airbnb dataset). +This is particularly important if strings are allowed to spread over newlines +(as in the Airbnb dataset). ## LineParser The _LineParser_ takes five parameters: two regexes, a String and two Chars. -These define, respectively, the delimiter regex, the string regex, list enclosures, the list separator, and the quote character. -Rather than invoke the constructor directly, it is easier to invoke the companion object's _apply_ method, which takes a single implicit parameter: a _RowConfig_. -Two consecutive quote characters, within a quoted string, will be parsed as a single quote character. +These define, respectively, the delimiter regex, the string regex, list enclosures, +the list separator, and the quote character. +Rather than invoke the constructor directly, +it is easier to invoke the companion object's _apply_ method, +which takes a single implicit parameter: a _RowConfig_. +Two consecutive quote characters, within a quoted string, +will be parsed as a single quote character. The _LineParser_ constructor will perform some basic checks that its parameters are consistent. ## StringsParser @@ -316,31 +380,36 @@ There are a number of methods which return an instance of _CellParser_ for vario * etc. including other ways to instantiate a ColumnHelper\[T]. The methods of form _cellParserN_ are the parsers that are used to parse into case classes. -Ensure that you have the correct number for N: the number of fields/parameters in the case class you are instantiating. +Ensure that you have the correct number for N: +the number of fields/parameters in the case class you are instantiating. If you don't, the compiler, or your IDE, will warn you. -In some situations, the reflection code is unable to get the field names in order (for example, when there are public -lazy values). +In some situations, +the reflection code is unable to get the field names in order (for example, +when there are public lazy values). In such a case, add the second parameter to _explicitly_ define the order of the field names. Normally, of course, you can leave this parameter unset. -There is one additional method to handle the situation where you want to vary the parser for a set of cells according +There is one additional method to handle the situation where you want to vary the parser +for a set of cells according to the value in another (key) column: _cellParser2Conditional_. -In this case, you must supply a _Map_ which specifies which parser is to be used for each possible value of the key column. +In this case, you must supply a _Map_, +which specifies which parser is to be used for each possible value of the key column. If the value in that column is not one of the keys of the map, an exception will be thrown. For an example of this, please see the example in _CellParsersSpec_ ("conditionally parse"). ### Implicits -Keep in mind when using implicit values that the best practice is to define an implicit involving a type T, +Keep in mind when using implicit values that the best practice +is to define an implicit involving a type T, for example, _CellParser\[T]_, in the companion object of _T_. -This will tend to eliminate any ambiguously defined implicits, and it also tends to avoid any problems with -initialization. +This will tend to eliminate any ambiguously defined implicits, +and it also tends to avoid any problems with initialization. If you still run into initialization problems, try defining the troublemaker as lazy. -It also relieves you from having to make up names for the implicit values (which the compiler more or less ignores, -anyway). +It also relieves you from having to make up names for the implicit values +(which the compiler more or less ignores, anyway). Just ensure that the name is valid, doesn't invoke a recursion, and is not in conflict with another name. -If you look in the example of _Principal_ (below), you will see that this is also the place to define optional parsers, -sequential parsers, etc. +If you look in the example of _Principal_ (below), +you will see that this is also the place to define optional parsers, sequential parsers, etc. ## Caveats @@ -387,13 +456,15 @@ The following is the required code: } We define a missing object because that is sometimes convenient to use with Spark. -Similarly, the header object is the standard header strings which can be used when reading a CSV file without a header. +Similarly, the header object is the standard header strings, +which can be used when reading a CSV file without a header. The (implicit) _helper_ is used to map the names of columns appropriately. The (implicit) _parser_ is the required _CellParser_ for _Movie_. The (implicit) _renderer_ is used to render a _Movie_ as a CSV file. The (implicit) _generator_ is used for outputting a _Movie_ in other format(s). -Each of the case classes referenced in the delcaration of _Movie_ will also need a similar companion object. +Each of the case classes referenced in the declaration of _Movie_, +will also need a similar companion object. For example, the _Principal_, whose case class is defined thus: case class Principal(name: Name, facebookLikes: Int) @@ -426,7 +497,8 @@ The other case classes look like this: case class Name(first: String, middle: Option[String], last: String, suffix: Option[String]) case class Rating(code: String, age: Option[Int]) -Consult the actual code in _Movie.scala_ for the details of what is required in the corresponding companion objects. +Consult the actual code in _Movie.scala_ for the details +of what is required in the corresponding companion objects. The _Movie_ object has additional code like this: @@ -446,21 +518,24 @@ The _Movie_ object has additional code like this: implicit object MovieTableParser extends MovieTableParser } -We use the `forgiving` mode for `MovieTableParser` because we expect that there will be many rows which cannot be -parsed. +We use the `forgiving` mode for `MovieTableParser` because we expect +that there will be many rows which cannot be parsed. In this code, _helper_, and the other columnHelpers, specify parameter-column mappings. Note that _helper_ for _Principal_ has an extra parameter at the start of the parameter list: Some("$x_$c") which is an (optional) formatter for the purpose of prefixing a string to column names. -That's because there are several "Principal" parameters in a _Movie_, and each one has its own set of attributes. -In this format parameter, "$x" is substituted by the prefix (the optional value passed into the lookup method) +That's because there are several "Principal" parameters in a _Movie_, +and each one has its own set of attributes. +In this format parameter, "$x" is substituted by the prefix +(the optional value passed into the lookup method) while $c represents the translated column name. A couple of parameters of _Movie_ are actually attribute sets (_AttributeSet_). These are basically lists of _String_ within one column value. -Such lists are parsed as lists as they are parsed from the original strings and then returned as strings +Such lists are parsed as lists as they are parsed from the original strings +and then returned as strings in the form "{" element "," element ... "}" The parsing from the original string obeys the _RowConfig_ parameters of _listSep_ and _listEnclosure_. @@ -474,7 +549,8 @@ A parameter can be optional, for example, in the _Movie_ example, the _Productio case class Production(country: String, budget: Option[Int], gross: Int, title_year: Int) In this example, some movies do not have a budget provided. -All you have to do is declare it optional in the case class and _TableParser_ will specify it as _Some(x)_ if valid, else _None_. +All you have to do is declare it optional in the case class, +and _TableParser_ will specify it as _Some(x)_ if valid, else _None_. Note that there is a default, implicit _RowConfig_ object defined in the object _RowConfig_. @@ -496,9 +572,11 @@ then you should define the following instead of the _MovieTableParser_: This example has two variations on the earlier theme of the _Movies_ example: (1) each row (a Submission) has an unknown number of _Question_ parameters; -(2) instead of reading each row from a single String, we read each row from a sequence of Strings, each corresponding to a cell. +(2) instead of reading each row from a single String, we read each row from a sequence of Strings, +each corresponding to a cell. -The example comes from a report on the submissions to a Scala exam. Only one question is included in this example. +The example comes from a report on the submissions to a Scala exam. +Only one question is included in this example. case class Submission(username: String, lastName: String, firstName: String, questions: Seq[Question]) @@ -544,26 +622,31 @@ To test this example, we run a unit test as follows (using scalatest): } } -Note the use of _cellParserRepetition_. The parameter allows the programmer to define the start value of the sequence number for the columns. +Note the use of _cellParserRepetition_. +The parameter allows the programmer to define the start value of the sequence number for the +columns. In this case, we use the default value: 1 and so don't have to explicitly specify it. -Also, note that the instance of _ColumnHelper_ defined here has the formatter defined as "$c $x" which is in the -opposite order from the _Movie_ example. +Also, note that the instance of _ColumnHelper_ defined here has the formatter defined as "$c $x", +which is in the opposite order from the _Movie_ example. Rendering ========= _TableParser_ provides a general mechanism for rendering (serializing to text) tables. -Indeed, _Table\[Row]_ extends _Renderable\[Row]_ which supports the _render(implicit rs: StringRenderer\[Row])_ method. +Indeed, _Table\[Row]_ extends _Renderable\[Row]_, +which supports the _render(implicit rs: StringRenderer\[Row])_ method. There are two distinct mechanisms for rendering a table: * one to a straight serialized output, for example, when rendering a table as a CSV file. * the other to a hierarchical (i.e., tree-structured) output, such as an HTML file. ## Non-hierarchical output -For this type of output, the application programmer must provide an instance of _Writer\[O]_ which is, for example a _StringBuilder_, -_BufferedOutput_, or perhaps an I/O Monad. +For this type of output, +the application programmer must provide an instance of _Writer\[O]_ which is, +for example, a _StringBuilder_, _BufferedOutput_, or perhaps an I/O Monad. -The non-hierarchical output does not support the same customization of renderings as does the hierarchical output. +The non-hierarchical output does not support the same customization +of renderings as does the hierarchical output. It's intended more as a straight, quick-and-dirty output mechanism to a CSV file. Here, for example, is an appropriate definition. @@ -582,8 +665,10 @@ And then, following this, you will write something like the following code: print(table.render.toString) The _Writable_ object will take care of inserting the delimiter and quotes as appropriate. -Columns will appear in the same order as the parameters of _Row_ type (which must be either a _Product_, such as a case class, or an _Array_ or a _Seq_). -If you need to change the order of the rows, you will need to override the _writeRow_ method of _Writable_. +Columns will appear in the same order as the parameters of _Row_ type +(which must be either a _Product_, such as a case class, or an _Array_ or a _Seq_). +If you need to change the order of the rows, +you will need to override the _writeRow_ method of _Writable_. ## Hierarchical rendering @@ -592,12 +677,15 @@ One of the instance methods of _Table\[Row]_ is a method as follows: def renderHierarchical\[U: TreeWriter](style: String)(implicit rr: HierarchicalRenderer[Row]): U -Providing that you have defined an implicit object of type _TreeWriter\[U]_ and a _HierarchicalRenderer\[Row]_, -then the _renderHierarchical_ method will produce an instance of _U_ which will be a tree containing all the rows of this table. +Providing that you have defined an implicit object of type _TreeWriter\[U]_ +and a _HierarchicalRenderer\[Row]_, +then the _renderHierarchical_ method will produce an instance of _U_ which +will be a tree containing all the rows of this table. What sort of type is _U_? An XML node would be appropriate. -The specifications use a type called HTML which is provided in package _parse.render.tag_ more as an exemplar rather than something definitive. +The specifications use a type called HTML which is provided in package _parse.render.tag_ +more as an exemplar rather than something definitive. case class HTML(tag: String, content: Option[String], attributes: Map[String, String], hs: Seq[HTML]) @@ -611,11 +699,11 @@ The example _TreeWriter_ for this type is reproduced here: implicit object TreeWriterHTML$ extends TreeWriterHTML$ -If we have a row type as for example: +If we have a row type as, for example: case class Complex(r: Double, i: Double) -Then, we should define appropriate renderers along the following likes: +Then, we should define appropriate renderers along the following lines: implicit val valueRenderer: HierarchicalRenderer[Double] = renderer("td") implicit val complexRenderer: HierarchicalRenderer[Complex] = renderer2("tr")(Complex) @@ -625,7 +713,7 @@ We can then write something like: val table = HeadedTable(Seq(Complex(0, 1), Complex(-1, 0)), Header.create("r", "i")) val h = table.renderHierarchical("table", Map("border" -> "1")) -The result of this will be an HTML tree which can be written out thus as a string: +The result of this will be an HTML tree that can be written out thus as a string: @@ -635,32 +723,40 @@ The result of this will be an HTML tree which can be written out thus as a strin
-1.0 0.0
-As with the parsing methods, the conversion between instances of types (especially case classes) and Strings is hierarchical (recursive). +As with the parsing methods, +the conversion between instances of types (especially case classes) and +Strings is hierarchical (recursive). -If you need to set HTML attributes for a specific type, for example a row in the above example, then an attribute map can be defined for the _renderer2_ method. +If you need to set HTML attributes for a specific type, for example, a row in the above example, +then an attribute map can be defined for the _renderer2_ method. ## CSV Rendering -If you simply need to write a table to CSV (comma-separated value) format as a _String_, then use the _toCsv_ method of _Table\[T]_. -Note that there is also an object method of _Table_ called _toCsvRow_ which can be used for instances of _Table\[Row]_. -More control can be gained by using _CsvTableStringRenderer\[T]_ or _CsvTableFileRenderer\[T]_ for a particular type _T_. +If you simply need to write a table to CSV (comma-separated value) format as a _String_, +then use the _toCsv_ method of _Table\[T]_. +Note that there is also an object method of _Table_ called _toCsvRow_, +which can be used for instances of _Table\[Row]_. +More control can be gained by using _CsvTableStringRenderer\[T]_ or _CsvTableFileRenderer\[T]_ +for a particular type _T_. These require customizable (implicit) evidence parameters and are defined as follows: case class CsvTableStringRenderer[T: CsvRenderer : CsvGenerator]()(implicit csvAttributes: CsvAttributes) extends CsvTableRenderer[T, StringBuilder]()(implicitly[CsvRenderer[T]], implicitly[CsvGenerator[T]], Writable.stringBuilderWritable(csvAttributes.delimiter, csvAttributes.quote), csvAttributes) - case class CsvTableFileRenderer[T: CsvRenderer : CsvGenerator](file: File)(implicit csvAttributes: CsvAttributes) + case class CsvTableFileRenderer[T: CsvRenderer : CsvGenerator](path: Path)(implicit csvAttributes: CsvAttributes) extends CsvTableRenderer[T, FileWriter]()(implicitly[CsvRenderer[T]], implicitly[CsvGenerator[T]], Writable.fileWritable(file), csvAttributes) abstract class CsvTableRenderer[T: CsvRenderer : CsvGenerator, O: Writable]()(implicit csvAttributes: CsvAttributes) extends Renderer[Table[T], O] {...} -_CsvRenderer\[T]_ determines the layout of the rows, while _CsvGenerator\[T]_ determines the header. +_CsvRenderer\[T]_ determines the layout of the rows, +while _CsvGenerator\[T]_ determines the header. _CsvAttributes_ specify the delimiter and quote characters for the output. Instances of each can be created using methods in _CsvRenderers_ and _CsvGenerators_ respectively. Appropriate methods are: * sequenceRenderer, optionRenderer, renderer1, renderer2, renderer3, etc. up to renderer12. * sequenceGenerator, optionGenerator, generator1, generator2, generator3, etc. up to generator12. -In some situations, you will want to omit values (and corresponding header columns) when outputting a CSV file. +In some situations, +you will want to omit values (and corresponding header columns) when outputting a CSV file. You may use the following methods (from the same types as above): def skipRenderer[T](alignment: Int = 1)(implicit ca: CsvAttributes): CsvRenderer[T] @@ -668,27 +764,34 @@ You may use the following methods (from the same types as above): Note that, when rendering a CSV row, you may want to simply render some number of delimiters (this would be in the case where you have a fixed header). -You can use the _alignment_ parameter of _skipRenderer_ to ensure alignment is correct. +You can use the _alignment_ parameter of _skipRenderer_ to ensure the alignment is correct. -As usual, the standard types are pre-defined for both _CsvRenderer\[T]_ and _CsvGenerator\[T]_ (for Int, Double, etc.). +As usual, +the standard types are pre-defined for both _CsvRenderer\[T]_ and _CsvGenerator\[T]_ (for Int, Double, etc.). The methods mentioned above render tables in the form of CSV Strings. -However, there are also methods available to render tables as a _File_: _writeCSVFile_ and _writeCSVFileRow_. +However, there are also methods available to render tables as a _File_: +_writeCSVFile_ and _writeCSVFileRow_. These utilize the type _CsvTableFileRenderer\[T]_ mentioned above. -If you wish to output only a subset of rows, then you should use one of the methods defined in _Table_ such as _take_. +If you wish to output only a subset of rows, +then you should use one of the methods defined in _Table_ such as _take_. ## Other String Rendering -Apart from CSV, there is currently only one implementation of _String_ rendering, and that is _Json_ rendering. -Although Json is indeed a hierarchical serialization format, the manner of creating a Json string masks the hierarchical aspects. -The implemented Json reader/writer is Spray Json but that could easily be changed in the future. +Apart from CSV, there is currently only one implementation of _String_ rendering, +and that is _Json_ rendering. +Although JSON is indeed a hierarchical serialization format, +the manner of creating a JSON string masks the hierarchical aspects. +The implemented JSON reader/writer is "Spray Json" but that could easily be changed in the future. -Although this section is concerned with rendering, it is also true, of course, to say that tables can be read from Json strings. +Although this section is concerned with rendering, it is also true, of course, +to say that tables can be read from Json strings. The following example from _JsonRendererSpec.scala_ shows how we can take the following steps (for the definitions of _Player_, _Partnership_, please see the spec file itself): -* read a table of players from a list of Strings (there are, as shown above, other signatures of parse for files, URLs, etc.); +* read a table of players from a list of _Strings_ + * (there are, as shown above, other signatures of parse for files, URLs, etc.); * convert to a table of partnerships; * write the resulting table to a Json string; * check the accuracy of the Json string; @@ -703,8 +806,10 @@ The following example from _JsonRendererSpec.scala_ shows how we can take the fo Release Notes ============= +V1.2.6 -> V1.3.0 +* Now supports parsing of Parquet files; _Path_ is the primary type for accessing files. V1.2.5 -> V1.2.6 -* Non-functional change to TableParser. The only difference is that CircleCI is not using Java 17 +* Non-functional change to _TableParser_. The only difference is that CircleCI is not using Java 17 V1.2.3 -> V1.2.5 * Dependency updates, e.g., use Spark 4.0.1 (not 4.1.1) and Spark is now "provided." V1.2.1 -> V1.2.3 diff --git a/build.sbt b/build.sbt index 37be492..103d2c8 100755 --- a/build.sbt +++ b/build.sbt @@ -1,5 +1,5 @@ ThisBuild / organization := "com.phasmidsoftware" -ThisBuild / version := "1.2.6" +ThisBuild / version := "1.3.0" ThisBuild / scalaVersion := "2.13.17" ThisBuild / scalacOptions ++= Seq("-encoding", "UTF-8", "-unchecked", "-deprecation") ThisBuild / scalacOptions ++= Seq("-java-output-version", "17") @@ -54,6 +54,26 @@ lazy val cats = project.dependsOn(core).settings( ) ) +lazy val parquet = project.dependsOn(core).settings( +name := "tableparser-parquet", + libraryDependencies ++= Seq( + "org.apache.parquet" % "parquet-column" % "1.15.2", + "org.apache.parquet" % "parquet-hadoop" % "1.15.2", + "org.apache.hadoop" % "hadoop-common" % "3.4.1" % "provided", + "org.apache.hadoop" % "hadoop-mapreduce-client-core" % "3.4.1" % Test, + "org.scalatest" %% "scalatest" % scalaTestVersion % Test + ) +) + +lazy val spark = project.dependsOn(core).settings( + name := "tableparser-spark", + libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", + "org.slf4j" % "slf4j-simple" % "2.0.17" % Test, + "org.scalatest" %% "scalatest" % scalaTestVersion % Test + ) +) + lazy val zio = project.dependsOn(core).settings( name := "tableparser-zio", libraryDependencies ++= Seq( @@ -68,17 +88,8 @@ lazy val zio = project.dependsOn(core).settings( ) ) -lazy val spark = project.dependsOn(core).settings( - name := "tableparser-spark", - libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", - "org.slf4j" % "slf4j-simple" % "2.0.17" % Test, - "org.scalatest" %% "scalatest" % scalaTestVersion % Test - ) -) - lazy val root = (project in file(".")) - .aggregate(core, cats, zio, spark) + .aggregate(core, cats, parquet, spark, zio) .settings( name := "TableParser", publish / skip := true diff --git a/cats/src/main/scala/com/phasmidsoftware/tableparser/cats/table/TableCrypt.scala b/cats/src/main/scala/com/phasmidsoftware/tableparser/cats/table/TableCrypt.scala index 8e3f43a..76c7686 100644 --- a/cats/src/main/scala/com/phasmidsoftware/tableparser/cats/table/TableCrypt.scala +++ b/cats/src/main/scala/com/phasmidsoftware/tableparser/cats/table/TableCrypt.scala @@ -25,6 +25,7 @@ object TableCrypt { * @tparam A the cipher algorithm (for which there must be evidence of HexEncryption[A]). * @param csvAttributes implicit value of CsvAttributes. */ + @deprecated("Use writeCSVFileEncrypted(Path) instead", "1.3.0") def writeCSVFileEncrypted[A: HexEncryption, Row](table: Table[Row])(file: File)(implicit renderer: CsvRenderer[Row], generator: CsvGenerator[Row], hasKey: HasKey[Row], csvAttributes: CsvAttributes): Unit = CsvTableEncryptedFileRenderer[Row, A](file).render(table) } diff --git a/core/src/main/scala/com/phasmidsoftware/tableparser/core/examples/crime/Crime.scala b/core/src/main/scala/com/phasmidsoftware/tableparser/core/examples/crime/Crime.scala index ad22b44..b1df0a4 100644 --- a/core/src/main/scala/com/phasmidsoftware/tableparser/core/examples/crime/Crime.scala +++ b/core/src/main/scala/com/phasmidsoftware/tableparser/core/examples/crime/Crime.scala @@ -7,7 +7,7 @@ import scala.util.Try /** * This example of table parsing is based on the Kaggle data set: - * [[https://www.kaggle.com/datasets/marshuu/crimes-in-uk-2023?select=2023-01-metropolitan-street.csv]] + * [[https://www.kaggle.com/datasets/marshuu/crimes-in-uk-2023?select=2023-01-metropolitan-street.csv Metropolitan Crime Data]] * * The file under resources is an edited version of the Metropolitan Crime Statistics 2023-01 (only the first 5,000 rows) * diff --git a/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/ColumnHelper.scala b/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/ColumnHelper.scala index d31d605..4bbbc08 100644 --- a/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/ColumnHelper.scala +++ b/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/ColumnHelper.scala @@ -73,5 +73,9 @@ object ColumnHelper { /** * Precede each upper case letter (or digit) with _. */ - val camelToSnakeCaseColumnNameMapper: String => String = _.replaceAll("([A-Z\\d])", "_$1") + val camelToSnakeCaseColumnNameMapper: String => String = + _.replaceAll("([A-Z\\d])", "_$1") + + val camelToSnakeCaseColumnNameMapperLower: String => String = + camelToSnakeCaseColumnNameMapper andThen (_.toLowerCase) } \ No newline at end of file diff --git a/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/Parseable.scala b/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/Parseable.scala index 92eb1bc..19a4971 100644 --- a/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/Parseable.scala +++ b/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/Parseable.scala @@ -6,6 +6,7 @@ package com.phasmidsoftware.tableparser.core.parse import java.io.File import java.net.URL +import java.nio.file.Path import org.joda.time.LocalDate import scala.annotation.implicitNotFound import scala.util.parsing.combinator.JavaTokenParsers @@ -290,6 +291,23 @@ object Parseable { implicit object ParseableURL extends ParseableURL + /** + * A trait representing a path that can be parsed from a String. + * + * This type class extends `Parseable[java.nio.file.Path]`, enabling the parsing of `String` inputs + * into `java.nio.file.Path` objects. It provides a default implementation of the `parse` + * method, leveraging utility methods such as `lift` and `parseAndRecover` to handle parsing + * and recovery in case of errors. + */ + trait ParseablePath extends Parseable[Path] { + def parse(s: String, optModifier: Option[String]): Try[Path] = + parseAndRecover(s)(lift(java.nio.file.Paths.get(_)))( + w => s"ParseablePath: cannot interpret '$w' as a Path" + ) + } + + implicit object ParseablePath extends ParseablePath + /** * Parser of File. */ @@ -452,6 +470,8 @@ object ParseableOption { implicit object ParseableOptionFile extends ParseableOption[File] + implicit object ParseableOptionPath extends ParseableOption[Path] + } /** diff --git a/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/TableBuilder.scala b/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/TableBuilder.scala new file mode 100644 index 0000000..fc92873 --- /dev/null +++ b/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/TableBuilder.scala @@ -0,0 +1,43 @@ +package com.phasmidsoftware.tableparser.core.parse + +import com.phasmidsoftware.tableparser.core.parse.TableParser.includeAll +import com.phasmidsoftware.tableparser.core.table.Header +import scala.util.Try + +/** + * Minimal trait expressing the table-building contract, independent + * of how the input source is read. + * + * TableParser extends this for string-based sources. + * ParquetTableParser extends this directly for Parquet sources. + * Future source types (JDBC ResultSet, JSON, etc.) can do the same. + * + * @tparam Table the table type to be built. + */ +trait TableBuilder[Table] { + + /** + * The row type. + */ + type Row + + /** + * Method to construct a Table based on the given iterator of rows and the given header. + * + * @param rows an iterator of Row objects representing the data rows. + * @param header a Header object representing the table's column headers. + * @return the constructed Table based on the input rows and header. + */ + protected def builder(rows: Iterator[Row], header: Header): Table + + /** + * If true, individual row failures are logged but do not + * cause the overall parse to fail. + */ + protected val forgiving: Boolean = false + + /** + * Predicate to filter rows. Defaults to including all rows. + */ + protected val predicate: Try[Row] => Boolean = includeAll +} \ No newline at end of file diff --git a/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/TableParser.scala b/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/TableParser.scala index 0d0a6ed..792f88f 100644 --- a/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/TableParser.scala +++ b/core/src/main/scala/com/phasmidsoftware/tableparser/core/parse/TableParser.scala @@ -4,7 +4,6 @@ package com.phasmidsoftware.tableparser.core.parse -import com.phasmidsoftware.tableparser.core.parse.TableParser.includeAll import com.phasmidsoftware.tableparser.core.table._ import com.phasmidsoftware.tableparser.core.util.{Joinable, TeeIterator, TryUsing} import org.slf4j.{Logger, LoggerFactory} @@ -19,24 +18,18 @@ import scala.util.{Failure, Random, Success, Try} * @tparam Table the Table type. */ @implicitNotFound(msg = "Cannot find an implicit instance of TableParser[${Table}]. Typically, you should define an instance of StringTableParser or StringsTableParser.") -trait TableParser[Table] { - - /** - * The row type. - */ - type Row - +trait TableParser[Table] extends TableBuilder[Table] { /** * The input type, typically `String` or `Strings`. */ type Input /** - * This variable determines if there is a programmed, i.e., fixed, header for the parser. - * If its value is None, it signifies that we must look to the first line(s) of data - * for an appropriate header. + * Method to define a row parser. + * + * @return a RowParser[Row, Input]. */ - protected val maybeHeader: Option[Header] = None + val rowParser: RowParser[Row, Input] /** * This indicates the number of header rows which must be read from the input. @@ -48,20 +41,11 @@ trait TableParser[Table] { val headerRowsToRead: Int = 1 /** - * Method to construct a Table based on the provided rows and header. - * - * @param rows an iterator of Row objects representing the data rows. - * @param header a Header object representing the table's column headers. - * @return the constructed Table based on the input rows and header. - */ - protected def builder(rows: Iterator[Row], header: Header): Table - - /** - * Method to determine how errors are handled. - * - * @return true if individual errors are logged but do not cause parsing to fail. + * This variable determines if there is a programmed, i.e., fixed, header for the parser. + * If its value is None, it signifies that we must look to the first line(s) of data + * for an appropriate header. */ - protected val forgiving: Boolean = false + protected val maybeHeader: Option[Header] = None /** * Value to determine whether it is acceptable to have a quoted string span more than one line. @@ -71,20 +55,7 @@ trait TableParser[Table] { protected val multiline: Boolean = false /** - * Function to determine whether or not a row should be included in the table. - * Typically used for random sampling. - */ - protected val predicate: Try[Row] => Boolean = includeAll - - /** - * Method to define a row parser. - * - * @return a RowParser[Row, Input]. - */ - val rowParser: RowParser[Row, Input] - - /** - * Method to parse a table based on a sequence of Inputs. + * Method to parse a table based on an iterator of Inputs. * * @param xs the sequence of Inputs, one for each row * @param n the number of rows to drop (length of the header). diff --git a/core/src/main/scala/com/phasmidsoftware/tableparser/core/render/CsvRenderer.scala b/core/src/main/scala/com/phasmidsoftware/tableparser/core/render/CsvRenderer.scala index d638e61..f988727 100644 --- a/core/src/main/scala/com/phasmidsoftware/tableparser/core/render/CsvRenderer.scala +++ b/core/src/main/scala/com/phasmidsoftware/tableparser/core/render/CsvRenderer.scala @@ -4,6 +4,7 @@ import com.phasmidsoftware.tableparser.core.parse.{StringList, Strings} import com.phasmidsoftware.tableparser.core.table._ import com.phasmidsoftware.tableparser.core.write.Writable import java.io.{File, FileWriter} +import java.nio.file.Path import org.joda.time.LocalDate import scala.reflect.ClassTag import scala.util.Try @@ -167,8 +168,14 @@ case class CsvTableStringRenderer[T]()(implicit z1: CsvRenderer[T], z2: CsvGener * * TODO merge this with CsvTableEncryptedFileRenderer to avoid duplicate code. * - * @param file the file to which the table will be written. + * @param path the path to which the table will be written. * @param csvAttributes implicit instance of CsvAttributes. * @tparam T the type of object to be rendered, must provide evidence of CsvRenderer[T] amd CsvGenerator[T]. */ -case class CsvTableFileRenderer[T: CsvRenderer : CsvGenerator](file: File)(implicit csvAttributes: CsvAttributes) extends CsvTableRenderer[T, FileWriter]()(implicitly[CsvRenderer[T]], implicitly[CsvGenerator[T]], Writable.fileWritable(file)) +case class CsvTableFileRenderer[T: CsvRenderer : CsvGenerator](path: Path)(implicit csvAttributes: CsvAttributes) extends CsvTableRenderer[T, FileWriter]()(implicitly[CsvRenderer[T]], implicitly[CsvGenerator[T]], Writable.fileWritable(path.toFile)) + +object CsvTableFileRenderer { + @deprecated("Use apply(file: File) instead.", "1.3.0") + def apply[T: CsvRenderer : CsvGenerator](file: File)(implicit csvAttributes: CsvAttributes): CsvTableFileRenderer[T] = + CsvTableFileRenderer(file.toPath)(implicitly[CsvRenderer[T]], implicitly[CsvGenerator[T]], csvAttributes) +} diff --git a/core/src/main/scala/com/phasmidsoftware/tableparser/core/table/Analysis.scala b/core/src/main/scala/com/phasmidsoftware/tableparser/core/table/Analysis.scala index 976813a..19efc2a 100644 --- a/core/src/main/scala/com/phasmidsoftware/tableparser/core/table/Analysis.scala +++ b/core/src/main/scala/com/phasmidsoftware/tableparser/core/table/Analysis.scala @@ -202,6 +202,7 @@ object Statistics { * resource file path before running the application. */ object Main extends App { +// doMain(FP.resource[Crime]("2023-01-metropolitan-street.csv")) doMain(FP.resource[Crime]("2023-01-metropolitan-street-sample.csv")) /** diff --git a/core/src/main/scala/com/phasmidsoftware/tableparser/core/table/Table.scala b/core/src/main/scala/com/phasmidsoftware/tableparser/core/table/Table.scala index bb5a63b..19a7269 100644 --- a/core/src/main/scala/com/phasmidsoftware/tableparser/core/table/Table.scala +++ b/core/src/main/scala/com/phasmidsoftware/tableparser/core/table/Table.scala @@ -12,6 +12,7 @@ import com.phasmidsoftware.tableparser.core.util.{Reflection, TryUsing} import com.phasmidsoftware.tableparser.core.write.{Node, TreeWriter, Writable} import java.io.{File, InputStream} import java.net.{URI, URL} +import java.nio.file.{Files, Path, Paths} import scala.annotation.unused import scala.io.{Codec, Source} import scala.language.postfixOps @@ -345,8 +346,21 @@ trait Table[Row] extends Iterable[Row] { * @param generator implicit value of CsvProductGenerator[Row]. * @param csvAttributes implicit value of CsvAttributes. */ + @deprecated("Use writeCSVFile(Path) instead", "1.3.0") def writeCSVFile(file: File)(implicit renderer: CsvRenderer[Row], generator: CsvGenerator[Row], csvAttributes: CsvAttributes): Unit = - CsvTableFileRenderer[Row](file).render(this) + CsvTableFileRenderer[Row](file.toPath).render(this) + + /** + * Writes the contents of the current table to a CSV file at the specified path. + * + * @param path The file system path where the CSV file will be written. + * @param renderer Implicit parameter that provides a mechanism to render rows to CSV format. + * @param generator Implicit parameter that is used to generate CSV rows from the data. + * @param csvAttributes Implicit parameter containing attributes for customizing the CSV formatting. + * @return Unit, indicating the operation does not return a value. + */ + def writeCSVFile(path: Path)(implicit renderer: CsvRenderer[Row], generator: CsvGenerator[Row], csvAttributes: CsvAttributes): Unit = + CsvTableFileRenderer[Row](path).render(this) def maybeColumnNames: Option[Seq[String]] = maybeHeader map (_.xs) @@ -479,36 +493,70 @@ object Table { } /** - * Method to parse a table from a File. - * - * NOTE: you should use parseFile(String) if you have a pathname in String form. + * Parses a file at the given path into an instance of type T using the provided implicit TableParser. * - * TESTME + * @param path The path to the file that needs to be parsed. + * @param codec The character codec to be used for reading the file. It is provided implicitly. + * @tparam T The type to which the file content will be parsed. + * @return A Try containing the parsed instance of type T if successful, or a Failure if an error occurs. + */ + def parseFile[T: TableParser](path: Path)(implicit codec: Codec): Try[T] = + parse(sourceFromFile(path.toFile)) + + /** + * Parses a file located at the specified path using the provided encoding and a typeclass instance + * for parsing the file content into the desired type. * - * @param f the File (call by name in case there is an exception thrown while constructing the file). - * @param enc the explicit encoding. - * @tparam T the type of the resulting table. - * @return an Try[T] + * @param path The path to the file that needs to be parsed. + * @param enc The encoding used to read the file. + * @tparam T The type to which the file content will be parsed, requiring an implicit TableParser instance. + * @return A Try containing the parsed result of type T if successful, or a failure if an error occurs during parsing. */ - def parseFile[T: TableParser](f: => File, enc: String): Try[T] = { + def parseFile[T: TableParser](path: Path, enc: String): Try[T] = { implicit val codec: Codec = Codec(enc) - parseFile(f) + parseFile(path) } /** - * Method to parse a table from an File. + * Method to parse a table from a File. * * NOTE: you should use parseFile(String) if you have a pathname in String form. * * TESTME * - * @param f the File (call by name in case there is an exception thrown while constructing the file). - * @param codec (implicit) the encoding. + * @param f the File (call by name in case there is an exception thrown while constructing the file). + * @param enc the explicit encoding. * @tparam T the type of the resulting table. * @return an Try[T] */ + @deprecated("Use parseFile(Path) instead", "1.3.0") def parseFile[T: TableParser](f: => File)(implicit codec: Codec): Try[T] = - parse(sourceFromFile(f)) + parseFile(f.toPath) + + @deprecated("Use parseFile(Path) instead", "1.3.0") + def parseFile[T: TableParser](pathname: String)(implicit codec: Codec): Try[T] = + parseFile(Paths.get(pathname)) + + @deprecated("Use parseFile(Path, String) instead", "1.3.0") + def parseFile[T: TableParser](f: => File, enc: String): Try[T] = { + implicit val codec: Codec = Codec(enc) + parseFile(f.toPath) + } +// +// /** +// * Method to parse a table from an File. +// * +// * NOTE: you should use parseFile(String) if you have a pathname in String form. +// * +// * TESTME +// * +// * @param f the File (call by name in case there is an exception thrown while constructing the file). +// * @param codec (implicit) the encoding. +// * @tparam T the type of the resulting table. +// * @return an Try[T] +// */ +// def parseFile[T: TableParser](f: => File)(implicit codec: Codec): Try[T] = +// parse(sourceFromFile(f)) /** * Method to parse a table from a File. @@ -518,24 +566,25 @@ object Table { * @tparam T the type of the resulting table. * @return an Try[T] */ + @deprecated("Use parseFile(Path, String) instead", "1.3.0") def parseFile[T: TableParser](pathname: String, enc: String): Try[T] = { implicit val codec: Codec = Codec(enc) - parseFile(pathname) + parseFile(Paths.get(pathname)) } +// +// /** +// * Method to parse a table from an File. +// * +// * @param pathname the file pathname. +// * @param codec (implicit) the encoding. +// * @tparam T the type of the resulting table. +// * @return a Try[T] +// */ +// def parseFile[T: TableParser](pathname: String)(implicit codec: Codec): Try[T] = +// parse(sourceFromFilename(pathname)) /** - * Method to parse a table from an File. - * - * @param pathname the file pathname. - * @param codec (implicit) the encoding. - * @tparam T the type of the resulting table. - * @return a Try[T] - */ - def parseFile[T: TableParser](pathname: String)(implicit codec: Codec): Try[T] = - parse(sourceFromFilename(pathname)) - - /** - * Method to parse a table from an File. + * Method to parse a table from a resource. * * @param w the resource name. * @param clazz the class for which the resource should be sought (should default to the calling class but doesn't). @@ -589,17 +638,41 @@ object Table { } /** - * Method to parse a table from a File as a table of Seq[String]. + * Parses a file at the given path into a `Table[RawRow]` using the specified conditions and options. * - * @param f the file. - * @param maybeFixedHeader an optional fixed header. If None (the default), we expect to find the header defined in the first line of the file. - * @param forgiving forcing (defaults to true). If true (the default) then an individual malformed row will not prevent subsequent rows being parsed. - * @param codec (implicit) the encoding. - * @return an Try of Table[RawRow] where RawRow is a Seq[String]. + * @param path the file path to be parsed + * @param predicate a filtering function applied to each `RawRow` wrapped in a `Try`, determining whether the row is included + * @param maybeFixedHeader an optional header to be used instead of inferring it from the file + * @param forgiving a flag indicating whether parsing should continue with warnings or fail on errors + * @param codec the character encoding to be used for reading the file + * @return a `Try` containing the parsed `Table[RawRow]` on success or an exception on failure */ + def parsePathRaw(path: Path, predicate: Try[RawRow] => Boolean, maybeFixedHeader: Option[Header] = None, forgiving: Boolean = true)(implicit codec: Codec): Try[Table[RawRow]] = { + implicit val z: TableParser[Table[RawRow]] = RawTableParser(predicate, maybeFixedHeader, forgiving) + parseFile[Table[RawRow]](path) + } + + /** + * Parses a file to produce a `Table` of `RawRow`, applying a predicate to filter rows + * and optionally using a fixed header. This method is deprecated and should be replaced + * with `parseFileRaw(Path, Try[RawRow] => Boolean, Option[Header] = None, Boolean = true)`. + * + * @param f the input file to parse + * @param predicate a function that determines whether a given `Try[RawRow]` + * should be included in the output table + * @param maybeFixedHeader an optional fixed header to be used instead of extracting + * it from the file + * @param forgiving a flag indicating if parsing should continue despite errors, + * default is true + * @param codec an implicit `Codec` used to handle the file's character + * encoding + * @return a `Try` containing the parsed `Table` of `RawRow` if + * successful, or an error otherwise + */ + @deprecated("Use parseFileRaw(Path, Try[RawRow] => Boolean, Option[Header] = None, Boolean = true) instead", "1.3.0") def parseFileRaw(f: File, predicate: Try[RawRow] => Boolean, maybeFixedHeader: Option[Header] = None, forgiving: Boolean = true)(implicit codec: Codec): Try[Table[RawRow]] = { implicit val z: TableParser[Table[RawRow]] = RawTableParser(predicate, maybeFixedHeader, forgiving) - parseFile[Table[RawRow]](f) + parseFile[Table[RawRow]](f.toPath) } /** @@ -611,7 +684,7 @@ object Table { */ def parseFileRaw(pathname: String, predicate: Try[RawRow] => Boolean)(implicit codec: Codec): Try[Table[RawRow]] = { implicit val z: TableParser[Table[RawRow]] = RawTableParser(predicate, None) - parseFile[Table[RawRow]](pathname) + parseFile[Table[RawRow]](Paths.get(pathname)) } /** @@ -714,9 +787,24 @@ object Table { * @param f a File. * @return an Try[Source]. */ + private def sourceFromFile(path: Path)(implicit codec: Codec): Try[Source] = + sourceFromPath(path) + private def sourceFromFile(f: => File)(implicit codec: Codec): Try[Source] = Try(Source.fromFile(f)) + /** + * Creates a `Source` from the given file path. + * NOTE The input stream must be closed by the caller to avoid resource leaks. + * + * @param path the file path from which the source is created + * @param codec an implicit codec specifying the character encoding to use + * @return a `Try` containing the `Source` if successful, or a `Failure` if an error occurs + */ + private def sourceFromPath(path: Path)(implicit codec: Codec): Try[Source] = { + Try(Source.fromInputStream(Files.newInputStream(path))(codec)) + } + /** * Method to open source defined by a File. * diff --git a/core/src/test/scala/com/phasmidsoftware/tableparser/core/parse/CellParserSpec.scala b/core/src/test/scala/com/phasmidsoftware/tableparser/core/parse/CellParserSpec.scala index 889ddaf..3eae282 100644 --- a/core/src/test/scala/com/phasmidsoftware/tableparser/core/parse/CellParserSpec.scala +++ b/core/src/test/scala/com/phasmidsoftware/tableparser/core/parse/CellParserSpec.scala @@ -7,6 +7,7 @@ package com.phasmidsoftware.tableparser.core.parse import com.phasmidsoftware.tableparser.core.table.{Header, Row} import java.io.File import java.net.URL +// TODO replace org.joda references with java.time import org.joda.time.LocalDate import org.scalatest.flatspec import org.scalatest.matchers.should diff --git a/core/src/test/scala/com/phasmidsoftware/tableparser/core/table/TableSpec.scala b/core/src/test/scala/com/phasmidsoftware/tableparser/core/table/TableSpec.scala index 0b59023..0d58f9a 100644 --- a/core/src/test/scala/com/phasmidsoftware/tableparser/core/table/TableSpec.scala +++ b/core/src/test/scala/com/phasmidsoftware/tableparser/core/table/TableSpec.scala @@ -122,13 +122,6 @@ class TableSpec extends flatspec.AnyFlatSpec with should.Matchers { } } - it should "parse from null File" in { - import IntPair._ - - val f: String = null - Table.parseFile(new File(f)) should matchPattern { case Failure(_) => } - } - it should "parse from Resource" in { import IntPair._ diff --git a/docs/design/TableParserParquetDesign.md b/docs/design/TableParserParquetDesign.md new file mode 100644 index 0000000..c769b01 --- /dev/null +++ b/docs/design/TableParserParquetDesign.md @@ -0,0 +1,365 @@ +# TableParser Parquet Module — Design Document + +**Project:** TableParser +**Module:** `parquet` +**Version:** 1.0 +**Status:** Implemented + +--- + +## 1. Motivation + +TableParser's core value proposition is type-safe ingestion of tabular data into typed case classes. Currently this is achieved for CSV and other text-based formats. Parquet has become the dominant format for large-scale tabular data (it is the default format in Apache Spark pipelines, and the format used by the NYC TLC taxi dataset and many other public datasets). The TLC no longer provides CSV downloads at all. + +The motivation for supporting Parquet is identical to the motivation for supporting CSV in the Spark context: `spark.read.parquet(...)` produces an untyped `DataFrame`, losing all type safety. TableParser's Parquet module restores that type safety by ingesting Parquet data directly into a `Table[Row]` backed by typed case classes. + +--- + +## 2. Scope + +### Implemented in this iteration +- Reading a single `.parquet` file into a `Table[Row]` +- Schema validation: Parquet file schema vs. target case class, failing fast on mismatch +- Canonical type mapping from Parquet physical and logical types to Scala types +- Column name mapping via the existing `ColumnHelper` mechanism, including a new `camelToSnakeCaseColumnNameMapperLower` mapper added to `core` +- A new `ParquetParserException` for all Parquet-specific failures +- A committed binary test fixture (NYC Yellow Taxi January 2024 data, trimmed to 1,000 rows) +- A `TableBuilder` trait extracted from `TableParser` in `core` to cleanly support non-string source types + +### Deferred +- Reading a Parquet dataset (directory of part files) — `ParquetReader` requires additional work for directory support +- Writing a `Table[Row]` to Parquet (output direction) +- Spark module integration (Parquet-aware `Dataset[Row]` path) +- Backfilling `java.nio.file.Path` into the existing `core` module API (noted as desirable, separate PR) +- `Analysis` support on Parquet-sourced tables (see Section 8) +- Parallel row group reading + +--- + +## 3. Module Structure + +A new top-level SBT module `parquet`, alongside the existing `core`, `cats`, `zio`, and `spark` modules. + +``` +tableparser-parquet + src/ + main/scala/com/phasmidsoftware/tableparser/parquet/ + ParquetParser.scala -- main entry point + ParquetTableParser.scala -- TableBuilder instance for Parquet + ParquetRowParser.scala -- row parser for Parquet SimpleGroup records + ParquetCellConverter.scala -- type class for direct typed value extraction + ParquetTypeMapper.scala -- Parquet type → Scala type mapping + ParquetSchemaValidator.scala -- schema validation logic + ParquetParserException.scala -- exception type + test/scala/com/phasmidsoftware/tableparser/parquet/ + ParquetParserSpec.scala + test/resources/ + taxi_sample.parquet -- committed binary fixture (1,000 rows) +``` + +In `build.sbt`: + +```scala +lazy val parquet = project.dependsOn(core).settings( + name := "tableparser-parquet", + libraryDependencies ++= Seq( + "org.apache.parquet" % "parquet-column" % "1.15.2", + "org.apache.parquet" % "parquet-hadoop" % "1.15.2", + "org.apache.hadoop" % "hadoop-common" % "3.4.1" % "provided", + "org.apache.hadoop" % "hadoop-mapreduce-client-core" % "3.4.1" % Test, + "org.scalatest" %% "scalatest" % scalaTestVersion % Test + ) +) +``` + +Note: `parquet-avro` is NOT used. The module works directly with `parquet-mr`'s native `SimpleGroup` / `GroupReadSupport` API, avoiding the Avro object model entirely. `hadoop-mapreduce-client-core` is required at test runtime but not in production (where Hadoop is expected on the classpath). + +The root aggregator in `build.sbt` is updated to include `parquet`: + +```scala +lazy val root = (project in file(".")) + .aggregate(core, cats, zio, spark, parquet) + .settings( + name := "TableParser", + publish / skip := true + ) +``` + +--- + +## 4. Core Refactoring: TableBuilder + +A prerequisite to implementing the `parquet` module cleanly was extracting a `TableBuilder` trait from `TableParser` in `core`. The existing `TableParser` trait is built around an `Iterator[String]` input pipeline which has no meaning for Parquet sources. Rather than force-fitting `ParquetTableParser` into that hierarchy (with stub implementations of `rowParser`, `parse(Iterator)` etc.), a thin base trait was extracted: + +```scala +trait TableBuilder[Table] { + type Row + protected def builder(rows: Iterator[Row], header: Header): Table + protected val forgiving: Boolean = false + protected val predicate: Try[Row] => Boolean = includeAll +} +``` + +`TableParser` now extends `TableBuilder`, so all existing code is unchanged. `ParquetTableParser` extends `TableBuilder` directly, gaining `builder`, `forgiving`, and `predicate` without inheriting the string-parsing machinery. + +--- + +## 5. API + +### 5.1 Entry Point + +```scala +object ParquetParser { + def parse[Row <: Product : ClassTag]( + path: Path + )(implicit helper: ColumnHelper[Row]): Try[Table[Row]] +} +``` + +The call site looks like: + +```scala +import com.phasmidsoftware.tableparser.parquet.ParquetParser + +implicit val helper: ColumnHelper[YellowTaxiTrip] = + columnHelper(camelToSnakeCaseColumnNameMapperLower, ...) + +val result: Try[Table[YellowTaxiTrip]] = + ParquetParser.parse[YellowTaxiTrip](Path.of("data/taxi_sample.parquet")) +``` + +`java.nio.file.Path` is used exclusively. There is no `parseParquetResource` method — test fixtures are accessed via `Path.of(getClass.getResource(...).toURI)`. + +### 5.2 ColumnHelper and Name Mapping + +A new mapper was added to `ColumnHelper` in `core`: + +```scala +val camelToSnakeCaseColumnNameMapperLower: String => String = + camelToSnakeCaseColumnNameMapper andThen (_.toLowerCase) +``` + +This correctly maps `passengerCount` → `passenger_count`. The existing `camelToSnakeCaseColumnNameMapper` is unchanged (it does not lowercase). + +Real-world Parquet schemas often have inconsistent casing (e.g. TLC uses `VendorID`, `RatecodeID`, `Airport_fee`). These are handled via aliases in `ColumnHelper`: + +```scala +implicit val helper: ColumnHelper[YellowTaxiTrip] = + columnHelper( + camelToSnakeCaseColumnNameMapperLower, + "vendorId" -> "VendorID", + "ratecodeId" -> "RatecodeID", + "puLocationId" -> "PULocationID", + "doLocationId" -> "DOLocationID", + "airportFee" -> "Airport_fee" + ) +``` + +--- + +## 6. Schema Validation + +### 6.1 Behaviour + +On opening a Parquet file, the schema is read from the footer metadata and validated against the target case class before any rows are parsed. If validation fails, a `ParquetParserException` is thrown and no rows are read. + +Validation checks: +- Every parameter of the target case class has a corresponding column in the Parquet schema (after `ColumnHelper` name mapping is applied) +- Every such column has a supported Parquet type (via `ParquetTypeMapper`) +- Any `OPTIONAL` Parquet column must map to an `Option[T]` parameter — mapping to a non-`Option` parameter throws `ParquetParserException` + +Columns present in the Parquet schema but absent from the case class are silently ignored, consistent with existing CSV behaviour. + +Note: In practice, real-world Parquet datasets (including all TLC Yellow Taxi 2024 data) may mark every column as `OPTIONAL`. Users should declare all fields as `Option[T]` unless they have specific knowledge that a column is `REQUIRED`. + +### 6.2 ParquetParserException + +```scala +case class ParquetParserException(msg: String, cause: Option[Throwable] = None) + extends Exception(msg, cause.orNull) +``` + +--- + +## 7. Type Mapping + +### 7.1 Canonical Mapping + +Defined in `ParquetTypeMapper`: + +| Parquet Physical Type | Logical Type Annotation | Scala Type | +|-------------------------|--------------------------|-----------------------------| +| `BOOLEAN` | — | `Boolean` | +| `INT32` | — | `Int` | +| `INT32` | `DATE` | `java.time.LocalDate` | +| `INT32` | `DECIMAL(p,s)` | `BigDecimal` | +| `INT64` | — | `Long` | +| `INT64` | `TIMESTAMP_MILLIS/MICROS`| `java.time.Instant` | +| `INT64` | `DECIMAL(p,s)` | `BigDecimal` | +| `FLOAT` | — | `Float` | +| `DOUBLE` | — | `Double` | +| `BINARY` | `STRING` (or none) | `String` | +| `FIXED_LEN_BYTE_ARRAY` | `DECIMAL(p,s)` | `BigDecimal` | + +Note: `BINARY` without a logical type annotation is treated as `String`. The `large_string` type reported by PyArrow is an Arrow concept; at the Parquet level it is `BINARY` with `StringLogicalTypeAnnotation`. + +### 7.2 Optional Fields and Type Erasure + +A key implementation detail: for `Option[T]` fields, generic type reflection (`getActualTypeArguments`) returns `java.lang.Object` at runtime due to JVM type erasure. The inner type cannot be recovered this way. + +The solution is to use the Parquet schema itself as the authoritative source of type information. `convertOption` takes the `PrimitiveType` from the schema rather than the field's generic type: + +```scala +def convertOption( + group: SimpleGroup, + columnName: String, + parquetType: PrimitiveType +): Try[Any] = + if (group.getFieldRepetitionCount(columnName) == 0) Success(None) + else ParquetTypeMapper.mapType(parquetType) match { + case Left(ex) => Failure(ex) + case Right(clazz) => convertByClass(group, columnName, clazz).map(Some(_)) + } +``` + +`convertByClass` matches against both primitive and boxed types (e.g. `classOf[Int]` and `classOf[java.lang.Integer]`) to handle JVM boxing correctly. + +### 7.3 BigDecimal Scale Limitation + +`BigDecimalConverter` currently hardcodes scale to `0`. The correct scale is available in `DecimalLogicalTypeAnnotation` but passing `PrimitiveType` through to all converters is deferred. This converter will produce incorrect results for Parquet decimals with non-zero scale. + +--- + +## 8. Analysis Support + +`Analysis` in `core` operates on `RawTable` (`Table[RawRow]`), not on typed `Table[Row]`. A `Table[YellowTaxiTrip]` produced from a Parquet source therefore does not support `Analysis` directly. This was not apparent from the design document and is noted here as a correction. + +Supporting `Analysis` on Parquet-sourced tables is deferred. Options include a separate raw Parquet read path or a dedicated column statistics mechanism in the `parquet` module. + +--- + +## 9. Row Reading + +### 9.1 ParquetCellConverter + +A new type class `ParquetCellConverter[T]` was introduced to extract typed values directly from a `SimpleGroup`, bypassing the `String`-based `CellParser` machinery of `core` entirely: + +```scala +trait ParquetCellConverter[T] { + def convert(group: SimpleGroup, fieldName: String): Try[T] +} +``` + +Instances are provided for `Boolean`, `Int`, `Long`, `Float`, `Double`, `String`, `Instant`, `LocalDate`, and `BigDecimal`. The companion object provides `convertField`, `convertOption`, and `convertByClass` dispatch methods. + +### 9.2 Read Path + +``` +java.nio.file.Path + → HadoopPath + → ParquetFileReader.open (schema from footer) + → ParquetSchemaValidator.validate + → StandardParquetRowParser.apply (converters built once via reflection) + → ParquetReader[Group] / GroupReadSupport + → Iterator.unfold → Iterator[Row] + → builder → HeadedTable[Row] (forces materialisation into Content) +``` + +Reflection cost (field inspection, converter construction) is paid once at parser construction time, not per row. + +### 9.3 Resource Management + +`ParquetReader` is `AutoCloseable`. The `Iterator.unfold` approach produces a lazy iterator, but `builder` immediately materialises it into `Content` (via `HeadedTable`), which exhausts the reader. If `builder` is overridden to return a lazy structure, resource management must be revisited. + +### 9.4 Dataset Support (Deferred) + +`ParquetReader` with `GroupReadSupport` does not handle directories natively — it requires a single file path. Directory (dataset) support is deferred and requires enumerating part files and reading them sequentially or in parallel. + +--- + +## 10. Testing + +### 10.1 Test Fixture + +`taxi_sample.parquet` — 1,000 rows from NYC TLC Yellow Taxi January 2024, generated via: + +```python +import pyarrow.parquet as pq +pq.write_table( + pq.read_table('yellow_tripdata_2024-01.parquet').slice(0, 1000), + 'taxi_sample.parquet' +) +``` + +### 10.2 Test Case Class + +All 19 columns in the 2024 TLC Yellow Taxi schema are `OPTIONAL`. The case class reflects this: + +```scala +case class YellowTaxiTrip( + vendorId: Option[Int], + tpepPickupDatetime: Option[Instant], + tpepDropoffDatetime: Option[Instant], + passengerCount: Option[Long], + tripDistance: Option[Double], + ratecodeId: Option[Long], + storeAndFwdFlag: Option[String], + puLocationId: Option[Int], + doLocationId: Option[Int], + paymentType: Option[Long], + fareAmount: Option[Double], + extra: Option[Double], + mtaTax: Option[Double], + tipAmount: Option[Double], + tollsAmount: Option[Double], + improvementSurcharge: Option[Double], + totalAmount: Option[Double], + congestionSurcharge: Option[Double], + airportFee: Option[Double] +) +``` + +### 10.3 Test Scenarios + +- Happy path: parse `taxi_sample.parquet` into `Table[YellowTaxiTrip]`, verify 1,000 rows and spot-check typed field values +- Header: verify 19 columns in the header +- Schema mismatch: supply a case class with an unknown column name, expect `ParquetParserException` +- OPTIONAL/non-Option mismatch: pending (all columns happen to be OPTIONAL in the fixture) +- Dataset (multi-file): pending — deferred until dataset support is implemented +- Analysis: pending — deferred (see Section 8) + +--- + +## 11. Open Questions and Deferred Decisions + +| Topic | Status | Notes | +|-------|--------|-------| +| Parquet dataset (directory) support | Deferred | `ParquetReader` needs directory handling | +| Parquet output (write direction) | Deferred | Separate design document when in scope | +| Spark module Parquet integration | Deferred | Depends on this module being stable first | +| `Path` backfill into `core` | Deferred | Desirable; separate PR to avoid scope creep | +| Parallel row group reading | Deferred | Revisit once baseline reading is working | +| `LIST` and `MAP` Parquet types | Deferred | No built-in mapping; custom `ParquetTypeMapper` possible | +| `BigDecimal` scale from `DecimalLogicalTypeAnnotation` | Deferred | Currently hardcoded to 0 | +| `Analysis` on Parquet-sourced tables | Deferred | Requires raw read path or separate statistics mechanism | +| Encryption | Deferred | Out of scope for initial iteration | + +--- + +## 12. Summary of New Types + +| Type | Location | Purpose | +|------|----------|---------| +| `ParquetParserException` | `parquet` module | All Parquet-specific failures | +| `ParquetTableParser[R]` | `parquet` module | `TableBuilder` instance for Parquet sources | +| `ParquetRowParser[Row]` / `StandardParquetRowParser[Row]` | `parquet` module | Converts `SimpleGroup` records to typed `Row` | +| `ParquetCellConverter[T]` | `parquet` module | Type class for direct typed value extraction from `SimpleGroup` | +| `ParquetTypeMapper` | `parquet` module | Canonical Parquet→Scala type mapping | +| `ParquetSchemaValidator` | `parquet` module | Validates Parquet schema against case class at open time | +| `ParquetParser` | `parquet` module | Entry point: `parse[Row](path)` | +| `TableBuilder[Table]` | `core` module | Thin base trait extracted from `TableParser`; extended by `ParquetTableParser` | + +### Changes to `core` +- `TableBuilder[Table]` trait added +- `TableParser[Table]` now extends `TableBuilder[Table]` +- `camelToSnakeCaseColumnNameMapperLower` added to `ColumnHelper` \ No newline at end of file diff --git a/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetCellConverter.scala b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetCellConverter.scala new file mode 100644 index 0000000..c294c89 --- /dev/null +++ b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetCellConverter.scala @@ -0,0 +1,137 @@ +package com.phasmidsoftware.tableparser.parquet + +import java.time.{Instant, LocalDate} +import org.apache.parquet.example.data.simple.SimpleGroup +import scala.util.{Failure, Success, Try} + +/** + * Type class for converting a field value from a Parquet SimpleGroup + * directly to a Scala type T, without going through String. + * + * @tparam T the target Scala type. + */ +trait ParquetCellConverter[T] { + /** + * Extract and convert the value of the named field from the given SimpleGroup. + * + * @param group the SimpleGroup representing one Parquet row. + * @param fieldName the Parquet column name. + * @return a Try[T]. + */ + def convert(group: SimpleGroup, fieldName: String): Try[T] +} + +object ParquetCellConverter { + + implicit object BooleanConverter extends ParquetCellConverter[Boolean] { + def convert(group: SimpleGroup, fieldName: String): Try[Boolean] = + Try(group.getBoolean(fieldName, 0)) + } + + implicit object IntConverter extends ParquetCellConverter[Int] { + def convert(group: SimpleGroup, fieldName: String): Try[Int] = + Try(group.getInteger(fieldName, 0)) + } + + implicit object LongConverter extends ParquetCellConverter[Long] { + def convert(group: SimpleGroup, fieldName: String): Try[Long] = + Try(group.getLong(fieldName, 0)) + } + + implicit object FloatConverter extends ParquetCellConverter[Float] { + def convert(group: SimpleGroup, fieldName: String): Try[Float] = + Try(group.getFloat(fieldName, 0)) + } + + implicit object DoubleConverter extends ParquetCellConverter[Double] { + def convert(group: SimpleGroup, fieldName: String): Try[Double] = + Try(group.getDouble(fieldName, 0)) + } + + implicit object StringConverter extends ParquetCellConverter[String] { + def convert(group: SimpleGroup, fieldName: String): Try[String] = + Try(group.getBinary(fieldName, 0).toStringUsingUTF8) + } + + implicit object InstantConverter extends ParquetCellConverter[Instant] { + def convert(group: SimpleGroup, fieldName: String): Try[Instant] = + Try { + val micros = group.getLong(fieldName, 0) + // TIMESTAMP_MICROS: microseconds since epoch + Instant.ofEpochSecond(micros / 1_000_000L, (micros % 1_000_000L) * 1000L) + } + } + + implicit object LocalDateConverter extends ParquetCellConverter[LocalDate] { + def convert(group: SimpleGroup, fieldName: String): Try[LocalDate] = + // DATE: days since Unix epoch (1970-01-01) stored as INT32 + Try(LocalDate.ofEpochDay(group.getInteger(fieldName, 0).toLong)) + } + + implicit object BigDecimalConverter extends ParquetCellConverter[BigDecimal] { + // NOTE: scale hardcoded to 0 pending access to PrimitiveType metadata. + // Will produce incorrect results unless Parquet decimal has scale 0. + def convert(group: SimpleGroup, fieldName: String): Try[BigDecimal] = + Try { + val binary = group.getBinary(fieldName, 0) + BigDecimal(new java.math.BigDecimal( + new java.math.BigInteger(binary.getBytes), 0 + )) + } + } + + /** + * Dispatch to the correct converter based on the field's runtime type. + * Used by ParquetRowParser for non-optional fields. + */ + def convertField( + group: SimpleGroup, + columnName: String, + field: java.lang.reflect.Field + ): Try[Any] = + convertByClass(group, columnName, field.getType) + + /** + * Handle OPTIONAL columns. + * Returns None if the field has no value in this row, Some(value) otherwise. + * The inner type of Option[T] is determined via generic type reflection. + */ + def convertOption( + group: SimpleGroup, + columnName: String, + parquetType: org.apache.parquet.schema.PrimitiveType + ): Try[Any] = + if (group.getFieldRepetitionCount(columnName) == 0) + Success(None) + else + ParquetTypeMapper.mapType(parquetType) match { + case Left(ex) => Failure(ex) + case Right(clazz) => convertByClass(group, columnName, clazz).map(Some(_)) + } + + /** + * Dispatch to the correct converter by Class[_] directly. + * Used for both non-optional fields and Option inner types. + */ + def convertByClass( + group: SimpleGroup, + columnName: String, + clazz: Class[_] + ): Try[Any] = + clazz match { + case c if c == classOf[Boolean] || c == classOf[java.lang.Boolean] => BooleanConverter.convert(group, columnName) + case c if c == classOf[Int] || c == classOf[java.lang.Integer] => IntConverter.convert(group, columnName) + case c if c == classOf[Long] || c == classOf[java.lang.Long] => LongConverter.convert(group, columnName) + case c if c == classOf[Float] || c == classOf[java.lang.Float] => FloatConverter.convert(group, columnName) + case c if c == classOf[Double] || c == classOf[java.lang.Double] => DoubleConverter.convert(group, columnName) + case c if c == classOf[String] => StringConverter.convert(group, columnName) + case c if c == classOf[Instant] => InstantConverter.convert(group, columnName) + case c if c == classOf[LocalDate] => LocalDateConverter.convert(group, columnName) + case c if c == classOf[BigDecimal] || c == classOf[java.math.BigDecimal] => BigDecimalConverter.convert(group, columnName) + case other => + Failure(ParquetParserException( + s"No ParquetCellConverter available for type '${other.getName}' " + + s"on column '$columnName'" + )) + } +} \ No newline at end of file diff --git a/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetParser.scala b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetParser.scala new file mode 100644 index 0000000..2394a71 --- /dev/null +++ b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetParser.scala @@ -0,0 +1,45 @@ +package com.phasmidsoftware.tableparser.parquet + +import com.phasmidsoftware.tableparser.core.parse.ColumnHelper +import com.phasmidsoftware.tableparser.core.table.Table +import java.nio.file.Path +import scala.reflect.ClassTag +import scala.util.Try + +/** + * Entry point for parsing Parquet files into typed Tables. + * + * Typical usage: + * {{{ + * import com.phasmidsoftware.tableparser.parquet.ParquetParser._ + * + * implicit val helper: ColumnHelper[YellowTaxiTrip] = + * columnHelper(camelToSnakeCaseColumnNameMapper) + * + * val result: Try[Table[YellowTaxiTrip]] = + * ParquetParser.parse[YellowTaxiTrip](Path.of("data/yellow_2024")) + * }}} + */ +object ParquetParser { + + /** + * Parse a Parquet file or dataset directory into a Table[Row]. + * + * @param path a java.nio.file.Path to a .parquet file or directory. + * @param helper the ColumnHelper providing parameter-to-column name mapping. + * @tparam Row the target case class type. + * @return a Try[Table[Row]]. + */ + def parse[Row <: Product : ClassTag]( + path: Path + )(implicit helper: ColumnHelper[Row]): Try[Table[Row]] = { + + val capturedHelper = helper // capture before entering anonymous class + + val parser = new ParquetTableParser[Row] { + val helper: ColumnHelper[Row] = capturedHelper + } + + parser.parseParquet(path) + } +} \ No newline at end of file diff --git a/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetParserException.scala b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetParserException.scala new file mode 100644 index 0000000..68f8fc5 --- /dev/null +++ b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetParserException.scala @@ -0,0 +1,14 @@ +package com.phasmidsoftware.tableparser.parquet + +/** + * Exception thrown for all Parquet-specific failures, including: + * - Schema mismatch between case class and Parquet file + * - Schema mismatch between part files in a dataset + * - Unsupported Parquet type with no registered mapping + * - Corrupt or unreadable Parquet file + * + * @param msg the error message. + * @param cause an optional underlying cause. + */ +case class ParquetParserException(msg: String, cause: Option[Throwable] = None) + extends Exception(msg, cause.orNull) \ No newline at end of file diff --git a/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetRowParser.scala b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetRowParser.scala new file mode 100644 index 0000000..16eef72 --- /dev/null +++ b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetRowParser.scala @@ -0,0 +1,105 @@ +package com.phasmidsoftware.tableparser.parquet + +import com.phasmidsoftware.tableparser.core.parse.ColumnHelper +import org.apache.parquet.example.data.simple.SimpleGroup +import org.apache.parquet.schema.MessageType +import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} + +/** + * Converts a Parquet SimpleGroup record directly into a typed Row, + * bypassing the String-based CellParser machinery of core. + * + * @tparam Row the target case class type. + */ +trait ParquetRowParser[Row] { + + /** + * Convert a SimpleGroup (one Parquet record) into a Try[Row]. + * + * @param schema the Parquet MessageType, used to look up field metadata. + * @param helper the ColumnHelper providing parameter-to-column name mapping. + * @param group the SimpleGroup representing one row. + * @return a Try[Row]. + */ + def parse(schema: MessageType, helper: ColumnHelper[Row])(group: SimpleGroup): Try[Row] +} + +/** + * Standard implementation of ParquetRowParser. + * Uses reflection to identify case class parameters and + * ParquetCellConverter to extract typed values from each field. + * + * @param converters a sequence of (columnName, converter) pairs, + * one per case class parameter, in declaration order. + * @tparam Row the target case class type, must be a Product (case class). + */ +class StandardParquetRowParser[Row <: Product : ClassTag]( + converters: Seq[(String, SimpleGroup => Try[Any])] + ) extends ParquetRowParser[Row] { + + def parse(schema: MessageType, helper: ColumnHelper[Row])(group: SimpleGroup): Try[Row] = + Try { + // Extract all field values in parameter order + val values: Seq[Any] = converters.map { case (columnName, converter) => + converter(group) match { + case Success(v) => v + case Failure(e) => throw ParquetParserException( + s"Failed to convert field '$columnName': ${e.getClass.getSimpleName}: ${e.getMessage}", + Some(e) + ) + } + } + // Invoke the case class constructor via reflection + // NOTE that this may fail if there are additional constructors defined for the case class. + val ctor = implicitly[ClassTag[Row]].runtimeClass.getConstructors.head + ctor.newInstance(values.map(_.asInstanceOf[AnyRef]): _*).asInstanceOf[Row] + } +} + +/** + * Companion object providing a factory method to build a + * StandardParquetRowParser from the case class structure and schema. + */ +object StandardParquetRowParser { + + /** + * Build a StandardParquetRowParser for a given case class type. + * + * Reflects on the case class fields, maps each parameter name to + * a Parquet column name via ColumnHelper, and builds a converter + * function for each field using ParquetCellConverter. + * + * @param schema the Parquet MessageType (already validated). + * @param helper the ColumnHelper providing name mapping. + * @tparam Row the target case class type. + * @return a StandardParquetRowParser[Row]. + */ + def apply[Row <: Product : ClassTag]( + schema: MessageType, + helper: ColumnHelper[Row] + ): StandardParquetRowParser[Row] = { + + val fields = implicitly[ClassTag[Row]] + .runtimeClass + .getDeclaredFields + .toSeq + + val converters: Seq[(String, SimpleGroup => Try[Any])] = + fields.map { field => + val columnName = helper.lookup(None, field.getName) + val isOption = field.getType == classOf[Option[_]] + + val converter: SimpleGroup => Try[Any] = + if (isOption) { + val pt = schema.getType(schema.getFieldIndex(columnName)).asPrimitiveType() + group => ParquetCellConverter.convertOption(group, columnName, pt) + } else + group => ParquetCellConverter.convertField(group, columnName, field) + + columnName -> converter + } + + new StandardParquetRowParser[Row](converters) + } +} \ No newline at end of file diff --git a/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetSchemaValidator.scala b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetSchemaValidator.scala new file mode 100644 index 0000000..fab5c8b --- /dev/null +++ b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetSchemaValidator.scala @@ -0,0 +1,81 @@ +package com.phasmidsoftware.tableparser.parquet + +import com.phasmidsoftware.tableparser.core.parse.ColumnHelper +import org.apache.parquet.schema.{MessageType, PrimitiveType => ParquetPrimitiveType} +import scala.reflect.ClassTag +import scala.util.Try + +/** + * Validates a Parquet MessageType schema against a target case class, + * prior to reading any rows. + */ +object ParquetSchemaValidator { + + /** + * Validates a Parquet MessageType schema against the target case class Row. + * + * Checks: + * - Every parameter of Row has a corresponding column in the Parquet schema + * (after ColumnHelper name mapping is applied) + * - Every such column has a supported Parquet type (via ParquetTypeMapper) + * - Any OPTIONAL Parquet column maps to an Option[T] parameter in Row + * + * Columns present in the Parquet schema but absent from Row are silently + * ignored, consistent with existing CSV behaviour. + * + * @param schema the MessageType read from the Parquet file footer. + * @param helper the ColumnHelper providing parameter-to-column name mapping. + * @tparam Row the target case class type. + * @return Success(()) if valid, Failure(ParquetParserException) if not. + */ + def validate[Row: ClassTag]( + schema: MessageType, + helper: ColumnHelper[Row] + ): Try[Unit] = Try { + + // Build a map of Parquet column name -> PrimitiveType from the schema + val parquetColumns: Map[String, ParquetPrimitiveType] = + (0 until schema.getFieldCount) + .map(schema.getType) + .collect { case pt: ParquetPrimitiveType => pt.getName -> pt } + .toMap + + // Get the declared fields of the target case class + val fields = implicitly[ClassTag[Row]] + .runtimeClass + .getDeclaredFields + .toSeq + + fields.foreach { field => + // Map case class parameter name to expected Parquet column name + val columnName = helper.lookup(None, field.getName) + + // Check column exists in Parquet schema + val parquetType = parquetColumns.getOrElse(columnName, + throw ParquetParserException( + s"Column '$columnName' (mapped from parameter '${field.getName}') " + + s"not found in Parquet schema. Available columns: " + + s"${parquetColumns.keys.mkString(", ")}" + ) + ) + + // Check type is supported + ParquetTypeMapper.mapType(parquetType) match { + case Left(ex) => throw ex + case Right(_) => () + } + + // Check optionality compatibility: + // OPTIONAL Parquet column must map to Option[T] in the case class + val isScalaOption = field.getType == classOf[Option[_]] + val isParquetOptional = ParquetTypeMapper.isOptional(parquetType) + + if (!isScalaOption && isParquetOptional) + throw ParquetParserException( + s"Column '$columnName' is OPTIONAL in Parquet schema but " + + s"parameter '${field.getName}' is not Option[_]. " + + s"Wrap the parameter type in Option to handle null values." + ) + } + } +} \ No newline at end of file diff --git a/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetTableParser.scala b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetTableParser.scala new file mode 100644 index 0000000..6ed90cd --- /dev/null +++ b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetTableParser.scala @@ -0,0 +1,130 @@ +package com.phasmidsoftware.tableparser.parquet + +import com.phasmidsoftware.tableparser.core.parse.{ColumnHelper, TableBuilder} +import com.phasmidsoftware.tableparser.core.table.{Content, HeadedTable, Header, Table} +import java.nio.file.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path => HadoopPath} +import org.apache.parquet.example.data.Group +import org.apache.parquet.example.data.simple.SimpleGroup +import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.hadoop.example.GroupReadSupport +import org.apache.parquet.schema.MessageType +import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} + +/** + * A TableBuilder for Parquet sources. + * + * Reads a Parquet file or dataset directory into a Table[Row] using + * ParquetSchemaValidator, StandardParquetRowParser, and ParquetCellConverter. + * + * Extends TableBuilder directly rather than TableParser, since the Parquet + * read path does not involve Iterator[String], header rows, or LineParser. + * + * @tparam R the target case class type. + */ +abstract class ParquetTableParser[R <: Product : ClassTag] + extends TableBuilder[Table[R]] { + + type Row = R + + /** + * The ColumnHelper providing parameter-to-column name mapping. + * Users provide this in their concrete instance, exactly as for CSV. + */ + val helper: ColumnHelper[Row] + + /** + * Build a Table[Row] from parsed rows and a Header derived from + * the Parquet schema. Can be overridden if a different Table + * implementation is needed. + */ + protected def builder(rows: Iterator[Row], header: Header): Table[Row] = + HeadedTable(Content(rows), header) + + /** + * Parse a Parquet file or dataset directory into a Table[Row]. + * + * Schema validation is performed before any rows are read. + * Fails fast with ParquetParserException on any schema mismatch + * or unsupported type. + * + * @param path a java.nio.file.Path to a .parquet file or directory + * containing part files. + * @return a Try[Table[Row]]. + */ + def parseParquet(path: Path): Try[Table[Row]] = Try { + val conf = new Configuration() + val hadoopPath = new HadoopPath(path.toUri) + + // Read schema from Parquet footer metadata + val schema: MessageType = readSchema(hadoopPath, conf) + + // Validate schema against the target case class before reading any rows + ParquetSchemaValidator.validate[Row](schema, helper) match { + case Failure(exception) => + throw exception + case Success(_) => + } + + // Build the Header from Parquet column names + val header: Header = headerFromSchema(schema) + + // Build the row parser once, paying reflection cost here not per row + val rowParser: StandardParquetRowParser[Row] = + StandardParquetRowParser[Row](schema, helper) + + // Read and materialise all rows -- materialisation into Content + // via builder ensures the ParquetReader is exhausted before we return, + // avoiding any resource leak from a partially-consumed iterator. + val rows: Iterator[Row] = readRows(hadoopPath, conf, schema, rowParser) + + builder(rows, header) + } + + // ── private helpers ────────────────────────────────────────────────────── + + private def headerFromSchema(schema: MessageType): Header = + Header( + (0 until schema.getFieldCount).map(schema.getFieldName), + Nil + ) + + private def readSchema(path: HadoopPath, conf: Configuration): MessageType = { + val reader = org.apache.parquet.hadoop.ParquetFileReader.open( + org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(path, conf) + ) + try reader.getFooter.getFileMetaData.getSchema + finally reader.close() + } + + private def readRows( + path: HadoopPath, + conf: Configuration, + schema: MessageType, + rowParser: StandardParquetRowParser[Row] + ): Iterator[Row] = { + val reader: ParquetReader[Group] = + ParquetReader + .builder(new GroupReadSupport(), path) + .withConf(conf) + .build() + + // Unfold the reader into an Iterator[Row]. + // builder() will force full materialisation into Content, + // ensuring the reader is fully consumed and can be closed. + Iterator.unfold(reader) { r => + Option(r.read()).map { group => + val row = rowParser + .parse(schema, helper)(group.asInstanceOf[SimpleGroup]) + .fold(e => throw e, identity) + row -> r + } + } + // NOTE: reader is not explicitly closed here because builder() + // materialises the iterator eagerly into Content via HeadedTable. + // If you ever change builder() to return a lazy structure, + // resource management must be revisited. + } +} \ No newline at end of file diff --git a/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetTypeMapper.scala b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetTypeMapper.scala new file mode 100644 index 0000000..2d520f2 --- /dev/null +++ b/parquet/src/main/scala/com/phasmidsoftware/tableparser/parquet/ParquetTypeMapper.scala @@ -0,0 +1,53 @@ +package com.phasmidsoftware.tableparser.parquet + +import java.time.{Instant, LocalDate} +import org.apache.parquet.schema.LogicalTypeAnnotation._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema.{PrimitiveType => ParquetPrimitiveType} + +/** + * Maps Parquet primitive types to Scala types, taking into account + * both physical type and logical type annotation. + */ +object ParquetTypeMapper { + + /** + * Maps a Parquet PrimitiveType to the corresponding Scala Class. + * Used during schema validation to verify type compatibility. + * + * @param pt the Parquet PrimitiveType from the schema. + * @return Right(Class[_]) if the type is supported, + * Left(ParquetParserException) if not. + */ + def mapType(pt: ParquetPrimitiveType): Either[ParquetParserException, Class[_]] = + (pt.getPrimitiveTypeName, Option(pt.getLogicalTypeAnnotation)) match { + case (BOOLEAN, _) => Right(classOf[Boolean]) + case (INT32, Some(_: DateLogicalTypeAnnotation)) => Right(classOf[LocalDate]) + case (INT32, Some(_: DecimalLogicalTypeAnnotation)) => Right(classOf[BigDecimal]) + case (INT32, _) => Right(classOf[Int]) + case (INT64, Some(_: TimestampLogicalTypeAnnotation)) => Right(classOf[Instant]) + case (INT64, Some(_: DecimalLogicalTypeAnnotation)) => Right(classOf[BigDecimal]) + case (INT64, _) => Right(classOf[Long]) + case (FLOAT, _) => Right(classOf[Float]) + case (DOUBLE, _) => Right(classOf[Double]) + case (BINARY, _) => Right(classOf[String]) + case (FIXED_LEN_BYTE_ARRAY, Some(_: DecimalLogicalTypeAnnotation)) => + Right(classOf[BigDecimal]) + case _ => + Left(ParquetParserException( + s"Unsupported Parquet type: ${pt.getPrimitiveTypeName} " + + s"with logical type: ${pt.getLogicalTypeAnnotation}" + )) + } + + /** + * Returns true if the Parquet column is OPTIONAL (i.e. nullable). + * Used during schema validation to check Option[T] compatibility. + * + * @param pt the Parquet PrimitiveType from the schema. + * @return true if the column repetition is OPTIONAL. + */ + def isOptional(pt: ParquetPrimitiveType): Boolean = + pt.getRepetition == Repetition.OPTIONAL +} \ No newline at end of file diff --git a/parquet/src/test/resources/make_dataset.py b/parquet/src/test/resources/make_dataset.py new file mode 100644 index 0000000..922ac87 --- /dev/null +++ b/parquet/src/test/resources/make_dataset.py @@ -0,0 +1,13 @@ +import pyarrow.parquet as pq +import os + +# Create the output directory if it doesn't exist +os.makedirs('taxi_sample_dataset', exist_ok=True) + +table = pq.read_table('taxi_sample.parquet') + +pq.write_table(table.slice(0, 500), 'taxi_sample_dataset/part-0000.parquet') +pq.write_table(table.slice(500, 1000), 'taxi_sample_dataset/part-0001.parquet') + +print("Done. Two part files written to taxi_sample_dataset/") + diff --git a/parquet/src/test/resources/make_sample.py b/parquet/src/test/resources/make_sample.py new file mode 100644 index 0000000..3e382dc --- /dev/null +++ b/parquet/src/test/resources/make_sample.py @@ -0,0 +1,13 @@ +import pyarrow.parquet as pq + +table = pq.read_table('yellow_tripdata_2024-01.parquet') + +print(table.schema) + +pq.write_table( + table.slice(0, 1000), + 'taxi_sample.parquet' +) + +print(f"Done. Original rows: {table.num_rows}, Sample rows: 1000") + diff --git a/parquet/src/test/resources/taxi_sample.parquet b/parquet/src/test/resources/taxi_sample.parquet new file mode 100644 index 0000000..82b70aa Binary files /dev/null and b/parquet/src/test/resources/taxi_sample.parquet differ diff --git a/parquet/src/test/resources/taxi_sample_dataset/part-0000.parquet b/parquet/src/test/resources/taxi_sample_dataset/part-0000.parquet new file mode 100644 index 0000000..e6ccd6b Binary files /dev/null and b/parquet/src/test/resources/taxi_sample_dataset/part-0000.parquet differ diff --git a/parquet/src/test/resources/taxi_sample_dataset/part-0001.parquet b/parquet/src/test/resources/taxi_sample_dataset/part-0001.parquet new file mode 100644 index 0000000..86f2fa4 Binary files /dev/null and b/parquet/src/test/resources/taxi_sample_dataset/part-0001.parquet differ diff --git a/parquet/src/test/resources/yellow_tripdata_2024-01.parquet b/parquet/src/test/resources/yellow_tripdata_2024-01.parquet new file mode 100644 index 0000000..b755fc2 Binary files /dev/null and b/parquet/src/test/resources/yellow_tripdata_2024-01.parquet differ diff --git a/parquet/src/test/resources/yellow_tripdata_2025-01.parquet b/parquet/src/test/resources/yellow_tripdata_2025-01.parquet new file mode 100644 index 0000000..f89625d Binary files /dev/null and b/parquet/src/test/resources/yellow_tripdata_2025-01.parquet differ diff --git a/parquet/src/test/scala/com/phasmidsoftware/tableparser/parquet/ParquetParserSpec.scala b/parquet/src/test/scala/com/phasmidsoftware/tableparser/parquet/ParquetParserSpec.scala new file mode 100644 index 0000000..3e3cc38 --- /dev/null +++ b/parquet/src/test/scala/com/phasmidsoftware/tableparser/parquet/ParquetParserSpec.scala @@ -0,0 +1,119 @@ +package com.phasmidsoftware.tableparser.parquet + +import com.phasmidsoftware.tableparser.core.parse.ColumnHelper.camelToSnakeCaseColumnNameMapper +import com.phasmidsoftware.tableparser.core.parse.{CellParsers, ColumnHelper} +import com.phasmidsoftware.tableparser.core.table.Table +import java.nio.file.{Path, Paths} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import scala.util.{Success, Try} + +class ParquetParserSpec extends AnyFlatSpec with Matchers with CellParsers { + + // Path to the committed test fixture + private val samplePath: Path = + Paths.get(getClass.getResource("/taxi_sample.parquet").toURI) + + behavior of "YellowTaxiTrip model" + + it should "print the Parquet schema for inspection" in { + import org.apache.hadoop.conf.Configuration + import org.apache.hadoop.fs.{Path => HadoopPath} + import org.apache.parquet.hadoop.ParquetFileReader + import org.apache.parquet.hadoop.util.HadoopInputFile + val conf = new Configuration() + val reader = ParquetFileReader.open( + HadoopInputFile.fromPath(new HadoopPath(samplePath.toUri), conf) + ) + val schema = reader.getFooter.getFileMetaData.getSchema + reader.close() + (0 until schema.getFieldCount).foreach { i => + val t = schema.getType(i) + println(s"${t.getRepetition} ${t.getName}") + } + succeed + } + + behavior of "ParquetParser" + + // ── Happy path ───────────────────────────────────────────────────────────── + + it should "parse taxi_sample.parquet into a Table[YellowTaxiTrip]" in { + import YellowTaxiTrip.helper + val result: Try[Table[YellowTaxiTrip]] = + ParquetParser.parse[YellowTaxiTrip](samplePath) + result shouldBe a[Success[_]] + } + + it should "produce exactly 1000 rows" in { + import YellowTaxiTrip.helper + val result = ParquetParser.parse[YellowTaxiTrip](samplePath) + result.get.size shouldBe 1000 + } + + it should "parse typed field values correctly for the first row" in { + import YellowTaxiTrip.helper + val firstRow = ParquetParser.parse[YellowTaxiTrip](samplePath).get.head + // vendorId should be 1 or 2 (Creative Mobile Technologies or VeriFone) + firstRow.storeAndFwdFlag should (be(Some("Y")) or be(Some("N"))) + // tripDistance should be non-negative + firstRow.tripDistance.get should be >= 0.0 + // fareAmount should be non-negative + firstRow.fareAmount.get should be >= 0.0 + // pickup should be before dropoff + firstRow.tpepPickupDatetime.zip(firstRow.tpepDropoffDatetime).headOption.forall { case (p, d) => p.isBefore(d) } shouldBe true + // storeAndFwdFlag should be Y or N + firstRow.storeAndFwdFlag should (be(Some("Y")) or be(Some("N"))) + } + + it should "have a header with 19 columns" in { + import YellowTaxiTrip.helper + val table = ParquetParser.parse[YellowTaxiTrip](samplePath).get + table.maybeHeader.map(_.size) shouldBe Some(19) + } + + // ── Schema validation ────────────────────────────────────────────────────── + + it should "fail with ParquetParserException for an unknown column name" in { + case class BadSchema(nonExistentColumn: String) + implicit val badHelper: ColumnHelper[BadSchema] = + columnHelper(camelToSnakeCaseColumnNameMapper) + val result = ParquetParser.parse[BadSchema](samplePath) + result.failed.get shouldBe a[ParquetParserException] + result.failed.get.getMessage should include("non_Existent_Column") + } + + it should "fail with ParquetParserException for a non-Option field mapping to an OPTIONAL column" in { + // congestion_surcharge is OPTIONAL in the schema but declared as Double here + // In the actual taxi data it happens to be non-null in all sample rows, + // but the validator should catch the schema-level mismatch. + // NOTE: this test assumes congestion_surcharge is OPTIONAL in the Parquet schema. + // If it is REQUIRED in the 2024 file, adjust accordingly after inspecting the schema. + pending + } + + // ── Dataset (multi-file) ─────────────────────────────────────────────────── + + it should "parse a two-part dataset directory" in { + pending + // ParquetReader requires directory support -- to be implemented + import YellowTaxiTrip.helper + // This test requires taxi_sample_dataset/ with two part files + // generated from taxi_sample.parquet -- see design document section 9. + val datasetPath = Paths.get(getClass.getResource("/taxi_sample_dataset").toURI) + val result = ParquetParser.parse[YellowTaxiTrip](datasetPath) + result shouldBe a[Success[_]] + result.get.size shouldBe 1000 + } + + // ── Analysis ────────────────────────────────────────────────────────────── + + it should "support Analysis on a Parquet-sourced table" in { + pending + import YellowTaxiTrip.helper + val table = ParquetParser.parse[YellowTaxiTrip](samplePath).get + // Analysis should run without throwing +// val analysis = Analysis(table.asInstanceOf[Table[YellowTaxiTrip]]) +// analysis should not be null + } +} \ No newline at end of file diff --git a/parquet/src/test/scala/com/phasmidsoftware/tableparser/parquet/YellowTaxiTrip.scala b/parquet/src/test/scala/com/phasmidsoftware/tableparser/parquet/YellowTaxiTrip.scala new file mode 100644 index 0000000..60dd147 --- /dev/null +++ b/parquet/src/test/scala/com/phasmidsoftware/tableparser/parquet/YellowTaxiTrip.scala @@ -0,0 +1,39 @@ +package com.phasmidsoftware.tableparser.parquet + +import com.phasmidsoftware.tableparser.core.parse.ColumnHelper.camelToSnakeCaseColumnNameMapperLower +import com.phasmidsoftware.tableparser.core.parse.{CellParsers, ColumnHelper} +import java.time.Instant + +case class YellowTaxiTrip( + vendorId: Option[Int], + tpepPickupDatetime: Option[Instant], + tpepDropoffDatetime: Option[Instant], + passengerCount: Option[Long], + tripDistance: Option[Double], + ratecodeId: Option[Long], + storeAndFwdFlag: Option[String], + puLocationId: Option[Int], + doLocationId: Option[Int], + paymentType: Option[Long], + fareAmount: Option[Double], + extra: Option[Double], + mtaTax: Option[Double], + tipAmount: Option[Double], + tollsAmount: Option[Double], + improvementSurcharge: Option[Double], + totalAmount: Option[Double], + congestionSurcharge: Option[Double], + airportFee: Option[Double] + ) + +object YellowTaxiTrip extends CellParsers { + implicit val helper: ColumnHelper[YellowTaxiTrip] = + columnHelper( + camelToSnakeCaseColumnNameMapperLower, + "vendorId" -> "VendorID", + "ratecodeId" -> "RatecodeID", + "puLocationId" -> "PULocationID", + "doLocationId" -> "DOLocationID", + "airportFee" -> "Airport_fee" + ) +} diff --git a/spark/src/main/scala/com/phasmidsoftware/tableparser/spark/DatasetMapper.scala b/spark/src/main/scala/com/phasmidsoftware/tableparser/spark/DatasetMapper.scala index 0647bcc..d846e83 100644 --- a/spark/src/main/scala/com/phasmidsoftware/tableparser/spark/DatasetMapper.scala +++ b/spark/src/main/scala/com/phasmidsoftware/tableparser/spark/DatasetMapper.scala @@ -63,7 +63,7 @@ class DatasetMapper[T](f: String => Try[T])(missing: T)(implicit sparkSession: S * */ object DatasetMapper extends App { - implicit val spark: SparkSession = SparkSession.builder.appName("DatasetMapper").master("local[*]").getOrCreate() + implicit val spark: SparkSession = SparkSession.builder().appName("DatasetMapper").master("local[*]").getOrCreate() implicit val encoder: Encoder[Movie] = Encoders.product[Movie] println(s"Current Directory is: ${System.getProperty("user.dir")}") new DatasetMapper[Movie](MovieDatabase.parser.parse(header))(Movie.missing).doMain(filename) @@ -80,7 +80,7 @@ object MovieDatabase { import com.phasmidsoftware.tableparser.core.examples.Movie._ val parser: RowParser[Movie, String] = implicitly[HeadedCSVTableParser[Movie]].rowParser - val header: Header = Header.create((Movie.header.split(',')): _*) + val header: Header = Header.create(Movie.header.split(','): _*) // NOTE: I don't know if there's a way to specify a classpath resource in Spark so, for now, we define a totally non-portable filename private val home = System.getProperties.getOrDefault("user.home", "/Users/rhillyardXX") diff --git a/spark/src/main/scala/com/phasmidsoftware/tableparser/spark/DatasetParser.scala b/spark/src/main/scala/com/phasmidsoftware/tableparser/spark/DatasetParser.scala index 18714d7..f1235fd 100644 --- a/spark/src/main/scala/com/phasmidsoftware/tableparser/spark/DatasetParser.scala +++ b/spark/src/main/scala/com/phasmidsoftware/tableparser/spark/DatasetParser.scala @@ -97,7 +97,7 @@ object DatasetParser extends App { } } - val spark: SparkSession = SparkSession.builder.appName("DatasetParser").master("local[*]").getOrCreate() + val spark: SparkSession = SparkSession.builder().appName("DatasetParser").master("local[*]").getOrCreate() doMain(spark) } diff --git a/spark/src/test/scala/com/phasmidsoftware/tableparser/spark/DatasetMapperSpec.scala b/spark/src/test/scala/com/phasmidsoftware/tableparser/spark/DatasetMapperSpec.scala index 8cd0eb4..433f6a9 100644 --- a/spark/src/test/scala/com/phasmidsoftware/tableparser/spark/DatasetMapperSpec.scala +++ b/spark/src/test/scala/com/phasmidsoftware/tableparser/spark/DatasetMapperSpec.scala @@ -10,7 +10,7 @@ class DatasetMapperSpec extends AnyFlatSpec with should.Matchers with Serializab behavior of "DatasetMapper" it should "create a Movie Dataset" in { - implicit val spark: SparkSession = SparkSession.builder.appName("DatasetMapper").master("local[*]").getOrCreate() + implicit val spark: SparkSession = SparkSession.builder().appName("DatasetMapper").master("local[*]").getOrCreate() implicit val encoder: Encoder[Movie] = Encoders.product[Movie] println(s"Current Directory is: ${System.getProperty("user.dir")}") diff --git a/spark/src/test/scala/com/phasmidsoftware/tableparser/spark/DatasetParserSpec.scala b/spark/src/test/scala/com/phasmidsoftware/tableparser/spark/DatasetParserSpec.scala index f0e082d..e3740ac 100644 --- a/spark/src/test/scala/com/phasmidsoftware/tableparser/spark/DatasetParserSpec.scala +++ b/spark/src/test/scala/com/phasmidsoftware/tableparser/spark/DatasetParserSpec.scala @@ -10,7 +10,7 @@ class DatasetParserSpec extends AnyFlatSpec { // TODO figure out why this doesn't work: we do the same thing in DatasetParser and it works fine. ignore should "doMain" in { - val spark: SparkSession = SparkSession.builder.appName("DatasetParser").master("local[*]").getOrCreate() + val spark: SparkSession = SparkSession.builder().appName("DatasetParser").master("local[*]").getOrCreate() doMain(spark) } diff --git a/zio/src/test/scala/com/phasmidsoftware/tableparser/zio/parse/RawParsersSpec.scala b/zio/src/test/scala/com/phasmidsoftware/tableparser/zio/parse/RawParsersSpec.scala index 69b65eb..669cc98 100644 --- a/zio/src/test/scala/com/phasmidsoftware/tableparser/zio/parse/RawParsersSpec.scala +++ b/zio/src/test/scala/com/phasmidsoftware/tableparser/zio/parse/RawParsersSpec.scala @@ -31,6 +31,7 @@ class RawParsersSpec extends flatspec.AnyFlatSpec with should.Matchers { triedString match { case Success(s) if s == "Doug Walker" => println(s"success: $s"); true case Failure(e) => fail(s"failure: $e") + case _ => fail("unexpected") } } }