Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Separate spark functionality from engine
Browse files Browse the repository at this point in the history
Signed-off-by: egor <egor@sourced.tech>
  • Loading branch information
egor committed Mar 2, 2018
1 parent eb1f8b9 commit 99a3ec3
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 101 deletions.
26 changes: 17 additions & 9 deletions sourced/ml/cmd_entries/repos2bow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from sourced.ml.models import OrderedDocumentFrequencies, QuantizationLevels
from sourced.ml.transformers import Ignition, UastExtractor, UastDeserializer, Uast2Quant, \
BagFeatures2DocFreq, BagFeatures2TermFreq, Uast2BagFeatures, HeadFiles, TFIDF, Cacher, \
Indexer, UastRow2Document, BOWWriter, Moder
Indexer, UastRow2Document, BOWWriter, Moder, create_parquet_loader
from sourced.ml.utils import create_engine
from sourced.ml.utils.engine import pipeline_graph, pause

Expand All @@ -17,20 +17,28 @@ def add_bow_args(my_parser: argparse.ArgumentParser):
my_parser.add_argument(
"--batch", default=BOWWriter.DEFAULT_CHUNK_SIZE, type=int,
help="The maximum size of a single BOW file in bytes.")
my_parser.add_argument(
"--parquet", action="store_true", help="If it's parquet input.")


@pause
def repos2bow_entry_template(args, select=HeadFiles, cache_hook=None):
log = logging.getLogger("repos2bow")
engine = create_engine("repos2bow-%s" % uuid4(), **args.__dict__)
extractors = create_extractors_from_args(args)
session_name = "repos2bow-%s" % uuid4()
if args.parquet:
start_point = create_parquet_loader(session_name, **args.__dict__)
root = start_point
else:
engine = create_engine(session_name, **args.__dict__)
root = engine

start_point = Ignition(engine, explain=args.explain) \
.link(select()) \
.link(UastExtractor(languages=args.languages))

uast_extractor = start_point.link(Moder(args.mode)).link(Cacher.maybe(args.persist))

ignition = Ignition(engine, explain=args.explain)
uast_extractor = ignition \
.link(select()) \
.link(UastExtractor(languages=args.languages)) \
.link(Moder(args.mode)) \
.link(Cacher.maybe(args.persist))
if cache_hook is not None:
uast_extractor.link(cache_hook()).execute()
# We link UastRow2Document after Cacher here because cache_hook() may want to have all possible
Expand Down Expand Up @@ -65,7 +73,7 @@ def repos2bow_entry_template(args, select=HeadFiles, cache_hook=None):
.link(Indexer(Uast2BagFeatures.Columns.token, df_model.order)) \
.link(BOWWriter(document_indexer, df_model, args.bow, args.batch)) \
.execute()
pipeline_graph(args, log, ignition)
pipeline_graph(args, log, root)


def repos2bow_entry(args):
Expand Down
2 changes: 1 addition & 1 deletion sourced/ml/transformers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from sourced.ml.transformers.basic import Sampler, Collector, First, Identity, Cacher, Ignition, \
HeadFiles, UastExtractor, FieldsSelector, ParquetSaver, ParquetLoader, UastDeserializer, \
Counter
Counter, create_parquet_loader
from sourced.ml.transformers.indexer import Indexer
from sourced.ml.transformers.tfidf import TFIDF
from sourced.ml.transformers.transformer import Transformer
Expand Down
14 changes: 13 additions & 1 deletion sourced/ml/transformers/basic.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
from typing import Union

from pyspark import StorageLevel, Row, RDD
from pyspark.sql import DataFrame

from sourced.ml.transformers.transformer import Transformer
from sourced.ml.utils import EngineConstants
from sourced.ml.utils import EngineConstants, assemble_spark_config, create_spark


class Sampler(Transformer):
Expand Down Expand Up @@ -157,6 +158,17 @@ def __call__(self, _):
raise ValueError


def create_parquet_loader(session_name, repositories, config=None, memory="", packages=None,
**spark_kwargs):
config, packages = assemble_spark_config(config=config, packages=packages, memory=memory)
session = create_spark(session_name, config=config, packages=packages,
**spark_kwargs)
log = logging.getLogger("parquet")
log.info("Initializing on %s", repositories)
parquet = ParquetLoader(session, repositories)
return parquet


