# Install Dependencies

In [4]:
!pip --default-timeout=100 install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple


# Read Json

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("COMP5349 A2 Data Loading") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory",'8g') \
    .getOrCreate()

data = "Assignment_2_data/test.json"
init_df = spark.read.json(data)

In [44]:
from pyspark.sql.functions import explode, explode_outer
from pyspark.sql.functions import udf

data_df= init_df.select((explode("data").alias('data')))
paragraph_df = data_df.select("data.title", explode("data.paragraphs").alias("paragraph"))

contract_num = 102
# data_df.printSchema()
# paragraph_df.printSchema()
# paragraph_df.show(5)

In [7]:
paragraph_unrolled_df = paragraph_df.select("title", "paragraph.context" , explode("paragraph.qas").alias("qas")) \
                                    .select("title", "context", "qas.id", "qas.question", "qas.is_impossible", explode_outer("qas.answers").alias("answer")) \
                                    .select("title", "context", "id", "question", "is_impossible", "answer.answer_start", "answer.text")

# paragraph_unrolled_df.show(5)
# paragraph_unrolled_df.printSchema()

In [8]:
import re
from pyspark.sql.types import ArrayType, StringType, LongType, BooleanType, IntegerType
from pyspark.sql.functions import split

@udf(returnType = ArrayType(StringType()))
def segmentContext(str):
    res = [repr(i)+','+str[i:i+4096] for i in range(0, len(str) - 2048, 2048)]
    return res

# res = segmentContext("1111122222\n\n\n\n\n333334444455")
# print(res)

paragraph_preproc_df = paragraph_unrolled_df.withColumn("sequence", explode(segmentContext("context"))).drop("context") \
                                            .withColumn("index", split("sequence",",").getItem(0)) \
                                            .withColumn("source", split("sequence",",").getItem(1)) \
                                            .drop("sequence")
# paragraph_preproc_df.show(5)
# paragraph_preproc_df.printSchema()

In [46]:
from pyspark.sql.functions import col, count
from pyspark.sql.window import Window

@udf(returnType = LongType())
def getEndPos(startPos, text):
    length = len(text)
    return startPos + length

@udf(returnType = StringType())
def getOverlappedPos(start, end, index):
    index = int(index)
    if start > index and start < index + 4096:
        if end < index + 4096 :
            return str(start % 4096) + ','+ str(end % 4096)
        else:
            return str(start % 4096) + ','+ str(4096)
    if end > index and end < index + 4096:
        if start < index:
            return str(index % 4096) + ','+ str(end % 4096)
    else:
        return "0,0"


ps_preproc_df = paragraph_preproc_df.filter("is_impossible==False").drop("is_impossible") \
                                         .withColumn("answer_end", getEndPos("answer_start", "text")) \
                                         .withColumn("overlapped", getOverlappedPos("answer_start", "answer_end", "index")) \
                                         .drop("answer_start", "answer_end") \
                                         .withColumn("answer_start", split("overlapped",",").getItem(0).cast('int')) \
                                         .withColumn("answer_end", split("overlapped",",").getItem(1).cast('int')) \
                                         .drop("overlapped", "index")

positive_sample_df = ps_preproc_df.filter((col("answer_start") != 0) | (col("answer_end") != 0)) \
                                       .select("id", "source", "question", "answer_start", "answer_end") \
                                       .orderBy(col("id").asc())

window = Window.partitionBy(["id"])
ps_question_count = positive_sample_df.withColumn("n", count("id").over(window)) \
                                      .select("id", "question", "n") \
                                      .dropDuplicates(["id"])
ps_question_count_list = ps_question_count.collect()


ps_all_count = positive_sample_df.groupBy("question").count() \
                                 .withColumnRenamed("count", "n")

ps_all_count_list = ps_all_count.collect()

# positive_sample_df.show(5)


+--------------------+--------------------+--------------------+------------+----------+
|                  id|              source|            question|answer_start|answer_end|
+--------------------+--------------------+--------------------+------------+----------+
|ACCELERATEDTECHNO...|Parties also decl...|Highlight the par...|        1498|      1781|
|ACCELERATEDTECHNO...|ed      by writte...|Highlight the par...|        1498|      1781|
|ACCELERATEDTECHNO...|he responsibiliti...|Highlight the par...|        1193|      1568|
|ACCELERATEDTECHNO...|lectible Concepts...|Highlight the par...|        1193|      1568|
|ACCELERATEDTECHNO...|he responsibiliti...|Highlight the par...|         997|      1192|
+--------------------+--------------------+--------------------+------------+----------+
only showing top 5 rows



