Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow the deprecation of Spark 2.x support by td-pyspark #94

Merged
merged 11 commits into from
Jan 2, 2021

Conversation

takuti
Copy link
Contributor

@takuti takuti commented Dec 24, 2020

Resolve #92

Manual test:

$ spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_275
Branch HEAD
Compiled by user ubuntu on 2020-08-28T07:36:48Z
Revision 2b147c4cd50da32fe2b4167f97c8142102a0510d
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.
import pytd
import pandas as pd
from pytd.writer import SparkWriter

client = pytd.Client(database='sample_datasets')
df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 10]})

writer = SparkWriter(td_spark_path='~/tmp/td_spark.jar')
client.load_table_from_dataframe(df, 'takuti.foo', writer=writer, if_exists='overwrite')

client.load_table_from_dataframe(df, 'takuti.foo', writer='spark', if_exists='overwrite')

writer = SparkWriter()
client.load_table_from_dataframe(df, 'takuti.bar', writer=writer, if_exists='overwrite')

@takuti takuti requested a review from chezou December 24, 2020 21:54
@takuti
Copy link
Contributor Author

takuti commented Dec 24, 2020

Lint errors causing CI failure should be resolved by #95

Copy link
Member

@chezou chezou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just confirming, is it okay to pass Nullable type?

pytd/spark.py Outdated
@@ -23,8 +23,8 @@ def download_td_spark(spark_binary_version="2.11", version="latest", destination
destination : str, optional
Where a downloaded jar file to be stored.
"""
td_spark_jar_name = "td-spark-assembly_{}-{}.jar".format(
spark_binary_version, version
td_spark_jar_name = "td-spark-assembly-{}_spark{}.jar".format(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Python 3.5 is deprecated, how about using f-string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Updated support Python versions accordingly... 45c1552 & 2ff0eae

`SparkWriter` does not take `apikey` and `endpoint`
Since td_pyspark 19.11.0, the URLs have been changed per Spark 3.x
support:
* td-spark-assembly-latest_spark2.4.7.jar (Spark 2.4.7, Scala 2.11)
* td-spark-assembly-latest_spark3.0.1.jar (Spark 3.0.1, Scala 2.12)

https://treasure-data.github.io/td-spark/release_notes.html#v19-11-0
> Spark 2.4.x + Scala 2.11 support is deprecated as of December 2020. It
is planned to migrate to Spark 3.x + Scala 2.12 for 2021.

https://treasure-data.github.io/td-spark/release_notes.html

> Spark 3.0, PySpark requires a PyArrow version of 0.12.1 or higher

https://spark.apache.org/docs/3.0.0-preview/pyspark-migration-guide.html#upgrading-from-pyspark-24-to-30
@takuti
Copy link
Contributor Author

takuti commented Dec 25, 2020

@chezou Thank you for reviewing.

Just confirming, is it okay to pass Nullable type?

Can I confirm you're speaking about passing Nullable type of...what?

@chezou
Copy link
Member

chezou commented Dec 25, 2020

Sorry, I meant how complex type dataframe, which includes NA like https://pandas.pydata.org/pandas-docs/stable/user_guide/integer_na.html, will be treated with Spark 3.x.
I know we added conversion code within pytd, though.

Copy link
Member

@chezou chezou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed on my local env. Need to replace spark.sql.execution.arrow.enabled into spark.sql.execution.arrow.pyspark.enabled due to deprecation.

Also, we need to test over Python 3.8, since Python 3.9 is supported by neither PySpark nor pyarrow yet.

(.venv) ➜  pytd git:(td-pyspark-3) python
Python 3.8.7 (v3.8.7:6503f05dd5, Dec 21 2020, 12:45:15)
[Clang 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pytd
>>> import pandas as pd
>>> df = pd.DataFrame(data=[{'a':1,'b':2}, {'a':3,'b':4,'c':5}], dtype=pd.Int64Dtype())
>>> client = pytd.Client()
>>> client.load_table_from_dataframe(df, "aki.spark3", "spark", if_exists="overwrite")
Downloading td-spark...
Completed to download
20/12/28 17:29:14 WARN Utils: Your hostname, TD-0866.local resolves to a loopback address: 127.0.0.1; using 192.168.101.1 instead (on interface lo0)
20/12/28 17:29:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/12/28 17:29:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/12/28 17:29:17 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
20/12/28 17:29:17 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
20/12/28 17:29:17 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
2020-12-28T17:29:17.799+0900 debug [spark] Loading com.treasuredata.spark package - (package.scala:23)
2020-12-28T17:29:17.814+0900  info [spark] td-spark version:20.12.0, revision:4f257db, build_time:2020-12-04T04:22:18.306+0000 - (package.scala:24)
2020-12-28T17:29:19.618+0900  info [TDServiceConfig] td-spark site: us - (TDServiceConfig.scala:36)
20/12/28 17:29:20 WARN config: No Client EndPointIdentificationAlgorithm configured for SslContextFactory@4d5d36b9[provider=null,keyStore=null,trustStore=null]
2020-12-28T17:29:24.777+0900  warn [DefaultSource] Dropping aki.spark3 (Overwrite mode) - (DefaultSource.scala:98)
2020-12-28T17:29:25.472+0900  info [TDWriter] Uploading data to aki.spark3 (mode: Overwrite) - (TDWriter.scala:68)
2020-12-28T17:29:27.015+0900  info [TDWriter] [txx:85a0b1e5] Starting a new transaction for updating aki.spark3 - (TDWriter.scala:90)
2020-12-28T17:29:34.108+0900  info [TDWriter] [txx:85a0b1e5] Finished uploading 1 partitions (2 records, size:152B) to aki.spark3 - (TDWriter.scala:127)
>>> client.query("select * from aki.spark3")
{'data': [[1, 2, None, 1609144165], [3, 4, 5, 1609144165]], 'columns': ['a', 'b', 'c', 'time']}

setup.py Outdated Show resolved Hide resolved
takuti and others added 4 commits January 1, 2021 12:15
The current required version of PySpark/pyarrow doesn't support the Python version.

Co-authored-by: Aki Ariga <ariga@treasure-data.com>
`spark.sql.execution.arrow.enabled` has been deprecated and replaced
with `spark.sql.execution.arrow.pyspark.enabled` since Spark 3.0

https://github.com/apache/spark/blob/v3.0.1/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L1768-L1773
@takuti
Copy link
Contributor Author

takuti commented Jan 1, 2021

Thank you @chezou for the in-depth review.

Mede some modifications. Could you take another look?

@takuti takuti merged commit f223e75 into master Jan 2, 2021
@takuti takuti deleted the td-pyspark-3 branch January 2, 2021 04:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Spark 3.0 via td-pyspark-ea
2 participants