class UastDeserializer(Transformer):
def __setstate__(self, state):
super().__setstate__(state)
Expand Down
5 changes: 3 additions & 2 deletions sourced/ml/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from sourced.ml.utils.bigartm import install_bigartm
from sourced.ml.utils.engine import create_engine, create_spark, add_engine_args, add_spark_args, \
EngineConstants
from sourced.ml.utils.spark import add_spark_args, create_spark, assemble_spark_config, \
SparkDefault
from sourced.ml.utils.engine import add_engine_args, create_engine, EngineConstants
from sourced.ml.utils.pickleable_logger import PickleableLogger
101 changes: 13 additions & 88 deletions sourced/ml/utils/engine.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,8 @@
import functools
import logging
import os
import sys

os.environ["PYSPARK_PYTHON"] = sys.executable

import pyspark # nopep8
from pyspark.sql import SparkSession # nopep8
from sourced.engine import Engine # nopep8


class SparkDefault:
"""
Default arguments for create_spark function and __main__
"""
MASTER_ADDRESS = "local[*]"
LOCAL_DIR = "/tmp/spark"
LOG_LEVEL = "WARN"
CONFIG = []
PACKAGES = []


def add_spark_args(my_parser, default_packages=None):
my_parser.add_argument(
"-s", "--spark", default=SparkDefault.MASTER_ADDRESS,
help="Spark's master address.")
my_parser.add_argument(
"--config", nargs="+", default=SparkDefault.CONFIG,
help="Spark configuration (key=value).")
my_parser.add_argument(
"-m", "--memory",
help="Handy memory config for spark. -m 4G,10G,2G is equivalent to "
"--config spark.executor.memory=4G "
"--config spark.driver.memory=10G "
"--config spark.driver.maxResultSize=2G."
"Numbers are floats separated by commas.")
if default_packages is None:
default_packages = SparkDefault.PACKAGES
my_parser.add_argument(
"--package", nargs="+", default=default_packages, dest="packages",
help="Additional Spark packages.")
my_parser.add_argument(
"--spark-local-dir", default=SparkDefault.LOCAL_DIR,
help="Spark local directory.")
my_parser.add_argument("--spark-log-level", default=SparkDefault.LOG_LEVEL, choices=(
"ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"),
help="Spark log level")
persistences = [att for att in pyspark.StorageLevel.__dict__.keys() if "__" not in att]
my_parser.add_argument(
"--persist", default=None, choices=persistences,
help="Spark persistence type (StorageLevel.*).")
from sourced.engine import Engine
from sourced.ml.utils import add_spark_args, create_spark, assemble_spark_config


class EngineConstants:
Expand Down Expand Up @@ -88,52 +41,24 @@ def add_engine_args(my_parser, default_packages=None):
help="Print the PySpark execution plans.")


def create_spark(session_name,
spark=SparkDefault.MASTER_ADDRESS,
spark_local_dir=SparkDefault.LOCAL_DIR,
config=SparkDefault.CONFIG,
packages=SparkDefault.PACKAGES,
spark_log_level=SparkDefault.LOG_LEVEL,
**_): # **kwargs are discarded for convenience
log = logging.getLogger("spark")
log.info("Starting %s on %s", session_name, spark)
builder = SparkSession.builder.master(spark).appName(session_name)
builder = builder.config(
"spark.jars.packages", ",".join(packages))
builder = builder.config("spark.local.dir", spark_local_dir)
for cfg in config:
builder = builder.config(*cfg.split("=", 1))
session = builder.getOrCreate()
session.sparkContext.setLogLevel(spark_log_level)
# Hide py4j verbose logging (It appears in travis mostly)
logging.getLogger("py4j").setLevel(logging.WARNING)
return session
def add_engine_dependencies(engine=EngineDefault.VERSION, config=None, packages=None):
config.append("spark.tech.sourced.engine.cleanup.skip=true")
packages.append("tech.sourced:engine:" + engine)


def add_bblfsh_dependencies(bblfsh, config=None):
config.append("spark.tech.sourced.bblfsh.grpc.host=" + bblfsh)


