/
spark.py
272 lines (231 loc) · 10.3 KB
/
spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
from typing import List, Union
import os
import pandas as pd
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
import py4j
from wmfdata import conda
from wmfdata.utils import (
check_kerberos_auth,
ensure_list,
print_err,
python_version
)
"""
Predefined spark sessions and configs for use with the get_session and run functions.
"""
PREDEFINED_SPARK_SESSIONS = {
"local": {
"master": "local[2]", # [2] means use 2 local worker threads
"config": {
"spark.driver.memory": "4g",
}
},
"yarn-regular": {
"master": "yarn",
"config": {
"spark.driver.memory": "2g",
"spark.dynamicAllocation.maxExecutors": 64,
"spark.executor.memory": "8g",
"spark.executor.cores": 4,
"spark.sql.shuffle.partitions": 256
}
},
"yarn-large": {
"master": "yarn",
"config": {
"spark.driver.memory": "4g",
"spark.dynamicAllocation.maxExecutors": 128,
"spark.executor.memory": "8g",
"spark.executor.cores": 4,
"spark.sql.shuffle.partitions": 512
}
}
}
# Add simple session type aliases
PREDEFINED_SPARK_SESSIONS["regular"] = PREDEFINED_SPARK_SESSIONS["yarn-regular"]
PREDEFINED_SPARK_SESSIONS["large"] = PREDEFINED_SPARK_SESSIONS["yarn-large"]
# Any environment variables here will be set in all Spark processes
# via spark.executorEnv and spark.yarn.appMasterEnv settings.
ENV_VARS_TO_PROPAGATE = [
# Always propagate http proxy settings.
# -Djava.net.useSystemProxies=true should be set in extraJavaOptions for this to work.
# WMF sets java.net.useSystemProxies=true in spark-defaults.conf, so we don't need to set it here.
'http_proxy',
'https_proxy',
'no_proxy',
]
def get_active_session():
"""
DEPRECATED: Returns the active session if there is one and None otherwise.
This is a holdover from before Spark 3, when there was no native getActiveSession function.
"""
print_err(
"get_active_session has been deprecated and will be removed in the next major version. "
"Use pyspark.sql.SparkSession.getActiveSession instead."
)
return SparkSession.getActiveSession()
def create_custom_session(
master="local[2]",
app_name="wmfdata-custom",
spark_config={},
ship_python_env=False,
conda_pack_kwargs={}
):
"""
Creates a new Spark session, stopping any existing session first.
Use this instead of create_session if you'd rather have manual control over
your SparkSession configuration.
Arguments:
* `master`: passed to SparkSession.builder.master()
If this is "yarn" and and a conda env is active and and ship_python_env=False,
remote executors will be configured to use conda.conda_base_env_prefix(). This
defaults to 'opt/conda-analytics', which should be installed on all analytics cluster
worker nodes.
If your conda environment has required packages installed that are not in those, set
ship_python_env=True.
* `app_name`: passed to SparkSession.builder.appName().
* `spark_config`: passed to SparkSession.builder.config()
* `ship_python_env`: If master='yarn' and this is True, a conda env will be packed
and shipped to remote Spark executors. This is useful if your conda env
has Python or other packages that the executors will need to do their work.
* `conda_pack_kwargs`: Args to pass to conda_pack.pack(). If none are given, this will
call conda_pack.pack() with no args, causing the default currently active
conda environment to be packed.
You can pack and ship any conda environment by setting appropriate args here.
See https://conda.github.io/conda-pack/api.html#pack
If True, this will fail if conda and conda_pack are not installed.
"""
check_kerberos_auth()
session = SparkSession.getActiveSession()
if session:
session.stop()
if master == "yarn":
if ship_python_env:
# The path to our packed conda environment.
conda_packed_file = conda.pack(**conda_pack_kwargs)
# This will be used as the unpacked directory name in the YARN working directory.
conda_packed_name = os.path.splitext(os.path.basename(conda_packed_file))[0]
# Ship conda_packed_file to each YARN worker, unless a previous session has done it already.
conda_spark_archive = f"{conda_packed_file}#{conda_packed_name}"
# Spark config values persist within the Python process even after sessions are
# stopped. If a previous session in this process shipped the environment, the file
# will already be present in `spark.yarn.dist.archives`. If we blindly add it a
# second time, it will cause an error.
previous_files_shipped = pyspark.SparkConf().get("spark.yarn.dist.archives")
is_archive_set = (
previous_files_shipped is not None
and conda_spark_archive in previous_files_shipped
)
if not is_archive_set:
if "spark.yarn.dist.archives" in spark_config:
spark_config["spark.yarn.dist.archives"] += f",{conda_spark_archive}"
else:
spark_config["spark.yarn.dist.archives"] = conda_spark_archive
print(f"Shipping {conda_packed_file} to remote Spark executors.")
# Workers should use python from the unpacked conda env.
os.environ["PYSPARK_PYTHON"] = f"{conda_packed_name}/bin/python3"
# Else if conda is active, use the Python in the standard Conda
# environment, as this should exist on all worker nodes.
elif conda.is_active():
os.environ["PYSPARK_PYTHON"] = os.path.join(
conda.conda_base_env_prefix(), "bin", "python3"
)
# Else use the system python. We can't use any current conda or virtualenv python
# as these won't be present on the remote YARN workers.
# The python version workers should use must be the same as the currently
# running python version, so only set this if that version of python
# (e.g. python3.7) is installed in the system.
elif os.path.isfile(os.path.join(f"/usr/bin/python{python_version()}")):
os.environ["PYSPARK_PYTHON"] = f"/usr/bin/python{python_version()}"
builder = (
SparkSession.builder
.master(master)
.appName(app_name)
)
# All ENV_VARS_TO_PROPAGATE should be set in all Spark processes.
for var in ENV_VARS_TO_PROPAGATE:
if var in os.environ:
builder.config(f"spark.executorEnv.{var}", os.environ[var])
# NOTE: Setting the var in appMasterEnv will only have an effect if
# running in yarn cluster mode.
builder.config(f"spark.yarn.appMasterEnv.{var}", os.environ[var])
# Apply any provided spark configs.
for k, v in spark_config.items():
builder.config(k, v)
return builder.getOrCreate()
def create_session(
type="yarn-regular",
app_name=None,
extra_settings={},
ship_python_env=False,
):
"""
Creates a new Spark session based on one of the PREDEFINED_SPARK_SESSION
types, stopping any existing session first.
Arguments:
* `type`: the type of Spark session to create.
* "local": Run the command in a local Spark process. Use this for
prototyping or querying small-ish data (less than a couple of GB).
* "yarn-regular": the default; able to use up to 15% of Hadoop cluster
resources (This is the default).
* "yarn-large": for queries which require more processing (e.g. joins) or
which access more data; able to use up to 30% of Hadoop cluster
resources.
* `extra_settings`: A dict of additional Spark configs to use when creating
the Spark session. These will override the defaults specified
by `type`.
* `ship_python_env`: If master='yarn' and this is True, a conda env will be packed
and shipped to remote Spark executors. This is useful if your active conda env
has Python or other packages that the executors will need to do their work.
"""
if type not in PREDEFINED_SPARK_SESSIONS.keys():
raise ValueError(
"'{}' is not a valid predefined Spark session type. Must be one of {}".format(
type, PREDEFINED_SPARK_SESSIONS.keys()
)
)
if app_name is None:
app_name = "wmfdata-{}".format(type)
config = PREDEFINED_SPARK_SESSIONS[type]["config"]
# Add in any extra settings, overwriting if applicable
config.update(extra_settings)
return create_custom_session(
master=PREDEFINED_SPARK_SESSIONS[type]["master"],
app_name=app_name,
spark_config=config,
ship_python_env=ship_python_env
)
def run(commands: Union[str, List[str]]) -> pd.DataFrame:
"""
Runs SQL commands against the Hive tables in the Data Lake using the
PySpark SQL interface.
Note: this command will use the existing Spark session if there is one and
otherwise create a predefined "yarn-regular" session. If you want to use
a different type of session, use `create_session` or `create_custom_session`
first.
Note: this function loads all the output into memory on the client. If
your command produces many gigabytes of output, it could cause an
out-of-memory error.
Arguments:
* `commands`: the SQL to run. A string for a single command or a list of
strings for multiple commands within the same session (useful for things
like setting session variables). Passing more than one query is *not*
supported; only results from the second will be returned.
"""
commands = ensure_list(commands)
session = SparkSession.getActiveSession()
if not session:
session = create_session()
overall_result = None
for cmd in commands:
cmd_result = session.sql(cmd)
# If the result has columns, the command was a query and therefore
# results-producing. If not, it was a DDL or DML command and not
# results-producing.
if len(cmd_result.columns) > 0:
overall_result = cmd_result
if overall_result:
overall_result = overall_result.toPandas()
return overall_result