Skip to content

scala-steward/dataframe-io

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

70 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

DataFrame IO

DataFrame IO allows you to

by specifying source, transform and sink URIs.

The URI schema is

protocol://host/path?queryParam1=value1&queryParam2=value2

The protocol decides which source or sink is actually used. Currently, the following options for sources and sinks are available:

For transformations, the following options are available:

Running

The etl module contains a main class called dev.mauch.spark.dfio.ETL. You can call it e.g. through

mill etl.run --help

Development

This project uses Mill as build tool. You can build the standalone JAR using

mill etl.assembly

Source and Sink URI schemas

The URI schema for sources and sinks generally looks like this

dfToReadInto+sourceType://some-host/some-path?additional=parameters
sourceType://some-host/some-path?additional=parameters

dfToPersist+sinkType://some-host/some-path?additional=parameters
sinkType://some-host/some-path?additional=parameters

The possible options for sourceType and sinkType are listed below. The dfToReadInto is used to save the result of reading the source. If not specified, it defaults to "source". The dfToPersist is the name of the DataFrame that should be persisted to the sink. If not specified, it defaults to "sink".

Both dfToReadInto and dfToPersist are optional.

Console

console://anything

The source returns an empty DataFrame.

The sink prints an excerpt of the DataFrame to the console by calling .show().

Values

values:///?header=foo,bar&values=1,2;3,4

The source returns a DataFrame with column names specified in header and values specified in values.

The sink prints an excerpt of the DataFrame to the console by calling .show().

Text

text:///path/to/some.csv

Reads/writes CSV or TSV files, determined by file extension.

Parquet

parquet:///path/to/parquet/directory

Reads/writes Parquet files in a given directory.

Delta

delta:///path/to/parquet/directory

Reads/writes Delta table directories in a given directory.

Excel

xlsx:///path/to/some.xlsl

Reads/writes given Excel file.

Hive

hive:///some_db/some_table?partitionCols=my_col_a,my_col_b&checkpointCol=my_timestamp_col

Reads/writes the given table. When writing, data is partitioned by the given partitionCol. The checkpointCol is used to filter data by recency.

Kafka

kafka://my_broker:1234/some_topic?serde=avro

Read/writes the given Kafka topic. The serde parameter determines how data is (de-)serialized. Valid options are

  • json
  • avro

avro can be used with or without schema. When using a schema registry, the following configuration is required:

spark.kafka.schema.registry.url=http://my_schema_registry:8081
# For Basic Auth
spark.kafka.schema.registry.basic.auth.credentials.source="USER_INFO"
spark.kafka.schema.registry.basic.auth.user.info=my_user:my_password

For details check KafkaDataFrameSource.

Solr

solr:///some_collection?batchSize=5000

Reads/writes the given Solr collection. The batch size specifies the number of documents that get written at once per Spark worker and can be used for performance tuning.

The following configuration options can be used:

# Required
spark.solr.zookeeperUrl=my_zookeeper:2181/solr
# Optional, defaults to jaas.conf
spark.solr.loginConfig=my_jaas.conf
# Optional, defaults to dataframe-io
spark.solr.appName=my-copy-data-app

Transformation URI Schemas

The URI schema for transformations generally looks like this

sourceName+sinkName+transformationType://some-host/some-path?additional=parameters

The possible options for transformationType are listed below. The sourceName is used to read from a previously named intermediate DataFrame for transformations that expect the DataFrame to be passed explicitly. Sources like the two SQL sources explicitly mention the name of the intermediate DataFrame. By default it is "source".

The sinkName is used to register the result under a name. By default it is "sink".

sourceName and sinkName can be specified or omitted in the following ways:

transformationType:// # No sourceName or sinkName, they default to "source" and "sink"
sourceName+transformationType:// # Only sourceName, sink defaults to "sink"
sourceName+sinkName+transformationType:// # Both sourceName and sinkName given

Identity

sourceName+sinkName+identity:///

This transformation can be used to rename a DataFrame from sourceName to sinkName.

SQL

sql:///SELECT%20foo%20AS%20bar%20FROM%20sourceName

This transformation allows to apply inline SQL to its input. The SQL has to be URL encoded (e.g. a space is encoded as %20). This is only helpful for short queries.

SQL-File

sql-file:///path/to/query.sql

This transformation allows to apply SQL from a file to its input. It assumes that the table(s) referenced in the SQL have been registered in a previous step using the extended schema

About

A small Spark library that makes simple data transformations easy

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Scala 78.0%
  • Shell 12.3%
  • Batchfile 9.7%