Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"joinLeft" is not working when joining a clock with a parque-based DataFrame #57

Open
LeoDashTM opened this issue Oct 26, 2018 · 15 comments

Comments

@LeoDashTM
Copy link

@icexelloss

(even though the "timeColumn" argument error can be bypassed by renaming the column in question to time) the joinLeft is not working for me:

print( sc.version )
print( tm )

n = df.filter( df['Container'] == 'dbc94d4e3af6' ).select( tm, 'MemPercentG', 'CpuPercentG' )

from ts.flint import FlintContext, clocks
from ts.flint import utils
  
fc = FlintContext( sqlContext )

r = fc.read \
    .option('isSorted', False) \
    .option('timeUnit', 's') \
    .dataframe( n )

r.show(truncate=False)
print( r )
r.printSchema()

l = clocks.uniform(fc, '30s', begin_date_time='2018-8-1 5:55:35', end_date_time='2018-08-01 05:59:05')
print( type( l ) )
print( l )
l.printSchema()
# l.show(truncate=False)
j = l.leftJoin( r )

With the output being:

2.3.1
time
+-------------------+---------------+------------+
|time               |MemPercentG    |CpuPercentG |
+-------------------+---------------+------------+
|2018-08-01 05:55:35|0.0030517578125|0.002331024 |
|2018-08-01 05:58:05|0.0030517578125|0.0031538776|
|2018-08-01 05:59:05|0.0030517578125|0.0030176123|
+-------------------+---------------+------------+

TimeSeriesDataFrame[time: timestamp, MemPercentG: double, CpuPercentG: float]
root
 |-- time: timestamp (nullable = true)
 |-- MemPercentG: double (nullable = true)
 |-- CpuPercentG: float (nullable = true)

<class 'ts.flint.dataframe.TimeSeriesDataFrame'>
TimeSeriesDataFrame[time: timestamp]
root
 |-- time: timestamp (nullable = true)

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.codegen.ExprCode.value()Ljava/lang/String;
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-911439891027714> in <module>()
     22 l.printSchema()
     23 # l.show(truncate=False)
---> 24 j = l.leftJoin( r )
     25 
     26 

/databricks/python/lib/python3.5/site-packages/ts/flint/dataframe.py in leftJoin(self, right, tolerance, key, left_alias, right_alias)
    606         tolerance = self._timedelta_ns('tolerance', tolerance, default='0ns')
    607         scala_key = utils.list_to_seq(self._sc, key)
--> 608         tsrdd = self.timeSeriesRDD.leftJoin(right.timeSeriesRDD, tolerance, scala_key, left_alias, right_alias)
    609         return TimeSeriesDataFrame._from_tsrdd(tsrdd, self.sql_ctx)
    610 

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3324.leftJoin.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.codegen.ExprCode.value()Ljava/lang/String;
	at org.apache.spark.sql.TimestampCast$class.doGenCode(TimestampCast.scala:77)
	at org.apache.spark.sql.NanosToTimestamp.doGenCode(TimestampCast.scala:31)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:111)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:108)
	at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:143)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24$$anonfun$apply$5.apply(CodeGenerator.scala:1367)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24$$anonfun$apply$5.apply(CodeGenerator.scala:1366)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:285)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24.apply(CodeGenerator.scala:1366)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$24.apply(CodeGenerator.scala:1366)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.withSubExprEliminationExprs(CodeGenerator.scala:1227)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressionsForWholeStageWithCSE(CodeGenerator.scala:1365)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:67)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
	at org.apache.spark.sql.execution.RDDScanExec.consume(ExistingRDD.scala:176)
	at org.apache.spark.sql.execution.RDDScanExec.doProduce(ExistingRDD.scala:221)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.RDDScanExec.produce(ExistingRDD.scala:176)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:50)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:40)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:530)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:582)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:150)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:138)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:108)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:108)
	at com.twosigma.flint.timeseries.NormalizedDataFrameStore.toOrderedRdd(TimeSeriesStore.scala:252)
	at com.twosigma.flint.timeseries.NormalizedDataFrameStore.orderedRdd(TimeSeriesStore.scala:237)
	at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.orderedRdd(TimeSeriesRDD.scala:1346)
	at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.leftJoin(TimeSeriesRDD.scala:1515)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)

By the way, what is the way to even display the contents of the clocks DataFrame?
The second to last commented out line (with the .show command) errors out, so I don't understand how TimeSeriesDataFrame is inheriting from a regular DataFrame, for which that method is available...
The display method also fails...

Anyways, what is wrong with the leftJoin here? The clock is on the left like you indicated it should be. Swapping left and right data frames also does not help.

Is this reproducible for you?

Please, advise, if I'm not using/calling it correctly or if it's a bug.

The flint libraries (the Scala and the Python ones) I installed on DataBricks via its UI (from the respective online repos, which might be dated) - I can try and install the latest builds from the freshest source code, if you think that will help.

Thanks.

@LeoDashTM
Copy link
Author

