diff --git a/sourced/ml/cmd_entries/repos2bow.py b/sourced/ml/cmd_entries/repos2bow.py index 86a488ae..291b9437 100644 --- a/sourced/ml/cmd_entries/repos2bow.py +++ b/sourced/ml/cmd_entries/repos2bow.py @@ -6,7 +6,7 @@ from sourced.ml.models import OrderedDocumentFrequencies, QuantizationLevels from sourced.ml.transformers import Ignition, UastExtractor, UastDeserializer, Uast2Quant, \ BagFeatures2DocFreq, BagFeatures2TermFreq, Uast2BagFeatures, HeadFiles, TFIDF, Cacher, \ - Indexer, UastRow2Document, BOWWriter, Moder + Indexer, UastRow2Document, BOWWriter, Moder, create_parquet_loader from sourced.ml.utils import create_engine from sourced.ml.utils.engine import pipeline_graph, pause @@ -17,20 +17,28 @@ def add_bow_args(my_parser: argparse.ArgumentParser): my_parser.add_argument( "--batch", default=BOWWriter.DEFAULT_CHUNK_SIZE, type=int, help="The maximum size of a single BOW file in bytes.") + my_parser.add_argument( + "--parquet", action="store_true", help="If it's parquet input.") @pause def repos2bow_entry_template(args, select=HeadFiles, cache_hook=None): log = logging.getLogger("repos2bow") - engine = create_engine("repos2bow-%s" % uuid4(), **args.__dict__) extractors = create_extractors_from_args(args) + session_name = "repos2bow-%s" % uuid4() + if args.parquet: + start_point = create_parquet_loader(session_name, **args.__dict__) + root = start_point + else: + engine = create_engine(session_name, **args.__dict__) + root = engine + + start_point = Ignition(engine, explain=args.explain) \ + .link(select()) \ + .link(UastExtractor(languages=args.languages)) + + uast_extractor = start_point.link(Moder(args.mode)).link(Cacher.maybe(args.persist)) - ignition = Ignition(engine, explain=args.explain) - uast_extractor = ignition \ - .link(select()) \ - .link(UastExtractor(languages=args.languages)) \ - .link(Moder(args.mode)) \ - .link(Cacher.maybe(args.persist)) if cache_hook is not None: uast_extractor.link(cache_hook()).execute() # We link UastRow2Document after Cacher here because cache_hook() may want to have all possible @@ -65,7 +73,7 @@ def repos2bow_entry_template(args, select=HeadFiles, cache_hook=None): .link(Indexer(Uast2BagFeatures.Columns.token, df_model.order)) \ .link(BOWWriter(document_indexer, df_model, args.bow, args.batch)) \ .execute() - pipeline_graph(args, log, ignition) + pipeline_graph(args, log, root) def repos2bow_entry(args): diff --git a/sourced/ml/transformers/__init__.py b/sourced/ml/transformers/__init__.py index 66435365..69c669ba 100644 --- a/sourced/ml/transformers/__init__.py +++ b/sourced/ml/transformers/__init__.py @@ -1,6 +1,6 @@ from sourced.ml.transformers.basic import Sampler, Collector, First, Identity, Cacher, Ignition, \ HeadFiles, UastExtractor, FieldsSelector, ParquetSaver, ParquetLoader, UastDeserializer, \ - Counter + Counter, create_parquet_loader from sourced.ml.transformers.indexer import Indexer from sourced.ml.transformers.tfidf import TFIDF from sourced.ml.transformers.transformer import Transformer diff --git a/sourced/ml/transformers/basic.py b/sourced/ml/transformers/basic.py index d78645b6..17ec1a51 100644 --- a/sourced/ml/transformers/basic.py +++ b/sourced/ml/transformers/basic.py @@ -1,10 +1,11 @@ +import logging from typing import Union from pyspark import StorageLevel, Row, RDD from pyspark.sql import DataFrame from sourced.ml.transformers.transformer import Transformer -from sourced.ml.utils import EngineConstants +from sourced.ml.utils import EngineConstants, assemble_spark_config, create_spark class Sampler(Transformer): @@ -157,6 +158,17 @@ def __call__(self, _): raise ValueError +def create_parquet_loader(session_name, repositories, config=None, memory="", packages=None, + **spark_kwargs): + config, packages = assemble_spark_config(config=config, packages=packages, memory=memory) + session = create_spark(session_name, config=config, packages=packages, + **spark_kwargs) + log = logging.getLogger("parquet") + log.info("Initializing on %s", repositories) + parquet = ParquetLoader(session, repositories) + return parquet + + class UastDeserializer(Transformer): def __setstate__(self, state): super().__setstate__(state) diff --git a/sourced/ml/utils/__init__.py b/sourced/ml/utils/__init__.py index 9f020374..af02020a 100644 --- a/sourced/ml/utils/__init__.py +++ b/sourced/ml/utils/__init__.py @@ -1,4 +1,5 @@ from sourced.ml.utils.bigartm import install_bigartm -from sourced.ml.utils.engine import create_engine, create_spark, add_engine_args, add_spark_args, \ - EngineConstants +from sourced.ml.utils.spark import add_spark_args, create_spark, assemble_spark_config, \ + SparkDefault +from sourced.ml.utils.engine import add_engine_args, create_engine, EngineConstants from sourced.ml.utils.pickleable_logger import PickleableLogger diff --git a/sourced/ml/utils/engine.py b/sourced/ml/utils/engine.py index 7f87f53d..6388f2b4 100644 --- a/sourced/ml/utils/engine.py +++ b/sourced/ml/utils/engine.py @@ -1,55 +1,8 @@ import functools import logging -import os -import sys -os.environ["PYSPARK_PYTHON"] = sys.executable - -import pyspark # nopep8 -from pyspark.sql import SparkSession # nopep8 -from sourced.engine import Engine # nopep8 - - -class SparkDefault: - """ - Default arguments for create_spark function and __main__ - """ - MASTER_ADDRESS = "local[*]" - LOCAL_DIR = "/tmp/spark" - LOG_LEVEL = "WARN" - CONFIG = [] - PACKAGES = [] - - -def add_spark_args(my_parser, default_packages=None): - my_parser.add_argument( - "-s", "--spark", default=SparkDefault.MASTER_ADDRESS, - help="Spark's master address.") - my_parser.add_argument( - "--config", nargs="+", default=SparkDefault.CONFIG, - help="Spark configuration (key=value).") - my_parser.add_argument( - "-m", "--memory", - help="Handy memory config for spark. -m 4G,10G,2G is equivalent to " - "--config spark.executor.memory=4G " - "--config spark.driver.memory=10G " - "--config spark.driver.maxResultSize=2G." - "Numbers are floats separated by commas.") - if default_packages is None: - default_packages = SparkDefault.PACKAGES - my_parser.add_argument( - "--package", nargs="+", default=default_packages, dest="packages", - help="Additional Spark packages.") - my_parser.add_argument( - "--spark-local-dir", default=SparkDefault.LOCAL_DIR, - help="Spark local directory.") - my_parser.add_argument("--spark-log-level", default=SparkDefault.LOG_LEVEL, choices=( - "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"), - help="Spark log level") - persistences = [att for att in pyspark.StorageLevel.__dict__.keys() if "__" not in att] - my_parser.add_argument( - "--persist", default=None, choices=persistences, - help="Spark persistence type (StorageLevel.*).") +from sourced.engine import Engine +from sourced.ml.utils import add_spark_args, create_spark, assemble_spark_config class EngineConstants: @@ -88,26 +41,13 @@ def add_engine_args(my_parser, default_packages=None): help="Print the PySpark execution plans.") -def create_spark(session_name, - spark=SparkDefault.MASTER_ADDRESS, - spark_local_dir=SparkDefault.LOCAL_DIR, - config=SparkDefault.CONFIG, - packages=SparkDefault.PACKAGES, - spark_log_level=SparkDefault.LOG_LEVEL, - **_): # **kwargs are discarded for convenience - log = logging.getLogger("spark") - log.info("Starting %s on %s", session_name, spark) - builder = SparkSession.builder.master(spark).appName(session_name) - builder = builder.config( - "spark.jars.packages", ",".join(packages)) - builder = builder.config("spark.local.dir", spark_local_dir) - for cfg in config: - builder = builder.config(*cfg.split("=", 1)) - session = builder.getOrCreate() - session.sparkContext.setLogLevel(spark_log_level) - # Hide py4j verbose logging (It appears in travis mostly) - logging.getLogger("py4j").setLevel(logging.WARNING) - return session +def add_engine_dependencies(engine=EngineDefault.VERSION, config=None, packages=None): + config.append("spark.tech.sourced.engine.cleanup.skip=true") + packages.append("tech.sourced:engine:" + engine) + + +def add_bblfsh_dependencies(bblfsh, config=None): + config.append("spark.tech.sourced.bblfsh.grpc.host=" + bblfsh) def create_engine(session_name, repositories, @@ -115,25 +55,10 @@ def create_engine(session_name, repositories, engine=EngineDefault.VERSION, config=None, packages=None, memory="", repository_format="siva", **spark_kwargs): - if config is None: - config = [] - if packages is None: - packages = [] - config.append("spark.tech.sourced.bblfsh.grpc.host=" + bblfsh) - # TODO(vmarkovtsev): figure out why is this option needed - config.append("spark.tech.sourced.engine.cleanup.skip=true") - packages.append("tech.sourced:engine:" + engine) - memory_conf = [] - if memory: - memory = memory.split(",") - err = "Expected 3 memory parameters but got {}. " \ - "Please check --help to see how -m/--memory should be used." - assert len(memory) == 3, err.format(len(memory)) - memory_conf.append("spark.executor.memory=" + memory[0]) - memory_conf.append("spark.driver.memory=" + memory[1]) - memory_conf.append("spark.driver.maxResultSize=" + memory[2]) - session = create_spark(session_name, config=config + memory_conf, packages=packages, - **spark_kwargs) + config, packages = assemble_spark_config(config=config, packages=packages, memory=memory) + add_engine_dependencies(engine=engine, config=config, packages=packages) + add_bblfsh_dependencies(bblfsh=bblfsh, config=config) + session = create_spark(session_name, config=config, packages=packages, **spark_kwargs) log = logging.getLogger("engine") log.info("Initializing on %s", repositories) engine = Engine(session, repositories, repository_format) diff --git a/sourced/ml/utils/spark.py b/sourced/ml/utils/spark.py new file mode 100644 index 00000000..533b8311 --- /dev/null +++ b/sourced/ml/utils/spark.py @@ -0,0 +1,96 @@ +import logging +import os +import sys + +os.environ["PYSPARK_PYTHON"] = sys.executable + +import pyspark # nopep8 +from pyspark.sql import SparkSession # nopep8 + + +class SparkDefault: + """ + Default arguments for create_spark function and __main__ + """ + MASTER_ADDRESS = "local[*]" + LOCAL_DIR = "/tmp/spark" + LOG_LEVEL = "WARN" + CONFIG = [] + PACKAGES = [] + + +def add_spark_args(my_parser, default_packages=None): + my_parser.add_argument( + "-s", "--spark", default=SparkDefault.MASTER_ADDRESS, + help="Spark's master address.") + my_parser.add_argument( + "--config", nargs="+", default=SparkDefault.CONFIG, + help="Spark configuration (key=value).") + my_parser.add_argument( + "-m", "--memory", + help="Handy memory config for spark. -m 4G,10G,2G is equivalent to " + "--config spark.executor.memory=4G " + "--config spark.driver.memory=10G " + "--config spark.driver.maxResultSize=2G." + "Numbers are floats separated by commas.") + if default_packages is None: + default_packages = SparkDefault.PACKAGES + my_parser.add_argument( + "--package", nargs="+", default=default_packages, dest="packages", + help="Additional Spark packages.") + my_parser.add_argument( + "--spark-local-dir", default=SparkDefault.LOCAL_DIR, + help="Spark local directory.") + my_parser.add_argument("--spark-log-level", default=SparkDefault.LOG_LEVEL, choices=( + "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"), + help="Spark log level") + persistences = [att for att in pyspark.StorageLevel.__dict__.keys() if "__" not in att] + my_parser.add_argument( + "--persist", default=None, choices=persistences, + help="Spark persistence type (StorageLevel.*).") + + +def create_spark(session_name, + spark=SparkDefault.MASTER_ADDRESS, + spark_local_dir=SparkDefault.LOCAL_DIR, + config=SparkDefault.CONFIG, + packages=SparkDefault.PACKAGES, + spark_log_level=SparkDefault.LOG_LEVEL, + **_): # **kwargs are discarded for convenience + log = logging.getLogger("spark") + log.info("Starting %s on %s", session_name, spark) + builder = SparkSession.builder.master(spark).appName(session_name) + builder = builder.config("spark.jars.packages", ",".join(packages)) + builder = builder.config("spark.local.dir", spark_local_dir) + for cfg in config: + builder = builder.config(*cfg.split("=", 1)) + session = builder.getOrCreate() + session.sparkContext.setLogLevel(spark_log_level) + # Hide py4j verbose logging (It appears in travis mostly) + logging.getLogger("py4j").setLevel(logging.WARNING) + return session + + +def assemble_spark_config(config=None, packages=None, memory: str = ""): + """ + Assemble configuration for a Spark session + :param config: configuration to send to spark session + :param packages: packages to send to spark session + :param memory: string with memory configuration for spark + :return: config, packages + """ + if config is None: + config = [] + if packages is None: + packages = [] + memory_conf = [] + if memory: + memory = memory.split(",") + err = "Expected 3 memory parameters but got {}. " \ + "Please check --help to see how -m/--memory should be used." + assert len(memory) == 3, err.format(len(memory)) + memory_conf.append("spark.executor.memory=" + memory[0]) + memory_conf.append("spark.driver.memory=" + memory[1]) + memory_conf.append("spark.driver.maxResultSize=" + memory[2]) + config = config + memory_conf + return config, packages