In [56]:
%%bash
hdfs dfs -D dfs.replication=1 -cp -f data/*.jsonl hdfs://nn:9000/
hdfs dfs -D dfs.replication=1 -cp -f data/*.csv hdfs://nn:9000/

In [57]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .master("spark://boss:7077")
         .config("spark.executor.memory", "1G")
         .config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
         .enableHiveSupport()
         .getOrCreate())

In [58]:
problems_df = spark.read.json("hdfs://nn:9000/problems.jsonl")

problems_df.limit(5).show()

+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|cf_contest_id|cf_index|cf_points|cf_rating|        cf_tags|difficulty|generated_tests|is_description_translated|memory_limit_bytes|                name|private_tests|problem_id|public_tests|source|time_limit|
+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|          322|       A|    500.0|     1000|            [0]|         7|             93|                    false|         256000000|322_A. Ciel and D...|           45|         1|           2|     2|         1|
|          760|       D|   1000.0|     1600|         [1, 2]|        10|             51|                    false|         256000000|  760_D. Travel Card|       

In [59]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .master("spark://boss:7077")
         .config("spark.executor.memory", "1G")
         .config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
         .enableHiveSupport()
         .getOrCreate())

In [60]:
problems_df.limit(5).show()

+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|cf_contest_id|cf_index|cf_points|cf_rating|        cf_tags|difficulty|generated_tests|is_description_translated|memory_limit_bytes|                name|private_tests|problem_id|public_tests|source|time_limit|
+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|          322|       A|    500.0|     1000|            [0]|         7|             93|                    false|         256000000|322_A. Ciel and D...|           45|         1|           2|     2|         1|
|          760|       D|   1000.0|     1600|         [1, 2]|        10|             51|                    false|         256000000|  760_D. Travel Card|       

In [80]:
#q1
problems_rdd = problems_df.rdd

filtered_rdd = problems_rdd.filter(lambda row: 
    row.cf_rating is not None and row.cf_rating >= 1600 and
    row.private_tests is not None and row.private_tests > 0 and
    row.name is not None and "_A." in row.name
)

filtered_count = filtered_rdd.count()
filtered_count


217

In [81]:
#q2
from pyspark.sql.functions import expr
filtered_df = problems_df.filter(expr("cf_rating >= 1600 AND private_tests > 0 AND name LIKE '%_A.%'"))
filtered_count_df = filtered_df.count()
filtered_count_df  

217

In [82]:
#q3 
problems_df.write.mode("overwrite").saveAsTable("problems_table")
result = spark.sql("""
    SELECT COUNT(*) AS count
    FROM problems_table
    WHERE name LIKE '%_A.%'
      AND cf_rating >= 1600
      AND private_tests > 0
""")
result.first()['count']

                                                                                

217

In [71]:
#q4

solutions_df = spark.read.json("hdfs://nn:9000/solutions.jsonl")
solutions_df.show(5)  

(solutions_df.write
    .mode("overwrite")
    .format("parquet")  
    .bucketBy(4, "language") 
    .sortBy("language")  
    .saveAsTable("solutions_bucketed_parquet"))

spark.sql("SHOW TABLES").show()

query_plan = spark.sql("""
    EXPLAIN FORMATTED
    SELECT language, COUNT(*)
    FROM solutions_bucketed_parquet
    GROUP BY language
""")

query_plan.show(truncate=False)

+----------+--------+----------+--------------------+
|is_correct|language|problem_id|            solution|
+----------+--------+----------+--------------------+
|      true| PYTHON2|         1|n,m = [int(nm) fo...|
|      true| PYTHON2|         1|mn = map(int, raw...|
|      true| PYTHON2|         1|n,m=map(int, raw_...|
|      true|    JAVA|         1|import java.io.Bu...|
|      true|    JAVA|         1|import java.util....|
+----------+--------+----------+--------------------+
only showing top 5 rows



                                                                                

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|      problems_table|      false|
|  default|           solutions|      false|
|  default|solutions_buckete...|      false|
|         |           languages|       true|
|         |       problem_tests|       true|
|         |             sources|       true|
|         |                tags|       true|
+---------+--------------------+-----------+

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [72]:
#q5 
languages_df = spark.read.csv("hdfs://nn:9000/languages.csv", header=True)
languages_df.createOrReplaceTempView("languages")
problem_tests_df = spark.read.csv("hdfs://nn:9000/problem_tests.csv", header=True)
problem_tests_df.createOrReplaceTempView("problem_tests")
sources_df = spark.read.csv("hdfs://nn:9000/sources.csv", header=True)
sources_df.createOrReplaceTempView("sources")
tags_df = spark.read.csv("hdfs://nn:9000/tags.csv", header=True)
tags_df.createOrReplaceTempView("tags")

q5_answer = {
    'problems': False,  
    'solutions': False, 
    'languages': True,  
    'problem_tests': True,  
    'sources': True,  
    'tags': True  
}

q5_answer

{'problems': False,
 'solutions': False,
 'languages': True,
 'problem_tests': True,
 'sources': True,
 'tags': True}

In [73]:
#q6
solutions_df.write.mode("overwrite").saveAsTable("solutions")
codeforces_id_df = spark.sql("SELECT source FROM sources WHERE source_name = 'CODEFORCES'")
codeforces_id = codeforces_id_df.collect()[0][0]

q6_result = spark.sql(f"""
    SELECT COUNT(*) as count
    FROM solutions s
    JOIN problems_table p ON s.problem_id = p.problem_id
    WHERE s.language = 'PYTHON3'
    AND s.is_correct = TRUE
    AND p.source = {codeforces_id}
""")

count = q6_result.collect()[0][0]
count  

                                                                                

10576

In [49]:
#q7
from pyspark.sql.functions import when, col

problems_df = problems_df.withColumn(
    "difficulty_category",
    when(col("difficulty") <= 5, "Easy").when(col("difficulty") <= 10, "Medium").otherwise("Hard"),
)

q7_result = problems_df.groupBy("difficulty_category").count()

q7_dict = {row.difficulty_category: row["count"] for row in q7_result.collect()}

print(q7_dict)

{'Easy': 409, 'Medium': 5768, 'Hard': 2396}


In [53]:
#q8

from pyspark.sql import functions as F
import time


problem_tests = spark.table("problem_tests") 

filtered_problem_tests = problem_tests.filter(F.col("is_generated") == False)

def compute_averages(df):
    start_time = time.time()
    averages = df.select(F.avg("input_chars"), F.avg("output_chars")).collect()[0]
    end_time = time.time()
    return end_time - start_time

times = []

times.append(compute_averages(filtered_problem_tests))

filtered_problem_tests.cache()

times.append(compute_averages(filtered_problem_tests))

times.append(compute_averages(filtered_problem_tests))

filtered_problem_tests.unpersist()

print(times)

[0.3829801082611084, 0.5719237327575684, 0.160003662109375]


In [84]:
#q9
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator

codeforces_problems = problems_df.filter(F.col("source") == 2).cache()

train_df = codeforces_problems.filter(
    (F.col("cf_rating") > 0) & 
    (F.col("problem_id") % 2 == 0) & 
    F.col("difficulty").isNotNull() &
    F.col("time_limit").isNotNull() &
    F.col("memory_limit_bytes").isNotNull()
).cache()

test_df = codeforces_problems.filter(
    (F.col("cf_rating") > 0) & 
    (F.col("problem_id") % 2 != 0) & 
    F.col("difficulty").isNotNull() &
    F.col("time_limit").isNotNull() &
    F.col("memory_limit_bytes").isNotNull()
).cache()

assembler = VectorAssembler(inputCols=["difficulty", "time_limit", "memory_limit_bytes"], outputCol="features")
dt = DecisionTreeRegressor(featuresCol="features", labelCol="cf_rating", maxDepth=5)
pipeline = Pipeline(stages=[assembler, dt])

model = pipeline.fit(train_df)

predictions = model.transform(test_df)

if "prediction" in predictions.columns:
    print("Prediction column exists.")
else:
    print("Prediction column does not exist.")

evaluator = RegressionEvaluator(labelCol="cf_rating", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

r2_float = float(r2)
print(f"R² value: {r2_float}")  
r2_float

25/04/01 16:46:54 WARN CacheManager: Asked to cache already cached data.
25/04/01 16:46:54 WARN CacheManager: Asked to cache already cached data.
25/04/01 16:46:54 WARN CacheManager: Asked to cache already cached data.


Prediction column exists.
R² value: 0.5929835263198762


0.5929835263198762

In [55]:
#q10

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.sql import functions as F

codeforces_problems = problems_df.filter(F.col("source") == 2)  

train_df = codeforces_problems.filter((F.col("cf_rating") > 0) & (F.col("problem_id") % 2 == 0))
test_df = codeforces_problems.filter((F.col("cf_rating") > 0) & (F.col("problem_id") % 2 != 0))
missing_df = codeforces_problems.filter(F.col("cf_rating") == 0)

assembler = VectorAssembler(inputCols=["difficulty", "time_limit", "memory_limit_bytes"], outputCol="features")

dt = DecisionTreeRegressor(featuresCol="features", labelCol="cf_rating", maxDepth=5)

pipeline = Pipeline(stages=[assembler, dt])

model = pipeline.fit(train_df)

predictions_missing = model.transform(missing_df)

avg_train = train_df.agg(F.avg("cf_rating")).collect()[0][0]
avg_test = test_df.agg(F.avg("cf_rating")).collect()[0][0]
avg_missing_predictions = predictions_missing.agg(F.avg("prediction")).collect()[0][0]

print((avg_train, avg_test, avg_missing_predictions))

(1887.9377431906614, 1893.1106471816283, 1950.4728638818783)
