In [1]:
import numpy as np
from time import time
import code.profiles.generation as gen
from code.profiles.definitions import Profile
from code.profiles.vcrDetection import detectVCRProperty, detectCRProperty, detectVRProperty, createGPEnv
from code.utils import getNumpyColumns

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import StructType, StructField, FloatType

# conf=SparkConf()
# conf.set("spark.executor.memory", "6g")
# conf.set("spark.driver.memory", "6g")
# conf.set("spark.cores.max", "16")

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()
spark

In [3]:
def run(C:int=3, V:int=3, loadPath:str="", rangeS:int=0, rangeE:int=0):

    statistics = {}
    candidatesIds = ["A", "B", "C", "D", "E", "F", "G"][:C]
    votersIds = ["0", "1", "2", "3", "4", "5", "6"][:V]

    gpEnv = createGPEnv()
    if loadPath == "":
        allProfiles = gen.parallelProfileGeneration(cpu=16, candidatesNumber=C, voterNumber=V)
        if rangeS == 0 and rangeE == 0:
            rangeE = allProfiles.shape[0] + 1
        profilesRDD = sc.parallelize(allProfiles[rangeS:rangeE])
    else:
        allProfiles = np.load(loadPath)
        if rangeS == 0 and rangeE == 0:
            rangeE = allProfiles.shape[0] + 1
        profilesRDD = sc.parallelize(allProfiles[rangeS:rangeE])

    print("After Load", rangeS, rangeE)

    vcrProfilesRDD = profilesRDD \
        .map(lambda p: np.array(p).reshape(C,V)) \
        .map(lambda p: (p,detectVCRProperty(gpEnv=gpEnv, A=p, C=candidatesIds, V=votersIds))) \
        # .filter(lambda pRes: pRes[1][0] == 2) \

    return {}, vcrProfilesRDD.take(1)

    # vcrNCOPProfilesRDD = vcrProfilesRDD \
    #     .filter(lambda pRes: not detectCRProperty(gpEnv=gpEnv, A=pRes[0], C=candidatesIds, V=votersIds)) \
    #     .filter(lambda pRes: not detectVRProperty(gpEnv=gpEnv, A=pRes[0], C=candidatesIds, V=votersIds)) \
    #     .map(lambda pRes: Profile.fromILPRes(pRes[0], pRes[1][1], candidatesIds, votersIds))
    #
    # NPRow = Row("rangeS", "rangeE", *tuple(getNumpyColumns(C,V)))
    # schema = StructType([StructField("rangeS", FloatType(), False),
    #                      StructField("rangeE", FloatType(), False)] +
    #                     [StructField(n, FloatType(), False) for n in getNumpyColumns(2,2)])
    #
    # vcrNCOPNumpyRows = vcrNCOPProfilesRDD \
    #     .map(lambda profile: profile.asNumpy().tolist()) \
    #     .map(lambda a: NPRow(rangeS, rangeE, *tuple(a))) \
    #
    # statistics["VCR"] = vcrProfilesRDD.count()
    # print(statistics["VCR"])
    #
    # print("After VCR Count")
    #
    # statistics["NCOPVCR"] = vcrNCOPProfilesRDD.count()
    # print(statistics["NCOPVCR"])
    #
    # spark.createDataFrame(statistics.items(), ["key", "value"]) \
    #     .repartition(1) \
    #     .write.format("com.databricks.spark.csv") \
    #     .option("header", "true") \
    #     .save("resources/{}C{}V-stats.csv".format(C,V), mode="append") \
    #
    # print("After Stats save")
    #
    # spark.createDataFrame(vcrNCOPNumpyRows, schema) \
    #     .write.format("com.databricks.spark.csv") \
    #     .option("header", "true") \
    #     .save("resources/{}C{}V".format(C,V), mode="append") \
    #
    # print("After save")
    #
    #
    # return statistics, vcrNCOPNumpyRows

In [None]:
base = "resources/profiles/profiles-5C-5V/all/P55-{}.npy"
for i in range(0,32):
    if (i != 7):
        print("SET : ", i)
        startTime = time()
        stats, vcrNCOPProfiles = run(C=5, V=5, loadPath=base.format(i), limit=1048580) # 2^20 + 4
        endTime = time()
        print("TIME : ", endTime - startTime)
        print(stats)


In [4]:
stats, vcrNCOPProfiles = run(C=2, V=2)

After Load 0 17


Traceback (most recent call last):
  File "/home/jasiek/.local/share/virtualenvs/master-thesis-code-oOGc7iSH/lib/python3.8/site-packages/pyspark/serializers.py", line 468, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/home/jasiek/.local/share/virtualenvs/master-thesis-code-oOGc7iSH/lib/python3.8/site-packages/pyspark/cloudpickle.py", line 1097, in dumps
    cp.dump(obj)
  File "/home/jasiek/.local/share/virtualenvs/master-thesis-code-oOGc7iSH/lib/python3.8/site-packages/pyspark/cloudpickle.py", line 357, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.8/pickle.py", line 485, in dump
    self.save(obj)
  File "/usr/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/lib/python3.8/pickle.py", line 899, in save_tuple
    save(element)
  File "/usr/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/home/jasiek/.local/share/v

PicklingError: Could not serialize object: TypeError: cannot pickle 'PyCapsule' object

In [None]:
def loadProfiles(C:int=3, V:int=3):
    vcrNCOPProfilesDF = spark.read.format("csv")\
        .option("inferSchema", "true") \
        .option("header", "true") \
        .load("resources/{}C{}V".format(C,V))

    return vcrNCOPProfilesDF.rdd \
        .map(lambda r: np.array(r, dtype=np.float)) \
        .map(lambda npProf: Profile.fromNumpy(npProf)) \

def loadStatistics(C:int=3, V:int=3):
    return spark.read.format("csv")\
        .option("inferSchema", "true") \
        .option("header", "true") \
        .load("resources/{}C{}V-stats.csv".format(C,V))



In [None]:
loadStatistics(5,5).groupBy('key').sum().collect()

In [None]:
base = "resources/profiles/profiles-5C-5V/all/P55-{}.npy"
np.load(base.format(7), 'r').shape


In [None]:
StructType([StructField("rangeS", FloatType(), False), StructField("rangeE", FloatType(), False)] +
           [StructField(n, FloatType(), False) for n in getNumpyColumns(2,2)])