Update: so dir does show that leftJoin is present for both l and r.
I can even print the 2 methods, but invoking it on one with the other as argument does not work for some reason... Any ideas?

@LeoDashTM
Copy link
Author

Some googling around might indicate that there's some incompatibility between the framework and the library versions, but your front page does say that the library works with Spark 2.3., and the lib's version (per python) is 0.6.0
Is there a more thorough way to figure out if the framework and the library are compatible with one another, @icexelloss ? Thanks.

@LeoDashTM
Copy link
Author

The error trace is calling this line surely:
https://github.com/apache/spark/blob/master/python/pyspark/sql/utils.py#L60

@icexelloss
Copy link
Member

Are you using the databricks jar? The apache jar doesn't work on databricks runtime for some reason.

@LeoDashTM
Copy link
Author

Are you using the databricks jar? The apache jar doesn't work on databricks runtime for some reason.

Li Jin, I appreciate your reply.
I'm not sure what you mean by the "databricks jar"

I downloaded the flint jar file that is currently loaded onto DataBricks, its size is 2171677 and here is its manifest file:

Manifest-Version: 1.0
Implementation-Title: flint
Implementation-Version: 0.6.0
Specification-Vendor: com.twosigma
Specification-Title: flint
Implementation-Vendor-Id: com.twosigma
Specification-Version: 0.6.0
Implementation-URL: https://github.com/twosigma/flint
Implementation-Vendor: com.twosigma

As I stated previously:
The flint libraries (the Scala and the Python ones) I installed on DataBricks via its UI (from the respective online repos, which might be dated) - I can try and install the latest builds from the freshest source code, if you think that will help.
Or if you could provide me with a jar that you think should work I could try installing and using it instead.

Please, assist further!
Thanks.

@LeoDashTM
Copy link
Author

Built from source (master branch) and its size is 8075230 (flint-assembly-0.6.0-SNAPSHOT.jar)

@icexelloss
Copy link
Member

There is issues with using the standard flint jar on Databricks platform so they build a specific jar for Flint 0.6 for their platform here:

https://github.com/databricks/databricks-accelerators/tree/master/projects/databricks-flint

@LeoDashTM
Copy link
Author

There is issues with using the standard flint jar on Databricks platform so they build a specific jar for Flint 0.6 for their platform here:

https://github.com/databricks/databricks-accelerators/tree/master/projects/databricks-flint

I downloaded the one from DataBricks, its size is 2252415 and its name is flint_0_6_0_databricks.jar
Still the same error... What gives?

@icexelloss
Copy link
Member

@LeoDashTM Could you please try this in the local pyspark notebook with --packages?

@LeoDashTM
Copy link
Author

@LeoDashTM Could you please try this in the local pyspark notebook with --packages?

I'm not too familiar yet with local pyspark, but see the 2nd comment of the issue I opened with DataBricks: databricks/databricks-accelerators#1
I used the --jars argument there.

Ok, actually just tried:
spark-submit --packages com.twosigma:flint:0.6.0 test.py and that did seem to produce the results I want!
So is it because locally I'm running 2.3.2 and it's 2.3.1 on DataBricks?
I'm not sure I can upgrade DataBricks instances from to 2.3.2 from 2.3.1, the latter is the highest stable version of Spark available on DB at the moment...

The data file is attached to the first comment and both have the reproducible code (the first one for DB and the second one for a local installation). Are you able to run either one of them against Spark 2.3.1 and see the issue?
I really need it to work against that version, you see.

Thanks.

@LeoDashTM
Copy link
Author

Update.
Just installed Spark 2.3.1 locally and running pyspark --packages com.twosigma:flint:0.6.0 then executing each line by hand seems to work correctly.
So the deal is indeed with the flavor of Spark installed on DataBricks then? Is that the right conclusion?

@icexelloss
Copy link
Member

icexelloss commented Nov 1, 2018 via email

@LeoDashTM
Copy link
Author

LeoDashTM commented Nov 13, 2018

With the Databricks' latest commit: databricks/databricks-accelerators@d23782a
the printing of the clocks and the joining both seem to be working.

The results of a clock creation and printing are a bit unexpected for me:

>>> l = clocks.uniform(fc, '30s', begin_date_time='2018-8-1 5:55:35', end_date_time='2018-08-01 05:56:05')
>>> l.show( truncate = False )
+-----------------------+
|time                   |
+-----------------------+
|50552-02-05 14:23:200.0|
|50552-02-05 22:43:200.0|
+-----------------------+

Are these results as expecte0d @icexelloss ? Are you getting the exact same thing on your setup? And if this is as expected, then how do I create a clock with the range I specified via begin_date_time and end_date_time?
If not, what is deal, any idea?
Thanks.

@LeoDashTM
Copy link
Author

Ok, I tried with TwoSigma's official latest build and the data frame is displayed as expected, so it's a DataBricks issue again...

@LeoDashTM
Copy link
Author

Same issue: #58 ?
@icexelloss ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants