In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction as UDF
from pyspark.sql.functions import size, col
from pyspark.sql.types import ArrayType, StringType, IntegerType
import time
    
def do_one_round(spark_executor_cores = 4, spark_cores_max = 16):
    start_time = time.time()
    print(f"RUNNING with spark_executor_cores = {spark_executor_cores}; spark_cores_max = {spark_cores_max}")
    '''
    spark.executor.cores - The number of cores to use on each executor (worker)

    spark.cores.max - the maximum amount of CPU cores to request for the application 
                      from across the cluster (not from each machine).
    '''

    ss = SparkSession\
            .builder\
            .master("spark://192.168.2.61:7077") \
            .appName("irina_strong_scaling")\
            .config("spark.dynamicAllocation.enabled", True)\
            .config("spark.executor.cores",spark_executor_cores)\
            .config("spark.cores.max",spark_cores_max)\
            .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
            .config("spark.shuffle.service.enabled", False)\
            .config("spark.dynamicAllocation.executorIdleTimeout", "30s")\
            .config("spark.driver.port", 9998)\
            .config("spark.blockManager.port", 10005)\
            .getOrCreate()  # Read file
    start_time = time.time()
    df = ss.read.json('hdfs://192.168.2.184:9000/RC_2009-09')
    df.printSchema()

    # Drop unused columns
    df = df.drop('archived', 'author', 'author_flair_css_class', 'author_flair_text', 'created_utc', 'edited',            'gilded', 'id', 'link_id', 'name', 'parent_id', 'removal_reason', 'retrieved_on',            'score_hidden', 'subreddit_id', 'distinguished', 'controversiality');
    df.printSchema()

    # Tokenize comment body
    import string
    def tokenize(text):
        return text.translate(str.maketrans('', '', string.punctuation)).lower().split()

    udf_tokenize =  UDF(tokenize, ArrayType(StringType()))
    df = df.withColumn('body', udf_tokenize(df.body))

    # Comment length feature
    df = df.withColumn('comment_length', size(df.body))

    # Deleted comment feature
    def check_deleted(line):
        if line == ['removed'] or line == ['deleted']:
            return 1
        else:
            return 0

    udf_deleted = UDF(check_deleted, IntegerType())
    df = df.withColumn('deleted', udf_deleted(df.body))

    df.select('deleted').show()

    # Controversial words feature
    cont_words = 'abuse, administration, afghanistan, aid, america,' + 'american, army, attack, attacks, authorities, authority, ban, banks, benefits, bill, bills,' + 'border, budget, campaign, candidate, candidates, catholic, china, chinese, church,'+ 'concerns, congress, conservative, control, country, court, crime, criminal, crisis, cuts,'+'debate, debt, defense, deficit, democrats, disease, dollar, drug, drugs, economy, education,'+'egypt, election, elections, enforcement, fighting, finance, fiscal, force, funding,'+'gas, government, gun, health, immigration, inaccuracies, india, insurance, investigation,'+'investigators, iran, israel, job, jobs, judge, justice, killing, korea, labor, land,'+'law, lawmakers, laws, lawsuit, leadership, legislation, marriage, media, mexico, military,'+'money, murder, nation, nations, news, obama, offensive, officials, oil, parties,'+'peace, police, policies, policy, politics, poll, power, president, prices, primary, prison,'+'progress, race, reform, republican, republicans, restrictions, rule, rules, ruling, russia,'+'russian, school, security, senate, sex, shooting, society, spending, strategy, strike, support,'+'syria, syrian, tax, taxes, threat, trial, unemployment, union, usa, victim, victims,'+'violence, vote, voters, war, washington, weapons, world,'
    semi_cont_words = 'account, advantage, amount, attorney, chairman,'+'charge, charges, cities, class, comment, companies, cost, credit, delays, effect, expectations,'+'families, family, february, germany, goal, housing, information, investment,'+'markets, numbers, oklahoma, parents, patients, population, price, projects, raise, rate,'+'reason, sales, schools, sector, shot, source, sources, status, stock, store, worth,'

    controversial = tokenize(cont_words)
    semi_controversial = tokenize(semi_cont_words)

    def controversial_words(line, controversial = controversial, semi_controversial = semi_controversial):
        score = 0
        for word in line:
            if word in controversial:
                score += 3
            elif word in semi_controversial:
                score += 1
        return score
        
    udf_controversial = UDF(controversial_words, IntegerType())
    df = df.withColumn('cont_word_score', udf_controversial('body'))
    df.select('subreddit').show()

    df.printSchema()

    controversial_subreddits = ["Conservative", "syriancivilwar", "conspiracy", "SeattleWA", "ukpolitics", "canada", "DC_cinematics", "worldnews", "bestof", "news", "europe", "Austin", "Documentaries", "PublicFreakout", "Portland", "JusticeServed", "JoeRogan", "toronto", "Games", "vancouver"] 

    #check if comment was posted on potentially controversial subreddit
    def controversial_subreddits(line, controversial_subreddits = controversial_subreddits):
        if line in controversial_subreddits:
            return 1
        else:
            return 0
        
    udf_subreddits = UDF(controversial_subreddits, IntegerType())   
    df = df.withColumn('cont_subreddits', udf_controversial(df.subreddit))

    #dropping columns we develop features on, just in case changing long ints to ints
    df = df.drop("subreddit", "body")
    df = df.withColumn('downs',df['downs'].cast("int").alias('downs'))
    df = df.withColumn('ups',df['ups'].cast("int").alias('ups'))
    df = df.withColumn('score',df['score'].cast("int").alias('score'))

    df.printSchema()

    #calculatiog ratio postive class/negative class
    positive = df.filter(col("deleted") == 1)
    negative = df.filter(col("deleted") == 0)
    ratio = positive.count()/negative.count()
    print("ratio: {}".format(ratio))

    #undersampling
    sampled_df = negative.sample(False, ratio)
    undersampled_df = sampled_df.unionAll(positive)

    #checking if undersampling worked
    undersampled_df.select("deleted").where(undersampled_df["deleted"] == 1).count()

    #checking if undersampling worked
    undersampled_df.select("deleted").where(undersampled_df["deleted"] == 0).count()
    undersampled_df.show()

    from pyspark.ml.feature import VectorAssembler, StringIndexer
    #score, ups and downs are correlated but they may work differently with different version of reddit comments
    #putting features into vector of features
    numericCols = ['ups', 'downs', 'score', 'cont_word_score','cont_subreddits']
    assemblerInputs =  numericCols
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    stages = [assembler]

    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml import Pipeline
    #making column of feature vectots
    pipeline = Pipeline(stages = stages)
    pipelineModel = pipeline.fit(undersampled_df)
    model_df = pipelineModel.transform(undersampled_df)

    #checking if it worked
    model_df.select("features").show()

    #triai-test split
    train, test = model_df.randomSplit([0.8, 0.2], seed = 3)

    #training logistic regression
    reg_time = time.time()
    from pyspark.ml.classification import LogisticRegression
    lr = LogisticRegression(featuresCol = 'features', labelCol = 'deleted', maxIter=10)
    lrModel = lr.fit(train)

    #checking if better than random classifier
    trainingSummary = lrModel.summary
    roc = trainingSummary.roc.toPandas()
    import matplotlib.pyplot as plt
    plt.plot(roc['FPR'],roc['TPR'])
    plt.ylabel('False Positive Rate')
    plt.xlabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.show()
    print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

    #making predictions on test dataset
    predictions = lrModel.transform(test)

    #calculating accuracy
    accuracy = predictions.filter(predictions.deleted == predictions.prediction).count() / float(predictions.count())
    total_reg = time.time() - reg_time   
    
    accuracy

    #checking which 
    lrModel.coefficients

    ss.sparkContext.stop()
    ss.stop()
    total_exec = time.time() - start_time
    print(f"SETUP:\nspark_executor_cores = {spark_executor_cores}\nspark_cores_max = {spark_cores_max}")
    print(f"LOG_REG execution time: {total_reg}")    
    print(f"TOTAL execution time: {total_exec}")
    
    return [spark_cores_max, spark_executor_cores, total_reg, total_exec]

In [None]:
#do_one_round(spark_executor_cores = 4, spark_cores_max = 16):
#spark_executor_cores - max cores on each worker       
#spark_cores_max - max cores on the whole cluster

num_cores_config = [1, 2, 4, 8, 12, 16]
n_iters = 3

results  = []
for c in num_cores_config:
    for i in range(0, n_iters):
        print(f"START COMPUTE WITH CORES NUM {c}")
        spark_executor_cores = 4 if c > 4 else c
        t = do_one_round(spark_cores_max = c, spark_executor_cores = spark_executor_cores)
        print(f"RESULT: {t}")
        results.append([t])
print("FORMAT: [spark_cores_max, spark_executor_cores, total_reg, total_exec]")
print(results)