def create_engine(session_name, repositories,
bblfsh=EngineDefault.BBLFSH,
engine=EngineDefault.VERSION,
config=None, packages=None, memory="",
repository_format="siva", **spark_kwargs):
if config is None:
config = []
if packages is None:
packages = []
config.append("spark.tech.sourced.bblfsh.grpc.host=" + bblfsh)
# TODO(vmarkovtsev): figure out why is this option needed
config.append("spark.tech.sourced.engine.cleanup.skip=true")
packages.append("tech.sourced:engine:" + engine)
memory_conf = []
if memory:
memory = memory.split(",")
err = "Expected 3 memory parameters but got {}. " \
"Please check --help to see how -m/--memory should be used."
assert len(memory) == 3, err.format(len(memory))
memory_conf.append("spark.executor.memory=" + memory[0])
memory_conf.append("spark.driver.memory=" + memory[1])
memory_conf.append("spark.driver.maxResultSize=" + memory[2])
session = create_spark(session_name, config=config + memory_conf, packages=packages,
**spark_kwargs)
config, packages = assemble_spark_config(config=config, packages=packages, memory=memory)
add_engine_dependencies(engine=engine, config=config, packages=packages)
add_bblfsh_dependencies(bblfsh=bblfsh, config=config)
session = create_spark(session_name, config=config, packages=packages, **spark_kwargs)
log = logging.getLogger("engine")
log.info("Initializing on %s", repositories)
engine = Engine(session, repositories, repository_format)
Expand Down
96 changes: 96 additions & 0 deletions sourced/ml/utils/spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import logging
import os
import sys

os.environ["PYSPARK_PYTHON"] = sys.executable

import pyspark # nopep8
from pyspark.sql import SparkSession # nopep8


class SparkDefault:
"""
Default arguments for create_spark function and __main__
"""
MASTER_ADDRESS = "local[*]"
LOCAL_DIR = "/tmp/spark"
LOG_LEVEL = "WARN"
CONFIG = []
PACKAGES = []


def add_spark_args(my_parser, default_packages=None):
my_parser.add_argument(
"-s", "--spark", default=SparkDefault.MASTER_ADDRESS,
help="Spark's master address.")
my_parser.add_argument(
"--config", nargs="+", default=SparkDefault.CONFIG,
help="Spark configuration (key=value).")
my_parser.add_argument(
"-m", "--memory",
help="Handy memory config for spark. -m 4G,10G,2G is equivalent to "
"--config spark.executor.memory=4G "
"--config spark.driver.memory=10G "
"--config spark.driver.maxResultSize=2G."
"Numbers are floats separated by commas.")
if default_packages is None:
default_packages = SparkDefault.PACKAGES
my_parser.add_argument(
"--package", nargs="+", default=default_packages, dest="packages",
help="Additional Spark packages.")
my_parser.add_argument(
"--spark-local-dir", default=SparkDefault.LOCAL_DIR,
help="Spark local directory.")
my_parser.add_argument("--spark-log-level", default=SparkDefault.LOG_LEVEL, choices=(
"ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"),
help="Spark log level")
persistences = [att for att in pyspark.StorageLevel.__dict__.keys() if "__" not in att]
my_parser.add_argument(
"--persist", default=None, choices=persistences,
help="Spark persistence type (StorageLevel.*).")


def create_spark(session_name,
spark=SparkDefault.MASTER_ADDRESS,
spark_local_dir=SparkDefault.LOCAL_DIR,
config=SparkDefault.CONFIG,
packages=SparkDefault.PACKAGES,
spark_log_level=SparkDefault.LOG_LEVEL,
**_): # **kwargs are discarded for convenience
log = logging.getLogger("spark")
log.info("Starting %s on %s", session_name, spark)
builder = SparkSession.builder.master(spark).appName(session_name)
builder = builder.config("spark.jars.packages", ",".join(packages))
builder = builder.config("spark.local.dir", spark_local_dir)
for cfg in config:
builder = builder.config(*cfg.split("=", 1))
session = builder.getOrCreate()
session.sparkContext.setLogLevel(spark_log_level)
# Hide py4j verbose logging (It appears in travis mostly)
logging.getLogger("py4j").setLevel(logging.WARNING)
return session


def assemble_spark_config(config=None, packages=None, memory: str = ""):
"""
Assemble configuration for a Spark session
:param config: configuration to send to spark session
:param packages: packages to send to spark session
:param memory: string with memory configuration for spark
:return: config, packages
"""
if config is None:
config = []
if packages is None:
packages = []
memory_conf = []
if memory:
memory = memory.split(",")
err = "Expected 3 memory parameters but got {}. " \
"Please check --help to see how -m/--memory should be used."
assert len(memory) == 3, err.format(len(memory))
memory_conf.append("spark.executor.memory=" + memory[0])
memory_conf.append("spark.driver.memory=" + memory[1])
memory_conf.append("spark.driver.maxResultSize=" + memory[2])
config = config + memory_conf
return config, packages

0 comments on commit 99a3ec3

Please sign in to comment.