In [48]:
from pyspark.sql.functions import row_number

@udf(returnType = IntegerType())
def getAvgInOther(id, question):
    answer_num = 0
    res = 0
    # To get the question numbers in the local contract
    for row in ps_question_count_list:
        if id == row["id"]:
            answer_num = int(row["n"])
    
    # Use the total question reduce the local question number to get the number of 
    # that question in other contracts
    for row in ps_all_count_list:
        if question == row["question"]:
             res = int(row["n"]) - answer_num
    return int(res / contract_num - 1)

impossible_negative_preproc_df = paragraph_preproc_df.filter("is_impossible==true").fillna(0).drop("text", "is_impossible", "index") \
                                                     .withColumnRenamed("source", "im_source") \
                                                     .withColumnRenamed("answer_start", "im_answer_start")

# To remove the duplicate sequences with the positive samples and distinct the values
# Use a join and null check to realize the prior purpose
impossible_negative_sample_df = impossible_negative_preproc_df.join(positive_sample_df, ["id","question"], "outer") \
                                                            .filter("source is null or im_source != source") \
                                                            .drop("title","source", "answer_start", "answer_end") \
                                                            .withColumnRenamed("im_answer_start", "answer_start") \
                                                            .withColumnRenamed("im_answer_start", "answer_start") \
                                                            .withColumnRenamed("im_source", "source") \
                                                            .withColumn("answer_end", col("answer_start")) \
                                                            .select("id", "source", "question", "answer_start", "answer_end") \
                                                            .orderBy(col("id"))

# impossible_negative_sample_df.show(10)
# impossible_negative_sample_df.describe(["id"]).show()

window = Window.partitionBy(["id"]).orderBy(col("id"))
impossible_negative_samples = impossible_negative_sample_df.withColumn("row", row_number().over(window)) \
                                 .dropDuplicates(["id", "source"]) \
                                 .filter(col("row") <= getAvgInOther("id", "question")) \
                                 .drop("id", "row")



In [49]:
@udf(returnType = IntegerType())
def getQuestionNum(id):
    for row in ps_question_count_list:
        if row["id"] == id:
            return int(row["n"])
    return 0

possible_negative_preproc_df = ps_preproc_df.filter((col("answer_start") == 0) & (col("answer_end") == 0)).drop("title", "text") \
                                            .withColumnRenamed("source", "p_source") \
                                            .withColumnRenamed("answer_start", "p_answer_start") \
                                            .withColumnRenamed("answer_end", "p_answer_end")

# possible_negative_preproc_df.show(50)
possible_negative_sample_df = possible_negative_preproc_df.join(positive_sample_df, ["id", "question"], "outer") \
                                                          .filter("source is null or p_source != source") \
                                                          .drop("source", "answer_start", "answer_end") \
                                                          .withColumnRenamed("p_source", "source") \
                                                          .withColumnRenamed("p_answer_start", "answer_start") \
                                                          .withColumnRenamed("p_answer_end", "answer_end")
#                                                           .orderBy(col("id"))

# possible_negative_sample_df.show(20)
# possible_negative_sample_df.describe(["id"]).show()

window = Window.partitionBy(["id"]).orderBy(col("id"))
possible_negative_samples = possible_negative_sample_df.withColumn("row", row_number().over(window)) \
                                 .dropDuplicates(["id", "source"]) \
                                 .filter(col("row") <= getQuestionNum("id")) \
                                 .drop("id", "row")

# possible_negative_samples.show(10)


In [53]:
positive_samples = positive_sample_df.drop("id")

output_sample = positive_samples.union(possible_negative_samples).union(impossible_negative_samples)
output_sample.write.mode('Overwrite').json("result")

Py4JJavaError: An error occurred while calling o2671.json.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:736)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:271)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:287)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:178)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:182)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:763)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:548)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:569)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:592)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:689)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1886)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1846)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1819)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:335)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:344)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:898)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:468)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:439)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:516)
	... 22 more
