In [1]:
!hdfs dfs -D dfs.replication=1 -cp -f data/*.jsonl hdfs://nn:9000/
!hdfs dfs -D dfs.replication=1 -cp -f data/*.csv hdfs://nn:9000/

In [2]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .master("spark://boss:7077")
         .config("spark.executor.memory", "1G")
         .config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
         .enableHiveSupport()
         .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/02 16:02:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
problems_df = spark.read.format("json").load("data/problems.jsonl")

                                                                                

In [4]:
!hdfs dfs -ls data

Found 6 items
-rw-rw-r--   1 20278 20278         75 2025-04-01 01:18 data/languages.csv
-rw-rw-r--   1 20278 20278   12731821 2025-04-01 01:22 data/problem_tests.csv
-rw-rw-r--   1 20278 20278    2854960 2025-04-01 01:22 data/problems.jsonl
-rw-rw-r--   1 20278 20278  177825918 2025-04-01 01:22 data/solutions.jsonl
-rw-rw-r--   1 20278 20278        101 2025-04-01 01:18 data/sources.csv
-rw-rw-r--   1 20278 20278        555 2025-04-01 01:22 data/tags.csv


In [5]:
problems_df.limit(5).show()

[Stage 1:>                                                          (0 + 1) / 1]

+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|cf_contest_id|cf_index|cf_points|cf_rating|        cf_tags|difficulty|generated_tests|is_description_translated|memory_limit_bytes|                name|private_tests|problem_id|public_tests|source|time_limit|
+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|          322|       A|    500.0|     1000|            [0]|         7|             93|                    false|         256000000|322_A. Ciel and D...|           45|         1|           2|     2|         1|
|          760|       D|   1000.0|     1600|         [1, 2]|        10|             51|                    false|         256000000|  760_D. Travel Card|       

                                                                                

### Part 1

In [6]:
#q1
problems_df.rdd.filter(lambda row: row.cf_rating >= 1600 and 
                                   row.private_tests and 
                                   "_A." in row.name).count()

                                                                                

217

In [7]:
#q2
from pyspark.sql.functions import col, expr
problems_df.filter((col("cf_rating") >= 1600) &
                               (col("private_tests") != 0) &
                               (col("name").contains("_A."))).count()

217

In [8]:
#q3
problems_df.write.mode("overwrite").saveAsTable("problems")
print(spark.sql("""
    SELECT COUNT(*) 
    FROM problems 
    WHERE cf_rating >= 1600 AND private_tests != 0 AND name LIKE '%_A.%'
""").toPandas().iloc[0, 0])

25/04/02 16:02:24 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/04/02 16:02:24 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/04/02 16:02:28 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
25/04/02 16:02:28 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.18.0.5
25/04/02 16:02:28 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
25/04/02 16:02:33 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
25/04/02 16:02:33 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
25/04/02 16:02:33 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/04/02 16:02:33 WARN 

217


                                                                                

### Part 2

In [9]:
#q4
solutions_df = spark.read.format("json").load("data/solutions.jsonl")
(solutions_df.write
    .bucketBy(4, "language")
    .mode("overwrite")
    .saveAsTable("solutions")
)

spark.sql("""
            SELECT language, COUNT(*)
            FROM solutions
            GROUP BY language
""").explain("formatted")

                                                                                

== Physical Plan ==
AdaptiveSparkPlan (4)
+- HashAggregate (3)
   +- HashAggregate (2)
      +- Scan parquet spark_catalog.default.solutions (1)


(1) Scan parquet spark_catalog.default.solutions
Output [1]: [language#230]
Batched: true
Bucketed: true
Location: InMemoryFileIndex [hdfs://nn:9000/user/hive/warehouse/solutions]
ReadSchema: struct<language:string>
SelectedBucketsCount: 4 out of 4

(2) HashAggregate
Input [1]: [language#230]
Keys [1]: [language#230]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#237L]
Results [2]: [language#230, count#238L]

(3) HashAggregate
Input [2]: [language#230, count#238L]
Keys [1]: [language#230]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#228L]
Results [2]: [language#230, count(1)#228L AS count(1)#233L]

(4) AdaptiveSparkPlan
Output [2]: [language#230, count(1)#233L]
Arguments: isFinalPlan=false




In [10]:
#q5
languages_df = (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load(("data/languages.csv"))) 
languages_df.createOrReplaceTempView("languages")

problem_tests_df = (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load(("data/problem_tests.csv"))) 
problem_tests_df.createOrReplaceTempView("problem_tests")

sources_df = (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load(("data/sources.csv"))) 
sources_df.createOrReplaceTempView("sources")

tags_df = (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load(("data/tags.csv"))) 
tags_df.createOrReplaceTempView("tags")

tables = spark.sql("SHOW TABLES")
p = tables.toPandas()
dict(zip(p['tableName'], p['isTemporary']))

                                                                                

{'problems': False,
 'solutions': False,
 'languages': True,
 'problem_tests': True,
 'sources': True,
 'tags': True}

In [11]:
spark.sql(""" SELECT * FROM problems""").show()

+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|cf_contest_id|cf_index|cf_points|cf_rating|        cf_tags|difficulty|generated_tests|is_description_translated|memory_limit_bytes|                name|private_tests|problem_id|public_tests|source|time_limit|
+-------------+--------+---------+---------+---------------+----------+---------------+-------------------------+------------------+--------------------+-------------+----------+------------+------+----------+
|          322|       A|    500.0|     1000|            [0]|         7|             93|                    false|         256000000|322_A. Ciel and D...|           45|         1|           2|     2|         1|
|          760|       D|   1000.0|     1600|         [1, 2]|        10|             51|                    false|         256000000|  760_D. Travel Card|       

In [12]:
spark.sql(""" SELECT * FROM solutions""").show()

+----------+--------+----------+--------------------+
|is_correct|language|problem_id|            solution|
+----------+--------+----------+--------------------+
|     false| PYTHON3|      1682|n = int(input())\...|
|     false| PYTHON3|      1682|# coding=utf-8\n\...|
|     false|    JAVA|      1682|var n = readline(...|
|     false|    JAVA|      1682|import java.util....|
|     false|    JAVA|      1682|import java.util....|
|      true| PYTHON3|      1683|from math import ...|
|      true| PYTHON3|      1683|n=int(input())\na...|
|      true| PYTHON3|      1683|from collections ...|
|      true|    JAVA|      1683|import java.util....|
|     false|    JAVA|      1683|import java.io.Ou...|
|     false| PYTHON3|      1683|import sys\ninput...|
|     false| PYTHON3|      1683|import math\nn=in...|
|     false| PYTHON3|      1683|from sys import s...|
|      true|    JAVA|      1686|import java.util....|
|      true|    JAVA|      1686|import java.util....|
|      true|    JAVA|      1

In [13]:
spark.sql(""" SELECT * FROM sources""").show()

+------+--------------+
|source|   source_name|
+------+--------------+
|     0|UNKNOWN_SOURCE|
|     1|      CODECHEF|
|     2|    CODEFORCES|
|     3|   HACKEREARTH|
|     4|       CODEJAM|
|     5|       ATCODER|
|     6|          AIZU|
+------+--------------+



In [14]:
#q6
spark.sql("""
    SELECT *
    FROM solutions 
    INNER JOIN problems ON solutions.problem_id = problems.problem_id
    INNER JOIN sources ON problems.source = sources.source
    WHERE solutions.language = 'PYTHON3' 
    AND solutions.is_correct = 1
    AND sources.source_name = 'CODEFORCES'
""").count()

10576

In [15]:
#q7
query = spark.sql("""
    SELECT
    CASE 
         WHEN difficulty <= 5 THEN 'Easy'
         WHEN difficulty <= 10 THEN 'Medium'
         ELSE 'Hard'
        END AS category,
    COUNT(*) as count
    FROM problems
    GROUP BY 
        CASE 
            WHEN difficulty <= 5 THEN 'Easy'
            WHEN difficulty <= 10 THEN 'Medium'
            ELSE 'Hard'
        END
""").toPandas()

dict(zip(query['category'], query['count']))

{'Easy': 409, 'Medium': 5768, 'Hard': 2396}

In [16]:
#q8

import time

result_df = problem_tests_df.filter("is_generated = False")

#1. compute the averages
start = time.time()
result_df.selectExpr("avg(input_chars) as average_input", "avg(output_chars) as average_output").collect()
t1 = time.time() - start

#2. make a call to cache the data
result_df.cache()

#3. compute the averages, after caching
start = time.time()
result_df.selectExpr("avg(input_chars) as average_input", "avg(output_chars) as average_output").collect()
t2 = time.time() - start

#4. uncache the data

#we used this reference to uncache the data - https://sparkbyexamples.com/spark/spark-drop-dataframe-from-cache/
result_df = result_df.unpersist()

#5. compute the average, after uncaching
start = time.time()
result_df.selectExpr("avg(input_chars) as average_input", "avg(output_chars) as average_output").collect()
t3 = time.time() - start

[t1, t2, t3]

                                                                                

[0.908815860748291, 1.3678522109985352, 0.5365347862243652]

### Part 4

In [17]:
problems_df.toPandas()

Unnamed: 0,cf_contest_id,cf_index,cf_points,cf_rating,cf_tags,difficulty,generated_tests,is_description_translated,memory_limit_bytes,name,private_tests,problem_id,public_tests,source,time_limit
0,322,A,500.0,1000,[0],7,93,False,256000000,322_A. Ciel and Dancing,45,1,2,2,1
1,760,D,1000.0,1600,"[1, 2]",10,51,False,256000000,760_D. Travel Card,4,2,2,2,2
2,569,E,1500.0,2600,"[3, 0]",11,99,False,256000000,569_E. New Language,17,3,3,2,2
3,447,B,1000.0,1000,"[0, 4]",8,100,False,256000000,447_B. DZY Loves Strings,13,4,1,2,1
4,1292,B,750.0,1700,"[5, 6, 7, 0, 4]",8,91,False,256000000,1292_B. Aroma's Search,131,5,3,2,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8568,471,E,2500.0,2700,"[11, 24]",11,59,False,512000000,471_E. MUH and Lots and Lots of Segments,8,13320,2,2,2
8569,285,D,2000.0,1900,"[18, 16, 2, 4, 28]",10,3,False,256000000,285_D. Permutation Sum,14,13322,2,2,3
8570,1217,F,0.0,2600,"[11, 19, 24, 14, 15]",12,33,False,256000000,1217_F. Forced Online Queries Problem,6,13324,2,2,5
8571,1427,E,1500.0,2500,"[18, 6, 17, 29, 8]",11,55,False,256000000,1427_E. Xum,52,13325,2,2,2


In [18]:
temp_df = spark.sql("""
    SELECT *
    FROM problems
    INNER JOIN sources ON problems.source = sources.source
    WHERE sources.source_name = 'CODEFORCES'
""")

temp_df

DataFrame[cf_contest_id: bigint, cf_index: string, cf_points: double, cf_rating: bigint, cf_tags: array<bigint>, difficulty: bigint, generated_tests: bigint, is_description_translated: boolean, memory_limit_bytes: bigint, name: string, private_tests: bigint, problem_id: bigint, public_tests: bigint, source: bigint, time_limit: bigint, source: int, source_name: string]

In [19]:
train = temp_df.filter((temp_df["cf_rating"] > 0) & 
                              (temp_df["problem_id"] % 2 == 0))

test = temp_df.filter((temp_df["cf_rating"] > 0) & 
                             (temp_df["problem_id"] % 2 != 0))

missing = temp_df.filter((temp_df["cf_rating"] == 0))

In [20]:
#q9
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator


va = VectorAssembler(inputCols=["difficulty", "time_limit", "memory_limit_bytes"], outputCol="features")
dt = DecisionTreeRegressor(featuresCol="features", labelCol="cf_rating", maxDepth=5)
pipe = Pipeline(stages=[va, dt])
model = pipe.fit(train)

r2_score = RegressionEvaluator(labelCol="cf_rating", predictionCol="prediction", metricName="r2")
r2_score.evaluate(model.transform(test))

0.5929835263198762

In [21]:
#q10
from pyspark.sql.functions import avg

train_avg = train.select(avg("cf_rating")).collect()[0][0]
test_avg = test.select(avg("cf_rating")).collect()[0][0]

pred = model.transform(missing)
missing_avg = pred.select(avg("prediction")).collect()[0][0]

(train_avg, test_avg, missing_avg)

(1887.9377431906614, 1893.1106471816283, 1950.4728638818783)