In [1]:
import pyspark.sql
import json
import os
import pathlib
from pyspark.sql.types import *


## Schemas

In [2]:
TimeType = LongType
ConfigType = lambda: StructType([
    StructField('problem-file', StringType(), False),
    StructField('argmap', MapType(StringType(), StringType()), False),
    StructField('initialization-ms', TimeType()),
    StructField('registered-instructions', ArrayType(StringType())),
    StructField('version-number', StringType()),
    StructField('git-hash', StringType()),
    StructField('uuid', StringType(), False),
])

In [3]:
GenomeType = lambda: StructType([
    StructField('instruction', StringType()),
    StructField('random-insertion', BooleanType()),
    StructField('silent', BooleanType()),
    StructField('close', IntegerType()),
    StructField('parent-uuid', StringType()),
])
ErrorType = DoubleType
ErrorsType = lambda: ArrayType(ErrorType())
IndividualType = lambda: StructType([
    StructField('genome', ArrayType(GenomeType())),
    StructField('program', StringType()),
    StructField('grain-size', IntegerType()),
    StructField('errors', ErrorsType()),
    StructField('total-error', ErrorType()),
    StructField('normalized-error', ErrorType()),
    StructField('meta-errors', ErrorsType()),
    StructField('history', ArrayType(ErrorType())),
    StructField('uuid', StringType()),
    StructField('parent-uuids', ArrayType(StringType())),
    StructField('genetic-operators', StringType()),
    StructField('is-random-replacement', BooleanType()),
    StructField('age', DoubleType()),
    StructField('weighted-error', ErrorType()),

])
PercentType = lambda: DecimalType(precision=3, scale=2)
BestType = lambda: StructType([
    StructField('individual', IndividualType()),
    StructField('mean-error', DoubleType()),
    StructField('genome-size', IntegerType()),
    StructField('program-size', IntegerType()),
    StructField('percent-parens', PercentType()),
    StructField('test-errors', ErrorsType()),
    StructField('mean-test-error', DoubleType()),
])
# PopulationType = lambda: StructType([
#     StructField('mean-total-error', DoubleType()),
#     StructField('median-total-error', DoubleType()),
#     StructField('mean-genome-size', DoubleType()),
#     StructField('mean-program-size', DoubleType()),
# ]) 
generation_schema = pyspark.sql.types.StructType([
    StructField('config', ConfigType(), True),
    StructField('config-uuid', StringType()),
    StructField('outcome', StringType(), False),
    StructField('epsilons', ErrorType()),
    StructField('population', ArrayType(IndividualType()), False),
    StructField('index', IntegerType()),
    StructField('best', BestType())
])

## Spark

In [4]:
spark = pyspark.sql.SparkSession.builder \
     .master("local") \
     .config("spark.driver.memory", "8G") \
     .getOrCreate()
#      .config("spark.executor.memory", "4G") \


In [5]:
# spark.sparkContext.setLogLevel("ALL")

In [7]:
# configs_host, configs_port = os.environ['CONFIGS_HOST'].split(":")
# generations_host, generations_port = os.environ['GENERATIONS_HOST'].split(":")

# configs_str = spark \
#     .readStream \
#     .format("socket") \
#     .option("host", configs_host) \
#     .option("port", int(configs_port)) \
#     .load()

# generations_str = spark \
#     .readStream \
#     .format("socket") \
#     .option("host", generations_host) \
#     .option("port", int(generations_port)) \
#     .load()

In [6]:
# configs_str_query_console = configs_str \
#     .writeStream \
#     .format("console") \
#     .start()

In [21]:
# generations_str_query_console = generations_str \
#     .writeStream \
#     .format("console") \
#     .start()

In [8]:
# configs = configs_str\
#     .select(
#         pyspark.sql.functions.from_json(
#             'value',
#             config_schema
#         ).alias('json')
#     ) \
#     .select("json.*")

# generations = generations_str.select(
#     pyspark.sql.functions.from_json(
#         'value',
#         generation_schema,
#         options={
#             "columnNameOfCorruptRecord": "error"
#         }
#     ).alias('json')
# ).select("json.*")

In [7]:
input_folder = pathlib.Path(os.environ['INPUT_FOLDER'])

In [8]:
generationsStream = spark \
    .readStream \
    .schema(generation_schema) \
    .json(str(input_folder), mode="DROPMALFORMED")

In [10]:
output_folder = pathlib.Path(os.environ['OUTPUT_FOLDER'])

In [11]:
output_generations_folder = output_folder / "generations"

output_generations_checkpoint = output_folder / "generations_checkpoint"

In [10]:
# configs_query_console = configs \
#     .writeStream \
#     .trigger(processingTime="10 seconds") \
#     .format("console") \
#     .start()

In [11]:
# generations_query_console = generations \
#     .writeStream \
#     .trigger(processingTime="10 seconds") \
#     .format("console") \
#     .start()

In [13]:
# configs_query = configs \
#     .writeStream \
#     .start(
#         path=str(configs_uri),
#         format="parquet",
# #         partitionBy=["label"],
#         checkpointLocation=configs_checkpoint_uri,
#         queryName='configs'
#     )

generations_query = generationsStream \
    .writeStream \
    .start(
        path=str(output_generations_folder),
        format="parquet",
#         partitionBy=["config-uuid", "outcome"],
        checkpointLocation=str(output_generations_checkpoint),
        queryName='generations'
    )    

## Other Stuff

In [28]:
generations = spark.read.parquet(str(output_generations_folder))

In [31]:
generations \
    .select(
        "config-uuid",
        "config.argmap.label",
        "config.problem-file",
        "index",
        "config.argmap.age-mediated-parent-selection",
        "config.argmap.age-combining-function",
        "config.argmap.genetic-operator-probabilities",
        "config.argmap.age-mediated-parent-selection",
        "best.mean-error",
        "best.mean-test-error"
#         minimum_mean_udf("population.errors").alias("best-fitness")
    ).toPandas()

Unnamed: 0,config-uuid,label,problem-file,index,age-mediated-parent-selection,age-combining-function,genetic-operator-probabilities,age-mediated-parent-selection.1,mean-error,mean-test-error
0,99624b47-7f6b-4e53-b271-9aacb20ad8bb,,,0,,,,,4.835,5.1145
1,99624b47-7f6b-4e53-b271-9aacb20ad8bb,,,2,,,,,3.745,3.1325
2,99624b47-7f6b-4e53-b271-9aacb20ad8bb,,,1,,,,,3.745,3.1325
3,9916a404-f399-46f7-a8cc-e4d97f49508d,,clojush.problems.software.replace-space-with-n...,0,False,average,"{""alternation"":0.2,""uniform-mutation"":0.2,""uni...",False,4.93,5.015
4,9916a404-f399-46f7-a8cc-e4d97f49508d,,clojush.problems.software.replace-space-with-n...,1,False,average,"{""alternation"":0.2,""uniform-mutation"":0.2,""uni...",False,3.98,3.234
5,99624b47-7f6b-4e53-b271-9aacb20ad8bb,,,4,,,,,2.255,1.959
6,99624b47-7f6b-4e53-b271-9aacb20ad8bb,,,3,,,,,2.56,2.257
