Skip to content

Commit

Permalink
Lots of progress on example datasources and writing
Browse files Browse the repository at this point in the history
  • Loading branch information
spirom committed Apr 8, 2018
1 parent dc8c92d commit 5a43c14
Show file tree
Hide file tree
Showing 38 changed files with 1,746 additions and 113 deletions.
9 changes: 9 additions & 0 deletions .idea/hydra.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 1 addition & 16 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/scala_compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 23 additions & 5 deletions README.md
Expand Up @@ -124,7 +124,8 @@ These can be found under [src/main/java/examples](src/main/java/examples).
<tr><th>File</th><th>What's Illustrated</th></tr>

<tr>
<td><a href="src/main/java/examples/Basic.java">Basic.java</a></td>
<td><a href="src/main/java/examples/JBasic.java">JBasic.java</a><br/>
<a href="src/main/scala/examples/SBasic.java">SBasic.java</a></td>
<td>
<p>Simplest example that uses direct ExampleDB calls to populate a table and then
uses the SimpleRowDataSource to query it from Spark. Since that data source is
Expand All @@ -134,14 +135,14 @@ the table name is not specified int he Spark code.</p>
</td>
</tr>
<tr>
<td><a href="src/main/java/examples/ReadNamedTable.java">ReadNamedTable.java</a></td>
<td><a href="src/main/java/examples/JReadNamedTable.java">JReadNamedTable.java</a></td>
<td>
<p>Instead uses the FlexibleRowDataSource to infer the schema of a specified table
and query it, again sequentially, again resulting in a Dataset with a single partition.</p>
</td>
</tr>
<tr>
<td><a href="src/main/java/examples/ReadParallel.java">ReadParallel.java</a></td>
<td><a href="src/main/java/examples/JReadParallel.java">JReadParallel.java</a></td>
<td>
<p>Uses the ParallelRowDataSource to infer the schema of a specified table
and query it, this time in parallel, resulting in Datasets with multiple partitions.
Expand All @@ -150,20 +151,37 @@ specifying a partition count.</p>
</td>
</tr>
<tr>
<td><a href="src/main/java/examples/ReadPartitionAware.java">ReadPartitionAware.java</a></td>
<td><a href="src/main/java/examples/JReadPartitionAware.java">ReadPartitionAware.java</a></td>
<td>
<p>Uses the PartitioningRowDataSource to avoid a shuffle in a grouping/aggregation query
against a table that is clustered ont he grouping column. It achieves this by using the
SupportsReportPartitioning mixin for the DataSourceReader interface.</p>
</td>
</tr>
<tr>
<td><a href="src/main/java/examples/ReadPartitionAware_Mismatch.java">ReadPartitionAware_Mismatch.java</a></td>
<td><a href="src/main/java/examples/JReadPartitionAware_Mismatch.java">ReadPartitionAware_Mismatch.java</a></td>
<td>
<p>This uses the same data source as the previous example but doesn't cluster the table, thus
illustrating the shuffle that takes place. .</p>
</td>
</tr>
<tr>
<td><a href="src/main/java/examples/JReadWriteParallel.java">JReadWriteParallel.java</a><br>
<a href="src/main/scala/examples/SReadWriteParallel.java">SReadWriteParallel.java</a>
</td>
<td>
<p>This illustrates updates using the simplest update-capable data source example, the ParallelRowReadWriteDataSource.</p>
<p>First a dataframe is created that is used to populate a table for the first time. At that
point the newly created table's database schema is calculated from the dataframe schema.
Notice that even though we create a dataframe with 6 partitions, later when we read
from the table we always obtain dataframes with 4 partitions. This is because all tables
in ExampleDB advertise 4 partitions by default, and we would have to override that default
when reading to obtain different partitioning. However, the partitioning of the dataframe
DOES impact update parallelism -- notice from the log output that six tasks write to six temporary tables --
and these would have run in parallel had we not specified only 4 executors as we do in all these examples.</p>
<p>We then put all four settings of SaveMode through their paces and see their impact.</p>
</td>
</tr>
</table>

# Logging
Expand Down
26 changes: 26 additions & 0 deletions pom.xml
Expand Up @@ -85,6 +85,32 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<configuration>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>
</project>
11 changes: 4 additions & 7 deletions spark-data-sources.iml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8" inherit-compiler-output="false">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
Expand All @@ -9,12 +9,8 @@
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/target/generated-sources/protobuf/grpc-java" isTestSource="false" generated="true" />
<sourceFolder url="file://$MODULE_DIR$/target/generated-sources/protobuf/java" isTestSource="false" generated="true" />
<excludeFolder url="file://$MODULE_DIR$/target/classes" />
<excludeFolder url="file://$MODULE_DIR$/target/protoc-dependencies" />
<excludeFolder url="file://$MODULE_DIR$/target/protoc-plugins" />
<excludeFolder url="file://$MODULE_DIR$/target/surefire" />
<excludeFolder url="file://$MODULE_DIR$/target/surefire-reports" />
<excludeFolder url="file://$MODULE_DIR$/target/test-classes" />
<sourceFolder url="file://$MODULE_DIR$/src/main/scala" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
Expand Down Expand Up @@ -198,5 +194,6 @@
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.assertj:assertj-core:3.9.1" level="project" />
<orderEntry type="library" name="scala-sdk-2.11.12" level="application" />
</component>
</module>
4 changes: 2 additions & 2 deletions src/main/java/datasources/FlexibleRowDataSource.java
Expand Up @@ -40,7 +40,7 @@ public DataSourceReader createReader(DataSourceOptions options) {
}

/**
* This is how Spark discovers the source's schema by requesting a schema from ExmapleDB,
* This is how Spark discovers the source table's schema by requesting a schema from ExmapleDB,
* and how it obtains the reader factories to be used by the executors to create readers.
* In this case only one reader factory is created, supporting just one executor, so the
* resulting Dataset will have only a single partition -- that's why this DataSource
Expand All @@ -67,7 +67,7 @@ public StructType readSchema() {
DBClientWrapper db = new DBClientWrapper(_host, _port);
db.connect();
try {
_schema = db.getSchema(_table);
_schema = db.getSparkSchema(_table);
} catch (UnknownTableException ute) {
throw new RuntimeException(ute);
} finally {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/datasources/ParallelRowDataSource.java
Expand Up @@ -44,7 +44,7 @@ public DataSourceReader createReader(DataSourceOptions options) {
}

/**
* This is how Spark discovers the source's schema by requesting a schema from ExmapleDB,
* This is how Spark discovers the source table's schema by requesting a schema from ExmapleDB,
* and how it obtains the reader factories to be used by the executors to create readers.
* Notice that one factory is created for each partition.
*/
Expand All @@ -71,7 +71,7 @@ public StructType readSchema() {
DBClientWrapper db = new DBClientWrapper(_host, _port);
db.connect();
try {
_schema = db.getSchema(_table);
_schema = db.getSparkSchema(_table);
} catch (UnknownTableException ute) {
throw new RuntimeException(ute);
} finally {
Expand Down

0 comments on commit 5a43c14

Please sign in to comment.